Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions src/flyquery/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,23 @@ class FlyquerySettings(BaseSettings):
reranker_top_n: int = 30
query_expansion_enabled: bool = False

# Value-anchoring / grounding-quality knobs. The ingest pipeline already

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

por qué la config lo tenemos aquí en vez de en el .env?

# computes a per-column value catalogue (profile_json.top_values, min/max,
# distinct_estimate) + a semantic_type; these control how it is surfaced to
# the SQL writer at query time. All dataset-agnostic.
value_catalog_enabled: bool = True # render real column values into prompts

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value_catalog_enabled se define aquí pero nunca se comprueba en ningún sitio — column_value_catalog() siempre se ejecuta, sin importar esta opción. Ahora mismo no hay forma de desactivar esta función. (Relacionado: render_column_catalog_line() en value_anchoring.py línea 150 también está muerta — nada la llama; todos los puntos de llamada reales usan render_catalog_from_meta(). Vale la pena borrar ambas, o conectar la opción si de verdad debe hacer algo.)

value_catalog_max_values: int = 25 # distinct values shown per column
value_catalog_char_budget: int = 320 # char cap on the value list per column
value_catalog_max_columns: int = 80 # cap columns that get a value line (prompt budget)
entity_resolution_enabled: bool = True # map question literals -> owning column
entity_resolution_max_literals: int = 8 # cap live value-scan probes per query
zero_row_repair_enabled: bool = True # repair queries that run but return 0 rows
candidate_exec_selection: bool = True # execute top candidates, pick a non-empty/non-degenerate one
synthesis_function_firewall: bool = True # block read_csv_auto/pg_read_file/... in generated SQL
group_resolution_enabled: bool = True # term -> full set of catalogued values it umbrellas
signed_measure_repair_enabled: bool = True # observed-sign probe on subtractions over a signed measure
group_coverage_repair_enabled: bool = True # advise when an IN-list under-covers a detected value group

# PII
pii_scanner: Literal["regex", "presidio", "disabled"] = "regex"
pii_policy_samples: Literal["warn", "redact", "reject"] = "redact"
Expand Down
3 changes: 3 additions & 0 deletions src/flyquery/core/agents/column_name_proposer_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ def build_column_name_proposer_agent(settings):
output_type=ProposedColumnNames,
instructions=prompt.instructions,
settings=settings,
# Deterministic naming: identical re-ingests must yield identical
# column names, otherwise reconcile sees phantom schema churn.
extra_settings={"temperature": 0.0},
)


Expand Down
3 changes: 3 additions & 0 deletions src/flyquery/core/agents/describe_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,4 +140,7 @@ def build_describe_agent(settings):
output_type=DescribedObjects,
instructions=prompt.instructions,
settings=settings,
# Deterministic descriptions/semantic types: identical columns on
# re-ingest must produce identical metadata, no schema-change churn.
extra_settings={"temperature": 0.0},
)
2 changes: 2 additions & 0 deletions src/flyquery/core/agents/relation_proposer_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,6 @@ def build_relation_proposer_agent(settings):
output_type=ProposedRelations,
instructions=prompt.instructions,
settings=settings,
# Deterministic relation proposals across re-ingests.
extra_settings={"temperature": 0.0},
)
3 changes: 3 additions & 0 deletions src/flyquery/core/agents/rename_detection_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,7 @@ def build_rename_detection_agent(settings):
settings=settings,
# Rename detection is a short task; cap output tokens tightly
max_output_tokens=2048,
# Deterministic: the same removed/candidate pair must always
# resolve the same way so re-ingests don't flip-flop renames.
extra_settings={"temperature": 0.0},
)
8 changes: 8 additions & 0 deletions src/flyquery/core/services/examples/auto_learner.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class AutoLearner:
Skips when:
- ``retries > 0`` (query required critic refinement)
- PII findings were detected in the result
- the query returned no rows (``row_count`` is 0, when provided)

Called by QueryService (Phase D) after a successful execution.
"""
Expand All @@ -46,6 +47,7 @@ async def maybe_propose(
retries: int,
pii_findings: list[Any],
query_id: uuid.UUID,
row_count: int | None = None,
) -> None:
"""Insert a flyquery_examples row when all criteria pass.

Expand All @@ -57,11 +59,17 @@ async def maybe_propose(
:param retries: number of critic refinement loops (must be 0 to propose)
:param pii_findings: any PII signals detected (must be empty to propose)
:param query_id: UUID of the parent query record
:param row_count: number of rows the query returned; when provided it
must be > 0 to propose (a valid-but-wrong query returning 0 rows
would otherwise poison grounding). When ``None`` the row gate is
skipped to preserve behaviour for callers that do not pass it.
"""
if retries > 0:
return
if pii_findings:
return
if row_count is not None and row_count <= 0:
return
await self._service.create(
tenant_id,
workspace_id,
Expand Down
19 changes: 17 additions & 2 deletions src/flyquery/core/services/execution/ast_classifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,23 @@ def classify(self, sql: str) -> AstClassification:
# pyright does not unify with the public ``Expression`` base below.
kind = self._kind(stmt) # pyright: ignore[reportArgumentType]

# Collect table refs — skip anonymous subquery aliases
tables = tuple(sorted({t.name for t in stmt.find_all(sqlglot.expressions.Table) if t.name}))
# Collect table refs — skip anonymous subquery aliases AND
# CTE-defined names. sqlglot represents a reference to a CTE
# (``FROM base`` where ``WITH base AS (...)``) as an ``exp.Table``
# node, so without this filter the CTE alias leaks into
# ``table_refs``; the downstream bad-tables guard then flags it
# as a non-existent table and the (otherwise valid) query is
# rejected — see QueryService bad-tables set-difference.
cte_names = {cte.alias_or_name for cte in stmt.find_all(sqlglot.expressions.CTE) if cte.alias_or_name}
tables = tuple(
sorted(
{
t.name
for t in stmt.find_all(sqlglot.expressions.Table)
if t.name and t.name not in cte_names
}
)
)
columns = tuple(sorted({c.name for c in stmt.find_all(sqlglot.expressions.Column) if c.name}))
has_subquery = bool(list(stmt.find_all(sqlglot.expressions.Subquery)))

Expand Down
51 changes: 49 additions & 2 deletions src/flyquery/core/services/execution/table_resolver.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

from __future__ import annotations

import json
import uuid

import sqlalchemy as sa
Expand All @@ -54,6 +55,7 @@ async def resolve(
dataset_id: uuid.UUID,
table_names: list[str],
object_store_base: str | None = None,
pins: dict[str, str] | None = None,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

resolve() tiene un parámetro pins pensado para fijar una pregunta de drill-down al snapshot del esquema con el que se respondió la primera vez, para que un re-ingest a mitad de conversación no cambie la respuesta sin avisar. Pero ninguno de los tres puntos de llamada en query_service.py (líneas 1166, 1281, 1384) pasa pins=, y current_snapshots() (línea 122) nunca se llama. prior_snapshot_pins simplemente se guarda y se recarga sin cambios (línea 988). Ahora mismo esta función no hace nada — un re-ingest entre dos mensajes de la misma conversación cambiará la respuesta al nuevo esquema sin que nadie se entere.

) -> dict[str, str]:
"""Return a mapping of table name → absolute parquet path.

Expand All @@ -65,6 +67,11 @@ async def resolve(
:param dataset_id: dataset to scope the lookup
:param table_names: unqualified table names from the AST
:param object_store_base: override for ``settings.object_store_base``
:param pins: optional ``{table_name: snapshot_id}`` — a follow-up
drill-down turn pins each table to the snapshot it resolved to
on the first turn, so a mid-conversation re-ingest does not
silently switch the answer to a newer schema. Unpinned tables
fall back to ``current_snapshot_id``.
:return: ``{name: path}`` dict for all resolvable tables
"""
if not table_names:
Expand All @@ -77,16 +84,56 @@ async def resolve(
SELECT t.name, ss.parquet_object_key
FROM flyquery_tables t
JOIN flyquery_schema_snapshots ss
ON ss.id = t.current_snapshot_id
ON ss.table_id = t.id
AND ss.id = COALESCE(
(CAST(:pins AS jsonb) ->> t.name)::uuid,
t.current_snapshot_id
)
WHERE t.dataset_id = :ds
AND t.name = ANY(:names)
AND t.is_active = true
"""),
{"ds": dataset_id, "names": list(table_names)},
{"ds": dataset_id, "names": list(table_names), "pins": json.dumps(pins or {})},
)

out: dict[str, str] = {}
for r in rows.mappings():
key: str = r["parquet_object_key"]
out[r["name"]] = f"{base}/{key}"
return out

async def table_kinds_by_name(self, dataset_id: uuid.UUID, table_names: list[str]) -> dict[str, str]:
"""Return ``{name: kind}`` for the active tables in the dataset.

Used by the firewall/bad-tables guard. Lives here (service layer)
rather than in a controller so the raw SQL stays out of the web tier.
"""
if not table_names:
return {}
rows = await self._session.execute(
sa.text("""
SELECT name, kind FROM flyquery_tables
WHERE dataset_id = :ds AND name = ANY(:names) AND is_active = true
"""),
{"ds": dataset_id, "names": list(table_names)},
)
return {r["name"]: r["kind"] for r in rows.mappings()}

async def current_snapshots(self, dataset_id: uuid.UUID, table_names: list[str]) -> dict[str, str]:
"""Return ``{table_name: current_snapshot_id}`` for the given tables.

Used to record THIS turn's snapshot pins so a later drill-down turn
can reproduce the exact schema version it answered against.
"""
if not table_names:
return {}
rows = await self._session.execute(
sa.text("""
SELECT name, current_snapshot_id
FROM flyquery_tables
WHERE dataset_id = :ds AND name = ANY(:names) AND is_active = true
AND current_snapshot_id IS NOT NULL
"""),
{"ds": dataset_id, "names": list(table_names)},
)
return {r["name"]: str(r["current_snapshot_id"]) for r in rows.mappings()}
5 changes: 5 additions & 0 deletions src/flyquery/core/services/ingestion/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ class ColumnSchema:
data_type: str
is_nullable: bool
position: int
# The source's ORIGINAL header before any rename (e.g. an Excel year header
# '2024' that the column-name proposer collapsed to ``year_1``). Preserved so
# the query layer can recover what a renamed column actually meant, instead of
# relying on a fixed ordinal convention. None when the name was not renamed.
original_name: str | None = None


@dataclass(frozen=True)
Expand Down
Loading
Loading