From a824b6fe50247f40ea75d9171364bd524e341ed3 Mon Sep 17 00:00:00 2001 From: yuvalkh Date: Wed, 17 Jun 2026 13:57:09 +0300 Subject: [PATCH 1/6] replace stub evaluations with api evaluations --- agent/src/agent/main.py | 5 +- backend/app/config.py | 2 +- backend/app/routers/agent.py | 10 +- backend/app/routers/evaluation.py | 671 ++++++++---------------- backend/app/services/langfuse_client.py | 1 + 5 files changed, 219 insertions(+), 470 deletions(-) diff --git a/agent/src/agent/main.py b/agent/src/agent/main.py index 8ead61c..7ab4f3e 100644 --- a/agent/src/agent/main.py +++ b/agent/src/agent/main.py @@ -4,7 +4,4 @@ # Set up logging with correlation ID setup_logging() -# FastMCP SSE app is a full Starlette app. -# We expose it directly so its lifespan is triggered properly by Uvicorn. -# It exposes the endpoint at /sse. -app = mcp.sse_app() +app = mcp.streamable_http_app() diff --git a/backend/app/config.py b/backend/app/config.py index eb0c108..67e642b 100644 --- a/backend/app/config.py +++ b/backend/app/config.py @@ -12,7 +12,7 @@ class Settings(BaseSettings): # Agent MCP service URL (internal service-to-service) AGENT_URL: str = "http://localhost:8001" - + EVALUATION_SERVICE_URL: str = "http://localhost:8001" OPENMETADATA_TOKEN: str = "" APP_ENV: str = "development" diff --git a/backend/app/routers/agent.py b/backend/app/routers/agent.py index 6a8e4fa..8ee6100 100644 --- a/backend/app/routers/agent.py +++ b/backend/app/routers/agent.py @@ -13,7 +13,7 @@ from fastapi import APIRouter, HTTPException from mcp.client.session import ClientSession -from mcp.client.sse import sse_client +from mcp.client.streamable_http import streamablehttp_client from pydantic import BaseModel from app.config import settings @@ -60,15 +60,15 @@ class ChatResponse(BaseModel): async def _call_agent_mcp(tool_arguments: dict) -> dict: """ - Connects to the agent MCP server over SSE, initializes the session, + Connects to the agent MCP server over Streamable HTTP, initializes the session, calls the 'chat_with_agent' tool, and returns the parsed result. """ - url = f"{settings.AGENT_URL}/sse" + url = f"{settings.AGENT_URL}/mcp" logger.debug("Connecting to agent MCP: %s args=%s", url, tool_arguments) try: - async with sse_client(url) as streams: - async with ClientSession(*streams) as session: + async with streamablehttp_client(url) as (read_stream, write_stream, _): + async with ClientSession(read_stream, write_stream) as session: await session.initialize() # Call the tool using the MCP client session diff --git a/backend/app/routers/evaluation.py b/backend/app/routers/evaluation.py index 9fb06b3..1998aec 100644 --- a/backend/app/routers/evaluation.py +++ b/backend/app/routers/evaluation.py @@ -14,14 +14,10 @@ """ import logging -import random from datetime import datetime +from typing import Literal import requests -from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException -from langfuse.decorators import langfuse_context, observe -from sqlmodel import Session, desc, select - from core.db.engine import engine, get_session from core.models.models import ( AlertSeverity, @@ -36,13 +32,55 @@ Table, TableStatus, ) -from app.services.evaluator import TextToSQLEvaluator +from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException +from langfuse.decorators import langfuse_context, observe +from pydantic import BaseModel +from sqlmodel import Session, desc, select + +from app.config import settings from app.services.langfuse_client import langfuse_client logger = logging.getLogger(__name__) router = APIRouter(tags=["evaluation"]) + +class EvalAPIRequest(BaseModel): + tables_names: list[str] + dataset_name: str + + +class EvalAPIQuestionMetrics(BaseModel): + exact_match: float + exact_execution_accuracy: float + contains_execution_accuracy: float + + +class EvalAPIQuestionResult(BaseModel): + question_id: str + generated_sql: str | None = None + metrics: EvalAPIQuestionMetrics + status: Literal["pass", "fail"] + error_message: str | None = None + row_count: int | None = None + + +class EvalAPIOverallMetrics(BaseModel): + contains_execution_accuracy: float + exact_execution_accuracy: float + exact_match: float + total_questions: int + pass_rate: float + fail_rate: float + + +class EvalAPIResponse(BaseModel): + run_id: str + status: Literal["completed", "failed"] + overall_metrics: EvalAPIOverallMetrics + results: list[EvalAPIQuestionResult] + + # Name of the single shared Langfuse dataset for all production table questions PRODUCTION_DATASET_NAME = "text2sql_production" @@ -115,61 +153,58 @@ def execute_single_table_eval(table_id: str, run_id: str, session: Session) -> f tags=["eval-run", f"table:{table_id}"], ) - # Score locally — stub returns 0.0 or 1.0 per question. - # MERGE: replace with real MCP/Trino calls via TextToSQLEvaluator. + table = session.get(Table, table_id) + dataset_name = f"text2sql_sandbox_{table_id}" - question_scores_contains: list[float] = [ - float(random.choice([0, 1])) for _ in questions - ] - question_scores_exact: list[float] = [ - float(random.choice([0, 1])) for _ in questions - ] - question_scores_ranking: list[float] = [ - float(random.choice([0, 1])) for _ in questions - ] - logger.info( - f"[Eval] Scored {len(questions)} questions for table {table_id} (local stubs)" - ) + if langfuse_client.enabled: + langfuse_client.ensure_dataset_synced( + dataset_name, _build_questions_payload(questions, table) + ) - avg_score_contains = round( - sum(question_scores_contains) / len(question_scores_contains), 3 - ) - pass_count_contains = sum(1 for s in question_scores_contains if s >= 0.50) - pass_rate_contains = round(pass_count_contains / len(question_scores_contains), 3) - avg_score_exact = round(sum(question_scores_exact) / len(question_scores_exact), 3) - pass_count_exact = sum(1 for s in question_scores_exact if s >= 0.50) - pass_rate_exact = round(pass_count_exact / len(question_scores_exact), 3) - avg_score_ranking = round( - sum(question_scores_ranking) / len(question_scores_ranking), 3 - ) - pass_count_ranking = sum(1 for s in question_scores_ranking if s >= 0.50) - pass_rate_ranking = round(pass_count_ranking / len(question_scores_ranking), 3) + try: + req = EvalAPIRequest(tables_names=[table.name], dataset_name=dataset_name) + resp = requests.post( + f"{settings.EVALUATION_SERVICE_URL}/text-to-sql/evaluation/run-single-dataset", + json=req.model_dump(), + timeout=600, + ) + resp.raise_for_status() + eval_resp = EvalAPIResponse(**resp.json()) + if eval_resp.status == "failed": + raise Exception("API returned failed status") + except Exception as e: + logger.error(f"[Eval] Table {table_id} evaluation failed via API: {e}") + run.status = EvalStatus.failed + run.score = -1.0 + session.add(run) + session.commit() + return -1.0 - run.score = avg_score_contains - run.pass_rate = pass_rate_contains - run.fail_rate = round(1.0 - pass_rate_contains, 3) - run.total_questions = len(questions) + metrics = eval_resp.overall_metrics + run.score = metrics.contains_execution_accuracy + run.pass_rate = metrics.pass_rate + run.fail_rate = metrics.fail_rate + run.total_questions = metrics.total_questions run.status = EvalStatus.completed run.completed_at = datetime.utcnow() run.dimension_averages = { - "contains_execution_accuracy": avg_score_contains, - "exact_execution_accuracy": avg_score_exact, - "ranking_accuracy": avg_score_ranking, + "contains_execution_accuracy": metrics.contains_execution_accuracy, + "exact_execution_accuracy": metrics.exact_execution_accuracy, + "exact_match": metrics.exact_match, } session.add(run) - for q, score in zip(questions, question_scores_contains, strict=False): + for q_res in eval_resp.results: session.add( EvalResult( run_id=run_id, - question_id=q.id, - score=score, - status="pass" if score >= 0.50 else "fail", + question_id=q_res.question_id, + score=q_res.metrics.contains_execution_accuracy, + status=q_res.status, ) ) # Lifecycle: draft → sandbox on first evaluation only - table = session.get(Table, table_id) if table and table.status == TableStatus.draft: table.status = TableStatus.sandbox session.add(table) @@ -177,14 +212,17 @@ def execute_single_table_eval(table_id: str, run_id: str, session: Session) -> f session.commit() logger.info( - f"[Eval] Table {table_id}: contains_exec_accuracy={avg_score_contains} " - f"exact_exec_accuracy={avg_score_exact} ranking_accuracy={avg_score_ranking} " - f"({len(questions)} questions, pass_rates=[{pass_rate_contains}, {pass_rate_exact}, {pass_rate_ranking}])" + f"[Eval] Table {table_id}: contains_exec_accuracy={metrics.contains_execution_accuracy} " + f"exact_exec_accuracy={metrics.exact_execution_accuracy} exact_match={metrics.exact_match} " + f"({metrics.total_questions} questions, pass_rate={metrics.pass_rate})" ) langfuse_context.update_current_trace( - output={"score": avg_score_contains, "pass_rate": pass_rate_contains} + output={ + "score": metrics.contains_execution_accuracy, + "pass_rate": metrics.pass_rate, + } ) - return avg_score_contains + return metrics.contains_execution_accuracy # ─── Phase A: measure baseline score on production dataset ──────────────────── @@ -193,19 +231,6 @@ def execute_single_table_eval(table_id: str, run_id: str, session: Session) -> f def _run_production_dataset_eval( session: Session, run_name_prefix: str, promotion_run_id: str ) -> float: - """ - Measures baseline contains_execution_accuracy on the unified - 'text2sql_production' Langfuse dataset. - - Dataset lifecycle: - - If the dataset does NOT exist: build it fresh from every production - table's golden questions, then run evaluation on it. - - If the dataset ALREADY exists: use it as-is (questions are appended - on each admin approval via _sync_questions_to_production_dataset). - - Returns the average contains_execution_accuracy score. - Returns 1.0 if there are no production tables (vacuously passing). - """ prod_tables = session.exec( select(Table).where(Table.status == TableStatus.production) ).all() @@ -214,7 +239,6 @@ def _run_production_dataset_eval( logger.info("[Promotion/Phase-A] No production tables — baseline score = 1.0") return 1.0 - # Load all production question objects so we can persist per-question EvalResult rows all_production_questions: list[GoldenQuestion] = [] for table in prod_tables: qs = session.exec( @@ -223,8 +247,6 @@ def _run_production_dataset_eval( all_production_questions.extend(qs) if langfuse_client.enabled: - # Always sync — ensure_dataset_synced is idempotent (skips already-present questions). - # This covers: new dataset, empty dataset, or dataset missing recently added questions. all_questions_payload = [] for table in prod_tables: qs_for_table = [ @@ -233,20 +255,12 @@ def _run_production_dataset_eval( all_questions_payload.extend(_build_questions_payload(qs_for_table, table)) if all_questions_payload: - logger.info( - f"[Promotion/Phase-A] Full sync of {len(all_questions_payload)} questions " - f"to '{PRODUCTION_DATASET_NAME}' (adds new, removes stale, updates changed)" - ) try: langfuse_client.sync_dataset( PRODUCTION_DATASET_NAME, all_questions_payload ) except Exception as e: logger.warning(f"[Promotion/Phase-A] Dataset sync failed: {e}") - else: - logger.info( - "[Promotion/Phase-A] No questions to sync — production tables have no golden questions" - ) run = EvalRun( table_id=None, @@ -258,181 +272,59 @@ def _run_production_dataset_eval( session.commit() session.refresh(run) - # Run evaluation against the production dataset - question_scores: list[float] = [] - if langfuse_client.enabled: - try: - evaluator = TextToSQLEvaluator( - run_name=f"{run_name_prefix}-PhaseA", - session=session, - table_id="production-baseline", - run_id=run.id, - question_scores=question_scores, - ) - evaluator.run_single_dataset(PRODUCTION_DATASET_NAME) - except Exception as e: - logger.error(f"[Promotion/Phase-A] Baseline eval failed: {e}") - - if not question_scores: - question_scores = [ - float(random.choice([0, 1])) for _ in (all_production_questions or range(5)) - ] - - question_scores_contains = question_scores - question_scores_exact = [ - float(random.choice([0, 1])) for _ in range(len(question_scores_contains)) - ] - question_scores_ranking = [ - float(random.choice([0, 1])) for _ in range(len(question_scores_contains)) - ] - - avg_score_contains = ( - round(sum(question_scores_contains) / len(question_scores_contains), 3) - if question_scores_contains - else 1.0 - ) - pass_count_contains = sum(1 for s in question_scores_contains if s >= 0.50) - pass_rate_contains = ( - round(pass_count_contains / len(question_scores_contains), 3) - if question_scores_contains - else 1.0 - ) - - avg_score_exact = ( - round(sum(question_scores_exact) / len(question_scores_exact), 3) - if question_scores_exact - else 1.0 - ) - pass_count_exact = sum(1 for s in question_scores_exact if s >= 0.50) - ( - round(pass_count_exact / len(question_scores_exact), 3) - if question_scores_exact - else 1.0 - ) - - avg_score_ranking = ( - round(sum(question_scores_ranking) / len(question_scores_ranking), 3) - if question_scores_ranking - else 1.0 - ) - pass_count_ranking = sum(1 for s in question_scores_ranking if s >= 0.50) - ( - round(pass_count_ranking / len(question_scores_ranking), 3) - if question_scores_ranking - else 1.0 - ) + table_names = [t.name for t in prod_tables] + try: + req = EvalAPIRequest( + tables_names=table_names, dataset_name=PRODUCTION_DATASET_NAME + ) + resp = requests.post( + f"{settings.EVALUATION_SERVICE_URL}/text-to-sql/evaluation/run-single-dataset", + json=req.model_dump(), + timeout=600, + ) + resp.raise_for_status() + eval_resp = EvalAPIResponse(**resp.json()) + if eval_resp.status == "failed": + raise Exception("API returned failed status") + except Exception as e: + logger.error(f"[Promotion/Phase-A] Baseline eval failed: {e}") + run.status = EvalStatus.failed + run.score = -1.0 + session.add(run) + session.commit() + return -1.0 - run.score = avg_score_contains - run.pass_rate = pass_rate_contains - run.fail_rate = round(1.0 - pass_rate_contains, 3) - run.total_questions = len(question_scores_contains) + metrics = eval_resp.overall_metrics + run.score = metrics.contains_execution_accuracy + run.pass_rate = metrics.pass_rate + run.fail_rate = metrics.fail_rate + run.total_questions = metrics.total_questions run.status = EvalStatus.completed run.completed_at = datetime.utcnow() run.dimension_averages = { - "contains_execution_accuracy": avg_score_contains, - "exact_execution_accuracy": avg_score_exact, - "ranking_accuracy": avg_score_ranking, + "contains_execution_accuracy": metrics.contains_execution_accuracy, + "exact_execution_accuracy": metrics.exact_execution_accuracy, + "exact_match": metrics.exact_match, } session.add(run) - # Persist per-question EvalResult rows so the regression diff can compare - # Only if the evaluator didn't already insert them - existing_results = session.exec( - select(EvalResult).where(EvalResult.run_id == run.id) - ).first() - if not existing_results and all_production_questions: - for q, score in zip( - all_production_questions, question_scores_contains, strict=False - ): - session.add( - EvalResult( - run_id=run.id, - question_id=q.id, - score=score, - status="pass" if score >= 0.50 else "fail", - ) + for q_res in eval_resp.results: + session.add( + EvalResult( + run_id=run.id, + question_id=q_res.question_id, + score=q_res.metrics.contains_execution_accuracy, + status=q_res.status, ) - + ) session.commit() - # Log per-question scores back to Langfuse so they appear in the Experiments UI - if ( - langfuse_client.enabled - and all_production_questions - and question_scores_contains - ): - try: - # Fetch dataset items to get their Langfuse item IDs (needed to link scores) - res = requests.get( - f"{langfuse_client._tracer.host}/api/public/dataset-items" - f"?datasetName={PRODUCTION_DATASET_NAME}&limit=500", - auth=( - langfuse_client._tracer.public_key, - langfuse_client._tracer.private_key, - ), - ) - item_map: dict[str, str] = {} # question_id → langfuse_item_id - if res.status_code == 200: - for item in res.json().get("data", []): - qid = item.get("metadata", {}).get("question_id") - if qid: - item_map[qid] = item["id"] - - run_name = f"{run_name_prefix}-PhaseA" - for q, score in zip( - all_production_questions, question_scores_contains, strict=False - ): - lf_item_id = item_map.get(q.id) - if not lf_item_id: - continue - try: - # Create a trace for this question result - trace = langfuse_client.client.trace( - name=f"production-baseline-q-{q.id[:8]}", - input={"question": q.question}, - output={ - "score": score, - "status": "pass" if score >= 0.50 else "fail", - }, - metadata={ - "question_id": q.id, - "run_id": run.id, - "run_name": run_name, - }, - ) - # Link the trace to the dataset item as an experiment run - langfuse_client.link_trace_to_dataset_run( - run_name=run_name, - run_description=f"Production baseline — {run_name_prefix}", - run_metadata={"promotion_run_id": promotion_run_id}, - dataset_item_id=lf_item_id, - trace_id=trace.id, - ) - # Score the trace - langfuse_client.client.score( - trace_id=trace.id, - name="contains_execution_accuracy", - value=score, - comment="pass" if score >= 0.50 else "fail", - ) - except Exception as exc: - logger.warning( - f"[Promotion/Phase-A] Failed to log score for question {q.id}: {exc}" - ) - - langfuse_client.flush() - logger.info( - f"[Promotion/Phase-A] Logged {len(all_production_questions)} question scores to Langfuse run '{run_name}'" - ) - except Exception as exc: - logger.warning(f"[Promotion/Phase-A] Langfuse score logging failed: {exc}") - logger.info( - f"[Promotion/Phase-A] Baseline contains_exec_accuracy = {avg_score_contains:.3f} " - f"exact_exec_accuracy = {avg_score_exact:.3f} ranking_accuracy = {avg_score_ranking:.3f} " - f"({len(question_scores_contains)} questions)" + f"[Promotion/Phase-A] Baseline contains_exec_accuracy = {metrics.contains_execution_accuracy:.3f} " + f"exact_exec_accuracy = {metrics.exact_execution_accuracy:.3f} exact_match = {metrics.exact_match:.3f} " + f"({metrics.total_questions} questions)" ) - return avg_score_contains + return metrics.contains_execution_accuracy # We no longer use a fixed dataset name to avoid soft-delete conflicts and question accumulation. @@ -446,18 +338,6 @@ def _run_candidate_eval( session: Session, promotion_run_id: str, ) -> float: - """ - Writes the candidate table's golden questions into the fixed - 'text2sql_candidate' Langfuse dataset (overwriting any prior run), - then evaluates it. Returns average contains_execution_accuracy. - - Using a fixed name ensures there is always exactly one candidate dataset - in Langfuse regardless of how many promotions have been attempted. - - Cleanup of dataset items is gated on confirmed Langfuse run item - finalization (via wait_for_run_items) to avoid the race condition where - items are deleted before the server finishes persisting evaluation run items. - """ run = EvalRun( table_id=table.id, status=EvalStatus.running, @@ -468,157 +348,83 @@ def _run_candidate_eval( session.commit() session.refresh(run) - question_scores: list[float] = [] - dataset_name = "text2sql_candidate" - run_name = f"{run_name_prefix}-Candidate" - score = 0.0 if langfuse_client.enabled: try: langfuse_client.ensure_dataset_synced( dataset_name, _build_questions_payload(questions, table) ) - - evaluator = TextToSQLEvaluator( - run_name=run_name, - session=session, - table_id=table.id, - run_id=run.id, - question_scores=question_scores, - ) - evaluator.run_single_dataset(dataset_name) except Exception as e: - logger.error(f"[Promotion/Phase-B] Candidate eval failed: {e}") - - if not question_scores: - question_scores = [float(random.choice([0, 1])) for _ in questions] - - # Generate stubs for exact and ranking based on the same length - - question_scores_contains = question_scores - question_scores_exact = [ - float(random.choice([0, 1])) for _ in range(len(question_scores_contains)) - ] - question_scores_ranking = [ - float(random.choice([0, 1])) for _ in range(len(question_scores_contains)) - ] - - avg_score_contains = ( - round(sum(question_scores_contains) / len(question_scores_contains), 3) - if question_scores_contains - else 0.0 - ) - pass_count_contains = sum(1 for s in question_scores_contains if s >= 0.50) - pass_rate_contains = ( - round(pass_count_contains / len(question_scores_contains), 3) - if question_scores_contains - else 1.0 - ) - - avg_score_exact = ( - round(sum(question_scores_exact) / len(question_scores_exact), 3) - if question_scores_exact - else 0.0 - ) - pass_count_exact = sum(1 for s in question_scores_exact if s >= 0.50) - ( - round(pass_count_exact / len(question_scores_exact), 3) - if question_scores_exact - else 1.0 - ) - - avg_score_ranking = ( - round(sum(question_scores_ranking) / len(question_scores_ranking), 3) - if question_scores_ranking - else 0.0 - ) - pass_count_ranking = sum(1 for s in question_scores_ranking if s >= 0.50) - ( - round(pass_count_ranking / len(question_scores_ranking), 3) - if question_scores_ranking - else 1.0 - ) + logger.error(f"[Promotion/Phase-B] Candidate eval prep failed: {e}") + + try: + req = EvalAPIRequest(tables_names=[table.name], dataset_name=dataset_name) + resp = requests.post( + f"{settings.EVALUATION_SERVICE_URL}/text-to-sql/evaluation/run-single-dataset", + json=req.model_dump(), + timeout=600, + ) + resp.raise_for_status() + eval_resp = EvalAPIResponse(**resp.json()) + if eval_resp.status == "failed": + raise Exception("API returned failed status") + except Exception as e: + logger.error(f"[Promotion/Phase-B] Candidate eval failed: {e}") + run.status = EvalStatus.failed + run.score = -1.0 + session.add(run) + session.commit() + return -1.0 - run.score = avg_score_contains - run.pass_rate = pass_rate_contains - run.fail_rate = round(1.0 - pass_rate_contains, 3) - run.total_questions = len(question_scores_contains) + metrics = eval_resp.overall_metrics + run.score = metrics.contains_execution_accuracy + run.pass_rate = metrics.pass_rate + run.fail_rate = metrics.fail_rate + run.total_questions = metrics.total_questions run.status = EvalStatus.completed run.completed_at = datetime.utcnow() run.dimension_averages = { - "contains_execution_accuracy": avg_score_contains, - "exact_execution_accuracy": avg_score_exact, - "ranking_accuracy": avg_score_ranking, + "contains_execution_accuracy": metrics.contains_execution_accuracy, + "exact_execution_accuracy": metrics.exact_execution_accuracy, + "exact_match": metrics.exact_match, } session.add(run) - # Persist per-question EvalResult rows so the report endpoint - # can show per-question scores in the UI. - existing_results = session.exec( - select(EvalResult).where(EvalResult.run_id == run.id) - ).first() - if not existing_results: - for q, score in zip(questions, question_scores_contains, strict=False): - session.add( - EvalResult( - run_id=run.id, - question_id=q.id, - score=score, - status="pass" if score >= 0.50 else "fail", - ) + for q_res in eval_resp.results: + session.add( + EvalResult( + run_id=run.id, + question_id=q_res.question_id, + score=q_res.metrics.contains_execution_accuracy, + status=q_res.status, ) - + ) session.commit() logger.info( - f"[Promotion/Phase-B] Candidate '{table.name}' contains_score = {avg_score_contains:.3f} " - f"exact_score = {avg_score_exact:.3f} ranking_score = {avg_score_ranking:.3f}" + f"[Promotion/Phase-B] Candidate '{table.name}' contains_score = {metrics.contains_execution_accuracy:.3f} " + f"exact_score = {metrics.exact_execution_accuracy:.3f} exact_match = {metrics.exact_match:.3f}" ) - # ── Gate cleanup on confirmed Langfuse run item finalization ─────────────── - # We must NOT delete dataset items until Langfuse has fully persisted all - # evaluation run items server-side. Deleting before that causes the run to - # record 0 items (race condition, especially on slow private networks). - # - # wait_for_run_items polls GET /api/public/dataset-run-items until the - # expected count is reached — deterministic, state-driven, no fixed sleep. if langfuse_client.enabled: - finalized = langfuse_client.wait_for_run_items( - dataset_name=dataset_name, - run_name=run_name, - expected_count=len(questions), - ) - if not finalized: - logger.warning( - "[Promotion/Phase-B] Langfuse run items were not fully persisted " - "within the configured wait window. Proceeding with cleanup to avoid " - "blocking the promotion pipeline. Check LANGFUSE_WAIT_MAX_ATTEMPTS " - "and LANGFUSE_WAIT_INITIAL_DELAY_SECS if this happens repeatedly." - ) + # Since API might be async or Langfuse is async, we may still need to clear dataset + # Here we don't wait for traces, just clear it after evaluation finishes langfuse_client.clear_dataset(dataset_name) - logger.info( - f"[Promotion/Phase-B] Cleared candidate dataset '{dataset_name}' after " - f"confirmed evaluation completion." - ) - return avg_score_contains + return metrics.contains_execution_accuracy def _run_regression_eval( run_name_prefix: str, session: Session, promotion_run_id: str ) -> float: - """ - Re-runs the production dataset evaluation AFTER the candidate table has been - temporarily added to the warehouse. Returns the new score. - """ - # Load the same production questions as the baseline so we can save per-question - # EvalResult rows and enable cross-run regression diff. prod_tables = session.exec( select(Table).where(Table.status == TableStatus.production) ).all() all_production_questions: list[GoldenQuestion] = [] + table_names = [] for table in prod_tables: + table_names.append(table.name) qs = session.exec( select(GoldenQuestion).where(GoldenQuestion.table_id == table.id) ).all() @@ -634,112 +440,57 @@ def _run_regression_eval( session.commit() session.refresh(run) - # Re-sync the production dataset (unchanged questions, but now the candidate - # table is in the warehouse so the agent can query it) - question_scores: list[float] = [] - if langfuse_client.enabled: - try: - evaluator = TextToSQLEvaluator( - run_name=f"{run_name_prefix}-Regression", - session=session, - table_id="production-regression", - run_id=run.id, - question_scores=question_scores, - ) - evaluator.run_single_dataset(PRODUCTION_DATASET_NAME) - except Exception as e: - logger.error(f"[Promotion/Phase-B] Regression eval failed: {e}") - - if not question_scores: - # Slight variance from baseline to simulate real regression testing. - # Use the same question count as the loaded production questions. - question_scores = [ - float(random.choice([0, 1])) for _ in (all_production_questions or range(5)) - ] - - question_scores_contains = question_scores - question_scores_exact = [ - float(random.choice([0, 1])) for _ in range(len(question_scores_contains)) - ] - question_scores_ranking = [ - float(random.choice([0, 1])) for _ in range(len(question_scores_contains)) - ] - - avg_score_contains = ( - round(sum(question_scores_contains) / len(question_scores_contains), 3) - if question_scores_contains - else 0.0 - ) - pass_count_contains = sum(1 for s in question_scores_contains if s >= 0.50) - pass_rate_contains = ( - round(pass_count_contains / len(question_scores_contains), 3) - if question_scores_contains - else 1.0 - ) - - avg_score_exact = ( - round(sum(question_scores_exact) / len(question_scores_exact), 3) - if question_scores_exact - else 0.0 - ) - pass_count_exact = sum(1 for s in question_scores_exact if s >= 0.50) - ( - round(pass_count_exact / len(question_scores_exact), 3) - if question_scores_exact - else 1.0 - ) - - avg_score_ranking = ( - round(sum(question_scores_ranking) / len(question_scores_ranking), 3) - if question_scores_ranking - else 0.0 - ) - pass_count_ranking = sum(1 for s in question_scores_ranking if s >= 0.50) - ( - round(pass_count_ranking / len(question_scores_ranking), 3) - if question_scores_ranking - else 1.0 - ) + try: + req = EvalAPIRequest( + tables_names=table_names, dataset_name=PRODUCTION_DATASET_NAME + ) + resp = requests.post( + f"{settings.EVALUATION_SERVICE_URL}/text-to-sql/evaluation/run-single-dataset", + json=req.model_dump(), + timeout=600, + ) + resp.raise_for_status() + eval_resp = EvalAPIResponse(**resp.json()) + if eval_resp.status == "failed": + raise Exception("API returned failed status") + except Exception as e: + logger.error(f"[Promotion/Phase-B] Regression eval failed: {e}") + run.status = EvalStatus.failed + run.score = -1.0 + session.add(run) + session.commit() + return -1.0 - run.score = avg_score_contains - run.pass_rate = pass_rate_contains - run.fail_rate = round(1.0 - pass_rate_contains, 3) - run.total_questions = len(question_scores_contains) + metrics = eval_resp.overall_metrics + run.score = metrics.contains_execution_accuracy + run.pass_rate = metrics.pass_rate + run.fail_rate = metrics.fail_rate + run.total_questions = metrics.total_questions run.status = EvalStatus.completed run.completed_at = datetime.utcnow() run.dimension_averages = { - "contains_execution_accuracy": avg_score_contains, - "exact_execution_accuracy": avg_score_exact, - "ranking_accuracy": avg_score_ranking, + "contains_execution_accuracy": metrics.contains_execution_accuracy, + "exact_execution_accuracy": metrics.exact_execution_accuracy, + "exact_match": metrics.exact_match, } session.add(run) - # Persist per-question EvalResult rows so the regression diff endpoint can - # compare which questions passed baseline but failed here. - # Only if the evaluator didn't already insert them - existing_results = session.exec( - select(EvalResult).where(EvalResult.run_id == run.id) - ).first() - if not existing_results and all_production_questions: - for q, score in zip( - all_production_questions, question_scores_contains, strict=False - ): - session.add( - EvalResult( - run_id=run.id, - question_id=q.id, - score=score, - status="pass" if score >= 0.50 else "fail", - ) + for q_res in eval_resp.results: + session.add( + EvalResult( + run_id=run.id, + question_id=q_res.question_id, + score=q_res.metrics.contains_execution_accuracy, + status=q_res.status, ) - + ) session.commit() logger.info( - f"[Promotion/Phase-B] Regression contains_score (with candidate) = {avg_score_contains:.3f} " - f"exact_score = {avg_score_exact:.3f} ranking_score = {avg_score_ranking:.3f}" + f"[Promotion/Phase-B] Regression contains_score (with candidate) = {metrics.contains_execution_accuracy:.3f} " + f"exact_score = {metrics.exact_execution_accuracy:.3f} exact_match = {metrics.exact_match:.3f}" ) - return avg_score_contains + return metrics.contains_execution_accuracy # ─── Main promotion workflow ─────────────────────────────────────────────────── diff --git a/backend/app/services/langfuse_client.py b/backend/app/services/langfuse_client.py index 8c2435f..f188f8a 100644 --- a/backend/app/services/langfuse_client.py +++ b/backend/app/services/langfuse_client.py @@ -402,6 +402,7 @@ def sync_dataset(self, dataset_name: str, questions: list) -> object: try: self._tracer.client.create_dataset_item( dataset_name=dataset_name, + id=q["question_id"], input={ "query": q["question_text"], "databases": [q.get("schema_name", q["table_id"])], From 5b5481cba8a4fac0be7549e058404857a49c0447 Mon Sep 17 00:00:00 2001 From: yuvalkh Date: Sun, 21 Jun 2026 11:05:05 +0300 Subject: [PATCH 2/6] updated embedder url --- agent/src/agent/nodes/schema_explorer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/agent/src/agent/nodes/schema_explorer.py b/agent/src/agent/nodes/schema_explorer.py index 9a1dfcb..f7a20a4 100644 --- a/agent/src/agent/nodes/schema_explorer.py +++ b/agent/src/agent/nodes/schema_explorer.py @@ -42,7 +42,7 @@ class SchemaExplorerOutput(BaseModel): def get_query_embedding(text: str) -> list[float]: """Generate 768-dimensional embedding from nomic-embed-text.""" # TODO: support secret - url = f"{settings.EMBEDDER_URL}/api/embeddings" + url = settings.EMBEDDER_URL data = json.dumps({"model": settings.EMBEDDER_MODEL, "prompt": text}).encode() req = urllib.request.Request(url, data=data, headers={"Content-Type": "application/json"}) try: From 161af684d0b0b123f6fc43787b1473dfe6f63fae Mon Sep 17 00:00:00 2001 From: yuvalkh Date: Mon, 22 Jun 2026 13:29:50 +0300 Subject: [PATCH 3/6] updated endpoint of embedding to match the ts --- agent/src/agent/config.py | 3 ++- agent/src/agent/nodes/schema_explorer.py | 11 +++++++---- backend/app/config.py | 3 ++- backend/app/routers/tables.py | 18 ++++++++++++++---- backend/app/seed.py | 12 +++++++----- 5 files changed, 32 insertions(+), 15 deletions(-) diff --git a/agent/src/agent/config.py b/agent/src/agent/config.py index 4a12671..ac0cecd 100644 --- a/agent/src/agent/config.py +++ b/agent/src/agent/config.py @@ -9,8 +9,9 @@ class AgentSettings(BaseSettings): LLM_API_KEY: str = "ollama" LLM_BASE_URL: str = "http://localhost:11434/v1" LLM_MODEL: str = "gemma4:e4b" - EMBEDDER_URL: str = "http://localhost:11434" + EMBEDDER_URL: str = "http://localhost:11434/v1/embeddings" EMBEDDER_MODEL: str = "nomic-embed-text:latest" + EMBEDDER_KEY: str = "" HYBRID_SEARCH_MAX_TABLES: int = 10 MAX_PROFILES_TO_FETCH: int = 3 diff --git a/agent/src/agent/nodes/schema_explorer.py b/agent/src/agent/nodes/schema_explorer.py index f7a20a4..d34d7c1 100644 --- a/agent/src/agent/nodes/schema_explorer.py +++ b/agent/src/agent/nodes/schema_explorer.py @@ -41,13 +41,16 @@ class SchemaExplorerOutput(BaseModel): def get_query_embedding(text: str) -> list[float]: """Generate 768-dimensional embedding from nomic-embed-text.""" - # TODO: support secret url = settings.EMBEDDER_URL - data = json.dumps({"model": settings.EMBEDDER_MODEL, "prompt": text}).encode() - req = urllib.request.Request(url, data=data, headers={"Content-Type": "application/json"}) + data = json.dumps({"model": settings.EMBEDDER_MODEL, "input": text}).encode() + headers = {"Content-Type": "application/json"} + if settings.EMBEDDER_KEY: + headers["Authorization"] = f"Bearer {settings.EMBEDDER_KEY}" + req = urllib.request.Request(url, data=data, headers=headers) try: with urllib.request.urlopen(req, timeout=10) as res: - return json.loads(res.read().decode())["embedding"] + resp_data = json.loads(res.read().decode())["data"][0] + return resp_data["embedding"] except Exception as e: print(f"Error getting query embedding: {e}") return [0.0] * 768 diff --git a/backend/app/config.py b/backend/app/config.py index 67e642b..c4eaf32 100644 --- a/backend/app/config.py +++ b/backend/app/config.py @@ -52,8 +52,9 @@ class Settings(BaseSettings): JWT_EXPIRE_HOURS: int = 8 # Embedder Config - EMBEDDER_URL: str = "http://host.docker.internal:11434/api/embeddings" + EMBEDDER_URL: str = "http://host.docker.internal:11434/v1/embeddings" EMBEDDER_MODEL: str = "nomic-embed-text:latest" + EMBEDDER_KEY: str = "" settings = Settings() diff --git a/backend/app/routers/tables.py b/backend/app/routers/tables.py index 3936b7e..a8541b0 100644 --- a/backend/app/routers/tables.py +++ b/backend/app/routers/tables.py @@ -131,13 +131,18 @@ def create_table(payload: TableCreate, session: Session = Depends(get_session)): text_to_embed = f"Table name: {name}\nSchema: {schema_name}\nDescription: {description}\nColumns: {', '.join([c.get('name', '') for c in om_columns])}" embedding = None try: + headers = {} + if settings.EMBEDDER_KEY: + headers["Authorization"] = f"Bearer {settings.EMBEDDER_KEY}" embed_resp = httpx.post( settings.EMBEDDER_URL, - json={"model": settings.EMBEDDER_MODEL, "prompt": text_to_embed}, + json={"model": settings.EMBEDDER_MODEL, "input": text_to_embed}, + headers=headers, timeout=10.0, ) if embed_resp.status_code == 200: - embedding = embed_resp.json().get("embedding") + resp_data = embed_resp.json()["data"][0] + embedding = resp_data.get("embedding") if len(embedding) != EXPECTED_EMBEDDING_DIM: raise ValueError( f"Embedder returned embedding of length {len(embedding)}, " @@ -245,13 +250,18 @@ def sync_table_schema(table_id: str, session: Session = Depends(get_session)): # Generate embedding text_to_embed = f"Table name: {name}\nSchema: {schema_name}\nDescription: {description}\nColumns: {', '.join([c.get('name', '') for c in om_columns])}" try: + headers = {} + if settings.EMBEDDER_KEY: + headers["Authorization"] = f"Bearer {settings.EMBEDDER_KEY}" embed_resp = httpx.post( settings.EMBEDDER_URL, - json={"model": settings.EMBEDDER_MODEL, "prompt": text_to_embed}, + json={"model": settings.EMBEDDER_MODEL, "input": text_to_embed}, + headers=headers, timeout=10.0, ) if embed_resp.status_code == 200: - embedding = embed_resp.json().get("embedding") + resp_data = embed_resp.json()["data"][0] + embedding = resp_data.get("embedding") if len(embedding) != EXPECTED_EMBEDDING_DIM: raise ValueError( f"Embedder returned embedding of length {len(embedding)}, " diff --git a/backend/app/seed.py b/backend/app/seed.py index 94036e4..2c1b33f 100644 --- a/backend/app/seed.py +++ b/backend/app/seed.py @@ -23,13 +23,15 @@ def get_embedding(text: str) -> list[float]: url = settings.EMBEDDER_URL - data = json.dumps({"model": settings.EMBEDDER_MODEL, "prompt": text}).encode() - req = urllib.request.Request( - url, data=data, headers={"Content-Type": "application/json"} - ) + data = json.dumps({"model": settings.EMBEDDER_MODEL, "input": text}).encode() + headers = {"Content-Type": "application/json"} + if settings.EMBEDDER_KEY: + headers["Authorization"] = f"Bearer {settings.EMBEDDER_KEY}" + req = urllib.request.Request(url, data=data, headers=headers) try: with urllib.request.urlopen(req, timeout=10) as res: - embedding = json.loads(res.read().decode())["embedding"] + resp_data = json.loads(res.read().decode()) + embedding = resp_data["data"][0]["embedding"] if len(embedding) != EXPECTED_EMBEDDING_DIM: raise ValueError( f"Embedder returned embedding of length {len(embedding)}, " From 59cee5d9432ebdcaa35d456c75a2c032124270c9 Mon Sep 17 00:00:00 2001 From: yuvalkh Date: Mon, 22 Jun 2026 13:40:04 +0300 Subject: [PATCH 4/6] updated gitignore to ignore openshift-images --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index 04559f4..6e63f5b 100644 --- a/.gitignore +++ b/.gitignore @@ -25,6 +25,7 @@ dist-ssr .venv *.python-version *.env +openshift-images/ # Python cache __pycache__/ From 980a76f99831454629b3f407354163e5ac72cb77 Mon Sep 17 00:00:00 2001 From: yuvalkh Date: Mon, 22 Jun 2026 15:10:42 +0300 Subject: [PATCH 5/6] fix ts evaluations after CR --- agent/src/agent/nodes/schema_explorer.py | 23 ++++---- backend/app/routers/admin_approval.py | 14 ++--- backend/app/routers/enrichment.py | 9 ++-- backend/app/routers/evaluation.py | 10 ++-- backend/app/routers/health.py | 9 ++-- backend/app/routers/orchestration.py | 6 +-- backend/app/routers/profiling.py | 6 +-- backend/app/routers/tables.py | 67 +++++++----------------- backend/app/seed.py | 33 ++++-------- backend/app/services/profiling_engine.py | 4 +- backend/app/services/scheduler.py | 7 ++- core/src/core/embeddings.py | 33 ++++++++++++ core/src/core/models/models.py | 46 ++++++++-------- 13 files changed, 126 insertions(+), 141 deletions(-) create mode 100644 core/src/core/embeddings.py diff --git a/agent/src/agent/nodes/schema_explorer.py b/agent/src/agent/nodes/schema_explorer.py index d34d7c1..81f9687 100644 --- a/agent/src/agent/nodes/schema_explorer.py +++ b/agent/src/agent/nodes/schema_explorer.py @@ -16,6 +16,7 @@ from sqlmodel import Session, select from agent.config import settings from agent.langfuse_client import langfuse_client +from core.embeddings import get_embedding # Initialize LLM llm = ChatOpenAI(model=settings.LLM_MODEL, base_url=settings.LLM_BASE_URL, api_key=settings.LLM_API_KEY, temperature=0) @@ -39,21 +40,19 @@ class SchemaExplorerOutput(BaseModel): description="List of strings (table names or options) for the user to choose from. Must be empty if ambiguity_detected is false." ) + def get_query_embedding(text: str) -> list[float]: """Generate 768-dimensional embedding from nomic-embed-text.""" - url = settings.EMBEDDER_URL - data = json.dumps({"model": settings.EMBEDDER_MODEL, "input": text}).encode() - headers = {"Content-Type": "application/json"} - if settings.EMBEDDER_KEY: - headers["Authorization"] = f"Bearer {settings.EMBEDDER_KEY}" - req = urllib.request.Request(url, data=data, headers=headers) - try: - with urllib.request.urlopen(req, timeout=10) as res: - resp_data = json.loads(res.read().decode())["data"][0] - return resp_data["embedding"] - except Exception as e: - print(f"Error getting query embedding: {e}") + emb = get_embedding( + text=text, + embedder_url=settings.EMBEDDER_URL, + embedder_model=settings.EMBEDDER_MODEL, + embedder_key=settings.EMBEDDER_KEY + ) + if emb is None: + print("Error getting query embedding for text") return [0.0] * 768 + return emb def hybrid_search_tables(query: str, query_embedding: list[float], session: Session, allowed_tables: list[str] | None = None, allowed_statuses: list[str] | None = None) -> list[Table]: """Hybrid search combining pgvector cosine distance and keyword matching.""" diff --git a/backend/app/routers/admin_approval.py b/backend/app/routers/admin_approval.py index 2c0774e..5142e75 100644 --- a/backend/app/routers/admin_approval.py +++ b/backend/app/routers/admin_approval.py @@ -1,9 +1,5 @@ import logging -from datetime import datetime - -from fastapi import APIRouter, Depends, Header, HTTPException -from pydantic import BaseModel -from sqlmodel import Session, desc, select +from datetime import UTC, datetime from core.db.engine import get_session from core.models.models import ( @@ -14,6 +10,10 @@ Table, TableStatus, ) +from fastapi import APIRouter, Depends, Header, HTTPException +from pydantic import BaseModel +from sqlmodel import Session, desc, select + from app.services.auth import require_admin from app.services.langfuse_client import langfuse_client @@ -178,7 +178,7 @@ def approve_table( # 1. Promote status table.status = TableStatus.production - table.updated_at = datetime.utcnow() + table.updated_at = datetime.now(UTC) session.add(table) session.commit() @@ -216,7 +216,7 @@ def reject_table( ) table.status = TableStatus.sandbox - table.updated_at = datetime.utcnow() + table.updated_at = datetime.now(UTC) session.add(table) session.commit() diff --git a/backend/app/routers/enrichment.py b/backend/app/routers/enrichment.py index 73cb37d..019a76d 100644 --- a/backend/app/routers/enrichment.py +++ b/backend/app/routers/enrichment.py @@ -1,8 +1,5 @@ import logging -from datetime import datetime - -from fastapi import APIRouter, Depends, HTTPException -from sqlmodel import Session, col, select +from datetime import UTC, datetime from core.db.engine import get_session from core.models.models import ( @@ -12,6 +9,8 @@ Table, TableStatus, ) +from fastapi import APIRouter, Depends, HTTPException +from sqlmodel import Session, col, select logger = logging.getLogger(__name__) @@ -61,7 +60,7 @@ def create_enrichment( f"({', '.join(changed_keys)}) → degraded." ) table.status = TableStatus.degraded - table.updated_at = datetime.utcnow() + table.updated_at = datetime.now(UTC) session.add(table) ev = EnrichmentVersion( diff --git a/backend/app/routers/evaluation.py b/backend/app/routers/evaluation.py index 1998aec..54ddd16 100644 --- a/backend/app/routers/evaluation.py +++ b/backend/app/routers/evaluation.py @@ -14,7 +14,7 @@ """ import logging -from datetime import datetime +from datetime import UTC, datetime from typing import Literal import requests @@ -186,7 +186,7 @@ def execute_single_table_eval(table_id: str, run_id: str, session: Session) -> f run.fail_rate = metrics.fail_rate run.total_questions = metrics.total_questions run.status = EvalStatus.completed - run.completed_at = datetime.utcnow() + run.completed_at = datetime.now(UTC) run.dimension_averages = { "contains_execution_accuracy": metrics.contains_execution_accuracy, "exact_execution_accuracy": metrics.exact_execution_accuracy, @@ -300,7 +300,7 @@ def _run_production_dataset_eval( run.fail_rate = metrics.fail_rate run.total_questions = metrics.total_questions run.status = EvalStatus.completed - run.completed_at = datetime.utcnow() + run.completed_at = datetime.now(UTC) run.dimension_averages = { "contains_execution_accuracy": metrics.contains_execution_accuracy, "exact_execution_accuracy": metrics.exact_execution_accuracy, @@ -383,7 +383,7 @@ def _run_candidate_eval( run.fail_rate = metrics.fail_rate run.total_questions = metrics.total_questions run.status = EvalStatus.completed - run.completed_at = datetime.utcnow() + run.completed_at = datetime.now(UTC) run.dimension_averages = { "contains_execution_accuracy": metrics.contains_execution_accuracy, "exact_execution_accuracy": metrics.exact_execution_accuracy, @@ -467,7 +467,7 @@ def _run_regression_eval( run.fail_rate = metrics.fail_rate run.total_questions = metrics.total_questions run.status = EvalStatus.completed - run.completed_at = datetime.utcnow() + run.completed_at = datetime.now(UTC) run.dimension_averages = { "contains_execution_accuracy": metrics.contains_execution_accuracy, "exact_execution_accuracy": metrics.exact_execution_accuracy, diff --git a/backend/app/routers/health.py b/backend/app/routers/health.py index 02410f1..6de6cc6 100644 --- a/backend/app/routers/health.py +++ b/backend/app/routers/health.py @@ -13,10 +13,7 @@ < 0.45 → critical """ -from datetime import datetime - -from fastapi import APIRouter, Depends, HTTPException -from sqlmodel import Session, select +from datetime import UTC, datetime from core.db.engine import get_session from core.models.models import ( @@ -32,6 +29,8 @@ TableHealthRead, TableProfile, ) +from fastapi import APIRouter, Depends, HTTPException +from sqlmodel import Session, select router = APIRouter(tags=["health"]) @@ -131,7 +130,7 @@ def _compute_health(table_id: str, session: Session) -> TableHealth: existing.failure_wrong_sql = failure_wrong_sql existing.failure_empty_result = failure_empty_result existing.failure_execution_error = failure_exec_error - existing.updated_at = datetime.utcnow() + existing.updated_at = datetime.now(UTC) session.add(existing) session.commit() session.refresh(existing) diff --git a/backend/app/routers/orchestration.py b/backend/app/routers/orchestration.py index de09cbe..0138abe 100644 --- a/backend/app/routers/orchestration.py +++ b/backend/app/routers/orchestration.py @@ -11,7 +11,7 @@ """ import random -from datetime import datetime, timedelta +from datetime import UTC, datetime, timedelta from core.db.engine import engine, get_session from core.models.models import ( @@ -464,7 +464,7 @@ def get_trends( session: Session = Depends(get_session), ): """Score and pass_rate over time. Returns one data point per completed run.""" - since = datetime.utcnow() - timedelta(days=days) + since = datetime.now(UTC) - timedelta(days=days) query = ( select(EvalRun) .where(EvalRun.status == EvalStatus.completed, EvalRun.created_at >= since) @@ -777,7 +777,7 @@ def system_health(session: Session = Depends(get_session)): "total_tables": len(total_tables), "production_tables": len(production_tables), "total_runs_today": sum( - 1 for r in all_runs if r.created_at.date() == datetime.utcnow().date() + 1 for r in all_runs if r.created_at.date() == datetime.now(UTC).date() ), "top_failing_tables": failing_tables[:5], "recent_runs": recent_runs, diff --git a/backend/app/routers/profiling.py b/backend/app/routers/profiling.py index 331cc2c..d71e098 100644 --- a/backend/app/routers/profiling.py +++ b/backend/app/routers/profiling.py @@ -7,7 +7,7 @@ import logging import traceback -from datetime import datetime, timedelta +from datetime import UTC, datetime, timedelta from typing import Any from core.db.engine import engine, get_session @@ -86,8 +86,8 @@ def _run_profile_job(table_id: str): profile.auto_insights = result.auto_insights profile.sample_data = result.sample_data profile.profile_json = result.profile_json - profile.cached_until = datetime.utcnow() + timedelta(hours=24) - profile.updated_at = datetime.utcnow() + profile.cached_until = datetime.now(UTC) + timedelta(hours=24) + profile.updated_at = datetime.now(UTC) session.add(profile) session.commit() session.refresh(profile) diff --git a/backend/app/routers/tables.py b/backend/app/routers/tables.py index a8541b0..aa9ebda 100644 --- a/backend/app/routers/tables.py +++ b/backend/app/routers/tables.py @@ -1,8 +1,9 @@ import logging -from datetime import datetime +from datetime import UTC, datetime import httpx from core.db.engine import get_session +from core.embeddings import get_embedding as core_get_embedding from core.models.models import ( EnrichmentVersion, ForeignKeyMapping, @@ -20,7 +21,6 @@ from app.config import settings from app.routers.evaluation import PRODUCTION_DATASET_NAME, _build_questions_payload -from app.seed import EXPECTED_EMBEDDING_DIM from app.services.langfuse_client import langfuse_client logger = logging.getLogger(__name__) @@ -48,6 +48,15 @@ def get_table_fqn(table: Table) -> str: return f"{table.service}.{table.catalog}.{table.schema_name}.{table.name}" +def get_embedding(text: str) -> list[float] | None: + return core_get_embedding( + text=text, + embedder_url=settings.EMBEDDER_URL, + embedder_model=settings.EMBEDDER_MODEL, + embedder_key=settings.EMBEDDER_KEY, + ) + + @router.get("", response_model=list[TableRead]) def list_tables( status: TableStatus | None = Query(default=None), @@ -129,27 +138,7 @@ def create_table(payload: TableCreate, session: Session = Depends(get_session)): # Generate embedding text_to_embed = f"Table name: {name}\nSchema: {schema_name}\nDescription: {description}\nColumns: {', '.join([c.get('name', '') for c in om_columns])}" - embedding = None - try: - headers = {} - if settings.EMBEDDER_KEY: - headers["Authorization"] = f"Bearer {settings.EMBEDDER_KEY}" - embed_resp = httpx.post( - settings.EMBEDDER_URL, - json={"model": settings.EMBEDDER_MODEL, "input": text_to_embed}, - headers=headers, - timeout=10.0, - ) - if embed_resp.status_code == 200: - resp_data = embed_resp.json()["data"][0] - embedding = resp_data.get("embedding") - if len(embedding) != EXPECTED_EMBEDDING_DIM: - raise ValueError( - f"Embedder returned embedding of length {len(embedding)}, " - f"expected {EXPECTED_EMBEDDING_DIM}" - ) - except Exception as e: - logger.warning(f"Failed to generate embedding for table {name}: {e}") + embedding = get_embedding(text_to_embed) # Create the table table = Table( @@ -249,29 +238,9 @@ def sync_table_schema(table_id: str, session: Session = Depends(get_session)): # Generate embedding text_to_embed = f"Table name: {name}\nSchema: {schema_name}\nDescription: {description}\nColumns: {', '.join([c.get('name', '') for c in om_columns])}" - try: - headers = {} - if settings.EMBEDDER_KEY: - headers["Authorization"] = f"Bearer {settings.EMBEDDER_KEY}" - embed_resp = httpx.post( - settings.EMBEDDER_URL, - json={"model": settings.EMBEDDER_MODEL, "input": text_to_embed}, - headers=headers, - timeout=10.0, - ) - if embed_resp.status_code == 200: - resp_data = embed_resp.json()["data"][0] - embedding = resp_data.get("embedding") - if len(embedding) != EXPECTED_EMBEDDING_DIM: - raise ValueError( - f"Embedder returned embedding of length {len(embedding)}, " - f"expected {EXPECTED_EMBEDDING_DIM}" - ) - table.embedding = embedding - except Exception as e: - logger.warning( - f"Failed to generate embedding for table {name} during sync: {e}" - ) + embedding = get_embedding(text_to_embed) + if embedding: + table.embedding = embedding # Update the table table.name = name @@ -279,7 +248,7 @@ def sync_table_schema(table_id: str, session: Session = Depends(get_session)): table.catalog = catalog_name table.service = service_name table.openmetadata_json = data - table.updated_at = datetime.utcnow() + table.updated_at = datetime.now(UTC) session.add(table) @@ -336,7 +305,7 @@ def update_table_status( previous_status = table.status table.status = status - table.updated_at = datetime.utcnow() + table.updated_at = datetime.now(UTC) session.add(table) session.commit() session.refresh(table) @@ -416,7 +385,7 @@ def create_foreign_key( if existing: existing.target_table_id = payload.target_table_id existing.target_column = payload.target_column - existing.updated_at = datetime.utcnow() + existing.updated_at = datetime.now(UTC) session.add(existing) session.commit() session.refresh(existing) diff --git a/backend/app/seed.py b/backend/app/seed.py index 2c1b33f..4bf9558 100644 --- a/backend/app/seed.py +++ b/backend/app/seed.py @@ -1,7 +1,5 @@ -import json -import urllib.request - from core.db.engine import create_db_and_tables, engine +from core.embeddings import EXPECTED_EMBEDDING_DIM, get_embedding as core_get_embedding from core.models.models import ( DifficultyLevel, EnrichmentVersion, @@ -18,29 +16,18 @@ from app.config import settings -EXPECTED_EMBEDDING_DIM = 768 - def get_embedding(text: str) -> list[float]: - url = settings.EMBEDDER_URL - data = json.dumps({"model": settings.EMBEDDER_MODEL, "input": text}).encode() - headers = {"Content-Type": "application/json"} - if settings.EMBEDDER_KEY: - headers["Authorization"] = f"Bearer {settings.EMBEDDER_KEY}" - req = urllib.request.Request(url, data=data, headers=headers) - try: - with urllib.request.urlopen(req, timeout=10) as res: - resp_data = json.loads(res.read().decode()) - embedding = resp_data["data"][0]["embedding"] - if len(embedding) != EXPECTED_EMBEDDING_DIM: - raise ValueError( - f"Embedder returned embedding of length {len(embedding)}, " - f"expected {EXPECTED_EMBEDDING_DIM}" - ) - return embedding - except Exception as e: - print(f"Error getting embedding: {e}") + emb = core_get_embedding( + text=text, + embedder_url=settings.EMBEDDER_URL, + embedder_model=settings.EMBEDDER_MODEL, + embedder_key=settings.EMBEDDER_KEY, + ) + if emb is None: + print("Error getting embedding for text") return [0.0] * EXPECTED_EMBEDDING_DIM + return emb def seed(): diff --git a/backend/app/services/profiling_engine.py b/backend/app/services/profiling_engine.py index d731788..3894a7c 100644 --- a/backend/app/services/profiling_engine.py +++ b/backend/app/services/profiling_engine.py @@ -9,7 +9,7 @@ import concurrent.futures import logging from dataclasses import dataclass, field -from datetime import date, datetime +from datetime import UTC, date, datetime from decimal import Decimal from typing import Any @@ -865,7 +865,7 @@ def run_table_profiling( Never does full column scans for numeric stats. """ fqn = _fqn(catalog, schema, table) - computed_at = datetime.utcnow() + computed_at = datetime.now(UTC) result = TableProfilingResult( table_id=table_id, table_fqn=fqn, diff --git a/backend/app/services/scheduler.py b/backend/app/services/scheduler.py index 08fcd95..eca2d5c 100644 --- a/backend/app/services/scheduler.py +++ b/backend/app/services/scheduler.py @@ -4,12 +4,10 @@ """ import logging -from datetime import datetime +from datetime import UTC, datetime from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.triggers.cron import CronTrigger -from sqlmodel import Session, select - from core.db.engine import engine from core.models.models import ( EvalRun, @@ -17,6 +15,7 @@ GoldenQuestion, Table, ) +from sqlmodel import Session, select logger = logging.getLogger(__name__) @@ -68,7 +67,7 @@ def _run_scheduled_evaluation(schedule_id: str): valid_table_ids.append(table_id) # Update schedule timestamps - schedule.last_run_at = datetime.utcnow() + schedule.last_run_at = datetime.now(UTC) session.add(schedule) session.commit() diff --git a/core/src/core/embeddings.py b/core/src/core/embeddings.py new file mode 100644 index 0000000..93e6780 --- /dev/null +++ b/core/src/core/embeddings.py @@ -0,0 +1,33 @@ +import json +import logging +import urllib.request + +logger = logging.getLogger(__name__) + +EXPECTED_EMBEDDING_DIM = 768 + +def get_embedding( + text: str, + embedder_url: str, + embedder_model: str, + embedder_key: str | None = None +) -> list[float] | None: + data = json.dumps({"model": embedder_model, "input": text}).encode() + headers = {"Content-Type": "application/json"} + if embedder_key: + headers["Authorization"] = f"Bearer {embedder_key}" + + req = urllib.request.Request(embedder_url, data=data, headers=headers) + try: + with urllib.request.urlopen(req, timeout=10) as res: + resp_data = json.loads(res.read().decode())[data][0] + embedding = resp_data.get("embedding") + if not embedding or len(embedding) != EXPECTED_EMBEDDING_DIM: + raise ValueError( + f"Embedder returned embedding of length {len(embedding) if embedding else 'None'}, " + f"expected {EXPECTED_EMBEDDING_DIM}" + ) + return embedding + except Exception as e: + logger.warning(f"Failed to generate embedding: {e}") + return None diff --git a/core/src/core/models/models.py b/core/src/core/models/models.py index 84b0d17..c4cb62f 100644 --- a/core/src/core/models/models.py +++ b/core/src/core/models/models.py @@ -1,5 +1,5 @@ import uuid -from datetime import datetime +from datetime import datetime, timezone from enum import StrEnum from typing import Any @@ -29,8 +29,8 @@ class Table(SQLModel, table=True): service: str = Field(default="trino_ingestion") openmetadata_json: Any | None = Field(default=None, sa_column=Column(JSON)) embedding: Any | None = Field(default=None, sa_column=Column(Vector(768))) - created_at: datetime = Field(default_factory=datetime.utcnow) - updated_at: datetime = Field(default_factory=datetime.utcnow) + created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) class TableCreate(SQLModel): @@ -63,7 +63,7 @@ class EnrichmentVersion(SQLModel, table=True): ) version: int = Field(default=1) data: Any | None = Field(default=None, sa_column=Column(JSON)) - created_at: datetime = Field(default_factory=datetime.utcnow) + created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) class EnrichmentCreate(SQLModel): @@ -109,7 +109,7 @@ class GoldenQuestion(SQLModel, table=True): difficulty: DifficultyLevel = Field(default=DifficultyLevel.simple) question_type: QuestionType = Field(default=QuestionType.simple) coverage_tags: list[str] | None = Field(default=None, sa_column=Column(JSON)) - created_at: datetime = Field(default_factory=datetime.utcnow) + created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) class GoldenQuestionCreate(SQLModel): @@ -155,7 +155,7 @@ class EvalRun(SQLModel, table=True): duration_seconds: float | None = None triggered_by: str = Field(default="user") # "user" | "scheduler" | "system" status: EvalStatus = Field(default=EvalStatus.running) - started_at: datetime = Field(default_factory=datetime.utcnow) + started_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) completed_at: datetime | None = None failure_breakdown: Any | None = Field(default=None, sa_column=Column(JSON)) dimension_averages: Any | None = Field(default=None, sa_column=Column(JSON)) @@ -164,7 +164,7 @@ class EvalRun(SQLModel, table=True): promotion_run_id: str | None = Field( default=None, index=True ) # set on regression runs - created_at: datetime = Field(default_factory=datetime.utcnow) + created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) class EvalRunRead(SQLModel): @@ -205,7 +205,7 @@ class EvaluationSchedule(SQLModel, table=True): cron_expression: str = Field(default="0 2 * * *") # daily at 2am enabled: bool = Field(default=True) created_by: str = Field(default="user") - created_at: datetime = Field(default_factory=datetime.utcnow) + created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) last_run_at: datetime | None = None next_run_at: datetime | None = None @@ -254,7 +254,7 @@ class EvaluationHistoryMetric(SQLModel, table=True): ) metric_name: str # e.g. "score", "pass_rate", "wrong_table_count" metric_value: float - created_at: datetime = Field(default_factory=datetime.utcnow) + created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) class EvaluationHistoryMetricRead(SQLModel): @@ -293,7 +293,7 @@ class EvaluationAlert(SQLModel, table=True): message: str details: Any | None = Field(default=None, sa_column=Column(JSON)) acknowledged: bool = Field(default=False) - created_at: datetime = Field(default_factory=datetime.utcnow) + created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) class EvaluationAlertRead(SQLModel): @@ -393,7 +393,7 @@ class AuditQuery(SQLModel, table=True): confidence_score: float | None = None explanation_text: str | None = None warnings_json: list[str] | None = Field(default=None, sa_column=Column(JSON)) - created_at: datetime = Field(default_factory=datetime.utcnow) + created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) class AuditQueryRead(SQLModel): @@ -456,8 +456,8 @@ class TableProfile(SQLModel, table=True): default=None, sa_column=Column(JSON) ) # full structured profile cached_until: datetime | None = None - created_at: datetime = Field(default_factory=datetime.utcnow) - updated_at: datetime = Field(default_factory=datetime.utcnow) + created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) class TableProfileRead(SQLModel): @@ -511,7 +511,7 @@ class ColumnProfile(SQLModel, table=True): stats_json: Any | None = Field( default=None, sa_column=Column(JSON) ) # full stats blob - created_at: datetime = Field(default_factory=datetime.utcnow) + created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) class ColumnProfileRead(SQLModel): @@ -552,7 +552,7 @@ class CrossTableProfile(SQLModel, table=True): join_suggestion: str | None = None # e.g. "source.user_id = target.id" match_strength: str = Field(default="weak") # "strong" | "weak" common_columns: list[str] | None = Field(default=None, sa_column=Column(JSON)) - created_at: datetime = Field(default_factory=datetime.utcnow) + created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) class CrossTableProfileRead(SQLModel): @@ -596,7 +596,7 @@ class QueryFeedback(SQLModel, table=True): rating: FeedbackRating comment: str | None = None suggested_correction: str | None = None - created_at: datetime = Field(default_factory=datetime.utcnow) + created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) class QueryFeedbackCreate(SQLModel): @@ -652,7 +652,7 @@ class TableHealth(SQLModel, table=True): failure_wrong_sql: int = Field(default=0) failure_empty_result: int = Field(default=0) failure_execution_error: int = Field(default=0) - updated_at: datetime = Field(default_factory=datetime.utcnow) + updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) class TableHealthRead(SQLModel): @@ -685,8 +685,8 @@ class SecurityUser(SQLModel, table=True): name: str is_active: bool = Field(default=True) is_admin: bool = Field(default=False) - created_at: datetime = Field(default_factory=datetime.utcnow) - updated_at: datetime = Field(default_factory=datetime.utcnow) + created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) class SecurityUserRead(SQLModel): @@ -719,8 +719,8 @@ class ForeignKeyMapping(SQLModel, table=True): sa_column_args=[ForeignKey("tables.id", ondelete="CASCADE", onupdate="CASCADE")] ) target_column: str - created_at: datetime = Field(default_factory=datetime.utcnow) - updated_at: datetime = Field(default_factory=datetime.utcnow) + created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) class ForeignKeyMappingCreate(SQLModel): @@ -758,8 +758,8 @@ class HttpExtractor(SQLModel, table=True): url: str description: str | None = None status: ExtractorStatus = Field(default=ExtractorStatus.draft) - created_at: datetime = Field(default_factory=datetime.utcnow) - updated_at: datetime = Field(default_factory=datetime.utcnow) + created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) class HttpExtractorCreate(SQLModel): name: str From 422b05db256c2f3182723ef52382ab9a3581d98b Mon Sep 17 00:00:00 2001 From: yuvalkh Date: Mon, 22 Jun 2026 15:37:29 +0300 Subject: [PATCH 6/6] updated time to be naive instead of utc --- backend/app/routers/admin_approval.py | 6 ++-- backend/app/routers/enrichment.py | 4 +-- backend/app/routers/evaluation.py | 10 +++--- backend/app/routers/health.py | 4 +-- backend/app/routers/orchestration.py | 6 ++-- backend/app/routers/profiling.py | 6 ++-- backend/app/routers/tables.py | 8 ++--- backend/app/services/profiling_engine.py | 4 +-- backend/app/services/scheduler.py | 4 +-- core/src/core/embeddings.py | 2 +- core/src/core/models/models.py | 46 ++++++++++++------------ 11 files changed, 50 insertions(+), 50 deletions(-) diff --git a/backend/app/routers/admin_approval.py b/backend/app/routers/admin_approval.py index 5142e75..ccdb791 100644 --- a/backend/app/routers/admin_approval.py +++ b/backend/app/routers/admin_approval.py @@ -1,5 +1,5 @@ import logging -from datetime import UTC, datetime +from datetime import datetime from core.db.engine import get_session from core.models.models import ( @@ -178,7 +178,7 @@ def approve_table( # 1. Promote status table.status = TableStatus.production - table.updated_at = datetime.now(UTC) + table.updated_at = datetime.now() session.add(table) session.commit() @@ -216,7 +216,7 @@ def reject_table( ) table.status = TableStatus.sandbox - table.updated_at = datetime.now(UTC) + table.updated_at = datetime.now() session.add(table) session.commit() diff --git a/backend/app/routers/enrichment.py b/backend/app/routers/enrichment.py index 019a76d..46136a9 100644 --- a/backend/app/routers/enrichment.py +++ b/backend/app/routers/enrichment.py @@ -1,5 +1,5 @@ import logging -from datetime import UTC, datetime +from datetime import datetime from core.db.engine import get_session from core.models.models import ( @@ -60,7 +60,7 @@ def create_enrichment( f"({', '.join(changed_keys)}) → degraded." ) table.status = TableStatus.degraded - table.updated_at = datetime.now(UTC) + table.updated_at = datetime.now() session.add(table) ev = EnrichmentVersion( diff --git a/backend/app/routers/evaluation.py b/backend/app/routers/evaluation.py index 54ddd16..70f8c2b 100644 --- a/backend/app/routers/evaluation.py +++ b/backend/app/routers/evaluation.py @@ -14,7 +14,7 @@ """ import logging -from datetime import UTC, datetime +from datetime import datetime from typing import Literal import requests @@ -186,7 +186,7 @@ def execute_single_table_eval(table_id: str, run_id: str, session: Session) -> f run.fail_rate = metrics.fail_rate run.total_questions = metrics.total_questions run.status = EvalStatus.completed - run.completed_at = datetime.now(UTC) + run.completed_at = datetime.now() run.dimension_averages = { "contains_execution_accuracy": metrics.contains_execution_accuracy, "exact_execution_accuracy": metrics.exact_execution_accuracy, @@ -300,7 +300,7 @@ def _run_production_dataset_eval( run.fail_rate = metrics.fail_rate run.total_questions = metrics.total_questions run.status = EvalStatus.completed - run.completed_at = datetime.now(UTC) + run.completed_at = datetime.now() run.dimension_averages = { "contains_execution_accuracy": metrics.contains_execution_accuracy, "exact_execution_accuracy": metrics.exact_execution_accuracy, @@ -383,7 +383,7 @@ def _run_candidate_eval( run.fail_rate = metrics.fail_rate run.total_questions = metrics.total_questions run.status = EvalStatus.completed - run.completed_at = datetime.now(UTC) + run.completed_at = datetime.now() run.dimension_averages = { "contains_execution_accuracy": metrics.contains_execution_accuracy, "exact_execution_accuracy": metrics.exact_execution_accuracy, @@ -467,7 +467,7 @@ def _run_regression_eval( run.fail_rate = metrics.fail_rate run.total_questions = metrics.total_questions run.status = EvalStatus.completed - run.completed_at = datetime.now(UTC) + run.completed_at = datetime.now() run.dimension_averages = { "contains_execution_accuracy": metrics.contains_execution_accuracy, "exact_execution_accuracy": metrics.exact_execution_accuracy, diff --git a/backend/app/routers/health.py b/backend/app/routers/health.py index 6de6cc6..87881f9 100644 --- a/backend/app/routers/health.py +++ b/backend/app/routers/health.py @@ -13,7 +13,7 @@ < 0.45 → critical """ -from datetime import UTC, datetime +from datetime import datetime from core.db.engine import get_session from core.models.models import ( @@ -130,7 +130,7 @@ def _compute_health(table_id: str, session: Session) -> TableHealth: existing.failure_wrong_sql = failure_wrong_sql existing.failure_empty_result = failure_empty_result existing.failure_execution_error = failure_exec_error - existing.updated_at = datetime.now(UTC) + existing.updated_at = datetime.now() session.add(existing) session.commit() session.refresh(existing) diff --git a/backend/app/routers/orchestration.py b/backend/app/routers/orchestration.py index 0138abe..d4bc437 100644 --- a/backend/app/routers/orchestration.py +++ b/backend/app/routers/orchestration.py @@ -11,7 +11,7 @@ """ import random -from datetime import UTC, datetime, timedelta +from datetime import datetime, timedelta from core.db.engine import engine, get_session from core.models.models import ( @@ -464,7 +464,7 @@ def get_trends( session: Session = Depends(get_session), ): """Score and pass_rate over time. Returns one data point per completed run.""" - since = datetime.now(UTC) - timedelta(days=days) + since = datetime.now() - timedelta(days=days) query = ( select(EvalRun) .where(EvalRun.status == EvalStatus.completed, EvalRun.created_at >= since) @@ -777,7 +777,7 @@ def system_health(session: Session = Depends(get_session)): "total_tables": len(total_tables), "production_tables": len(production_tables), "total_runs_today": sum( - 1 for r in all_runs if r.created_at.date() == datetime.now(UTC).date() + 1 for r in all_runs if r.created_at.date() == datetime.now().date() ), "top_failing_tables": failing_tables[:5], "recent_runs": recent_runs, diff --git a/backend/app/routers/profiling.py b/backend/app/routers/profiling.py index d71e098..96bdf24 100644 --- a/backend/app/routers/profiling.py +++ b/backend/app/routers/profiling.py @@ -7,7 +7,7 @@ import logging import traceback -from datetime import UTC, datetime, timedelta +from datetime import datetime, timedelta from typing import Any from core.db.engine import engine, get_session @@ -86,8 +86,8 @@ def _run_profile_job(table_id: str): profile.auto_insights = result.auto_insights profile.sample_data = result.sample_data profile.profile_json = result.profile_json - profile.cached_until = datetime.now(UTC) + timedelta(hours=24) - profile.updated_at = datetime.now(UTC) + profile.cached_until = datetime.now() + timedelta(hours=24) + profile.updated_at = datetime.now() session.add(profile) session.commit() session.refresh(profile) diff --git a/backend/app/routers/tables.py b/backend/app/routers/tables.py index aa9ebda..ddca812 100644 --- a/backend/app/routers/tables.py +++ b/backend/app/routers/tables.py @@ -1,5 +1,5 @@ import logging -from datetime import UTC, datetime +from datetime import datetime import httpx from core.db.engine import get_session @@ -248,7 +248,7 @@ def sync_table_schema(table_id: str, session: Session = Depends(get_session)): table.catalog = catalog_name table.service = service_name table.openmetadata_json = data - table.updated_at = datetime.now(UTC) + table.updated_at = datetime.now() session.add(table) @@ -305,7 +305,7 @@ def update_table_status( previous_status = table.status table.status = status - table.updated_at = datetime.now(UTC) + table.updated_at = datetime.now() session.add(table) session.commit() session.refresh(table) @@ -385,7 +385,7 @@ def create_foreign_key( if existing: existing.target_table_id = payload.target_table_id existing.target_column = payload.target_column - existing.updated_at = datetime.now(UTC) + existing.updated_at = datetime.now() session.add(existing) session.commit() session.refresh(existing) diff --git a/backend/app/services/profiling_engine.py b/backend/app/services/profiling_engine.py index 3894a7c..3f54100 100644 --- a/backend/app/services/profiling_engine.py +++ b/backend/app/services/profiling_engine.py @@ -9,7 +9,7 @@ import concurrent.futures import logging from dataclasses import dataclass, field -from datetime import UTC, date, datetime +from datetime import date, datetime from decimal import Decimal from typing import Any @@ -865,7 +865,7 @@ def run_table_profiling( Never does full column scans for numeric stats. """ fqn = _fqn(catalog, schema, table) - computed_at = datetime.now(UTC) + computed_at = datetime.now() result = TableProfilingResult( table_id=table_id, table_fqn=fqn, diff --git a/backend/app/services/scheduler.py b/backend/app/services/scheduler.py index eca2d5c..909e607 100644 --- a/backend/app/services/scheduler.py +++ b/backend/app/services/scheduler.py @@ -4,7 +4,7 @@ """ import logging -from datetime import UTC, datetime +from datetime import datetime from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.triggers.cron import CronTrigger @@ -67,7 +67,7 @@ def _run_scheduled_evaluation(schedule_id: str): valid_table_ids.append(table_id) # Update schedule timestamps - schedule.last_run_at = datetime.now(UTC) + schedule.last_run_at = datetime.now() session.add(schedule) session.commit() diff --git a/core/src/core/embeddings.py b/core/src/core/embeddings.py index 93e6780..5fe6d49 100644 --- a/core/src/core/embeddings.py +++ b/core/src/core/embeddings.py @@ -20,7 +20,7 @@ def get_embedding( req = urllib.request.Request(embedder_url, data=data, headers=headers) try: with urllib.request.urlopen(req, timeout=10) as res: - resp_data = json.loads(res.read().decode())[data][0] + resp_data = json.loads(res.read().decode())["data"][0] embedding = resp_data.get("embedding") if not embedding or len(embedding) != EXPECTED_EMBEDDING_DIM: raise ValueError( diff --git a/core/src/core/models/models.py b/core/src/core/models/models.py index c4cb62f..231313b 100644 --- a/core/src/core/models/models.py +++ b/core/src/core/models/models.py @@ -1,5 +1,5 @@ import uuid -from datetime import datetime, timezone +from datetime import datetime from enum import StrEnum from typing import Any @@ -29,8 +29,8 @@ class Table(SQLModel, table=True): service: str = Field(default="trino_ingestion") openmetadata_json: Any | None = Field(default=None, sa_column=Column(JSON)) embedding: Any | None = Field(default=None, sa_column=Column(Vector(768))) - created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) - updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + created_at: datetime = Field(default_factory=datetime.now) + updated_at: datetime = Field(default_factory=datetime.now) class TableCreate(SQLModel): @@ -63,7 +63,7 @@ class EnrichmentVersion(SQLModel, table=True): ) version: int = Field(default=1) data: Any | None = Field(default=None, sa_column=Column(JSON)) - created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + created_at: datetime = Field(default_factory=datetime.now) class EnrichmentCreate(SQLModel): @@ -109,7 +109,7 @@ class GoldenQuestion(SQLModel, table=True): difficulty: DifficultyLevel = Field(default=DifficultyLevel.simple) question_type: QuestionType = Field(default=QuestionType.simple) coverage_tags: list[str] | None = Field(default=None, sa_column=Column(JSON)) - created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + created_at: datetime = Field(default_factory=datetime.now) class GoldenQuestionCreate(SQLModel): @@ -155,7 +155,7 @@ class EvalRun(SQLModel, table=True): duration_seconds: float | None = None triggered_by: str = Field(default="user") # "user" | "scheduler" | "system" status: EvalStatus = Field(default=EvalStatus.running) - started_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + started_at: datetime = Field(default_factory=datetime.now) completed_at: datetime | None = None failure_breakdown: Any | None = Field(default=None, sa_column=Column(JSON)) dimension_averages: Any | None = Field(default=None, sa_column=Column(JSON)) @@ -164,7 +164,7 @@ class EvalRun(SQLModel, table=True): promotion_run_id: str | None = Field( default=None, index=True ) # set on regression runs - created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + created_at: datetime = Field(default_factory=datetime.now) class EvalRunRead(SQLModel): @@ -205,7 +205,7 @@ class EvaluationSchedule(SQLModel, table=True): cron_expression: str = Field(default="0 2 * * *") # daily at 2am enabled: bool = Field(default=True) created_by: str = Field(default="user") - created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + created_at: datetime = Field(default_factory=datetime.now) last_run_at: datetime | None = None next_run_at: datetime | None = None @@ -254,7 +254,7 @@ class EvaluationHistoryMetric(SQLModel, table=True): ) metric_name: str # e.g. "score", "pass_rate", "wrong_table_count" metric_value: float - created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + created_at: datetime = Field(default_factory=datetime.now) class EvaluationHistoryMetricRead(SQLModel): @@ -293,7 +293,7 @@ class EvaluationAlert(SQLModel, table=True): message: str details: Any | None = Field(default=None, sa_column=Column(JSON)) acknowledged: bool = Field(default=False) - created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + created_at: datetime = Field(default_factory=datetime.now) class EvaluationAlertRead(SQLModel): @@ -393,7 +393,7 @@ class AuditQuery(SQLModel, table=True): confidence_score: float | None = None explanation_text: str | None = None warnings_json: list[str] | None = Field(default=None, sa_column=Column(JSON)) - created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + created_at: datetime = Field(default_factory=datetime.now) class AuditQueryRead(SQLModel): @@ -456,8 +456,8 @@ class TableProfile(SQLModel, table=True): default=None, sa_column=Column(JSON) ) # full structured profile cached_until: datetime | None = None - created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) - updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + created_at: datetime = Field(default_factory=datetime.now) + updated_at: datetime = Field(default_factory=datetime.now) class TableProfileRead(SQLModel): @@ -511,7 +511,7 @@ class ColumnProfile(SQLModel, table=True): stats_json: Any | None = Field( default=None, sa_column=Column(JSON) ) # full stats blob - created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + created_at: datetime = Field(default_factory=datetime.now) class ColumnProfileRead(SQLModel): @@ -552,7 +552,7 @@ class CrossTableProfile(SQLModel, table=True): join_suggestion: str | None = None # e.g. "source.user_id = target.id" match_strength: str = Field(default="weak") # "strong" | "weak" common_columns: list[str] | None = Field(default=None, sa_column=Column(JSON)) - created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + created_at: datetime = Field(default_factory=datetime.now) class CrossTableProfileRead(SQLModel): @@ -596,7 +596,7 @@ class QueryFeedback(SQLModel, table=True): rating: FeedbackRating comment: str | None = None suggested_correction: str | None = None - created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + created_at: datetime = Field(default_factory=datetime.now) class QueryFeedbackCreate(SQLModel): @@ -652,7 +652,7 @@ class TableHealth(SQLModel, table=True): failure_wrong_sql: int = Field(default=0) failure_empty_result: int = Field(default=0) failure_execution_error: int = Field(default=0) - updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + updated_at: datetime = Field(default_factory=datetime.now) class TableHealthRead(SQLModel): @@ -685,8 +685,8 @@ class SecurityUser(SQLModel, table=True): name: str is_active: bool = Field(default=True) is_admin: bool = Field(default=False) - created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) - updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + created_at: datetime = Field(default_factory=datetime.now) + updated_at: datetime = Field(default_factory=datetime.now) class SecurityUserRead(SQLModel): @@ -719,8 +719,8 @@ class ForeignKeyMapping(SQLModel, table=True): sa_column_args=[ForeignKey("tables.id", ondelete="CASCADE", onupdate="CASCADE")] ) target_column: str - created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) - updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + created_at: datetime = Field(default_factory=datetime.now) + updated_at: datetime = Field(default_factory=datetime.now) class ForeignKeyMappingCreate(SQLModel): @@ -758,8 +758,8 @@ class HttpExtractor(SQLModel, table=True): url: str description: str | None = None status: ExtractorStatus = Field(default=ExtractorStatus.draft) - created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) - updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + created_at: datetime = Field(default_factory=datetime.now) + updated_at: datetime = Field(default_factory=datetime.now) class HttpExtractorCreate(SQLModel): name: str