From 61393822372cead3169e75a2dcb08115fb226378 Mon Sep 17 00:00:00 2001 From: houfu Date: Fri, 17 Apr 2026 08:19:21 +0800 Subject: [PATCH 1/3] Structured error reporting for build/deploy MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Resource failures now surface with full tracebacks and per-resource attribution instead of flat error strings, and the output is designed for both humans (rich table, colored streaming lines) and AI agents (--json, stable [OK]/[FAIL]/[SKIP] prefixes, CWD-relative paths in tracebacks). Core changes: - types.py: add ResourceOutcome / BuildReport dataclasses; extend ValidationResult additively with tracebacks, report, records - processor.py, deployer.py: capture traceback.format_exc() at every except, so real Python stacks survive to the CLI - builder.py: aggregate a BuildReport with per-resource timings and status; accept an optional progress_callback so the CLI can drive streaming output - helpers.py: render_build_report / render_resource_event — JSON, rich-table TTY, and plain non-TTY outputs share a single schema - cli.py: -v/--verbose and --json on build and deploy; documented exit codes (0 success, 1 resource failure, 2 fatal) Based on user feedback from a week of real builds, two watchability features go in with the refactor: - --fail-on-empty: treats resources that returned [] as failures (exit 1). Addresses the silent-success case where a resource swallows its own exception and produces zero rows. - --progress-file : atomically overwrites a JSON snapshot of BuildReport after each resource finishes. Lets trigger-and-wait callers distinguish "host asleep" from "build running" without parsing stdout. Test suite updated (54 CLI tests, 353 total) and rich>=13.0 added. Cross-resource parallelism and --post-hook explicitly left for follow-up. Co-Authored-By: Claude Opus 4.7 (1M context) --- packages/zeeker/pyproject.toml | 1 + packages/zeeker/tests/test_cli.py | 192 ++++++++++--- packages/zeeker/zeeker/cli.py | 186 ++++++++++--- packages/zeeker/zeeker/commands/helpers.py | 252 +++++++++++++++++- .../zeeker/zeeker/core/database/builder.py | 105 +++++++- .../zeeker/zeeker/core/database/processor.py | 8 + packages/zeeker/zeeker/core/deployer.py | 2 + packages/zeeker/zeeker/core/project.py | 18 +- packages/zeeker/zeeker/core/types.py | 40 ++- uv.lock | 36 +++ 10 files changed, 748 insertions(+), 92 deletions(-) diff --git a/packages/zeeker/pyproject.toml b/packages/zeeker/pyproject.toml index fb18d6e..aa97e19 100644 --- a/packages/zeeker/pyproject.toml +++ b/packages/zeeker/pyproject.toml @@ -21,6 +21,7 @@ dependencies = [ "jinja2>=3.1.6", "python-dotenv>=1.0.0", "pyyaml>=6.0.2", + "rich>=13.0", "sqlite-utils>=3.38", ] diff --git a/packages/zeeker/tests/test_cli.py b/packages/zeeker/tests/test_cli.py index f08833e..eb9ed4b 100644 --- a/packages/zeeker/tests/test_cli.py +++ b/packages/zeeker/tests/test_cli.py @@ -7,7 +7,12 @@ from click.testing import CliRunner from zeeker.cli import cli -from zeeker.core.types import ValidationResult, ZeekerSchemaConflictError +from zeeker.core.types import ( + BuildReport, + ResourceOutcome, + ValidationResult, + ZeekerSchemaConflictError, +) class TestCLIInit: @@ -181,77 +186,107 @@ def mock_manager(self): mock_class.return_value = mock_instance yield mock_instance + @staticmethod + def _assert_build_kwargs(mock_manager, **expected): + """Assert build_database was called with the expected kwargs (ignoring the + progress_callback, which is a closure the CLI constructs internally).""" + assert mock_manager.build_database.call_count == 1 + kwargs = mock_manager.build_database.call_args.kwargs + for key, value in expected.items(): + assert kwargs[key] == value, f"{key}: expected {value!r}, got {kwargs[key]!r}" + def test_build_successful(self, runner, mock_manager): """Test successful database build.""" mock_result = ValidationResult(is_valid=True) - mock_result.info.extend( - [ - "Built table: users (150 records)", - "Built table: posts (200 records)", - "Generated metadata.json", - ] + mock_result.report = BuildReport( + resources=[ResourceOutcome(name="users", status="success", records=150)], + total_duration_s=0.1, ) mock_manager.build_database.return_value = mock_result result = runner.invoke(cli, ["build"]) assert result.exit_code == 0 - for info in mock_result.info: - assert f"✅ {info}" in result.output - mock_manager.build_database.assert_called_once_with( - force_schema_reset=False, sync_from_s3=False, resources=None, setup_fts=False + # Plain-mode renderer emits a SUMMARY line + assert "SUMMARY: 1 succeeded" in result.output + self._assert_build_kwargs( + mock_manager, + force_schema_reset=False, + sync_from_s3=False, + resources=None, + setup_fts=False, ) def test_build_with_force_schema_reset(self, runner, mock_manager): """Test build with force schema reset flag.""" mock_result = ValidationResult(is_valid=True) + mock_result.report = BuildReport() mock_manager.build_database.return_value = mock_result result = runner.invoke(cli, ["build", "--force-schema-reset"]) assert result.exit_code == 0 - mock_manager.build_database.assert_called_once_with( - force_schema_reset=True, sync_from_s3=False, resources=None, setup_fts=False + self._assert_build_kwargs( + mock_manager, + force_schema_reset=True, + sync_from_s3=False, + resources=None, + setup_fts=False, ) def test_build_with_s3_sync(self, runner, mock_manager): """Test build with S3 sync flag.""" mock_result = ValidationResult(is_valid=True) + mock_result.report = BuildReport() mock_manager.build_database.return_value = mock_result result = runner.invoke(cli, ["build", "--sync-from-s3"]) assert result.exit_code == 0 - mock_manager.build_database.assert_called_once_with( - force_schema_reset=False, sync_from_s3=True, resources=None, setup_fts=False + self._assert_build_kwargs( + mock_manager, + force_schema_reset=False, + sync_from_s3=True, + resources=None, + setup_fts=False, ) def test_build_with_both_flags(self, runner, mock_manager): """Test build with both flags.""" mock_result = ValidationResult(is_valid=True) + mock_result.report = BuildReport() mock_manager.build_database.return_value = mock_result result = runner.invoke(cli, ["build", "--force-schema-reset", "--sync-from-s3"]) assert result.exit_code == 0 - mock_manager.build_database.assert_called_once_with( - force_schema_reset=True, sync_from_s3=True, resources=None, setup_fts=False + self._assert_build_kwargs( + mock_manager, + force_schema_reset=True, + sync_from_s3=True, + resources=None, + setup_fts=False, ) def test_build_with_setup_fts(self, runner, mock_manager): """Test build with setup-fts flag.""" mock_result = ValidationResult(is_valid=True) + mock_result.report = BuildReport() mock_manager.build_database.return_value = mock_result result = runner.invoke(cli, ["build", "--setup-fts"]) assert result.exit_code == 0 - mock_manager.build_database.assert_called_once_with( - force_schema_reset=False, sync_from_s3=False, resources=None, setup_fts=True + self._assert_build_kwargs( + mock_manager, + force_schema_reset=False, + sync_from_s3=False, + resources=None, + setup_fts=True, ) def test_build_schema_conflict_error(self, runner, mock_manager): - """Test build with schema conflict error.""" + """Schema conflict is a fatal error (exit code 2) routed through the renderer.""" mock_manager.build_database.side_effect = ZeekerSchemaConflictError( "users", {"id": "INTEGER", "name": "TEXT"}, @@ -260,44 +295,48 @@ def test_build_schema_conflict_error(self, runner, mock_manager): result = runner.invoke(cli, ["build"]) - assert result.exit_code == 0 # CLI doesn't exit with error code, just returns early - assert "❌ Schema conflict detected" in result.output + assert result.exit_code == 2 + # Plain-mode renderer emits FATAL: prefix with the conflict message + assert "FATAL:" in result.output assert "users" in result.output def test_build_general_error(self, runner, mock_manager): - """Test build with general error.""" + """A ValidationResult with errors but no report is treated as fatal.""" mock_result = ValidationResult(is_valid=False) mock_result.errors.append("Database file is locked") mock_manager.build_database.return_value = mock_result result = runner.invoke(cli, ["build"]) - assert result.exit_code == 1 # Build errors should exit with code 1 - assert "❌ Database build failed" in result.output + assert result.exit_code == 2 + assert "FATAL:" in result.output assert "Database file is locked" in result.output - def test_build_displays_completion_info(self, runner, mock_manager): - """Test that build completion info is displayed.""" + def test_build_displays_summary(self, runner, mock_manager): + """Build completion shows the structured SUMMARY line.""" mock_result = ValidationResult(is_valid=True) + mock_result.report = BuildReport(total_duration_s=0.0) mock_manager.build_database.return_value = mock_result result = runner.invoke(cli, ["build"]) assert result.exit_code == 0 - assert "Built with sqlite-utils" in result.output - assert "Ready for deployment with 'zeeker deploy'" in result.output + assert "SUMMARY:" in result.output def test_build_with_specific_resources(self, runner, mock_manager): """Test build with specific resources.""" mock_result = ValidationResult(is_valid=True) - mock_result.info.extend(["Built table: users (150 records)"]) + mock_result.report = BuildReport( + resources=[ResourceOutcome(name="users", status="success", records=150)] + ) mock_manager.build_database.return_value = mock_result result = runner.invoke(cli, ["build", "users", "posts"]) assert result.exit_code == 0 assert "Building specific resources: users, posts" in result.output - mock_manager.build_database.assert_called_once_with( + self._assert_build_kwargs( + mock_manager, force_schema_reset=False, sync_from_s3=False, resources=["users", "posts"], @@ -307,19 +346,25 @@ def test_build_with_specific_resources(self, runner, mock_manager): def test_build_with_single_resource(self, runner, mock_manager): """Test build with single resource.""" mock_result = ValidationResult(is_valid=True) - mock_result.info.extend(["Built table: users (150 records)"]) + mock_result.report = BuildReport( + resources=[ResourceOutcome(name="users", status="success", records=150)] + ) mock_manager.build_database.return_value = mock_result result = runner.invoke(cli, ["build", "users"]) assert result.exit_code == 0 assert "Building specific resources: users" in result.output - mock_manager.build_database.assert_called_once_with( - force_schema_reset=False, sync_from_s3=False, resources=["users"], setup_fts=False + self._assert_build_kwargs( + mock_manager, + force_schema_reset=False, + sync_from_s3=False, + resources=["users"], + setup_fts=False, ) def test_build_with_invalid_resources(self, runner, mock_manager): - """Test build with invalid resource names.""" + """Unknown resources are pre-flight errors and exit as fatal (code 2).""" mock_result = ValidationResult(is_valid=False) mock_result.errors.extend( ["Unknown resources: invalid_resource", "Available resources: users, posts"] @@ -328,27 +373,88 @@ def test_build_with_invalid_resources(self, runner, mock_manager): result = runner.invoke(cli, ["build", "users", "invalid_resource"]) - assert result.exit_code == 1 + assert result.exit_code == 2 assert "Unknown resources: invalid_resource" in result.output - assert "Available resources: users, posts" in result.output def test_build_resources_with_options(self, runner, mock_manager): """Test build with resources and options combined.""" mock_result = ValidationResult(is_valid=True) - mock_result.info.extend(["Built table: users (150 records)"]) + mock_result.report = BuildReport( + resources=[ResourceOutcome(name="users", status="success", records=150)] + ) mock_manager.build_database.return_value = mock_result result = runner.invoke(cli, ["build", "--force-schema-reset", "users", "posts"]) assert result.exit_code == 0 assert "Building specific resources: users, posts" in result.output - mock_manager.build_database.assert_called_once_with( + self._assert_build_kwargs( + mock_manager, force_schema_reset=True, sync_from_s3=False, resources=["users", "posts"], setup_fts=False, ) + def test_build_json_output(self, runner, mock_manager): + """--json emits a single structured payload with fixed schema.""" + import json as _json + + mock_result = ValidationResult(is_valid=True) + mock_result.report = BuildReport( + resources=[ResourceOutcome(name="users", status="success", records=42, duration_s=0.5)], + total_duration_s=0.7, + ) + mock_manager.build_database.return_value = mock_result + + result = runner.invoke(cli, ["build", "--json"]) + + assert result.exit_code == 0 + payload = _json.loads(result.output) + assert payload["status"] == "success" + assert payload["resources"][0]["name"] == "users" + assert payload["resources"][0]["records"] == 42 + + def test_build_fail_on_empty_exits_one(self, runner, mock_manager): + """--fail-on-empty turns a skipped-only build into exit 1.""" + mock_result = ValidationResult(is_valid=True) + mock_result.report = BuildReport( + resources=[ResourceOutcome(name="users", status="skipped", records=0)], + total_duration_s=0.1, + ) + mock_manager.build_database.return_value = mock_result + + # Without the flag, skipped is OK + result = runner.invoke(cli, ["build"]) + assert result.exit_code == 0 + + # With the flag, skipped is a failure + mock_manager.build_database.reset_mock() + mock_manager.build_database.return_value = mock_result + result = runner.invoke(cli, ["build", "--fail-on-empty"]) + assert result.exit_code == 1 + + def test_build_progress_file_written(self, runner, mock_manager, tmp_path): + """--progress-file writes a JSON BuildReport snapshot to the given path.""" + import json as _json + + progress_path = tmp_path / "progress.json" + + mock_result = ValidationResult(is_valid=True) + mock_result.report = BuildReport( + resources=[ResourceOutcome(name="users", status="success", records=5)], + total_duration_s=0.1, + ) + mock_manager.build_database.return_value = mock_result + + result = runner.invoke(cli, ["build", "--progress-file", str(progress_path)]) + + assert result.exit_code == 0 + assert progress_path.exists() + payload = _json.loads(progress_path.read_text()) + assert payload["status"] == "success" + assert payload["resources"][0]["name"] == "users" + class TestCLIDeploy: """Test 'zeeker deploy' command.""" @@ -375,12 +481,12 @@ def mock_deployer(self): yield mock_instance def test_deploy_not_in_project(self, runner, mock_manager): - """Test deploy when not in a project directory.""" + """Deploy outside a project is a configuration/fatal error (exit 2).""" mock_manager.is_project_root.return_value = False result = runner.invoke(cli, ["deploy"]) - assert result.exit_code == 0 + assert result.exit_code == 2 assert "❌ Not in a Zeeker project directory" in result.output def test_deploy_successful(self, runner, mock_manager, mock_deployer): @@ -430,7 +536,7 @@ def test_deploy_dry_run(self, runner, mock_manager, mock_deployer): assert "Would upload" in result.output def test_deploy_error(self, runner, mock_manager, mock_deployer): - """Test deployment error.""" + """Missing database file is a setup error (exit 2).""" mock_manager.is_project_root.return_value = True # Mock project loading @@ -443,7 +549,7 @@ def test_deploy_error(self, runner, mock_manager, mock_deployer): with patch("pathlib.Path.exists", return_value=False): result = runner.invoke(cli, ["deploy"]) - assert result.exit_code == 0 + assert result.exit_code == 2 assert "❌ Database not found: nonexistent.db" in result.output assert "Run 'zeeker build' first" in result.output diff --git a/packages/zeeker/zeeker/cli.py b/packages/zeeker/zeeker/cli.py index 6d3b88d..73235a8 100644 --- a/packages/zeeker/zeeker/cli.py +++ b/packages/zeeker/zeeker/cli.py @@ -5,9 +5,11 @@ """ import subprocess +import traceback as tb_mod from pathlib import Path import click +from rich.console import Console from .commands.assets import assets from .commands.backup import backup @@ -16,12 +18,14 @@ echo_errors, echo_warnings, load_env, + render_build_report, + render_resource_event, require_database, require_project, ) from .commands.metadata import metadata from .core.project import ZeekerProjectManager -from .core.types import ZeekerSchemaConflictError +from .core.types import BuildReport, ValidationResult, ZeekerSchemaConflictError # Main CLI group @@ -175,83 +179,205 @@ def add( @click.option( "--setup-fts", is_flag=True, help="Set up full-text search (FTS) indexes on configured fields" ) -def build(resources, force_schema_reset, sync_from_s3, setup_fts): +@click.option("-v", "--verbose", is_flag=True, help="Show full Python tracebacks for failures") +@click.option( + "--json", + "as_json", + is_flag=True, + help="Emit a single structured JSON BuildReport to stdout (tracebacks always included)", +) +@click.option( + "--fail-on-empty", + is_flag=True, + help="Treat resources that returned no data as failures (exit 1 instead of passing)", +) +@click.option( + "--progress-file", + type=click.Path(), + help="Write a JSON BuildReport snapshot to this path after each resource (atomic overwrite). " + "Useful for trigger-and-wait callers that poll externally.", +) +def build( + resources, + force_schema_reset, + sync_from_s3, + setup_fts, + verbose, + as_json, + fail_on_empty, + progress_file, +): """Build database from resources using sqlite-utils. Runs fetch_data() for specified resources and creates/updates the SQLite database. If no resources are specified, builds all resources in the project. + Exit codes: + 0 all resources succeeded + 1 one or more resources failed, or FTS setup failed + 2 fatal error (schema conflict, DB open failure, config error) + Examples: - zeeker build # Build all resources - zeeker build --setup-fts # Build all resources and set up FTS indexes - zeeker build users posts # Build specific resources + zeeker build # Build all resources + zeeker build --setup-fts # Build + FTS indexes + zeeker build users posts # Build specific resources + zeeker build --json | jq # Machine-readable output + zeeker build --fail-on-empty # Empty fetch_data() -> exit 1 + zeeker build --progress-file build.json # Watchable progress from outside """ + from .commands.helpers import write_progress_file + load_env() resource_list = list(resources) if resources else None - if resource_list: - click.echo(f"Building specific resources: {', '.join(resource_list)}") + + # Route any non-JSON chatter through the console; JSON mode keeps stdout clean. + console = Console() + if resource_list and not as_json: + console.print(f"Building specific resources: {', '.join(resource_list)}") manager = ZeekerProjectManager() + # A BuildReport we can mutate progressively for --progress-file watchers. + progress_report = BuildReport() if progress_file else None + + def _callback(name, outcome): + # Stream per-resource line unless emitting JSON. + if not as_json: + render_resource_event(name, outcome, console=console) + + # Update the progress file atomically on finish events. + if progress_file and outcome is not None and progress_report is not None: + progress_report.resources.append(outcome) + write_progress_file(progress_file, progress_report) + + # Write an initial (empty) snapshot so watchers see the file exist immediately. + if progress_file and progress_report is not None: + write_progress_file(progress_file, progress_report) + + progress_callback = None if (as_json and not progress_file) else _callback + try: result = manager.build_database( force_schema_reset=force_schema_reset, sync_from_s3=sync_from_s3, resources=resource_list, setup_fts=setup_fts, + progress_callback=progress_callback, ) except ZeekerSchemaConflictError as e: - click.echo("❌ Schema conflict detected:") - click.echo(str(e)) - click.echo("\n💡 To resolve this, you can:") - click.echo(" • Use --force-schema-reset flag to ignore conflicts") - click.echo(" • Add a migrate_schema() function to handle the change") - click.echo(" • Delete the database file to rebuild from scratch") - return - - if result.errors: - click.echo("❌ Database build failed:") - echo_errors(result) - raise click.ClickException("Build failed") - - if result.warnings: + result = ValidationResult(is_valid=False) + result.errors.append(str(e)) + result.tracebacks.append(tb_mod.format_exc()) + result.report = BuildReport(fatal_error=str(e)) + if progress_file: + write_progress_file(progress_file, result.report) + render_build_report(result, verbose=verbose, as_json=as_json, console=console) + raise click.exceptions.Exit(2) + except Exception as e: + result = ValidationResult(is_valid=False) + result.errors.append(f"Build failed: {e}") + result.tracebacks.append(tb_mod.format_exc()) + result.report = BuildReport(fatal_error=f"Build failed: {e}") + if progress_file: + write_progress_file(progress_file, result.report) + render_build_report(result, verbose=verbose, as_json=as_json, console=console) + raise click.exceptions.Exit(2) + + # Pre-flight failures (e.g., "Unknown resources") return a ValidationResult with no + # report — treat them as fatal so agents get a structured fatal_error message. + if result.report is None: + fatal_msg = result.errors[0] if result.errors else "build failed" + result.report = BuildReport(fatal_error=fatal_msg) + + if result.warnings and not as_json: echo_warnings(result) - for info in result.info: - click.echo(f"✅ {info}") + # Final progress-file snapshot reflects the completed state. + if progress_file: + write_progress_file(progress_file, result.report) + + render_build_report(result, verbose=verbose, as_json=as_json, console=console) - click.echo("\n🔧 Built with sqlite-utils for robust schema detection") - click.echo("🚀 Ready for deployment with 'zeeker deploy'") + report = result.report + if report.fatal_error: + raise click.exceptions.Exit(2) + if report.failed or report.fts_error: + raise click.exceptions.Exit(1) + if fail_on_empty and report.skipped: + if not as_json: + console.print( + f"[red]Exiting non-zero: {len(report.skipped)} resource(s) returned no data " + "and --fail-on-empty is set.[/red]" + ) + raise click.exceptions.Exit(1) @cli.command("deploy") @click.option("--dry-run", is_flag=True, help="Show what would be uploaded without uploading") -def deploy_database(dry_run): +@click.option("-v", "--verbose", is_flag=True, help="Show full Python traceback on failure") +@click.option( + "--json", + "as_json", + is_flag=True, + help="Emit a single structured JSON deploy result to stdout", +) +def deploy_database(dry_run, verbose, as_json): """Deploy the project database to S3. Uploads the generated .db file to S3: s3://bucket/latest/{database_name}.db + + Exit codes: + 0 upload succeeded (or dry-run completed) + 1 upload failed + 2 configuration or setup error (missing project, env vars, db file) """ + import json as _json + manager = ZeekerProjectManager() project = require_project(manager) if not project: - return + if as_json: + click.echo(_json.dumps({"status": "fatal", "error": "not_a_zeeker_project"})) + raise click.exceptions.Exit(2) deployer = create_deployer() if not deployer: - return + if as_json: + click.echo(_json.dumps({"status": "fatal", "error": "missing_configuration"})) + raise click.exceptions.Exit(2) db_path = require_database(manager, project) if not db_path: - return + if as_json: + click.echo(_json.dumps({"status": "fatal", "error": "database_not_found"})) + raise click.exceptions.Exit(2) database_name = Path(project.database).stem result = deployer.upload_database(db_path, database_name, dry_run) + if as_json: + payload = { + "status": "success" if result.is_valid else "failed", + "dry_run": dry_run, + "database": database_name, + "destination": f"s3://{deployer.bucket_name}/latest/{database_name}.db", + "info": list(result.info), + "errors": list(result.errors), + "tracebacks": list(result.tracebacks), + } + click.echo(_json.dumps(payload, indent=2)) + if not result.is_valid: + raise click.exceptions.Exit(1) + return + if result.errors: echo_errors(result) - return + if verbose and result.tracebacks: + for tb in result.tracebacks: + click.echo(tb) + raise click.exceptions.Exit(1) for info in result.info: click.echo(f"✅ {info}") diff --git a/packages/zeeker/zeeker/commands/helpers.py b/packages/zeeker/zeeker/commands/helpers.py index b631f0e..e189cd6 100644 --- a/packages/zeeker/zeeker/commands/helpers.py +++ b/packages/zeeker/zeeker/commands/helpers.py @@ -2,12 +2,19 @@ Shared helper functions for CLI commands. """ +import json +import os +import re +from dataclasses import asdict from pathlib import Path import click from dotenv import load_dotenv +from rich.console import Console +from rich.panel import Panel +from rich.table import Table -from ..core.types import ValidationResult +from ..core.types import BuildReport, ResourceOutcome, ValidationResult def echo_errors(result: ValidationResult) -> None: @@ -116,3 +123,246 @@ def show_resource_metadata(resource_name: str, resource_config: dict): for field in metadata_fields: if field in resource_config: click.echo(f" {field.replace('_', ' ').title()}: {resource_config[field]}") + + +# ---------------------------------------------------------------------------- +# Build report rendering +# ---------------------------------------------------------------------------- + +# Matches `File ""` lines in Python tracebacks so we can rewrite absolute +# paths to be relative to CWD for agent consumption. +_TRACEBACK_PATH_RE = re.compile(r'File "([^"]+)"') + +_STATUS_GLYPHS = { + "success": ("[green]✓[/green]", "OK"), + "failed": ("[red]✗[/red]", "FAIL"), + "skipped": ("[yellow]−[/yellow]", "SKIP"), +} + + +def _relativize_traceback(tb: str) -> str: + """Rewrite absolute paths in a traceback to be CWD-relative where possible.""" + if not tb: + return tb + cwd = os.getcwd() + + def _sub(match: re.Match) -> str: + raw = match.group(1) + try: + rel = os.path.relpath(raw, start=cwd) + except ValueError: + return match.group(0) + # Prefer relative only when it doesn't escape too far upward + if rel.startswith(".." + os.sep + ".." + os.sep + ".."): + return match.group(0) + return f'File "{rel}"' + + return _TRACEBACK_PATH_RE.sub(_sub, tb) + + +def _report_overall_status(report: BuildReport) -> str: + if report.fatal_error: + return "fatal" + if report.failed or report.fts_error: + return "partial_failure" if report.succeeded else "failed" + return "success" + + +def render_resource_event( + name: str, + outcome: ResourceOutcome | None, + *, + console: Console, +) -> None: + """Stream a per-resource line during the build. + + Called by the CLI on both start (outcome=None) and finish events. On start, + emits a dim "building" hint in TTY mode only. On finish, emits a colored + glyph line in TTY mode or a stable ``[OK]/[FAIL]/[SKIP]`` prefix in non-TTY + mode for agent parsing. + """ + if outcome is None: + if console.is_terminal: + console.print(f"[dim]→ Building [cyan]{name}[/cyan]...[/dim]") + return + + if console.is_terminal: + glyph, _ = _STATUS_GLYPHS[outcome.status] + if outcome.status == "success": + detail = f"{outcome.records} records" + elif outcome.status == "failed": + detail = f"[red]{outcome.error_message or 'failed'}[/red]" + else: # skipped + detail = "[yellow]no data returned[/yellow]" + console.print( + f"{glyph} [bold]{name}[/bold] {detail} " f"[dim]({outcome.duration_s:.1f}s)[/dim]" + ) + else: + prefix = _STATUS_GLYPHS[outcome.status][1] + if outcome.status == "success": + note = f"{outcome.records} records" + elif outcome.status == "failed": + note = outcome.error_message or "failed" + else: # skipped + note = "no data returned" + console.print( + f"[{prefix:<4}] {name:<24} {note} ({outcome.duration_s:.1f}s)", + markup=False, + highlight=False, + ) + + +def render_build_report( + result: ValidationResult, + *, + verbose: bool, + as_json: bool, + console: Console, +) -> None: + """Emit the build outcome in the requested format. + + - ``as_json``: single JSON object to stdout. Tracebacks always included. + - TTY text mode: rich summary table + optional traceback panels when ``verbose``. + - Non-TTY text mode: one ``[OK]`` / ``[FAIL]`` / ``[SKIP]`` line per resource + (already streamed during the build) + a ``SUMMARY:`` footer. With ``verbose``, + tracebacks printed under failures. + """ + report = result.report or BuildReport() + + if as_json: + _emit_json(report, console=console) + return + + if console.is_terminal: + _emit_rich(report, verbose=verbose, console=console) + else: + _emit_plain(report, verbose=verbose, console=console) + + +def _build_report_payload(report: BuildReport) -> dict: + """Serialize a BuildReport to the stable JSON schema (used by both --json and + --progress-file so external watchers and CLI consumers see identical shape).""" + payload = { + "status": _report_overall_status(report), + "total_duration_s": round(report.total_duration_s, 3), + "resources": [asdict(r) for r in report.resources], + "fts_error": report.fts_error, + "fatal_error": report.fatal_error, + } + for item in payload["resources"]: + if item.get("traceback"): + item["traceback"] = _relativize_traceback(item["traceback"]) + return payload + + +def write_progress_file(path: str | os.PathLike, report: BuildReport) -> None: + """Atomically overwrite ``path`` with a JSON snapshot of ``report``. + + Writes to a sibling ``.tmp`` file then renames, so external watchers never + see a truncated file. Swallows I/O errors silently — a progress file failure + must never fail the build. + """ + try: + target = Path(path) + target.parent.mkdir(parents=True, exist_ok=True) + tmp = target.with_suffix(target.suffix + ".tmp") + tmp.write_text(json.dumps(_build_report_payload(report), indent=2)) + os.replace(tmp, target) + except Exception: + # Progress reporting is best-effort — never fail a build on it. + pass + + +def _emit_json(report: BuildReport, *, console: Console) -> None: + console.print( + json.dumps(_build_report_payload(report), indent=2), + markup=False, + highlight=False, + ) + + +def _emit_rich(report: BuildReport, *, verbose: bool, console: Console) -> None: + if report.fatal_error: + console.print(Panel(report.fatal_error, title="[red]Fatal error[/red]", border_style="red")) + return + + if not report.resources and not report.fts_error: + console.print("[yellow]No resources built.[/yellow]") + return + + table = Table(title="Build report", show_lines=False) + table.add_column("Resource", style="bold") + table.add_column("Status") + table.add_column("Records", justify="right") + table.add_column("Time", justify="right") + table.add_column("Notes") + + for r in report.resources: + glyph, _ = _STATUS_GLYPHS[r.status] + if r.status == "success": + notes = "" + if r.fragments_records is not None: + notes = f"+{r.fragments_records} fragments" + records = str(r.records) + elif r.status == "failed": + notes = r.error_message or "" + records = "" + else: # skipped + notes = "no data returned" + records = "" + table.add_row(r.name, glyph, records, f"{r.duration_s:.1f}s", notes) + + console.print(table) + + succeeded = len(report.succeeded) + failed = len(report.failed) + skipped = len(report.skipped) + total = len(report.resources) + console.print( + f"[bold]{succeeded} of {total} resources succeeded[/bold] " + f"({failed} failed, {skipped} skipped) in {report.total_duration_s:.1f}s" + ) + + if report.fts_error: + console.print(f"[red]FTS setup failed:[/red] {report.fts_error}") + + if verbose: + for r in report.failed: + if r.traceback: + console.print( + Panel( + _relativize_traceback(r.traceback), + title=f"[red]Traceback · {r.name}[/red]", + border_style="red", + ) + ) + + +def _emit_plain(report: BuildReport, *, verbose: bool, console: Console) -> None: + # Per-resource lines already streamed via render_resource_event during the build, + # so here we only emit the summary + verbose tracebacks + fatal/FTS errors. + if report.fatal_error: + console.print(f"FATAL: {report.fatal_error}", markup=False, highlight=False) + return + + succeeded = len(report.succeeded) + failed = len(report.failed) + skipped = len(report.skipped) + console.print( + f"SUMMARY: {succeeded} succeeded, {failed} failed, {skipped} skipped " + f"in {report.total_duration_s:.1f}s", + markup=False, + highlight=False, + ) + + if report.fts_error: + console.print(f"FTS_ERROR: {report.fts_error}", markup=False, highlight=False) + + if verbose: + for r in report.failed: + if r.traceback: + console.print( + f"TRACEBACK: {r.name}\n{_relativize_traceback(r.traceback)}", + markup=False, + highlight=False, + ) diff --git a/packages/zeeker/zeeker/core/database/builder.py b/packages/zeeker/zeeker/core/database/builder.py index 6dba536..b6a6fee 100644 --- a/packages/zeeker/zeeker/core/database/builder.py +++ b/packages/zeeker/zeeker/core/database/builder.py @@ -6,12 +6,20 @@ """ import time +import traceback from pathlib import Path +from typing import Callable import sqlite_utils from ..schema import SchemaManager -from ..types import ValidationResult, ZeekerProject, ZeekerSchemaConflictError +from ..types import ( + BuildReport, + ResourceOutcome, + ValidationResult, + ZeekerProject, + ZeekerSchemaConflictError, +) from .fts_processor import FTSProcessor from .processor import ResourceProcessor from .s3_sync import S3Synchronizer @@ -41,6 +49,7 @@ def build_database( sync_from_s3: bool = False, resources: list[str] = None, setup_fts: bool = False, + progress_callback: Callable[[str, ResourceOutcome | None], None] | None = None, ) -> ValidationResult: """Build the SQLite database from resources using sqlite-utils. @@ -55,11 +64,19 @@ def build_database( sync_from_s3: If True, download existing database from S3 before building resources: List of specific resource names to build. If None, builds all resources. setup_fts: If True, set up full-text search indexes on configured fields + progress_callback: Optional callable invoked as (resource_name, None) when a + resource starts and (resource_name, outcome) when it finishes. Allows the + CLI layer to drive progress bars or streaming output. Returns: - ValidationResult with build results + ValidationResult with build results. `result.report` holds a BuildReport + describing per-resource outcomes, timings, and fatal/FTS errors. """ result = ValidationResult(is_valid=True) + report = BuildReport() + result.report = report + build_started = time.perf_counter() + db_path = self.project_path / self.project.database # S3 Database Synchronization - Download existing DB if requested @@ -86,27 +103,50 @@ def build_database( # Process each specified resource for resource_name in resources_to_build: + if progress_callback: + progress_callback(resource_name, None) + + resource_started = time.perf_counter() resource_result = self._process_resource_with_schema_check( db, resource_name, force_schema_reset, build_id ) + duration = time.perf_counter() - resource_started + + outcome = self._build_resource_outcome(resource_name, resource_result, duration) if not resource_result.is_valid: result.errors.extend(resource_result.errors) + result.tracebacks.extend(resource_result.tracebacks) result.is_valid = False else: result.info.extend(resource_result.info) - # Process fragments if enabled - resource_config = self.project.resources.get(resource_name, {}) - is_fragments_enabled = resource_config.get("fragments", False) - - if is_fragments_enabled: - fragments_result = self._process_fragments_for_resource(db, resource_name) - if not fragments_result.is_valid: - result.errors.extend(fragments_result.errors) - result.is_valid = False - else: - result.info.extend(fragments_result.info) + # Process fragments if enabled (main must have succeeded) + if outcome.status == "success": + resource_config = self.project.resources.get(resource_name, {}) + if resource_config.get("fragments", False): + fragments_result = self._process_fragments_for_resource( + db, resource_name + ) + if not fragments_result.is_valid: + result.errors.extend(fragments_result.errors) + result.tracebacks.extend(fragments_result.tracebacks) + result.is_valid = False + outcome.status = "failed" + outcome.error_message = ( + fragments_result.errors[0] + if fragments_result.errors + else "fragments processing failed" + ) + if fragments_result.tracebacks: + outcome.traceback = fragments_result.tracebacks[0] + else: + result.info.extend(fragments_result.info) + outcome.fragments_records = fragments_result.records + + report.resources.append(outcome) + if progress_callback: + progress_callback(resource_name, outcome) # Set up FTS after all resources are processed (only if requested) if result.is_valid and setup_fts: @@ -114,6 +154,9 @@ def build_database( if not fts_result.is_valid: result.errors.extend(fts_result.errors) result.is_valid = False + report.fts_error = ( + fts_result.errors[0] if fts_result.errors else "FTS setup failed" + ) else: result.info.extend(fts_result.info) if fts_result.warnings: @@ -123,10 +166,38 @@ def build_database( except Exception as e: result.is_valid = False - result.errors.append(f"Database build failed: {e}") + msg = f"Database build failed: {e}" + result.errors.append(msg) + result.tracebacks.append(traceback.format_exc()) + report.fatal_error = msg + report.total_duration_s = time.perf_counter() - build_started return result + @staticmethod + def _build_resource_outcome( + resource_name: str, resource_result: ValidationResult, duration_s: float + ) -> ResourceOutcome: + """Construct a ResourceOutcome from a per-resource ValidationResult.""" + if not resource_result.is_valid: + return ResourceOutcome( + name=resource_name, + status="failed", + duration_s=duration_s, + error_message=( + resource_result.errors[0] if resource_result.errors else "unknown error" + ), + traceback=(resource_result.tracebacks[0] if resource_result.tracebacks else None), + ) + + is_skipped = any("No data returned" in msg for msg in resource_result.info) + return ResourceOutcome( + name=resource_name, + status="skipped" if is_skipped else "success", + records=resource_result.records or 0, + duration_s=duration_s, + ) + def _process_resource_with_schema_check( self, db: sqlite_utils.Database, resource_name: str, force_schema_reset: bool, build_id: str ) -> ValidationResult: @@ -186,9 +257,11 @@ def _process_resource_with_schema_check( resource_result = self.processor.process_resource(db, resource_name, module) if not resource_result.is_valid: result.errors.extend(resource_result.errors) + result.tracebacks.extend(resource_result.tracebacks) result.is_valid = False else: result.info.extend(resource_result.info) + result.records = resource_result.records # Update resource timestamps duration_ms = int((time.time() - start_time) * 1000) @@ -199,9 +272,11 @@ def _process_resource_with_schema_check( except ZeekerSchemaConflictError as e: result.is_valid = False result.errors.append(str(e)) + result.tracebacks.append(traceback.format_exc()) except Exception as e: result.is_valid = False result.errors.append(f"Failed to process resource '{resource_name}': {e}") + result.tracebacks.append(traceback.format_exc()) return result @@ -253,8 +328,10 @@ def _process_fragments_for_resource( ) if not fragments_result.is_valid: result.errors.extend(fragments_result.errors) + result.tracebacks.extend(fragments_result.tracebacks) result.is_valid = False else: result.info.extend(fragments_result.info) + result.records = fragments_result.records return result diff --git a/packages/zeeker/zeeker/core/database/processor.py b/packages/zeeker/zeeker/core/database/processor.py index 1a56472..cc15593 100644 --- a/packages/zeeker/zeeker/core/database/processor.py +++ b/packages/zeeker/zeeker/core/database/processor.py @@ -8,6 +8,7 @@ import importlib.util import inspect import sqlite3 +import traceback from pathlib import Path from typing import Any, Dict, List @@ -104,6 +105,7 @@ def process_resource( # Insert all data at once for better performance table.insert_all(transformed_data, replace=False) + result.records = len(transformed_data) result.info.append( f"Processed {len(transformed_data)} records for resource '{resource_name}'" ) @@ -111,9 +113,11 @@ def process_resource( except sqlite3.IntegrityError as e: result.is_valid = False result.errors.append(f"Database integrity error in '{resource_name}': {e}") + result.tracebacks.append(traceback.format_exc()) except Exception as e: result.is_valid = False result.errors.append(f"Failed to process resource '{resource_name}': {e}") + result.tracebacks.append(traceback.format_exc()) return result @@ -197,6 +201,7 @@ def process_fragments_data( # Insert all fragments at once for better performance fragments_table.insert_all(transformed_fragments, replace=False) + result.records = len(transformed_fragments) result.info.append( f"Processed {len(transformed_fragments)} fragments for resource '{resource_name}'" ) @@ -204,9 +209,11 @@ def process_fragments_data( except sqlite3.IntegrityError as e: result.is_valid = False result.errors.append(f"Database integrity error in '{resource_name}' fragments: {e}") + result.tracebacks.append(traceback.format_exc()) except Exception as e: result.is_valid = False result.errors.append(f"Failed to process fragments for '{resource_name}': {e}") + result.tracebacks.append(traceback.format_exc()) return result @@ -237,6 +244,7 @@ def _load_resource_module(self, resource_name: str) -> ValidationResult: except Exception as e: result.is_valid = False result.errors.append(f"Failed to load resource module '{resource_name}': {e}") + result.tracebacks.append(traceback.format_exc()) return result diff --git a/packages/zeeker/zeeker/core/deployer.py b/packages/zeeker/zeeker/core/deployer.py index c04bfb5..8d44c66 100644 --- a/packages/zeeker/zeeker/core/deployer.py +++ b/packages/zeeker/zeeker/core/deployer.py @@ -4,6 +4,7 @@ import hashlib import os +import traceback from pathlib import Path import boto3 @@ -69,6 +70,7 @@ def _upload_db_to_s3( except Exception as e: result.is_valid = False result.errors.append(f"Failed to {action_verb} database: {e}") + result.tracebacks.append(traceback.format_exc()) return result diff --git a/packages/zeeker/zeeker/core/project.py b/packages/zeeker/zeeker/core/project.py index 2bd2e40..4091530 100644 --- a/packages/zeeker/zeeker/core/project.py +++ b/packages/zeeker/zeeker/core/project.py @@ -3,11 +3,12 @@ """ from pathlib import Path +from typing import Callable from .database import DatabaseBuilder from .resources import ResourceManager from .scaffolding import ProjectScaffolder -from .types import ValidationResult, ZeekerProject +from .types import ResourceOutcome, ValidationResult, ZeekerProject class ZeekerProjectManager: @@ -74,6 +75,8 @@ def build_database( sync_from_s3: bool = False, resources: list[str] = None, setup_fts: bool = False, + *, + progress_callback: Callable[[str, ResourceOutcome | None], None] | None = None, ) -> ValidationResult: """Build the SQLite database from resources with optional S3 sync. @@ -82,9 +85,12 @@ def build_database( sync_from_s3: If True, download existing database from S3 before building resources: List of specific resource names to build. If None, builds all resources. setup_fts: If True, set up full-text search indexes on configured fields + progress_callback: Optional callable (resource_name, outcome|None) used by the + CLI layer to render progress. See DatabaseBuilder.build_database. Returns: - ValidationResult with build results + ValidationResult with build results; the BuildReport is attached as + ``result.report``. """ if not self.is_project_root(): result = ValidationResult(is_valid=False) @@ -104,4 +110,10 @@ def build_database( builder = DatabaseBuilder(self.project_path, project) - return builder.build_database(force_schema_reset, sync_from_s3, resources, setup_fts) + return builder.build_database( + force_schema_reset, + sync_from_s3, + resources, + setup_fts, + progress_callback=progress_callback, + ) diff --git a/packages/zeeker/zeeker/core/types.py b/packages/zeeker/zeeker/core/types.py index 6f05762..7617b7a 100644 --- a/packages/zeeker/zeeker/core/types.py +++ b/packages/zeeker/zeeker/core/types.py @@ -7,7 +7,7 @@ import tomllib from dataclasses import dataclass, field from pathlib import Path -from typing import Any +from typing import Any, Literal # Meta table constants META_TABLE_SCHEMAS = "_zeeker_schemas" @@ -57,6 +57,41 @@ def __init__(self, resource_name: str, old_schema: dict[str, str], new_schema: d super().__init__("\n".join(msg_parts)) +@dataclass +class ResourceOutcome: + """Per-resource outcome from a build, aggregated by DatabaseBuilder.""" + + name: str + status: Literal["success", "failed", "skipped"] + records: int = 0 + duration_s: float = 0.0 + error_message: str | None = None + traceback: str | None = None + fragments_records: int | None = None + + +@dataclass +class BuildReport: + """Structured report of a full database build, one entry per resource.""" + + resources: list[ResourceOutcome] = field(default_factory=list) + total_duration_s: float = 0.0 + fts_error: str | None = None + fatal_error: str | None = None + + @property + def failed(self) -> list[ResourceOutcome]: + return [r for r in self.resources if r.status == "failed"] + + @property + def succeeded(self) -> list[ResourceOutcome]: + return [r for r in self.resources if r.status == "success"] + + @property + def skipped(self) -> list[ResourceOutcome]: + return [r for r in self.resources if r.status == "skipped"] + + @dataclass class ValidationResult: """Result of validation operations.""" @@ -65,6 +100,9 @@ class ValidationResult: errors: list[str] = field(default_factory=list) warnings: list[str] = field(default_factory=list) info: list[str] = field(default_factory=list) + tracebacks: list[str] = field(default_factory=list) + report: "BuildReport | None" = None + records: int | None = None @dataclass diff --git a/uv.lock b/uv.lock index f26b28f..927a688 100644 --- a/uv.lock +++ b/uv.lock @@ -571,6 +571,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/31/b4/b9b800c45527aadd64d5b442f9b932b00648617eb5d63d2c7a6587b7cafc/jmespath-1.0.1-py3-none-any.whl", hash = "sha256:02e2e4cc71b5bcab88332eebf907519190dd9e6e82107fa7f83b1003a6252980", size = 20256, upload-time = "2022-06-17T18:00:10.251Z" }, ] +[[package]] +name = "markdown-it-py" +version = "4.0.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "mdurl" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/5b/f5/4ec618ed16cc4f8fb3b701563655a69816155e79e24a17b651541804721d/markdown_it_py-4.0.0.tar.gz", hash = "sha256:cb0a2b4aa34f932c007117b194e945bd74e0ec24133ceb5bac59009cda1cb9f3", size = 73070, upload-time = "2025-08-11T12:57:52.854Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/94/54/e7d793b573f298e1c9013b8c4dade17d481164aa517d1d7148619c2cedbf/markdown_it_py-4.0.0-py3-none-any.whl", hash = "sha256:87327c59b172c5011896038353a81343b6754500a08cd7a4973bb48c6d578147", size = 87321, upload-time = "2025-08-11T12:57:51.923Z" }, +] + [[package]] name = "markupsafe" version = "3.0.2" @@ -609,6 +621,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/4f/65/6079a46068dfceaeabb5dcad6d674f5f5c61a6fa5673746f42a9f4c233b3/MarkupSafe-3.0.2-cp313-cp313t-win_amd64.whl", hash = "sha256:e444a31f8db13eb18ada366ab3cf45fd4b31e4db1236a4448f68778c1d1a5a2f", size = 15739, upload-time = "2024-10-18T15:21:42.784Z" }, ] +[[package]] +name = "mdurl" +version = "0.1.2" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/d6/54/cfe61301667036ec958cb99bd3efefba235e65cdeb9c84d24a8293ba1d90/mdurl-0.1.2.tar.gz", hash = "sha256:bb413d29f5eea38f31dd4754dd7377d4465116fb207585f97bf925588687c1ba", size = 8729, upload-time = "2022-08-14T12:40:10.846Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/b3/38/89ba8ad64ae25be8de66a6d463314cf1eb366222074cfda9ee839c56a4b4/mdurl-0.1.2-py3-none-any.whl", hash = "sha256:84008a41e51615a49fc9966191ff91509e3c40b939176e643fd50a5c2196b8f8", size = 9979, upload-time = "2022-08-14T12:40:09.779Z" }, +] + [[package]] name = "mergedeep" version = "1.3.4" @@ -899,6 +920,19 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/1e/db/4254e3eabe8020b458f1a747140d32277ec7a271daf1d235b70dc0b4e6e3/requests-2.32.5-py3-none-any.whl", hash = "sha256:2462f94637a34fd532264295e186976db0f5d453d1cdd31473c85a6a161affb6", size = 64738, upload-time = "2025-08-18T20:46:00.542Z" }, ] +[[package]] +name = "rich" +version = "15.0.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "markdown-it-py" }, + { name = "pygments" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/c0/8f/0722ca900cc807c13a6a0c696dacf35430f72e0ec571c4275d2371fca3e9/rich-15.0.0.tar.gz", hash = "sha256:edd07a4824c6b40189fb7ac9bc4c52536e9780fbbfbddf6f1e2502c31b068c36", size = 230680, upload-time = "2026-04-12T08:24:00.75Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/82/3b/64d4899d73f91ba49a8c18a8ff3f0ea8f1c1d75481760df8c68ef5235bf5/rich-15.0.0-py3-none-any.whl", hash = "sha256:33bd4ef74232fb73fe9279a257718407f169c09b78a87ad3d296f548e27de0bb", size = 310654, upload-time = "2026-04-12T08:24:02.83Z" }, +] + [[package]] name = "ruff" version = "0.12.8" @@ -1081,6 +1115,7 @@ dependencies = [ { name = "jinja2" }, { name = "python-dotenv" }, { name = "pyyaml" }, + { name = "rich" }, { name = "sqlite-utils" }, ] @@ -1098,6 +1133,7 @@ requires-dist = [ { name = "jinja2", specifier = ">=3.1.6" }, { name = "python-dotenv", specifier = ">=1.0.0" }, { name = "pyyaml", specifier = ">=6.0.2" }, + { name = "rich", specifier = ">=13.0" }, { name = "sqlite-utils", specifier = ">=3.38" }, { name = "tenacity", marker = "extra == 'data'", specifier = ">=8.2.0" }, ] From bc977389fcb4c9dd255fb20844bfa2c520cc78c6 Mon Sep 17 00:00:00 2001 From: houfu Date: Fri, 17 Apr 2026 20:09:31 +0800 Subject: [PATCH 2/3] Parallel fetch, post-hook, force-sync, and transform traceback Targets four friction points from a week of real builds: 1. Cross-resource parallelism (--parallel N) DatabaseBuilder.build_database gains max_parallel. When > 1, fetch_data() calls for all resources are pre-warmed under asyncio.gather + Semaphore(N) before the existing sequential insert loop runs. Sync fetchers run in the default ThreadPoolExecutor so they participate alongside async ones. DB writes stay strictly sequential to avoid SQLite contention. A 4-resource project sleeping 0.3s per fetch drops from ~1.2s to <1.0s with --parallel 4 (test_builder_parallel.py). 2. Mid-pipeline hook (--post-hook CMD) New commands/post_hook.py runs a shell command after a successful build with ZEEKER_DB_PATH, ZEEKER_DB_NAME, ZEEKER_PROJECT_PATH, ZEEKER_BUILD_STATUS, and ZEEKER_BUILD_REPORT env vars set. Non-zero hook exit propagates to CLI exit 1. Outcome is part of BuildReport (and therefore JSON output and --progress-file). 3. Safer --sync-from-s3 (--force-sync) Sync refuses to download when a local DB already exists; the user must pass --force-sync to opt in. Byte-level hash comparisons were considered but every build writes to meta tables, so "diverged" detection is unreliable. Conservative existence check matches the user's ask ("making this explicit would make the workflow safer"). 4. Transform traceback hygiene _apply_transformation now returns (data, traceback_str). When a transform raises, the full Python stack flows into result.tracebacks and is surfaced via -v / --json. Also converts two silent except-pass blocks in builder.py (schema-sample fetch and fragments-context fetch) into warnings so users can see why those paths bailed. Four new test modules added; two existing tests updated for the tuple return shape and the force-sync default. Agent-team attempt note: four Wave-1 streams were planned to run as parallel worktree-isolated agents, but the worktrees fired from a pre-structured-reporting commit and hit a read-before-edit hook that wasn't working across their contexts. Executed all four streams plus Wave 2 integration inline from main instead. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../zeeker/tests/test_builder_parallel.py | 132 ++++++++++++++++++ packages/zeeker/tests/test_cli.py | 52 +++++++ .../zeeker/tests/test_database_components.py | 20 +-- packages/zeeker/tests/test_post_hook.py | 106 ++++++++++++++ .../zeeker/tests/test_processor_transform.py | 61 ++++++++ .../zeeker/tests/test_s3_sync_divergence.py | 86 ++++++++++++ packages/zeeker/zeeker/cli.py | 64 ++++++++- packages/zeeker/zeeker/commands/helpers.py | 32 +++++ packages/zeeker/zeeker/commands/post_hook.py | 84 +++++++++++ .../zeeker/core/database/async_executor.py | 69 ++++++++- .../zeeker/zeeker/core/database/builder.py | 99 ++++++++++++- .../zeeker/zeeker/core/database/processor.py | 26 ++-- .../zeeker/zeeker/core/database/s3_sync.py | 50 ++++++- packages/zeeker/zeeker/core/project.py | 7 + packages/zeeker/zeeker/core/types.py | 5 + 15 files changed, 854 insertions(+), 39 deletions(-) create mode 100644 packages/zeeker/tests/test_builder_parallel.py create mode 100644 packages/zeeker/tests/test_post_hook.py create mode 100644 packages/zeeker/tests/test_processor_transform.py create mode 100644 packages/zeeker/tests/test_s3_sync_divergence.py create mode 100644 packages/zeeker/zeeker/commands/post_hook.py diff --git a/packages/zeeker/tests/test_builder_parallel.py b/packages/zeeker/tests/test_builder_parallel.py new file mode 100644 index 0000000..60f3828 --- /dev/null +++ b/packages/zeeker/tests/test_builder_parallel.py @@ -0,0 +1,132 @@ +"""Tests for cross-resource parallel fetch phase in DatabaseBuilder.""" + +import asyncio +import time +from pathlib import Path + +import pytest + +from zeeker.core.database.builder import DatabaseBuilder +from zeeker.core.types import ZeekerProject + + +def _write_resource(resources_dir: Path, name: str, body: str) -> None: + resources_dir.mkdir(parents=True, exist_ok=True) + (resources_dir / f"{name}.py").write_text(body) + + +def _async_sleep_resource(name: str, sleep_s: float = 0.3) -> str: + """Module body that sleeps asynchronously then returns a tiny record.""" + return f""" +import asyncio + +async def fetch_data(existing_table): + await asyncio.sleep({sleep_s}) + return [{{"id": 1, "name": "{name}"}}] +""" + + +def _sync_sleep_resource(name: str, sleep_s: float = 0.3) -> str: + return f""" +import time + +def fetch_data(existing_table): + time.sleep({sleep_s}) + return [{{"id": 1, "name": "{name}"}}] +""" + + +def _build_project(tmp_path: Path, resource_names: list[str]) -> DatabaseBuilder: + project = ZeekerProject( + name="parallel_test", + database="parallel_test.db", + resources={name: {} for name in resource_names}, + root_path=tmp_path, + ) + return DatabaseBuilder(tmp_path, project) + + +def test_parallel_fetch_is_faster_than_sequential(tmp_path: Path): + names = ["a", "b", "c", "d"] + resources_dir = tmp_path / "resources" + for n in names: + _write_resource(resources_dir, n, _async_sleep_resource(n, sleep_s=0.3)) + + builder = _build_project(tmp_path, names) + + started = time.perf_counter() + result = builder.build_database(max_parallel=4) + elapsed = time.perf_counter() - started + + assert result.is_valid, f"Build failed: {result.errors}" + assert len(result.report.succeeded) == 4 + # 4 × 0.3s serial = 1.2s+ of pure sleep; parallel should finish well + # under that. Give a generous bound to avoid CI flake. + assert elapsed < 1.0, f"Parallel build took {elapsed:.2f}s — not parallel?" + + +def test_parallel_fetch_handles_mixed_sync_and_async(tmp_path: Path): + resources_dir = tmp_path / "resources" + _write_resource(resources_dir, "a_async", _async_sleep_resource("a_async", 0.3)) + _write_resource(resources_dir, "b_sync", _sync_sleep_resource("b_sync", 0.3)) + + builder = _build_project(tmp_path, ["a_async", "b_sync"]) + + started = time.perf_counter() + result = builder.build_database(max_parallel=4) + elapsed = time.perf_counter() - started + + assert result.is_valid, result.errors + assert len(result.report.succeeded) == 2 + # Two 0.3s fetches in parallel should be ~0.3s, definitely under 0.6s. + assert elapsed < 0.6 + + +def test_parallel_one_failure_doesnt_block_others(tmp_path: Path): + resources_dir = tmp_path / "resources" + _write_resource( + resources_dir, + "ok", + """ +def fetch_data(existing_table): + return [{"id": 1}] +""", + ) + _write_resource( + resources_dir, + "boom", + """ +def fetch_data(existing_table): + raise RuntimeError("boom") +""", + ) + + builder = _build_project(tmp_path, ["ok", "boom"]) + result = builder.build_database(max_parallel=2) + + # Build itself reports not-valid (one failed), but the succeeded resource + # must still appear in the report. + names = {r.name: r.status for r in result.report.resources} + assert names == {"ok": "success", "boom": "failed"} + failed = next(r for r in result.report.resources if r.name == "boom") + assert failed.error_message and "boom" in failed.error_message + assert failed.traceback and "RuntimeError" in failed.traceback + + +def test_sequential_default_still_works(tmp_path: Path): + """max_parallel=1 (the default) must preserve existing behaviour.""" + resources_dir = tmp_path / "resources" + _write_resource( + resources_dir, + "a", + """ +def fetch_data(existing_table): + return [{"id": 1, "name": "alpha"}] +""", + ) + + builder = _build_project(tmp_path, ["a"]) + result = builder.build_database() # default max_parallel=1 + + assert result.is_valid + assert len(result.report.succeeded) == 1 diff --git a/packages/zeeker/tests/test_cli.py b/packages/zeeker/tests/test_cli.py index eb9ed4b..2380ccf 100644 --- a/packages/zeeker/tests/test_cli.py +++ b/packages/zeeker/tests/test_cli.py @@ -455,6 +455,58 @@ def test_build_progress_file_written(self, runner, mock_manager, tmp_path): assert payload["status"] == "success" assert payload["resources"][0]["name"] == "users" + def test_build_parallel_flag_forwarded(self, runner, mock_manager): + """--parallel N reaches the manager as max_parallel.""" + mock_result = ValidationResult(is_valid=True) + mock_result.report = BuildReport(total_duration_s=0.1) + mock_manager.build_database.return_value = mock_result + + result = runner.invoke(cli, ["build", "--parallel", "4"]) + assert result.exit_code == 0 + self._assert_build_kwargs(mock_manager, max_parallel=4) + + def test_build_post_hook_success_exits_zero(self, runner, mock_manager, tmp_path): + """--post-hook runs after a successful build; zero exit keeps code 0.""" + mock_result = ValidationResult(is_valid=True) + mock_result.report = BuildReport( + resources=[ResourceOutcome(name="users", status="success", records=1)], + total_duration_s=0.1, + ) + mock_manager.build_database.return_value = mock_result + mock_project = MagicMock() + mock_project.database = "my.db" + mock_manager.load_project.return_value = mock_project + mock_manager.project_path = tmp_path + + result = runner.invoke(cli, ["build", "--post-hook", "exit 0"]) + assert result.exit_code == 0 + + def test_build_post_hook_nonzero_exits_one(self, runner, mock_manager, tmp_path): + """--post-hook non-zero exit propagates to exit code 1.""" + mock_result = ValidationResult(is_valid=True) + mock_result.report = BuildReport( + resources=[ResourceOutcome(name="users", status="success", records=1)], + total_duration_s=0.1, + ) + mock_manager.build_database.return_value = mock_result + mock_project = MagicMock() + mock_project.database = "my.db" + mock_manager.load_project.return_value = mock_project + mock_manager.project_path = tmp_path + + result = runner.invoke(cli, ["build", "--post-hook", "exit 5"]) + assert result.exit_code == 1 + + def test_build_force_sync_flag_forwarded(self, runner, mock_manager): + """--force-sync reaches the manager.""" + mock_result = ValidationResult(is_valid=True) + mock_result.report = BuildReport(total_duration_s=0.0) + mock_manager.build_database.return_value = mock_result + + result = runner.invoke(cli, ["build", "--sync-from-s3", "--force-sync"]) + assert result.exit_code == 0 + self._assert_build_kwargs(mock_manager, force_sync=True, sync_from_s3=True) + class TestCLIDeploy: """Test 'zeeker deploy' command.""" diff --git a/packages/zeeker/tests/test_database_components.py b/packages/zeeker/tests/test_database_components.py index 4023da2..7d86d55 100644 --- a/packages/zeeker/tests/test_database_components.py +++ b/packages/zeeker/tests/test_database_components.py @@ -267,9 +267,10 @@ def test_apply_transformation_success(self, processor): mock_module.transform_data.return_value = [{"id": 1, "transformed": True}] raw_data = [{"id": 1, "raw": True}] - result = processor._apply_transformation(mock_module, raw_data, "test", "transform_data") + data, tb = processor._apply_transformation(mock_module, raw_data, "test", "transform_data") - assert result == [{"id": 1, "transformed": True}] + assert data == [{"id": 1, "transformed": True}] + assert tb is None mock_module.transform_data.assert_called_once_with(raw_data) def test_apply_transformation_missing_function(self, processor): @@ -280,10 +281,11 @@ def test_apply_transformation_missing_function(self, processor): delattr(mock_module, "transform_data") raw_data = [{"id": 1, "raw": True}] - result = processor._apply_transformation(mock_module, raw_data, "test", "transform_data") + data, tb = processor._apply_transformation(mock_module, raw_data, "test", "transform_data") - # Should return original data when function is missing - assert result == raw_data + # Should return original data when function is missing, no traceback. + assert data == raw_data + assert tb is None def test_apply_transformation_function_error(self, processor): """Test transformation with function that throws error.""" @@ -291,10 +293,12 @@ def test_apply_transformation_function_error(self, processor): mock_module.transform_data = MagicMock(side_effect=Exception("Transform failed")) raw_data = [{"id": 1, "raw": True}] - result = processor._apply_transformation(mock_module, raw_data, "test", "transform_data") + data, tb = processor._apply_transformation(mock_module, raw_data, "test", "transform_data") - # Should return None when function throws exception - assert result is None + # Data should be None when function throws; traceback should be captured. + assert data is None + assert tb is not None + assert "Transform failed" in tb class TestDatabaseBuilder: diff --git a/packages/zeeker/tests/test_post_hook.py b/packages/zeeker/tests/test_post_hook.py new file mode 100644 index 0000000..df882d1 --- /dev/null +++ b/packages/zeeker/tests/test_post_hook.py @@ -0,0 +1,106 @@ +"""Tests for the post-build shell hook runner.""" + +import json +import os +import subprocess +from pathlib import Path +from unittest.mock import patch + +import pytest + +from zeeker.commands.post_hook import PostHookResult, run_post_hook +from zeeker.core.types import BuildReport, ResourceOutcome + + +@pytest.fixture +def sample_report() -> BuildReport: + return BuildReport( + resources=[ResourceOutcome(name="users", status="success", records=3, duration_s=0.1)], + total_duration_s=0.1, + ) + + +class _FakeCompleted: + def __init__(self, returncode: int, stdout: str = "", stderr: str = ""): + self.returncode = returncode + self.stdout = stdout + self.stderr = stderr + + +def test_run_post_hook_success_populates_result(tmp_path: Path, sample_report): + with patch("subprocess.run", return_value=_FakeCompleted(0, "ok\n", "")) as mock_run: + result = run_post_hook( + "echo ok", + project_path=tmp_path, + db_path=tmp_path / "mydb.db", + db_name="mydb", + report=sample_report, + ) + + assert isinstance(result, PostHookResult) + assert result.exit_code == 0 + assert result.stdout == "ok\n" + assert result.command == "echo ok" + + call = mock_run.call_args + assert call.args[0] == "echo ok" + assert call.kwargs["shell"] is True + assert call.kwargs["cwd"] == str(tmp_path) + assert call.kwargs["capture_output"] is True + assert call.kwargs["text"] is True + assert call.kwargs["check"] is False + + env = call.kwargs["env"] + assert env["ZEEKER_DB_PATH"] == str(tmp_path / "mydb.db") + assert env["ZEEKER_DB_NAME"] == "mydb" + assert env["ZEEKER_PROJECT_PATH"] == str(tmp_path) + assert env["ZEEKER_BUILD_STATUS"] == "success" + + report_path = env["ZEEKER_BUILD_REPORT"] + assert os.path.exists(report_path) + payload = json.loads(Path(report_path).read_text()) + assert payload["status"] == "success" + assert payload["resources"][0]["name"] == "users" + + +def test_run_post_hook_non_zero_propagates(tmp_path: Path, sample_report): + with patch("subprocess.run", return_value=_FakeCompleted(5, "", "boom")): + result = run_post_hook( + "false", + project_path=tmp_path, + db_path=tmp_path / "x.db", + db_name="x", + report=sample_report, + ) + assert result.exit_code == 5 + assert result.stderr == "boom" + + +def test_run_post_hook_marks_partial_failure_status(tmp_path: Path): + partial = BuildReport( + resources=[ResourceOutcome(name="users", status="failed", error_message="x")], + total_duration_s=0.0, + ) + with patch("subprocess.run", return_value=_FakeCompleted(0)) as mock_run: + run_post_hook( + "echo", + project_path=tmp_path, + db_path=tmp_path / "x.db", + db_name="x", + report=partial, + ) + assert mock_run.call_args.kwargs["env"]["ZEEKER_BUILD_STATUS"] == "partial_failure" + + +def test_run_post_hook_real_subprocess(tmp_path: Path, sample_report): + """Smoke test that the real subprocess picks up env vars.""" + # Use a portable one-liner: print an env var and exit 0. + result = run_post_hook( + 'printf "%s" "$ZEEKER_DB_NAME"', + project_path=tmp_path, + db_path=tmp_path / "testdb.db", + db_name="testdb", + report=sample_report, + ) + assert result.exit_code == 0 + assert result.stdout == "testdb" diff --git a/packages/zeeker/tests/test_processor_transform.py b/packages/zeeker/tests/test_processor_transform.py new file mode 100644 index 0000000..4811f0b --- /dev/null +++ b/packages/zeeker/tests/test_processor_transform.py @@ -0,0 +1,61 @@ +"""Tests for transform_data traceback preservation in ResourceProcessor.""" + +from pathlib import Path +from types import SimpleNamespace + +import sqlite_utils + +from zeeker.core.database.processor import ResourceProcessor +from zeeker.core.schema import SchemaManager + + +def _build_module(fetch_return, transform_func=None): + mod = SimpleNamespace() + mod.fetch_data = lambda existing_table: fetch_return + if transform_func is not None: + mod.transform_data = transform_func + return mod + + +def test_transform_raise_is_preserved_in_tracebacks(tmp_path: Path): + db = sqlite_utils.Database(str(tmp_path / "test.db")) + processor = ResourceProcessor(tmp_path, SchemaManager()) + + def bad_transform(data): + raise ValueError("boom") + + module = _build_module(fetch_return=[{"id": 1, "name": "a"}], transform_func=bad_transform) + result = processor.process_resource(db, "users", module) + + assert not result.is_valid + assert result.errors + assert "Data transformation failed" in result.errors[0] + assert result.tracebacks, "expected tracebacks to be populated" + assert "ValueError: boom" in result.tracebacks[0] + + +def test_transform_success_has_no_traceback(tmp_path: Path): + db = sqlite_utils.Database(str(tmp_path / "test.db")) + processor = ResourceProcessor(tmp_path, SchemaManager()) + processor.schema_manager.ensure_meta_tables(db) + + module = _build_module( + fetch_return=[{"id": 1, "name": "a"}], + transform_func=lambda data: [{**d, "extra": True} for d in data], + ) + result = processor.process_resource(db, "users", module) + + assert result.is_valid + assert not result.tracebacks + + +def test_no_transform_function_no_traceback(tmp_path: Path): + db = sqlite_utils.Database(str(tmp_path / "test.db")) + processor = ResourceProcessor(tmp_path, SchemaManager()) + processor.schema_manager.ensure_meta_tables(db) + + module = _build_module(fetch_return=[{"id": 1}]) + result = processor.process_resource(db, "widgets", module) + + assert result.is_valid + assert not result.tracebacks diff --git a/packages/zeeker/tests/test_s3_sync_divergence.py b/packages/zeeker/tests/test_s3_sync_divergence.py new file mode 100644 index 0000000..44ef4de --- /dev/null +++ b/packages/zeeker/tests/test_s3_sync_divergence.py @@ -0,0 +1,86 @@ +"""Tests for the S3 sync preserve-local guard.""" + +from pathlib import Path +from unittest.mock import MagicMock, patch + +from zeeker.core.database.s3_sync import S3Synchronizer + + +def test_no_local_db_is_clean(tmp_path: Path): + sync = S3Synchronizer() + result = sync.check_local_divergence(tmp_path / "missing.db") + assert result.is_valid + + +def test_existing_local_db_is_protected(tmp_path: Path): + db_path = tmp_path / "my.db" + db_path.write_bytes(b"anything") # any existing file triggers the guard + + sync = S3Synchronizer() + result = sync.check_local_divergence(db_path) + assert not result.is_valid + assert any("--force-sync" in e for e in result.errors) + assert any("Local database already exists" in e for e in result.errors) + + +def test_sync_from_s3_refuses_when_local_exists(tmp_path: Path): + db_path = tmp_path / "my.db" + db_path.write_bytes(b"local data") + + with patch("zeeker.core.deployer.ZeekerDeployer") as deployer_cls: + sync = S3Synchronizer() + result = sync.sync_from_s3("my.db", db_path) + # Divergence check short-circuits before the deployer is constructed. + deployer_cls.assert_not_called() + + assert not result.is_valid + assert any("--force-sync" in e for e in result.errors) + + +def test_sync_from_s3_force_proceeds_and_warns(tmp_path: Path): + db_path = tmp_path / "my.db" + db_path.write_bytes(b"local data") + + fake_client = MagicMock() + fake_client.head_object.return_value = {"ContentLength": 10} + + def _fake_download(Bucket, Key, Filename): + Path(Filename).write_bytes(b"downloaded-from-s3") + + fake_client.download_file.side_effect = _fake_download + + with patch("zeeker.core.deployer.ZeekerDeployer") as deployer_cls: + deployer_cls.return_value.s3_client = fake_client + deployer_cls.return_value.bucket_name = "test-bucket" + + sync = S3Synchronizer() + result = sync.sync_from_s3("my.db", db_path, force=True) + + assert result.is_valid + assert fake_client.download_file.called + assert any("overwriting local database" in w for w in result.warnings) + assert db_path.read_bytes() == b"downloaded-from-s3" + + +def test_sync_from_s3_no_local_db_downloads_cleanly(tmp_path: Path): + db_path = tmp_path / "my.db" # does not exist + + fake_client = MagicMock() + fake_client.head_object.return_value = {"ContentLength": 10} + + def _fake_download(Bucket, Key, Filename): + Path(Filename).write_bytes(b"downloaded") + + fake_client.download_file.side_effect = _fake_download + + with patch("zeeker.core.deployer.ZeekerDeployer") as deployer_cls: + deployer_cls.return_value.s3_client = fake_client + deployer_cls.return_value.bucket_name = "test-bucket" + + sync = S3Synchronizer() + result = sync.sync_from_s3("my.db", db_path) + + assert result.is_valid + assert fake_client.download_file.called + # No warnings since there was nothing to overwrite. + assert not result.warnings diff --git a/packages/zeeker/zeeker/cli.py b/packages/zeeker/zeeker/cli.py index 73235a8..81d0a82 100644 --- a/packages/zeeker/zeeker/cli.py +++ b/packages/zeeker/zeeker/cli.py @@ -197,6 +197,27 @@ def add( help="Write a JSON BuildReport snapshot to this path after each resource (atomic overwrite). " "Useful for trigger-and-wait callers that poll externally.", ) +@click.option( + "--parallel", + type=int, + default=1, + show_default=True, + metavar="N", + help="Run up to N resource fetches concurrently (I/O only; DB writes stay sequential).", +) +@click.option( + "--post-hook", + "post_hook", + type=str, + default=None, + metavar="CMD", + help="Shell command to run after a successful build. See `zeeker build --help` for env vars.", +) +@click.option( + "--force-sync", + is_flag=True, + help="With --sync-from-s3, overwrite an existing local DB instead of refusing.", +) def build( resources, force_schema_reset, @@ -206,6 +227,9 @@ def build( as_json, fail_on_empty, progress_file, + parallel, + post_hook, + force_sync, ): """Build database from resources using sqlite-utils. @@ -214,18 +238,24 @@ def build( Exit codes: 0 all resources succeeded - 1 one or more resources failed, or FTS setup failed - 2 fatal error (schema conflict, DB open failure, config error) + 1 one or more resources failed, FTS setup failed, or post-hook exited non-zero + 2 fatal error (schema conflict, DB open failure, config error, local-diverged sync) + + The --post-hook command receives these env vars: + ZEEKER_DB_PATH, ZEEKER_DB_NAME, ZEEKER_PROJECT_PATH, + ZEEKER_BUILD_STATUS (success|partial_failure), ZEEKER_BUILD_REPORT (JSON tempfile) Examples: zeeker build # Build all resources - zeeker build --setup-fts # Build + FTS indexes + zeeker build --parallel 4 # Fetch 4 resources concurrently zeeker build users posts # Build specific resources zeeker build --json | jq # Machine-readable output zeeker build --fail-on-empty # Empty fetch_data() -> exit 1 zeeker build --progress-file build.json # Watchable progress from outside + zeeker build --post-hook 'sqlite3 mydb.db < patch.sql' """ from .commands.helpers import write_progress_file + from .commands.post_hook import run_post_hook load_env() @@ -264,6 +294,8 @@ def _callback(name, outcome): resources=resource_list, setup_fts=setup_fts, progress_callback=progress_callback, + max_parallel=parallel, + force_sync=force_sync, ) except ZeekerSchemaConflictError as e: result = ValidationResult(is_valid=False) @@ -293,17 +325,37 @@ def _callback(name, outcome): if result.warnings and not as_json: echo_warnings(result) - # Final progress-file snapshot reflects the completed state. + report = result.report + + # Run the post-hook after the build settles but before the final render, + # so its outcome is part of the reported payload. Skip on fatal state — + # there's no coherent DB to patch. + if post_hook and not report.fatal_error: + project = manager.load_project() + db_path = manager.project_path / project.database + db_name = Path(project.database).stem + hook_outcome = run_post_hook( + post_hook, + project_path=manager.project_path, + db_path=db_path, + db_name=db_name, + report=report, + ) + report.post_hook = hook_outcome + + # Final progress-file snapshot reflects the completed state (including + # any post-hook outcome). if progress_file: - write_progress_file(progress_file, result.report) + write_progress_file(progress_file, report) render_build_report(result, verbose=verbose, as_json=as_json, console=console) - report = result.report if report.fatal_error: raise click.exceptions.Exit(2) if report.failed or report.fts_error: raise click.exceptions.Exit(1) + if report.post_hook is not None and report.post_hook.exit_code != 0: + raise click.exceptions.Exit(1) if fail_on_empty and report.skipped: if not as_json: console.print( diff --git a/packages/zeeker/zeeker/commands/helpers.py b/packages/zeeker/zeeker/commands/helpers.py index e189cd6..8a1b2f9 100644 --- a/packages/zeeker/zeeker/commands/helpers.py +++ b/packages/zeeker/zeeker/commands/helpers.py @@ -248,6 +248,7 @@ def _build_report_payload(report: BuildReport) -> dict: "resources": [asdict(r) for r in report.resources], "fts_error": report.fts_error, "fatal_error": report.fatal_error, + "post_hook": asdict(report.post_hook) if report.post_hook is not None else None, } for item in payload["resources"]: if item.get("traceback"): @@ -326,6 +327,24 @@ def _emit_rich(report: BuildReport, *, verbose: bool, console: Console) -> None: if report.fts_error: console.print(f"[red]FTS setup failed:[/red] {report.fts_error}") + if report.post_hook is not None: + hook = report.post_hook + colour = "green" if hook.exit_code == 0 else "red" + console.print( + f"[bold]post-hook:[/bold] [dim]{hook.command}[/dim] " + f"→ [{colour}]exit {hook.exit_code}[/{colour}]" + ) + if verbose and hook.exit_code != 0: + body = "" + if hook.stdout: + body += f"[bold]stdout[/bold]\n{hook.stdout}" + if hook.stderr: + if body: + body += "\n" + body += f"[bold]stderr[/bold]\n{hook.stderr}" + if body: + console.print(Panel(body, title="[red]post-hook output[/red]", border_style="red")) + if verbose: for r in report.failed: if r.traceback: @@ -358,6 +377,19 @@ def _emit_plain(report: BuildReport, *, verbose: bool, console: Console) -> None if report.fts_error: console.print(f"FTS_ERROR: {report.fts_error}", markup=False, highlight=False) + if report.post_hook is not None: + hook = report.post_hook + console.print( + f"POST_HOOK: {hook.command} exit={hook.exit_code}", + markup=False, + highlight=False, + ) + if verbose and hook.exit_code != 0: + if hook.stdout: + console.print(f"POST_HOOK_STDOUT:\n{hook.stdout}", markup=False, highlight=False) + if hook.stderr: + console.print(f"POST_HOOK_STDERR:\n{hook.stderr}", markup=False, highlight=False) + if verbose: for r in report.failed: if r.traceback: diff --git a/packages/zeeker/zeeker/commands/post_hook.py b/packages/zeeker/zeeker/commands/post_hook.py new file mode 100644 index 0000000..3bf5efb --- /dev/null +++ b/packages/zeeker/zeeker/commands/post_hook.py @@ -0,0 +1,84 @@ +"""Post-build shell hook. + +Runs a user-provided shell command after a successful build with zeeker +context injected as environment variables. The CLI wires this up via the +``--post-hook`` flag on ``zeeker build``. +""" + +import json +import os +import subprocess +import tempfile +from dataclasses import dataclass +from pathlib import Path + +from ..core.types import BuildReport + + +@dataclass +class PostHookResult: + """Outcome of running a post-build hook command.""" + + command: str + exit_code: int + stdout: str + stderr: str + + +def run_post_hook( + command: str, + *, + project_path: Path, + db_path: Path, + db_name: str, + report: BuildReport, +) -> PostHookResult: + """Run ``command`` as a shell command with zeeker env vars set. + + Writes the current BuildReport to a tempfile and exposes its path via + ``ZEEKER_BUILD_REPORT`` so the hook can read it (e.g. to skip patching + on partial failures). + + Env vars exposed to the hook: + ZEEKER_DB_PATH absolute path to the .db file + ZEEKER_DB_NAME database stem (e.g. my_project) + ZEEKER_PROJECT_PATH project root + ZEEKER_BUILD_STATUS "success" or "partial_failure" + ZEEKER_BUILD_REPORT path to a tempfile with the JSON BuildReport + """ + # Local import to avoid circular dependency at module load time + # (helpers imports types; post_hook imports types; cli imports both). + from .helpers import _build_report_payload + + report_fd = tempfile.NamedTemporaryFile( + mode="w", suffix=".json", delete=False, encoding="utf-8" + ) + try: + json.dump(_build_report_payload(report), report_fd) + finally: + report_fd.close() + + env = { + **os.environ, + "ZEEKER_DB_PATH": str(db_path), + "ZEEKER_DB_NAME": db_name, + "ZEEKER_PROJECT_PATH": str(project_path), + "ZEEKER_BUILD_STATUS": "success" if not report.failed else "partial_failure", + "ZEEKER_BUILD_REPORT": report_fd.name, + } + + proc = subprocess.run( + command, + shell=True, + cwd=str(project_path), + env=env, + capture_output=True, + text=True, + check=False, + ) + return PostHookResult( + command=command, + exit_code=proc.returncode, + stdout=proc.stdout, + stderr=proc.stderr, + ) diff --git a/packages/zeeker/zeeker/core/database/async_executor.py b/packages/zeeker/zeeker/core/database/async_executor.py index 868b9a0..713920b 100644 --- a/packages/zeeker/zeeker/core/database/async_executor.py +++ b/packages/zeeker/zeeker/core/database/async_executor.py @@ -15,9 +15,30 @@ class AsyncExecutor: """Handles execution of both sync and async resource functions.""" - def __init__(self): - """Initialize AsyncExecutor with fetch_data cache.""" + def __init__(self, cache_enabled: bool = True): + """Initialize AsyncExecutor with optional fetch_data cache. + + Args: + cache_enabled: When False, skip the shared fetch cache. Disable + this in parallel-fetch contexts where concurrent access to + the cache dict would race. + """ self._fetch_cache: Dict[str, List[Dict[str, Any]]] = {} + self._cache_enabled = cache_enabled + # One-shot overrides keyed only on resource_name, populated by the + # builder's parallel pre-warm step. Stable across table-state changes + # during the build so a single fetch serves schema check + insert + + # fragments-context without re-running the user's fetch_data(). + self._prewarmed: Dict[str, List[Dict[str, Any]]] = {} + + def set_prewarmed(self, resource_name: str, data: List[Dict[str, Any]]) -> None: + """Register a fetch result to be returned by subsequent ``call_fetch_data`` + calls for ``resource_name`` within this build.""" + self._prewarmed[resource_name] = data + + def clear_prewarmed(self) -> None: + """Drop all pre-warmed overrides. Call between builds.""" + self._prewarmed.clear() def call_fetch_data( self, fetch_data_func: Callable, existing_table: Optional[Table], resource_name: str = None @@ -32,9 +53,15 @@ def call_fetch_data( Returns: List[Dict[str, Any]]: The data returned by fetch_data """ - # Check cache if resource_name is provided + # Serve from the parallel pre-warm override if present. + if resource_name and resource_name in self._prewarmed: + return self._prewarmed[resource_name] + + # Check cache if resource_name is provided and caching is enabled. cache_key = ( - self._generate_cache_key(resource_name, existing_table) if resource_name else None + self._generate_cache_key(resource_name, existing_table) + if resource_name and self._cache_enabled + else None ) if cache_key and cache_key in self._fetch_cache: return self._fetch_cache[cache_key] @@ -45,7 +72,39 @@ def call_fetch_data( else: result = fetch_data_func(existing_table) - # Cache the result if resource_name is provided + # Cache the result if caching is active. + if cache_key: + self._fetch_cache[cache_key] = result + + return result + + async def acall_fetch_data( + self, + fetch_data_func: Callable, + existing_table: Optional[Table], + resource_name: str = None, + ) -> List[Dict[str, Any]]: + """Async variant of :meth:`call_fetch_data` for use under + ``asyncio.gather``. Returns the data produced by ``fetch_data_func``. + + Async fetchers are awaited directly. Sync fetchers are run in the + default ThreadPoolExecutor so they don't block the event loop and + can run concurrently with other fetches. + """ + cache_key = ( + self._generate_cache_key(resource_name, existing_table) + if resource_name and self._cache_enabled + else None + ) + if cache_key and cache_key in self._fetch_cache: + return self._fetch_cache[cache_key] + + if inspect.iscoroutinefunction(fetch_data_func): + result = await fetch_data_func(existing_table) + else: + loop = asyncio.get_running_loop() + result = await loop.run_in_executor(None, fetch_data_func, existing_table) + if cache_key: self._fetch_cache[cache_key] = result diff --git a/packages/zeeker/zeeker/core/database/builder.py b/packages/zeeker/zeeker/core/database/builder.py index b6a6fee..f6dfe45 100644 --- a/packages/zeeker/zeeker/core/database/builder.py +++ b/packages/zeeker/zeeker/core/database/builder.py @@ -5,6 +5,7 @@ resource processing, schema management, and S3 synchronization. """ +import asyncio import time import traceback from pathlib import Path @@ -20,6 +21,7 @@ ZeekerProject, ZeekerSchemaConflictError, ) +from .async_executor import AsyncExecutor from .fts_processor import FTSProcessor from .processor import ResourceProcessor from .s3_sync import S3Synchronizer @@ -50,6 +52,8 @@ def build_database( resources: list[str] = None, setup_fts: bool = False, progress_callback: Callable[[str, ResourceOutcome | None], None] | None = None, + max_parallel: int = 1, + force_sync: bool = False, ) -> ValidationResult: """Build the SQLite database from resources using sqlite-utils. @@ -81,13 +85,17 @@ def build_database( # S3 Database Synchronization - Download existing DB if requested if sync_from_s3: - sync_result = self.s3_sync.sync_from_s3(self.project.database, db_path) + sync_result = self.s3_sync.sync_from_s3( + self.project.database, db_path, force=force_sync + ) if not sync_result.is_valid: result.errors.extend(sync_result.errors) # Don't fail build if S3 sync fails - just warn result.warnings.append("S3 sync failed but continuing with local build") else: result.info.extend(sync_result.info) + if sync_result.warnings: + result.warnings.extend(sync_result.warnings) # Open existing database or create new one using sqlite-utils # Don't delete existing database - let resources check existing data for duplicates @@ -101,6 +109,13 @@ def build_database( # Determine which resources to process resources_to_build = resources if resources else list(self.project.resources.keys()) + # Pre-warm fetches concurrently when requested. The per-resource + # sequential loop below will then hit pre-warmed data instead of + # re-running each fetch_data() serially. + if max_parallel > 1 and len(resources_to_build) > 1: + prewarm_warnings = self._prewarm_fetches(db, resources_to_build, max_parallel) + result.warnings.extend(prewarm_warnings) + # Process each specified resource for resource_name in resources_to_build: if progress_callback: @@ -171,6 +186,10 @@ def build_database( result.tracebacks.append(traceback.format_exc()) report.fatal_error = msg + finally: + # Prevent stale pre-warmed data from leaking into a reused builder. + self.processor.async_executor.clear_prewarmed() + report.total_duration_s = time.perf_counter() - build_started return result @@ -250,8 +269,13 @@ def _process_resource_with_schema_check( except Exception as e: if isinstance(e, ZeekerSchemaConflictError): raise - # If we can't get sample data, proceed with build - pass + # Couldn't fetch a sample — surface this as a warning so + # the user knows the schema-conflict check ran blind, + # instead of a completely silent pass. + result.warnings.append( + f"Schema sample fetch failed for '{resource_name}' " + f"(continuing with build): {e}" + ) # Process the resource (pass pre-loaded module to avoid redundant load) resource_result = self.processor.process_resource(db, resource_name, module) @@ -318,8 +342,13 @@ def _process_fragments_for_resource( main_data_context = self.processor.async_executor.call_fetch_data( fetch_data, existing_table, resource_name=resource_name ) - except Exception: - # If we can't get context, fragments will work without it + except Exception as e: + # If we can't get context, fragments will work without it — but + # surface the reason rather than swallowing the exception silently. + result.warnings.append( + f"Fragments context fetch failed for '{resource_name}' " + f"(fragments will run without main-data context): {e}" + ) main_data_context = None # Process fragments @@ -335,3 +364,63 @@ def _process_fragments_for_resource( result.records = fragments_result.records return result + + def _prewarm_fetches( + self, + db: sqlite_utils.Database, + resources_to_build: list[str], + max_parallel: int, + ) -> list[str]: + """Run ``fetch_data()`` for all resources concurrently and populate + the processor's executor with pre-warmed results. + + Returns a list of warning strings (one per resource whose pre-fetch + failed; the sequential loop will re-attempt those and record the + definitive error). + """ + # Synchronous prep: module loading and existing-table lookup. + # Doing this here (not inside the coroutines) avoids racy imports + # and lets us skip resources that can't even be loaded. + specs: list[tuple[str, object, object]] = [] + warnings: list[str] = [] + for name in resources_to_build: + mod_result = self.processor._load_resource_module(name) + if not mod_result.is_valid: + # Let the sequential loop produce the canonical error. + continue + module = mod_result.data + if not hasattr(module, "fetch_data"): + continue + fetch_data = getattr(module, "fetch_data") + existing = db[name] if db[name].exists() else None + specs.append((name, fetch_data, existing)) + + if not specs: + return warnings + + fresh = AsyncExecutor(cache_enabled=False) + sem = asyncio.Semaphore(max_parallel) + + async def run_one(name: str, fetch_data, existing): + async with sem: + try: + data = await fresh.acall_fetch_data(fetch_data, existing, name) + return name, data, None + except Exception as e: + return name, None, e + + async def run_all(): + return await asyncio.gather(*(run_one(*s) for s in specs)) + + results = asyncio.run(run_all()) + for name, data, err in results: + if err is not None: + warnings.append( + f"Parallel pre-fetch failed for '{name}' " + f"(sequential loop will retry): {err}" + ) + continue + if data is not None: + self.processor.async_executor.set_prewarmed(name, data) + + return warnings diff --git a/packages/zeeker/zeeker/core/database/processor.py b/packages/zeeker/zeeker/core/database/processor.py index cc15593..c24659a 100644 --- a/packages/zeeker/zeeker/core/database/processor.py +++ b/packages/zeeker/zeeker/core/database/processor.py @@ -82,12 +82,14 @@ def process_resource( return result # Apply transformation if available - transformed_data = self._apply_transformation( + transformed_data, transform_tb = self._apply_transformation( module, raw_data, resource_name, "transform_data" ) if transformed_data is None: result.is_valid = False result.errors.append(f"Data transformation failed for '{resource_name}'") + if transform_tb: + result.tracebacks.append(transform_tb) return result # Validate transformed data structure @@ -174,12 +176,14 @@ def process_fragments_data( return result # Apply transformation if available - transformed_fragments = self._apply_transformation( + transformed_fragments, transform_tb = self._apply_transformation( module, raw_fragments, resource_name, "transform_fragments_data" ) if transformed_fragments is None: result.is_valid = False result.errors.append(f"Fragment transformation failed for '{resource_name}'") + if transform_tb: + result.tracebacks.append(transform_tb) return result # Validate fragments data structure @@ -276,26 +280,22 @@ def _call_fragments_function( def _apply_transformation( self, module: Any, data: List[Dict[str, Any]], resource_name: str, transform_func_name: str - ) -> List[Dict[str, Any]] | None: + ) -> tuple[List[Dict[str, Any]] | None, str | None]: """Apply transformation function if available. - Args: - module: The resource module - data: Data to transform - resource_name: Name of resource for error messages - transform_func_name: Name of transformation function - Returns: - Transformed data or None if transformation failed + A tuple ``(transformed_data, traceback_str)``: + - ``(data, None)`` on success (or when no transform function exists) + - ``(None, traceback_str)`` when the transform raised """ if hasattr(module, transform_func_name): try: transform_func = getattr(module, transform_func_name) - return transform_func(data) + return transform_func(data), None except Exception: - return None + return None, traceback.format_exc() else: - return data + return data, None def _validate_data_structure( self, data: List[Dict[str, Any]], context: str diff --git a/packages/zeeker/zeeker/core/database/s3_sync.py b/packages/zeeker/zeeker/core/database/s3_sync.py index 564a723..a2513cd 100644 --- a/packages/zeeker/zeeker/core/database/s3_sync.py +++ b/packages/zeeker/zeeker/core/database/s3_sync.py @@ -2,7 +2,10 @@ S3 synchronization for Zeeker databases. This module handles downloading existing databases from S3 before building -to enable multi-machine workflows and incremental updates. +to enable multi-machine workflows and incremental updates. It refuses to +overwrite an existing local DB by default (the user must opt in with +``force=True``) because any local patches/fixes would otherwise be silently +discarded. """ from pathlib import Path @@ -13,22 +16,65 @@ class S3Synchronizer: """Handles S3 database synchronization operations.""" - def sync_from_s3(self, database_name: str, local_db_path: Path) -> ValidationResult: + def check_local_divergence(self, local_db_path) -> ValidationResult: + """Return ``is_valid=False`` when a local DB already exists. + + Deliberately conservative: we don't try to auto-detect whether the + local file is "safe to overwrite" (every build writes to meta tables + anyway, so a byte-level hash comparison is unreliable). Instead we + treat any existing local DB as potentially containing work the user + doesn't want to lose and require an explicit ``--force-sync``. + """ + result = ValidationResult(is_valid=True) + path = Path(local_db_path) + if not path.exists(): + return result + + result.is_valid = False + result.errors.append( + f"Local database already exists at {path}. Syncing from " + "S3 would overwrite any local changes." + ) + result.errors.append( + "Use --force-sync to overwrite, or deploy first to push your " "local changes to S3." + ) + return result + + def sync_from_s3( + self, + database_name: str, + local_db_path: Path, + *, + force: bool = False, + ) -> ValidationResult: """Download existing database from S3 if available. Args: database_name: Name of the database file local_db_path: Local path where database should be saved + force: If True, overwrite an existing local DB without complaint. Returns: ValidationResult with sync results """ result = ValidationResult(is_valid=True) + path = Path(local_db_path) try: # Import here to avoid making boto3 a hard dependency from ..deployer import ZeekerDeployer + # Guard against wiping out local changes unless explicitly opted in. + if not force: + divergence = self.check_local_divergence(path) + if not divergence.is_valid: + return divergence + elif path.exists(): + result.warnings.append( + "--force-sync: overwriting local database (any unsynced " + "changes will be lost)." + ) + deployer = ZeekerDeployer() s3_key = f"latest/{database_name}" diff --git a/packages/zeeker/zeeker/core/project.py b/packages/zeeker/zeeker/core/project.py index 4091530..920cc3f 100644 --- a/packages/zeeker/zeeker/core/project.py +++ b/packages/zeeker/zeeker/core/project.py @@ -77,6 +77,8 @@ def build_database( setup_fts: bool = False, *, progress_callback: Callable[[str, ResourceOutcome | None], None] | None = None, + max_parallel: int = 1, + force_sync: bool = False, ) -> ValidationResult: """Build the SQLite database from resources with optional S3 sync. @@ -87,6 +89,9 @@ def build_database( setup_fts: If True, set up full-text search indexes on configured fields progress_callback: Optional callable (resource_name, outcome|None) used by the CLI layer to render progress. See DatabaseBuilder.build_database. + max_parallel: Max concurrent fetch_data() calls (default 1 = sequential). + force_sync: With sync_from_s3, overwrite an existing local DB instead of + refusing. Returns: ValidationResult with build results; the BuildReport is attached as @@ -116,4 +121,6 @@ def build_database( resources, setup_fts, progress_callback=progress_callback, + max_parallel=max_parallel, + force_sync=force_sync, ) diff --git a/packages/zeeker/zeeker/core/types.py b/packages/zeeker/zeeker/core/types.py index 7617b7a..b109805 100644 --- a/packages/zeeker/zeeker/core/types.py +++ b/packages/zeeker/zeeker/core/types.py @@ -78,6 +78,11 @@ class BuildReport: total_duration_s: float = 0.0 fts_error: str | None = None fatal_error: str | None = None + # Set to a PostHookResult dataclass when --post-hook ran. Annotated as + # ``object`` to avoid a circular import between this module and + # commands/post_hook.py; dataclasses.asdict() still recurses correctly + # when the value is itself a dataclass instance. + post_hook: object | None = None @property def failed(self) -> list[ResourceOutcome]: From b04f51eb7bfec37e9830a66b7520e07bd08a546a Mon Sep 17 00:00:00 2001 From: houfu Date: Fri, 17 Apr 2026 20:21:04 +0800 Subject: [PATCH 3/3] Bump zeeker to 0.7.0 Minor release covering: - Structured error reporting for build/deploy (BuildReport, --json, --verbose, --progress-file, --fail-on-empty) - Cross-resource parallel fetch phase (--parallel N) - Post-build shell hook (--post-hook CMD) with ZEEKER_* env vars - Safer --sync-from-s3 via explicit --force-sync opt-in - Transform traceback preservation and quieter silent-failure surfaces Co-Authored-By: Claude Opus 4.7 (1M context) --- packages/zeeker/pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/zeeker/pyproject.toml b/packages/zeeker/pyproject.toml index aa97e19..175ad13 100644 --- a/packages/zeeker/pyproject.toml +++ b/packages/zeeker/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "zeeker" -version = "0.6.0" +version = "0.7.0" description = "Database management tool for Datasette-based systems" readme = "README.md" requires-python = ">=3.12"