-
Notifications
You must be signed in to change notification settings - Fork 0
fix(query+ingest): value-anchored NL→SQL grounding, hierarchy resolution & pipeline robustness #2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
cbe6f83
277f96a
7847a81
3b30b74
c135bcc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -181,6 +181,23 @@ class FlyquerySettings(BaseSettings): | |
| reranker_top_n: int = 30 | ||
| query_expansion_enabled: bool = False | ||
|
|
||
| # Value-anchoring / grounding-quality knobs. The ingest pipeline already | ||
| # computes a per-column value catalogue (profile_json.top_values, min/max, | ||
| # distinct_estimate) + a semantic_type; these control how it is surfaced to | ||
| # the SQL writer at query time. All dataset-agnostic. | ||
| value_catalog_enabled: bool = True # render real column values into prompts | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| value_catalog_max_values: int = 25 # distinct values shown per column | ||
| value_catalog_char_budget: int = 320 # char cap on the value list per column | ||
| value_catalog_max_columns: int = 80 # cap columns that get a value line (prompt budget) | ||
| entity_resolution_enabled: bool = True # map question literals -> owning column | ||
| entity_resolution_max_literals: int = 8 # cap live value-scan probes per query | ||
| zero_row_repair_enabled: bool = True # repair queries that run but return 0 rows | ||
| candidate_exec_selection: bool = True # execute top candidates, pick a non-empty/non-degenerate one | ||
| synthesis_function_firewall: bool = True # block read_csv_auto/pg_read_file/... in generated SQL | ||
| group_resolution_enabled: bool = True # term -> full set of catalogued values it umbrellas | ||
| signed_measure_repair_enabled: bool = True # observed-sign probe on subtractions over a signed measure | ||
| group_coverage_repair_enabled: bool = True # advise when an IN-list under-covers a detected value group | ||
|
|
||
| # PII | ||
| pii_scanner: Literal["regex", "presidio", "disabled"] = "regex" | ||
| pii_policy_samples: Literal["warn", "redact", "reject"] = "redact" | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -32,6 +32,7 @@ | |
|
|
||
| from __future__ import annotations | ||
|
|
||
| import json | ||
| import uuid | ||
|
|
||
| import sqlalchemy as sa | ||
|
|
@@ -54,6 +55,7 @@ async def resolve( | |
| dataset_id: uuid.UUID, | ||
| table_names: list[str], | ||
| object_store_base: str | None = None, | ||
| pins: dict[str, str] | None = None, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| ) -> dict[str, str]: | ||
| """Return a mapping of table name → absolute parquet path. | ||
|
|
||
|
|
@@ -65,6 +67,11 @@ async def resolve( | |
| :param dataset_id: dataset to scope the lookup | ||
| :param table_names: unqualified table names from the AST | ||
| :param object_store_base: override for ``settings.object_store_base`` | ||
| :param pins: optional ``{table_name: snapshot_id}`` — a follow-up | ||
| drill-down turn pins each table to the snapshot it resolved to | ||
| on the first turn, so a mid-conversation re-ingest does not | ||
| silently switch the answer to a newer schema. Unpinned tables | ||
| fall back to ``current_snapshot_id``. | ||
| :return: ``{name: path}`` dict for all resolvable tables | ||
| """ | ||
| if not table_names: | ||
|
|
@@ -77,16 +84,56 @@ async def resolve( | |
| SELECT t.name, ss.parquet_object_key | ||
| FROM flyquery_tables t | ||
| JOIN flyquery_schema_snapshots ss | ||
| ON ss.id = t.current_snapshot_id | ||
| ON ss.table_id = t.id | ||
| AND ss.id = COALESCE( | ||
| (CAST(:pins AS jsonb) ->> t.name)::uuid, | ||
| t.current_snapshot_id | ||
| ) | ||
| WHERE t.dataset_id = :ds | ||
| AND t.name = ANY(:names) | ||
| AND t.is_active = true | ||
| """), | ||
| {"ds": dataset_id, "names": list(table_names)}, | ||
| {"ds": dataset_id, "names": list(table_names), "pins": json.dumps(pins or {})}, | ||
| ) | ||
|
|
||
| out: dict[str, str] = {} | ||
| for r in rows.mappings(): | ||
| key: str = r["parquet_object_key"] | ||
| out[r["name"]] = f"{base}/{key}" | ||
| return out | ||
|
|
||
| async def table_kinds_by_name(self, dataset_id: uuid.UUID, table_names: list[str]) -> dict[str, str]: | ||
| """Return ``{name: kind}`` for the active tables in the dataset. | ||
|
|
||
| Used by the firewall/bad-tables guard. Lives here (service layer) | ||
| rather than in a controller so the raw SQL stays out of the web tier. | ||
| """ | ||
| if not table_names: | ||
| return {} | ||
| rows = await self._session.execute( | ||
| sa.text(""" | ||
| SELECT name, kind FROM flyquery_tables | ||
| WHERE dataset_id = :ds AND name = ANY(:names) AND is_active = true | ||
| """), | ||
| {"ds": dataset_id, "names": list(table_names)}, | ||
| ) | ||
| return {r["name"]: r["kind"] for r in rows.mappings()} | ||
|
|
||
| async def current_snapshots(self, dataset_id: uuid.UUID, table_names: list[str]) -> dict[str, str]: | ||
| """Return ``{table_name: current_snapshot_id}`` for the given tables. | ||
|
|
||
| Used to record THIS turn's snapshot pins so a later drill-down turn | ||
| can reproduce the exact schema version it answered against. | ||
| """ | ||
| if not table_names: | ||
| return {} | ||
| rows = await self._session.execute( | ||
| sa.text(""" | ||
| SELECT name, current_snapshot_id | ||
| FROM flyquery_tables | ||
| WHERE dataset_id = :ds AND name = ANY(:names) AND is_active = true | ||
| AND current_snapshot_id IS NOT NULL | ||
| """), | ||
| {"ds": dataset_id, "names": list(table_names)}, | ||
| ) | ||
| return {r["name"]: str(r["current_snapshot_id"]) for r in rows.mappings()} | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
por qué la config lo tenemos aquí en vez de en el .env?