diff --git a/packages/zeeker/pyproject.toml b/packages/zeeker/pyproject.toml index fb18d6e..175ad13 100644 --- a/packages/zeeker/pyproject.toml +++ b/packages/zeeker/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "zeeker" -version = "0.6.0" +version = "0.7.0" description = "Database management tool for Datasette-based systems" readme = "README.md" requires-python = ">=3.12" @@ -21,6 +21,7 @@ dependencies = [ "jinja2>=3.1.6", "python-dotenv>=1.0.0", "pyyaml>=6.0.2", + "rich>=13.0", "sqlite-utils>=3.38", ] diff --git a/packages/zeeker/tests/test_builder_parallel.py b/packages/zeeker/tests/test_builder_parallel.py new file mode 100644 index 0000000..60f3828 --- /dev/null +++ b/packages/zeeker/tests/test_builder_parallel.py @@ -0,0 +1,132 @@ +"""Tests for cross-resource parallel fetch phase in DatabaseBuilder.""" + +import asyncio +import time +from pathlib import Path + +import pytest + +from zeeker.core.database.builder import DatabaseBuilder +from zeeker.core.types import ZeekerProject + + +def _write_resource(resources_dir: Path, name: str, body: str) -> None: + resources_dir.mkdir(parents=True, exist_ok=True) + (resources_dir / f"{name}.py").write_text(body) + + +def _async_sleep_resource(name: str, sleep_s: float = 0.3) -> str: + """Module body that sleeps asynchronously then returns a tiny record.""" + return f""" +import asyncio + +async def fetch_data(existing_table): + await asyncio.sleep({sleep_s}) + return [{{"id": 1, "name": "{name}"}}] +""" + + +def _sync_sleep_resource(name: str, sleep_s: float = 0.3) -> str: + return f""" +import time + +def fetch_data(existing_table): + time.sleep({sleep_s}) + return [{{"id": 1, "name": "{name}"}}] +""" + + +def _build_project(tmp_path: Path, resource_names: list[str]) -> DatabaseBuilder: + project = ZeekerProject( + name="parallel_test", + database="parallel_test.db", + resources={name: {} for name in resource_names}, + root_path=tmp_path, + ) + return DatabaseBuilder(tmp_path, project) + + +def test_parallel_fetch_is_faster_than_sequential(tmp_path: Path): + names = ["a", "b", "c", "d"] + resources_dir = tmp_path / "resources" + for n in names: + _write_resource(resources_dir, n, _async_sleep_resource(n, sleep_s=0.3)) + + builder = _build_project(tmp_path, names) + + started = time.perf_counter() + result = builder.build_database(max_parallel=4) + elapsed = time.perf_counter() - started + + assert result.is_valid, f"Build failed: {result.errors}" + assert len(result.report.succeeded) == 4 + # 4 × 0.3s serial = 1.2s+ of pure sleep; parallel should finish well + # under that. Give a generous bound to avoid CI flake. + assert elapsed < 1.0, f"Parallel build took {elapsed:.2f}s — not parallel?" + + +def test_parallel_fetch_handles_mixed_sync_and_async(tmp_path: Path): + resources_dir = tmp_path / "resources" + _write_resource(resources_dir, "a_async", _async_sleep_resource("a_async", 0.3)) + _write_resource(resources_dir, "b_sync", _sync_sleep_resource("b_sync", 0.3)) + + builder = _build_project(tmp_path, ["a_async", "b_sync"]) + + started = time.perf_counter() + result = builder.build_database(max_parallel=4) + elapsed = time.perf_counter() - started + + assert result.is_valid, result.errors + assert len(result.report.succeeded) == 2 + # Two 0.3s fetches in parallel should be ~0.3s, definitely under 0.6s. + assert elapsed < 0.6 + + +def test_parallel_one_failure_doesnt_block_others(tmp_path: Path): + resources_dir = tmp_path / "resources" + _write_resource( + resources_dir, + "ok", + """ +def fetch_data(existing_table): + return [{"id": 1}] +""", + ) + _write_resource( + resources_dir, + "boom", + """ +def fetch_data(existing_table): + raise RuntimeError("boom") +""", + ) + + builder = _build_project(tmp_path, ["ok", "boom"]) + result = builder.build_database(max_parallel=2) + + # Build itself reports not-valid (one failed), but the succeeded resource + # must still appear in the report. + names = {r.name: r.status for r in result.report.resources} + assert names == {"ok": "success", "boom": "failed"} + failed = next(r for r in result.report.resources if r.name == "boom") + assert failed.error_message and "boom" in failed.error_message + assert failed.traceback and "RuntimeError" in failed.traceback + + +def test_sequential_default_still_works(tmp_path: Path): + """max_parallel=1 (the default) must preserve existing behaviour.""" + resources_dir = tmp_path / "resources" + _write_resource( + resources_dir, + "a", + """ +def fetch_data(existing_table): + return [{"id": 1, "name": "alpha"}] +""", + ) + + builder = _build_project(tmp_path, ["a"]) + result = builder.build_database() # default max_parallel=1 + + assert result.is_valid + assert len(result.report.succeeded) == 1 diff --git a/packages/zeeker/tests/test_cli.py b/packages/zeeker/tests/test_cli.py index f08833e..2380ccf 100644 --- a/packages/zeeker/tests/test_cli.py +++ b/packages/zeeker/tests/test_cli.py @@ -7,7 +7,12 @@ from click.testing import CliRunner from zeeker.cli import cli -from zeeker.core.types import ValidationResult, ZeekerSchemaConflictError +from zeeker.core.types import ( + BuildReport, + ResourceOutcome, + ValidationResult, + ZeekerSchemaConflictError, +) class TestCLIInit: @@ -181,77 +186,107 @@ def mock_manager(self): mock_class.return_value = mock_instance yield mock_instance + @staticmethod + def _assert_build_kwargs(mock_manager, **expected): + """Assert build_database was called with the expected kwargs (ignoring the + progress_callback, which is a closure the CLI constructs internally).""" + assert mock_manager.build_database.call_count == 1 + kwargs = mock_manager.build_database.call_args.kwargs + for key, value in expected.items(): + assert kwargs[key] == value, f"{key}: expected {value!r}, got {kwargs[key]!r}" + def test_build_successful(self, runner, mock_manager): """Test successful database build.""" mock_result = ValidationResult(is_valid=True) - mock_result.info.extend( - [ - "Built table: users (150 records)", - "Built table: posts (200 records)", - "Generated metadata.json", - ] + mock_result.report = BuildReport( + resources=[ResourceOutcome(name="users", status="success", records=150)], + total_duration_s=0.1, ) mock_manager.build_database.return_value = mock_result result = runner.invoke(cli, ["build"]) assert result.exit_code == 0 - for info in mock_result.info: - assert f"✅ {info}" in result.output - mock_manager.build_database.assert_called_once_with( - force_schema_reset=False, sync_from_s3=False, resources=None, setup_fts=False + # Plain-mode renderer emits a SUMMARY line + assert "SUMMARY: 1 succeeded" in result.output + self._assert_build_kwargs( + mock_manager, + force_schema_reset=False, + sync_from_s3=False, + resources=None, + setup_fts=False, ) def test_build_with_force_schema_reset(self, runner, mock_manager): """Test build with force schema reset flag.""" mock_result = ValidationResult(is_valid=True) + mock_result.report = BuildReport() mock_manager.build_database.return_value = mock_result result = runner.invoke(cli, ["build", "--force-schema-reset"]) assert result.exit_code == 0 - mock_manager.build_database.assert_called_once_with( - force_schema_reset=True, sync_from_s3=False, resources=None, setup_fts=False + self._assert_build_kwargs( + mock_manager, + force_schema_reset=True, + sync_from_s3=False, + resources=None, + setup_fts=False, ) def test_build_with_s3_sync(self, runner, mock_manager): """Test build with S3 sync flag.""" mock_result = ValidationResult(is_valid=True) + mock_result.report = BuildReport() mock_manager.build_database.return_value = mock_result result = runner.invoke(cli, ["build", "--sync-from-s3"]) assert result.exit_code == 0 - mock_manager.build_database.assert_called_once_with( - force_schema_reset=False, sync_from_s3=True, resources=None, setup_fts=False + self._assert_build_kwargs( + mock_manager, + force_schema_reset=False, + sync_from_s3=True, + resources=None, + setup_fts=False, ) def test_build_with_both_flags(self, runner, mock_manager): """Test build with both flags.""" mock_result = ValidationResult(is_valid=True) + mock_result.report = BuildReport() mock_manager.build_database.return_value = mock_result result = runner.invoke(cli, ["build", "--force-schema-reset", "--sync-from-s3"]) assert result.exit_code == 0 - mock_manager.build_database.assert_called_once_with( - force_schema_reset=True, sync_from_s3=True, resources=None, setup_fts=False + self._assert_build_kwargs( + mock_manager, + force_schema_reset=True, + sync_from_s3=True, + resources=None, + setup_fts=False, ) def test_build_with_setup_fts(self, runner, mock_manager): """Test build with setup-fts flag.""" mock_result = ValidationResult(is_valid=True) + mock_result.report = BuildReport() mock_manager.build_database.return_value = mock_result result = runner.invoke(cli, ["build", "--setup-fts"]) assert result.exit_code == 0 - mock_manager.build_database.assert_called_once_with( - force_schema_reset=False, sync_from_s3=False, resources=None, setup_fts=True + self._assert_build_kwargs( + mock_manager, + force_schema_reset=False, + sync_from_s3=False, + resources=None, + setup_fts=True, ) def test_build_schema_conflict_error(self, runner, mock_manager): - """Test build with schema conflict error.""" + """Schema conflict is a fatal error (exit code 2) routed through the renderer.""" mock_manager.build_database.side_effect = ZeekerSchemaConflictError( "users", {"id": "INTEGER", "name": "TEXT"}, @@ -260,44 +295,48 @@ def test_build_schema_conflict_error(self, runner, mock_manager): result = runner.invoke(cli, ["build"]) - assert result.exit_code == 0 # CLI doesn't exit with error code, just returns early - assert "❌ Schema conflict detected" in result.output + assert result.exit_code == 2 + # Plain-mode renderer emits FATAL: prefix with the conflict message + assert "FATAL:" in result.output assert "users" in result.output def test_build_general_error(self, runner, mock_manager): - """Test build with general error.""" + """A ValidationResult with errors but no report is treated as fatal.""" mock_result = ValidationResult(is_valid=False) mock_result.errors.append("Database file is locked") mock_manager.build_database.return_value = mock_result result = runner.invoke(cli, ["build"]) - assert result.exit_code == 1 # Build errors should exit with code 1 - assert "❌ Database build failed" in result.output + assert result.exit_code == 2 + assert "FATAL:" in result.output assert "Database file is locked" in result.output - def test_build_displays_completion_info(self, runner, mock_manager): - """Test that build completion info is displayed.""" + def test_build_displays_summary(self, runner, mock_manager): + """Build completion shows the structured SUMMARY line.""" mock_result = ValidationResult(is_valid=True) + mock_result.report = BuildReport(total_duration_s=0.0) mock_manager.build_database.return_value = mock_result result = runner.invoke(cli, ["build"]) assert result.exit_code == 0 - assert "Built with sqlite-utils" in result.output - assert "Ready for deployment with 'zeeker deploy'" in result.output + assert "SUMMARY:" in result.output def test_build_with_specific_resources(self, runner, mock_manager): """Test build with specific resources.""" mock_result = ValidationResult(is_valid=True) - mock_result.info.extend(["Built table: users (150 records)"]) + mock_result.report = BuildReport( + resources=[ResourceOutcome(name="users", status="success", records=150)] + ) mock_manager.build_database.return_value = mock_result result = runner.invoke(cli, ["build", "users", "posts"]) assert result.exit_code == 0 assert "Building specific resources: users, posts" in result.output - mock_manager.build_database.assert_called_once_with( + self._assert_build_kwargs( + mock_manager, force_schema_reset=False, sync_from_s3=False, resources=["users", "posts"], @@ -307,19 +346,25 @@ def test_build_with_specific_resources(self, runner, mock_manager): def test_build_with_single_resource(self, runner, mock_manager): """Test build with single resource.""" mock_result = ValidationResult(is_valid=True) - mock_result.info.extend(["Built table: users (150 records)"]) + mock_result.report = BuildReport( + resources=[ResourceOutcome(name="users", status="success", records=150)] + ) mock_manager.build_database.return_value = mock_result result = runner.invoke(cli, ["build", "users"]) assert result.exit_code == 0 assert "Building specific resources: users" in result.output - mock_manager.build_database.assert_called_once_with( - force_schema_reset=False, sync_from_s3=False, resources=["users"], setup_fts=False + self._assert_build_kwargs( + mock_manager, + force_schema_reset=False, + sync_from_s3=False, + resources=["users"], + setup_fts=False, ) def test_build_with_invalid_resources(self, runner, mock_manager): - """Test build with invalid resource names.""" + """Unknown resources are pre-flight errors and exit as fatal (code 2).""" mock_result = ValidationResult(is_valid=False) mock_result.errors.extend( ["Unknown resources: invalid_resource", "Available resources: users, posts"] @@ -328,27 +373,140 @@ def test_build_with_invalid_resources(self, runner, mock_manager): result = runner.invoke(cli, ["build", "users", "invalid_resource"]) - assert result.exit_code == 1 + assert result.exit_code == 2 assert "Unknown resources: invalid_resource" in result.output - assert "Available resources: users, posts" in result.output def test_build_resources_with_options(self, runner, mock_manager): """Test build with resources and options combined.""" mock_result = ValidationResult(is_valid=True) - mock_result.info.extend(["Built table: users (150 records)"]) + mock_result.report = BuildReport( + resources=[ResourceOutcome(name="users", status="success", records=150)] + ) mock_manager.build_database.return_value = mock_result result = runner.invoke(cli, ["build", "--force-schema-reset", "users", "posts"]) assert result.exit_code == 0 assert "Building specific resources: users, posts" in result.output - mock_manager.build_database.assert_called_once_with( + self._assert_build_kwargs( + mock_manager, force_schema_reset=True, sync_from_s3=False, resources=["users", "posts"], setup_fts=False, ) + def test_build_json_output(self, runner, mock_manager): + """--json emits a single structured payload with fixed schema.""" + import json as _json + + mock_result = ValidationResult(is_valid=True) + mock_result.report = BuildReport( + resources=[ResourceOutcome(name="users", status="success", records=42, duration_s=0.5)], + total_duration_s=0.7, + ) + mock_manager.build_database.return_value = mock_result + + result = runner.invoke(cli, ["build", "--json"]) + + assert result.exit_code == 0 + payload = _json.loads(result.output) + assert payload["status"] == "success" + assert payload["resources"][0]["name"] == "users" + assert payload["resources"][0]["records"] == 42 + + def test_build_fail_on_empty_exits_one(self, runner, mock_manager): + """--fail-on-empty turns a skipped-only build into exit 1.""" + mock_result = ValidationResult(is_valid=True) + mock_result.report = BuildReport( + resources=[ResourceOutcome(name="users", status="skipped", records=0)], + total_duration_s=0.1, + ) + mock_manager.build_database.return_value = mock_result + + # Without the flag, skipped is OK + result = runner.invoke(cli, ["build"]) + assert result.exit_code == 0 + + # With the flag, skipped is a failure + mock_manager.build_database.reset_mock() + mock_manager.build_database.return_value = mock_result + result = runner.invoke(cli, ["build", "--fail-on-empty"]) + assert result.exit_code == 1 + + def test_build_progress_file_written(self, runner, mock_manager, tmp_path): + """--progress-file writes a JSON BuildReport snapshot to the given path.""" + import json as _json + + progress_path = tmp_path / "progress.json" + + mock_result = ValidationResult(is_valid=True) + mock_result.report = BuildReport( + resources=[ResourceOutcome(name="users", status="success", records=5)], + total_duration_s=0.1, + ) + mock_manager.build_database.return_value = mock_result + + result = runner.invoke(cli, ["build", "--progress-file", str(progress_path)]) + + assert result.exit_code == 0 + assert progress_path.exists() + payload = _json.loads(progress_path.read_text()) + assert payload["status"] == "success" + assert payload["resources"][0]["name"] == "users" + + def test_build_parallel_flag_forwarded(self, runner, mock_manager): + """--parallel N reaches the manager as max_parallel.""" + mock_result = ValidationResult(is_valid=True) + mock_result.report = BuildReport(total_duration_s=0.1) + mock_manager.build_database.return_value = mock_result + + result = runner.invoke(cli, ["build", "--parallel", "4"]) + assert result.exit_code == 0 + self._assert_build_kwargs(mock_manager, max_parallel=4) + + def test_build_post_hook_success_exits_zero(self, runner, mock_manager, tmp_path): + """--post-hook runs after a successful build; zero exit keeps code 0.""" + mock_result = ValidationResult(is_valid=True) + mock_result.report = BuildReport( + resources=[ResourceOutcome(name="users", status="success", records=1)], + total_duration_s=0.1, + ) + mock_manager.build_database.return_value = mock_result + mock_project = MagicMock() + mock_project.database = "my.db" + mock_manager.load_project.return_value = mock_project + mock_manager.project_path = tmp_path + + result = runner.invoke(cli, ["build", "--post-hook", "exit 0"]) + assert result.exit_code == 0 + + def test_build_post_hook_nonzero_exits_one(self, runner, mock_manager, tmp_path): + """--post-hook non-zero exit propagates to exit code 1.""" + mock_result = ValidationResult(is_valid=True) + mock_result.report = BuildReport( + resources=[ResourceOutcome(name="users", status="success", records=1)], + total_duration_s=0.1, + ) + mock_manager.build_database.return_value = mock_result + mock_project = MagicMock() + mock_project.database = "my.db" + mock_manager.load_project.return_value = mock_project + mock_manager.project_path = tmp_path + + result = runner.invoke(cli, ["build", "--post-hook", "exit 5"]) + assert result.exit_code == 1 + + def test_build_force_sync_flag_forwarded(self, runner, mock_manager): + """--force-sync reaches the manager.""" + mock_result = ValidationResult(is_valid=True) + mock_result.report = BuildReport(total_duration_s=0.0) + mock_manager.build_database.return_value = mock_result + + result = runner.invoke(cli, ["build", "--sync-from-s3", "--force-sync"]) + assert result.exit_code == 0 + self._assert_build_kwargs(mock_manager, force_sync=True, sync_from_s3=True) + class TestCLIDeploy: """Test 'zeeker deploy' command.""" @@ -375,12 +533,12 @@ def mock_deployer(self): yield mock_instance def test_deploy_not_in_project(self, runner, mock_manager): - """Test deploy when not in a project directory.""" + """Deploy outside a project is a configuration/fatal error (exit 2).""" mock_manager.is_project_root.return_value = False result = runner.invoke(cli, ["deploy"]) - assert result.exit_code == 0 + assert result.exit_code == 2 assert "❌ Not in a Zeeker project directory" in result.output def test_deploy_successful(self, runner, mock_manager, mock_deployer): @@ -430,7 +588,7 @@ def test_deploy_dry_run(self, runner, mock_manager, mock_deployer): assert "Would upload" in result.output def test_deploy_error(self, runner, mock_manager, mock_deployer): - """Test deployment error.""" + """Missing database file is a setup error (exit 2).""" mock_manager.is_project_root.return_value = True # Mock project loading @@ -443,7 +601,7 @@ def test_deploy_error(self, runner, mock_manager, mock_deployer): with patch("pathlib.Path.exists", return_value=False): result = runner.invoke(cli, ["deploy"]) - assert result.exit_code == 0 + assert result.exit_code == 2 assert "❌ Database not found: nonexistent.db" in result.output assert "Run 'zeeker build' first" in result.output diff --git a/packages/zeeker/tests/test_database_components.py b/packages/zeeker/tests/test_database_components.py index 4023da2..7d86d55 100644 --- a/packages/zeeker/tests/test_database_components.py +++ b/packages/zeeker/tests/test_database_components.py @@ -267,9 +267,10 @@ def test_apply_transformation_success(self, processor): mock_module.transform_data.return_value = [{"id": 1, "transformed": True}] raw_data = [{"id": 1, "raw": True}] - result = processor._apply_transformation(mock_module, raw_data, "test", "transform_data") + data, tb = processor._apply_transformation(mock_module, raw_data, "test", "transform_data") - assert result == [{"id": 1, "transformed": True}] + assert data == [{"id": 1, "transformed": True}] + assert tb is None mock_module.transform_data.assert_called_once_with(raw_data) def test_apply_transformation_missing_function(self, processor): @@ -280,10 +281,11 @@ def test_apply_transformation_missing_function(self, processor): delattr(mock_module, "transform_data") raw_data = [{"id": 1, "raw": True}] - result = processor._apply_transformation(mock_module, raw_data, "test", "transform_data") + data, tb = processor._apply_transformation(mock_module, raw_data, "test", "transform_data") - # Should return original data when function is missing - assert result == raw_data + # Should return original data when function is missing, no traceback. + assert data == raw_data + assert tb is None def test_apply_transformation_function_error(self, processor): """Test transformation with function that throws error.""" @@ -291,10 +293,12 @@ def test_apply_transformation_function_error(self, processor): mock_module.transform_data = MagicMock(side_effect=Exception("Transform failed")) raw_data = [{"id": 1, "raw": True}] - result = processor._apply_transformation(mock_module, raw_data, "test", "transform_data") + data, tb = processor._apply_transformation(mock_module, raw_data, "test", "transform_data") - # Should return None when function throws exception - assert result is None + # Data should be None when function throws; traceback should be captured. + assert data is None + assert tb is not None + assert "Transform failed" in tb class TestDatabaseBuilder: diff --git a/packages/zeeker/tests/test_post_hook.py b/packages/zeeker/tests/test_post_hook.py new file mode 100644 index 0000000..df882d1 --- /dev/null +++ b/packages/zeeker/tests/test_post_hook.py @@ -0,0 +1,106 @@ +"""Tests for the post-build shell hook runner.""" + +import json +import os +import subprocess +from pathlib import Path +from unittest.mock import patch + +import pytest + +from zeeker.commands.post_hook import PostHookResult, run_post_hook +from zeeker.core.types import BuildReport, ResourceOutcome + + +@pytest.fixture +def sample_report() -> BuildReport: + return BuildReport( + resources=[ResourceOutcome(name="users", status="success", records=3, duration_s=0.1)], + total_duration_s=0.1, + ) + + +class _FakeCompleted: + def __init__(self, returncode: int, stdout: str = "", stderr: str = ""): + self.returncode = returncode + self.stdout = stdout + self.stderr = stderr + + +def test_run_post_hook_success_populates_result(tmp_path: Path, sample_report): + with patch("subprocess.run", return_value=_FakeCompleted(0, "ok\n", "")) as mock_run: + result = run_post_hook( + "echo ok", + project_path=tmp_path, + db_path=tmp_path / "mydb.db", + db_name="mydb", + report=sample_report, + ) + + assert isinstance(result, PostHookResult) + assert result.exit_code == 0 + assert result.stdout == "ok\n" + assert result.command == "echo ok" + + call = mock_run.call_args + assert call.args[0] == "echo ok" + assert call.kwargs["shell"] is True + assert call.kwargs["cwd"] == str(tmp_path) + assert call.kwargs["capture_output"] is True + assert call.kwargs["text"] is True + assert call.kwargs["check"] is False + + env = call.kwargs["env"] + assert env["ZEEKER_DB_PATH"] == str(tmp_path / "mydb.db") + assert env["ZEEKER_DB_NAME"] == "mydb" + assert env["ZEEKER_PROJECT_PATH"] == str(tmp_path) + assert env["ZEEKER_BUILD_STATUS"] == "success" + + report_path = env["ZEEKER_BUILD_REPORT"] + assert os.path.exists(report_path) + payload = json.loads(Path(report_path).read_text()) + assert payload["status"] == "success" + assert payload["resources"][0]["name"] == "users" + + +def test_run_post_hook_non_zero_propagates(tmp_path: Path, sample_report): + with patch("subprocess.run", return_value=_FakeCompleted(5, "", "boom")): + result = run_post_hook( + "false", + project_path=tmp_path, + db_path=tmp_path / "x.db", + db_name="x", + report=sample_report, + ) + assert result.exit_code == 5 + assert result.stderr == "boom" + + +def test_run_post_hook_marks_partial_failure_status(tmp_path: Path): + partial = BuildReport( + resources=[ResourceOutcome(name="users", status="failed", error_message="x")], + total_duration_s=0.0, + ) + with patch("subprocess.run", return_value=_FakeCompleted(0)) as mock_run: + run_post_hook( + "echo", + project_path=tmp_path, + db_path=tmp_path / "x.db", + db_name="x", + report=partial, + ) + assert mock_run.call_args.kwargs["env"]["ZEEKER_BUILD_STATUS"] == "partial_failure" + + +def test_run_post_hook_real_subprocess(tmp_path: Path, sample_report): + """Smoke test that the real subprocess picks up env vars.""" + # Use a portable one-liner: print an env var and exit 0. + result = run_post_hook( + 'printf "%s" "$ZEEKER_DB_NAME"', + project_path=tmp_path, + db_path=tmp_path / "testdb.db", + db_name="testdb", + report=sample_report, + ) + assert result.exit_code == 0 + assert result.stdout == "testdb" diff --git a/packages/zeeker/tests/test_processor_transform.py b/packages/zeeker/tests/test_processor_transform.py new file mode 100644 index 0000000..4811f0b --- /dev/null +++ b/packages/zeeker/tests/test_processor_transform.py @@ -0,0 +1,61 @@ +"""Tests for transform_data traceback preservation in ResourceProcessor.""" + +from pathlib import Path +from types import SimpleNamespace + +import sqlite_utils + +from zeeker.core.database.processor import ResourceProcessor +from zeeker.core.schema import SchemaManager + + +def _build_module(fetch_return, transform_func=None): + mod = SimpleNamespace() + mod.fetch_data = lambda existing_table: fetch_return + if transform_func is not None: + mod.transform_data = transform_func + return mod + + +def test_transform_raise_is_preserved_in_tracebacks(tmp_path: Path): + db = sqlite_utils.Database(str(tmp_path / "test.db")) + processor = ResourceProcessor(tmp_path, SchemaManager()) + + def bad_transform(data): + raise ValueError("boom") + + module = _build_module(fetch_return=[{"id": 1, "name": "a"}], transform_func=bad_transform) + result = processor.process_resource(db, "users", module) + + assert not result.is_valid + assert result.errors + assert "Data transformation failed" in result.errors[0] + assert result.tracebacks, "expected tracebacks to be populated" + assert "ValueError: boom" in result.tracebacks[0] + + +def test_transform_success_has_no_traceback(tmp_path: Path): + db = sqlite_utils.Database(str(tmp_path / "test.db")) + processor = ResourceProcessor(tmp_path, SchemaManager()) + processor.schema_manager.ensure_meta_tables(db) + + module = _build_module( + fetch_return=[{"id": 1, "name": "a"}], + transform_func=lambda data: [{**d, "extra": True} for d in data], + ) + result = processor.process_resource(db, "users", module) + + assert result.is_valid + assert not result.tracebacks + + +def test_no_transform_function_no_traceback(tmp_path: Path): + db = sqlite_utils.Database(str(tmp_path / "test.db")) + processor = ResourceProcessor(tmp_path, SchemaManager()) + processor.schema_manager.ensure_meta_tables(db) + + module = _build_module(fetch_return=[{"id": 1}]) + result = processor.process_resource(db, "widgets", module) + + assert result.is_valid + assert not result.tracebacks diff --git a/packages/zeeker/tests/test_s3_sync_divergence.py b/packages/zeeker/tests/test_s3_sync_divergence.py new file mode 100644 index 0000000..44ef4de --- /dev/null +++ b/packages/zeeker/tests/test_s3_sync_divergence.py @@ -0,0 +1,86 @@ +"""Tests for the S3 sync preserve-local guard.""" + +from pathlib import Path +from unittest.mock import MagicMock, patch + +from zeeker.core.database.s3_sync import S3Synchronizer + + +def test_no_local_db_is_clean(tmp_path: Path): + sync = S3Synchronizer() + result = sync.check_local_divergence(tmp_path / "missing.db") + assert result.is_valid + + +def test_existing_local_db_is_protected(tmp_path: Path): + db_path = tmp_path / "my.db" + db_path.write_bytes(b"anything") # any existing file triggers the guard + + sync = S3Synchronizer() + result = sync.check_local_divergence(db_path) + assert not result.is_valid + assert any("--force-sync" in e for e in result.errors) + assert any("Local database already exists" in e for e in result.errors) + + +def test_sync_from_s3_refuses_when_local_exists(tmp_path: Path): + db_path = tmp_path / "my.db" + db_path.write_bytes(b"local data") + + with patch("zeeker.core.deployer.ZeekerDeployer") as deployer_cls: + sync = S3Synchronizer() + result = sync.sync_from_s3("my.db", db_path) + # Divergence check short-circuits before the deployer is constructed. + deployer_cls.assert_not_called() + + assert not result.is_valid + assert any("--force-sync" in e for e in result.errors) + + +def test_sync_from_s3_force_proceeds_and_warns(tmp_path: Path): + db_path = tmp_path / "my.db" + db_path.write_bytes(b"local data") + + fake_client = MagicMock() + fake_client.head_object.return_value = {"ContentLength": 10} + + def _fake_download(Bucket, Key, Filename): + Path(Filename).write_bytes(b"downloaded-from-s3") + + fake_client.download_file.side_effect = _fake_download + + with patch("zeeker.core.deployer.ZeekerDeployer") as deployer_cls: + deployer_cls.return_value.s3_client = fake_client + deployer_cls.return_value.bucket_name = "test-bucket" + + sync = S3Synchronizer() + result = sync.sync_from_s3("my.db", db_path, force=True) + + assert result.is_valid + assert fake_client.download_file.called + assert any("overwriting local database" in w for w in result.warnings) + assert db_path.read_bytes() == b"downloaded-from-s3" + + +def test_sync_from_s3_no_local_db_downloads_cleanly(tmp_path: Path): + db_path = tmp_path / "my.db" # does not exist + + fake_client = MagicMock() + fake_client.head_object.return_value = {"ContentLength": 10} + + def _fake_download(Bucket, Key, Filename): + Path(Filename).write_bytes(b"downloaded") + + fake_client.download_file.side_effect = _fake_download + + with patch("zeeker.core.deployer.ZeekerDeployer") as deployer_cls: + deployer_cls.return_value.s3_client = fake_client + deployer_cls.return_value.bucket_name = "test-bucket" + + sync = S3Synchronizer() + result = sync.sync_from_s3("my.db", db_path) + + assert result.is_valid + assert fake_client.download_file.called + # No warnings since there was nothing to overwrite. + assert not result.warnings diff --git a/packages/zeeker/zeeker/cli.py b/packages/zeeker/zeeker/cli.py index 6d3b88d..81d0a82 100644 --- a/packages/zeeker/zeeker/cli.py +++ b/packages/zeeker/zeeker/cli.py @@ -5,9 +5,11 @@ """ import subprocess +import traceback as tb_mod from pathlib import Path import click +from rich.console import Console from .commands.assets import assets from .commands.backup import backup @@ -16,12 +18,14 @@ echo_errors, echo_warnings, load_env, + render_build_report, + render_resource_event, require_database, require_project, ) from .commands.metadata import metadata from .core.project import ZeekerProjectManager -from .core.types import ZeekerSchemaConflictError +from .core.types import BuildReport, ValidationResult, ZeekerSchemaConflictError # Main CLI group @@ -175,83 +179,257 @@ def add( @click.option( "--setup-fts", is_flag=True, help="Set up full-text search (FTS) indexes on configured fields" ) -def build(resources, force_schema_reset, sync_from_s3, setup_fts): +@click.option("-v", "--verbose", is_flag=True, help="Show full Python tracebacks for failures") +@click.option( + "--json", + "as_json", + is_flag=True, + help="Emit a single structured JSON BuildReport to stdout (tracebacks always included)", +) +@click.option( + "--fail-on-empty", + is_flag=True, + help="Treat resources that returned no data as failures (exit 1 instead of passing)", +) +@click.option( + "--progress-file", + type=click.Path(), + help="Write a JSON BuildReport snapshot to this path after each resource (atomic overwrite). " + "Useful for trigger-and-wait callers that poll externally.", +) +@click.option( + "--parallel", + type=int, + default=1, + show_default=True, + metavar="N", + help="Run up to N resource fetches concurrently (I/O only; DB writes stay sequential).", +) +@click.option( + "--post-hook", + "post_hook", + type=str, + default=None, + metavar="CMD", + help="Shell command to run after a successful build. See `zeeker build --help` for env vars.", +) +@click.option( + "--force-sync", + is_flag=True, + help="With --sync-from-s3, overwrite an existing local DB instead of refusing.", +) +def build( + resources, + force_schema_reset, + sync_from_s3, + setup_fts, + verbose, + as_json, + fail_on_empty, + progress_file, + parallel, + post_hook, + force_sync, +): """Build database from resources using sqlite-utils. Runs fetch_data() for specified resources and creates/updates the SQLite database. If no resources are specified, builds all resources in the project. + Exit codes: + 0 all resources succeeded + 1 one or more resources failed, FTS setup failed, or post-hook exited non-zero + 2 fatal error (schema conflict, DB open failure, config error, local-diverged sync) + + The --post-hook command receives these env vars: + ZEEKER_DB_PATH, ZEEKER_DB_NAME, ZEEKER_PROJECT_PATH, + ZEEKER_BUILD_STATUS (success|partial_failure), ZEEKER_BUILD_REPORT (JSON tempfile) + Examples: - zeeker build # Build all resources - zeeker build --setup-fts # Build all resources and set up FTS indexes - zeeker build users posts # Build specific resources + zeeker build # Build all resources + zeeker build --parallel 4 # Fetch 4 resources concurrently + zeeker build users posts # Build specific resources + zeeker build --json | jq # Machine-readable output + zeeker build --fail-on-empty # Empty fetch_data() -> exit 1 + zeeker build --progress-file build.json # Watchable progress from outside + zeeker build --post-hook 'sqlite3 mydb.db < patch.sql' """ + from .commands.helpers import write_progress_file + from .commands.post_hook import run_post_hook + load_env() resource_list = list(resources) if resources else None - if resource_list: - click.echo(f"Building specific resources: {', '.join(resource_list)}") + + # Route any non-JSON chatter through the console; JSON mode keeps stdout clean. + console = Console() + if resource_list and not as_json: + console.print(f"Building specific resources: {', '.join(resource_list)}") manager = ZeekerProjectManager() + # A BuildReport we can mutate progressively for --progress-file watchers. + progress_report = BuildReport() if progress_file else None + + def _callback(name, outcome): + # Stream per-resource line unless emitting JSON. + if not as_json: + render_resource_event(name, outcome, console=console) + + # Update the progress file atomically on finish events. + if progress_file and outcome is not None and progress_report is not None: + progress_report.resources.append(outcome) + write_progress_file(progress_file, progress_report) + + # Write an initial (empty) snapshot so watchers see the file exist immediately. + if progress_file and progress_report is not None: + write_progress_file(progress_file, progress_report) + + progress_callback = None if (as_json and not progress_file) else _callback + try: result = manager.build_database( force_schema_reset=force_schema_reset, sync_from_s3=sync_from_s3, resources=resource_list, setup_fts=setup_fts, + progress_callback=progress_callback, + max_parallel=parallel, + force_sync=force_sync, ) except ZeekerSchemaConflictError as e: - click.echo("❌ Schema conflict detected:") - click.echo(str(e)) - click.echo("\n💡 To resolve this, you can:") - click.echo(" • Use --force-schema-reset flag to ignore conflicts") - click.echo(" • Add a migrate_schema() function to handle the change") - click.echo(" • Delete the database file to rebuild from scratch") - return - - if result.errors: - click.echo("❌ Database build failed:") - echo_errors(result) - raise click.ClickException("Build failed") - - if result.warnings: + result = ValidationResult(is_valid=False) + result.errors.append(str(e)) + result.tracebacks.append(tb_mod.format_exc()) + result.report = BuildReport(fatal_error=str(e)) + if progress_file: + write_progress_file(progress_file, result.report) + render_build_report(result, verbose=verbose, as_json=as_json, console=console) + raise click.exceptions.Exit(2) + except Exception as e: + result = ValidationResult(is_valid=False) + result.errors.append(f"Build failed: {e}") + result.tracebacks.append(tb_mod.format_exc()) + result.report = BuildReport(fatal_error=f"Build failed: {e}") + if progress_file: + write_progress_file(progress_file, result.report) + render_build_report(result, verbose=verbose, as_json=as_json, console=console) + raise click.exceptions.Exit(2) + + # Pre-flight failures (e.g., "Unknown resources") return a ValidationResult with no + # report — treat them as fatal so agents get a structured fatal_error message. + if result.report is None: + fatal_msg = result.errors[0] if result.errors else "build failed" + result.report = BuildReport(fatal_error=fatal_msg) + + if result.warnings and not as_json: echo_warnings(result) - for info in result.info: - click.echo(f"✅ {info}") - - click.echo("\n🔧 Built with sqlite-utils for robust schema detection") - click.echo("🚀 Ready for deployment with 'zeeker deploy'") + report = result.report + + # Run the post-hook after the build settles but before the final render, + # so its outcome is part of the reported payload. Skip on fatal state — + # there's no coherent DB to patch. + if post_hook and not report.fatal_error: + project = manager.load_project() + db_path = manager.project_path / project.database + db_name = Path(project.database).stem + hook_outcome = run_post_hook( + post_hook, + project_path=manager.project_path, + db_path=db_path, + db_name=db_name, + report=report, + ) + report.post_hook = hook_outcome + + # Final progress-file snapshot reflects the completed state (including + # any post-hook outcome). + if progress_file: + write_progress_file(progress_file, report) + + render_build_report(result, verbose=verbose, as_json=as_json, console=console) + + if report.fatal_error: + raise click.exceptions.Exit(2) + if report.failed or report.fts_error: + raise click.exceptions.Exit(1) + if report.post_hook is not None and report.post_hook.exit_code != 0: + raise click.exceptions.Exit(1) + if fail_on_empty and report.skipped: + if not as_json: + console.print( + f"[red]Exiting non-zero: {len(report.skipped)} resource(s) returned no data " + "and --fail-on-empty is set.[/red]" + ) + raise click.exceptions.Exit(1) @cli.command("deploy") @click.option("--dry-run", is_flag=True, help="Show what would be uploaded without uploading") -def deploy_database(dry_run): +@click.option("-v", "--verbose", is_flag=True, help="Show full Python traceback on failure") +@click.option( + "--json", + "as_json", + is_flag=True, + help="Emit a single structured JSON deploy result to stdout", +) +def deploy_database(dry_run, verbose, as_json): """Deploy the project database to S3. Uploads the generated .db file to S3: s3://bucket/latest/{database_name}.db + + Exit codes: + 0 upload succeeded (or dry-run completed) + 1 upload failed + 2 configuration or setup error (missing project, env vars, db file) """ + import json as _json + manager = ZeekerProjectManager() project = require_project(manager) if not project: - return + if as_json: + click.echo(_json.dumps({"status": "fatal", "error": "not_a_zeeker_project"})) + raise click.exceptions.Exit(2) deployer = create_deployer() if not deployer: - return + if as_json: + click.echo(_json.dumps({"status": "fatal", "error": "missing_configuration"})) + raise click.exceptions.Exit(2) db_path = require_database(manager, project) if not db_path: - return + if as_json: + click.echo(_json.dumps({"status": "fatal", "error": "database_not_found"})) + raise click.exceptions.Exit(2) database_name = Path(project.database).stem result = deployer.upload_database(db_path, database_name, dry_run) + if as_json: + payload = { + "status": "success" if result.is_valid else "failed", + "dry_run": dry_run, + "database": database_name, + "destination": f"s3://{deployer.bucket_name}/latest/{database_name}.db", + "info": list(result.info), + "errors": list(result.errors), + "tracebacks": list(result.tracebacks), + } + click.echo(_json.dumps(payload, indent=2)) + if not result.is_valid: + raise click.exceptions.Exit(1) + return + if result.errors: echo_errors(result) - return + if verbose and result.tracebacks: + for tb in result.tracebacks: + click.echo(tb) + raise click.exceptions.Exit(1) for info in result.info: click.echo(f"✅ {info}") diff --git a/packages/zeeker/zeeker/commands/helpers.py b/packages/zeeker/zeeker/commands/helpers.py index b631f0e..8a1b2f9 100644 --- a/packages/zeeker/zeeker/commands/helpers.py +++ b/packages/zeeker/zeeker/commands/helpers.py @@ -2,12 +2,19 @@ Shared helper functions for CLI commands. """ +import json +import os +import re +from dataclasses import asdict from pathlib import Path import click from dotenv import load_dotenv +from rich.console import Console +from rich.panel import Panel +from rich.table import Table -from ..core.types import ValidationResult +from ..core.types import BuildReport, ResourceOutcome, ValidationResult def echo_errors(result: ValidationResult) -> None: @@ -116,3 +123,278 @@ def show_resource_metadata(resource_name: str, resource_config: dict): for field in metadata_fields: if field in resource_config: click.echo(f" {field.replace('_', ' ').title()}: {resource_config[field]}") + + +# ---------------------------------------------------------------------------- +# Build report rendering +# ---------------------------------------------------------------------------- + +# Matches `File ""` lines in Python tracebacks so we can rewrite absolute +# paths to be relative to CWD for agent consumption. +_TRACEBACK_PATH_RE = re.compile(r'File "([^"]+)"') + +_STATUS_GLYPHS = { + "success": ("[green]✓[/green]", "OK"), + "failed": ("[red]✗[/red]", "FAIL"), + "skipped": ("[yellow]−[/yellow]", "SKIP"), +} + + +def _relativize_traceback(tb: str) -> str: + """Rewrite absolute paths in a traceback to be CWD-relative where possible.""" + if not tb: + return tb + cwd = os.getcwd() + + def _sub(match: re.Match) -> str: + raw = match.group(1) + try: + rel = os.path.relpath(raw, start=cwd) + except ValueError: + return match.group(0) + # Prefer relative only when it doesn't escape too far upward + if rel.startswith(".." + os.sep + ".." + os.sep + ".."): + return match.group(0) + return f'File "{rel}"' + + return _TRACEBACK_PATH_RE.sub(_sub, tb) + + +def _report_overall_status(report: BuildReport) -> str: + if report.fatal_error: + return "fatal" + if report.failed or report.fts_error: + return "partial_failure" if report.succeeded else "failed" + return "success" + + +def render_resource_event( + name: str, + outcome: ResourceOutcome | None, + *, + console: Console, +) -> None: + """Stream a per-resource line during the build. + + Called by the CLI on both start (outcome=None) and finish events. On start, + emits a dim "building" hint in TTY mode only. On finish, emits a colored + glyph line in TTY mode or a stable ``[OK]/[FAIL]/[SKIP]`` prefix in non-TTY + mode for agent parsing. + """ + if outcome is None: + if console.is_terminal: + console.print(f"[dim]→ Building [cyan]{name}[/cyan]...[/dim]") + return + + if console.is_terminal: + glyph, _ = _STATUS_GLYPHS[outcome.status] + if outcome.status == "success": + detail = f"{outcome.records} records" + elif outcome.status == "failed": + detail = f"[red]{outcome.error_message or 'failed'}[/red]" + else: # skipped + detail = "[yellow]no data returned[/yellow]" + console.print( + f"{glyph} [bold]{name}[/bold] {detail} " f"[dim]({outcome.duration_s:.1f}s)[/dim]" + ) + else: + prefix = _STATUS_GLYPHS[outcome.status][1] + if outcome.status == "success": + note = f"{outcome.records} records" + elif outcome.status == "failed": + note = outcome.error_message or "failed" + else: # skipped + note = "no data returned" + console.print( + f"[{prefix:<4}] {name:<24} {note} ({outcome.duration_s:.1f}s)", + markup=False, + highlight=False, + ) + + +def render_build_report( + result: ValidationResult, + *, + verbose: bool, + as_json: bool, + console: Console, +) -> None: + """Emit the build outcome in the requested format. + + - ``as_json``: single JSON object to stdout. Tracebacks always included. + - TTY text mode: rich summary table + optional traceback panels when ``verbose``. + - Non-TTY text mode: one ``[OK]`` / ``[FAIL]`` / ``[SKIP]`` line per resource + (already streamed during the build) + a ``SUMMARY:`` footer. With ``verbose``, + tracebacks printed under failures. + """ + report = result.report or BuildReport() + + if as_json: + _emit_json(report, console=console) + return + + if console.is_terminal: + _emit_rich(report, verbose=verbose, console=console) + else: + _emit_plain(report, verbose=verbose, console=console) + + +def _build_report_payload(report: BuildReport) -> dict: + """Serialize a BuildReport to the stable JSON schema (used by both --json and + --progress-file so external watchers and CLI consumers see identical shape).""" + payload = { + "status": _report_overall_status(report), + "total_duration_s": round(report.total_duration_s, 3), + "resources": [asdict(r) for r in report.resources], + "fts_error": report.fts_error, + "fatal_error": report.fatal_error, + "post_hook": asdict(report.post_hook) if report.post_hook is not None else None, + } + for item in payload["resources"]: + if item.get("traceback"): + item["traceback"] = _relativize_traceback(item["traceback"]) + return payload + + +def write_progress_file(path: str | os.PathLike, report: BuildReport) -> None: + """Atomically overwrite ``path`` with a JSON snapshot of ``report``. + + Writes to a sibling ``.tmp`` file then renames, so external watchers never + see a truncated file. Swallows I/O errors silently — a progress file failure + must never fail the build. + """ + try: + target = Path(path) + target.parent.mkdir(parents=True, exist_ok=True) + tmp = target.with_suffix(target.suffix + ".tmp") + tmp.write_text(json.dumps(_build_report_payload(report), indent=2)) + os.replace(tmp, target) + except Exception: + # Progress reporting is best-effort — never fail a build on it. + pass + + +def _emit_json(report: BuildReport, *, console: Console) -> None: + console.print( + json.dumps(_build_report_payload(report), indent=2), + markup=False, + highlight=False, + ) + + +def _emit_rich(report: BuildReport, *, verbose: bool, console: Console) -> None: + if report.fatal_error: + console.print(Panel(report.fatal_error, title="[red]Fatal error[/red]", border_style="red")) + return + + if not report.resources and not report.fts_error: + console.print("[yellow]No resources built.[/yellow]") + return + + table = Table(title="Build report", show_lines=False) + table.add_column("Resource", style="bold") + table.add_column("Status") + table.add_column("Records", justify="right") + table.add_column("Time", justify="right") + table.add_column("Notes") + + for r in report.resources: + glyph, _ = _STATUS_GLYPHS[r.status] + if r.status == "success": + notes = "" + if r.fragments_records is not None: + notes = f"+{r.fragments_records} fragments" + records = str(r.records) + elif r.status == "failed": + notes = r.error_message or "" + records = "" + else: # skipped + notes = "no data returned" + records = "" + table.add_row(r.name, glyph, records, f"{r.duration_s:.1f}s", notes) + + console.print(table) + + succeeded = len(report.succeeded) + failed = len(report.failed) + skipped = len(report.skipped) + total = len(report.resources) + console.print( + f"[bold]{succeeded} of {total} resources succeeded[/bold] " + f"({failed} failed, {skipped} skipped) in {report.total_duration_s:.1f}s" + ) + + if report.fts_error: + console.print(f"[red]FTS setup failed:[/red] {report.fts_error}") + + if report.post_hook is not None: + hook = report.post_hook + colour = "green" if hook.exit_code == 0 else "red" + console.print( + f"[bold]post-hook:[/bold] [dim]{hook.command}[/dim] " + f"→ [{colour}]exit {hook.exit_code}[/{colour}]" + ) + if verbose and hook.exit_code != 0: + body = "" + if hook.stdout: + body += f"[bold]stdout[/bold]\n{hook.stdout}" + if hook.stderr: + if body: + body += "\n" + body += f"[bold]stderr[/bold]\n{hook.stderr}" + if body: + console.print(Panel(body, title="[red]post-hook output[/red]", border_style="red")) + + if verbose: + for r in report.failed: + if r.traceback: + console.print( + Panel( + _relativize_traceback(r.traceback), + title=f"[red]Traceback · {r.name}[/red]", + border_style="red", + ) + ) + + +def _emit_plain(report: BuildReport, *, verbose: bool, console: Console) -> None: + # Per-resource lines already streamed via render_resource_event during the build, + # so here we only emit the summary + verbose tracebacks + fatal/FTS errors. + if report.fatal_error: + console.print(f"FATAL: {report.fatal_error}", markup=False, highlight=False) + return + + succeeded = len(report.succeeded) + failed = len(report.failed) + skipped = len(report.skipped) + console.print( + f"SUMMARY: {succeeded} succeeded, {failed} failed, {skipped} skipped " + f"in {report.total_duration_s:.1f}s", + markup=False, + highlight=False, + ) + + if report.fts_error: + console.print(f"FTS_ERROR: {report.fts_error}", markup=False, highlight=False) + + if report.post_hook is not None: + hook = report.post_hook + console.print( + f"POST_HOOK: {hook.command} exit={hook.exit_code}", + markup=False, + highlight=False, + ) + if verbose and hook.exit_code != 0: + if hook.stdout: + console.print(f"POST_HOOK_STDOUT:\n{hook.stdout}", markup=False, highlight=False) + if hook.stderr: + console.print(f"POST_HOOK_STDERR:\n{hook.stderr}", markup=False, highlight=False) + + if verbose: + for r in report.failed: + if r.traceback: + console.print( + f"TRACEBACK: {r.name}\n{_relativize_traceback(r.traceback)}", + markup=False, + highlight=False, + ) diff --git a/packages/zeeker/zeeker/commands/post_hook.py b/packages/zeeker/zeeker/commands/post_hook.py new file mode 100644 index 0000000..3bf5efb --- /dev/null +++ b/packages/zeeker/zeeker/commands/post_hook.py @@ -0,0 +1,84 @@ +"""Post-build shell hook. + +Runs a user-provided shell command after a successful build with zeeker +context injected as environment variables. The CLI wires this up via the +``--post-hook`` flag on ``zeeker build``. +""" + +import json +import os +import subprocess +import tempfile +from dataclasses import dataclass +from pathlib import Path + +from ..core.types import BuildReport + + +@dataclass +class PostHookResult: + """Outcome of running a post-build hook command.""" + + command: str + exit_code: int + stdout: str + stderr: str + + +def run_post_hook( + command: str, + *, + project_path: Path, + db_path: Path, + db_name: str, + report: BuildReport, +) -> PostHookResult: + """Run ``command`` as a shell command with zeeker env vars set. + + Writes the current BuildReport to a tempfile and exposes its path via + ``ZEEKER_BUILD_REPORT`` so the hook can read it (e.g. to skip patching + on partial failures). + + Env vars exposed to the hook: + ZEEKER_DB_PATH absolute path to the .db file + ZEEKER_DB_NAME database stem (e.g. my_project) + ZEEKER_PROJECT_PATH project root + ZEEKER_BUILD_STATUS "success" or "partial_failure" + ZEEKER_BUILD_REPORT path to a tempfile with the JSON BuildReport + """ + # Local import to avoid circular dependency at module load time + # (helpers imports types; post_hook imports types; cli imports both). + from .helpers import _build_report_payload + + report_fd = tempfile.NamedTemporaryFile( + mode="w", suffix=".json", delete=False, encoding="utf-8" + ) + try: + json.dump(_build_report_payload(report), report_fd) + finally: + report_fd.close() + + env = { + **os.environ, + "ZEEKER_DB_PATH": str(db_path), + "ZEEKER_DB_NAME": db_name, + "ZEEKER_PROJECT_PATH": str(project_path), + "ZEEKER_BUILD_STATUS": "success" if not report.failed else "partial_failure", + "ZEEKER_BUILD_REPORT": report_fd.name, + } + + proc = subprocess.run( + command, + shell=True, + cwd=str(project_path), + env=env, + capture_output=True, + text=True, + check=False, + ) + return PostHookResult( + command=command, + exit_code=proc.returncode, + stdout=proc.stdout, + stderr=proc.stderr, + ) diff --git a/packages/zeeker/zeeker/core/database/async_executor.py b/packages/zeeker/zeeker/core/database/async_executor.py index 868b9a0..713920b 100644 --- a/packages/zeeker/zeeker/core/database/async_executor.py +++ b/packages/zeeker/zeeker/core/database/async_executor.py @@ -15,9 +15,30 @@ class AsyncExecutor: """Handles execution of both sync and async resource functions.""" - def __init__(self): - """Initialize AsyncExecutor with fetch_data cache.""" + def __init__(self, cache_enabled: bool = True): + """Initialize AsyncExecutor with optional fetch_data cache. + + Args: + cache_enabled: When False, skip the shared fetch cache. Disable + this in parallel-fetch contexts where concurrent access to + the cache dict would race. + """ self._fetch_cache: Dict[str, List[Dict[str, Any]]] = {} + self._cache_enabled = cache_enabled + # One-shot overrides keyed only on resource_name, populated by the + # builder's parallel pre-warm step. Stable across table-state changes + # during the build so a single fetch serves schema check + insert + + # fragments-context without re-running the user's fetch_data(). + self._prewarmed: Dict[str, List[Dict[str, Any]]] = {} + + def set_prewarmed(self, resource_name: str, data: List[Dict[str, Any]]) -> None: + """Register a fetch result to be returned by subsequent ``call_fetch_data`` + calls for ``resource_name`` within this build.""" + self._prewarmed[resource_name] = data + + def clear_prewarmed(self) -> None: + """Drop all pre-warmed overrides. Call between builds.""" + self._prewarmed.clear() def call_fetch_data( self, fetch_data_func: Callable, existing_table: Optional[Table], resource_name: str = None @@ -32,9 +53,15 @@ def call_fetch_data( Returns: List[Dict[str, Any]]: The data returned by fetch_data """ - # Check cache if resource_name is provided + # Serve from the parallel pre-warm override if present. + if resource_name and resource_name in self._prewarmed: + return self._prewarmed[resource_name] + + # Check cache if resource_name is provided and caching is enabled. cache_key = ( - self._generate_cache_key(resource_name, existing_table) if resource_name else None + self._generate_cache_key(resource_name, existing_table) + if resource_name and self._cache_enabled + else None ) if cache_key and cache_key in self._fetch_cache: return self._fetch_cache[cache_key] @@ -45,7 +72,39 @@ def call_fetch_data( else: result = fetch_data_func(existing_table) - # Cache the result if resource_name is provided + # Cache the result if caching is active. + if cache_key: + self._fetch_cache[cache_key] = result + + return result + + async def acall_fetch_data( + self, + fetch_data_func: Callable, + existing_table: Optional[Table], + resource_name: str = None, + ) -> List[Dict[str, Any]]: + """Async variant of :meth:`call_fetch_data` for use under + ``asyncio.gather``. Returns the data produced by ``fetch_data_func``. + + Async fetchers are awaited directly. Sync fetchers are run in the + default ThreadPoolExecutor so they don't block the event loop and + can run concurrently with other fetches. + """ + cache_key = ( + self._generate_cache_key(resource_name, existing_table) + if resource_name and self._cache_enabled + else None + ) + if cache_key and cache_key in self._fetch_cache: + return self._fetch_cache[cache_key] + + if inspect.iscoroutinefunction(fetch_data_func): + result = await fetch_data_func(existing_table) + else: + loop = asyncio.get_running_loop() + result = await loop.run_in_executor(None, fetch_data_func, existing_table) + if cache_key: self._fetch_cache[cache_key] = result diff --git a/packages/zeeker/zeeker/core/database/builder.py b/packages/zeeker/zeeker/core/database/builder.py index 6dba536..f6dfe45 100644 --- a/packages/zeeker/zeeker/core/database/builder.py +++ b/packages/zeeker/zeeker/core/database/builder.py @@ -5,13 +5,23 @@ resource processing, schema management, and S3 synchronization. """ +import asyncio import time +import traceback from pathlib import Path +from typing import Callable import sqlite_utils from ..schema import SchemaManager -from ..types import ValidationResult, ZeekerProject, ZeekerSchemaConflictError +from ..types import ( + BuildReport, + ResourceOutcome, + ValidationResult, + ZeekerProject, + ZeekerSchemaConflictError, +) +from .async_executor import AsyncExecutor from .fts_processor import FTSProcessor from .processor import ResourceProcessor from .s3_sync import S3Synchronizer @@ -41,6 +51,9 @@ def build_database( sync_from_s3: bool = False, resources: list[str] = None, setup_fts: bool = False, + progress_callback: Callable[[str, ResourceOutcome | None], None] | None = None, + max_parallel: int = 1, + force_sync: bool = False, ) -> ValidationResult: """Build the SQLite database from resources using sqlite-utils. @@ -55,22 +68,34 @@ def build_database( sync_from_s3: If True, download existing database from S3 before building resources: List of specific resource names to build. If None, builds all resources. setup_fts: If True, set up full-text search indexes on configured fields + progress_callback: Optional callable invoked as (resource_name, None) when a + resource starts and (resource_name, outcome) when it finishes. Allows the + CLI layer to drive progress bars or streaming output. Returns: - ValidationResult with build results + ValidationResult with build results. `result.report` holds a BuildReport + describing per-resource outcomes, timings, and fatal/FTS errors. """ result = ValidationResult(is_valid=True) + report = BuildReport() + result.report = report + build_started = time.perf_counter() + db_path = self.project_path / self.project.database # S3 Database Synchronization - Download existing DB if requested if sync_from_s3: - sync_result = self.s3_sync.sync_from_s3(self.project.database, db_path) + sync_result = self.s3_sync.sync_from_s3( + self.project.database, db_path, force=force_sync + ) if not sync_result.is_valid: result.errors.extend(sync_result.errors) # Don't fail build if S3 sync fails - just warn result.warnings.append("S3 sync failed but continuing with local build") else: result.info.extend(sync_result.info) + if sync_result.warnings: + result.warnings.extend(sync_result.warnings) # Open existing database or create new one using sqlite-utils # Don't delete existing database - let resources check existing data for duplicates @@ -84,29 +109,59 @@ def build_database( # Determine which resources to process resources_to_build = resources if resources else list(self.project.resources.keys()) + # Pre-warm fetches concurrently when requested. The per-resource + # sequential loop below will then hit pre-warmed data instead of + # re-running each fetch_data() serially. + if max_parallel > 1 and len(resources_to_build) > 1: + prewarm_warnings = self._prewarm_fetches(db, resources_to_build, max_parallel) + result.warnings.extend(prewarm_warnings) + # Process each specified resource for resource_name in resources_to_build: + if progress_callback: + progress_callback(resource_name, None) + + resource_started = time.perf_counter() resource_result = self._process_resource_with_schema_check( db, resource_name, force_schema_reset, build_id ) + duration = time.perf_counter() - resource_started + + outcome = self._build_resource_outcome(resource_name, resource_result, duration) if not resource_result.is_valid: result.errors.extend(resource_result.errors) + result.tracebacks.extend(resource_result.tracebacks) result.is_valid = False else: result.info.extend(resource_result.info) - # Process fragments if enabled - resource_config = self.project.resources.get(resource_name, {}) - is_fragments_enabled = resource_config.get("fragments", False) - - if is_fragments_enabled: - fragments_result = self._process_fragments_for_resource(db, resource_name) - if not fragments_result.is_valid: - result.errors.extend(fragments_result.errors) - result.is_valid = False - else: - result.info.extend(fragments_result.info) + # Process fragments if enabled (main must have succeeded) + if outcome.status == "success": + resource_config = self.project.resources.get(resource_name, {}) + if resource_config.get("fragments", False): + fragments_result = self._process_fragments_for_resource( + db, resource_name + ) + if not fragments_result.is_valid: + result.errors.extend(fragments_result.errors) + result.tracebacks.extend(fragments_result.tracebacks) + result.is_valid = False + outcome.status = "failed" + outcome.error_message = ( + fragments_result.errors[0] + if fragments_result.errors + else "fragments processing failed" + ) + if fragments_result.tracebacks: + outcome.traceback = fragments_result.tracebacks[0] + else: + result.info.extend(fragments_result.info) + outcome.fragments_records = fragments_result.records + + report.resources.append(outcome) + if progress_callback: + progress_callback(resource_name, outcome) # Set up FTS after all resources are processed (only if requested) if result.is_valid and setup_fts: @@ -114,6 +169,9 @@ def build_database( if not fts_result.is_valid: result.errors.extend(fts_result.errors) result.is_valid = False + report.fts_error = ( + fts_result.errors[0] if fts_result.errors else "FTS setup failed" + ) else: result.info.extend(fts_result.info) if fts_result.warnings: @@ -123,10 +181,42 @@ def build_database( except Exception as e: result.is_valid = False - result.errors.append(f"Database build failed: {e}") + msg = f"Database build failed: {e}" + result.errors.append(msg) + result.tracebacks.append(traceback.format_exc()) + report.fatal_error = msg + + finally: + # Prevent stale pre-warmed data from leaking into a reused builder. + self.processor.async_executor.clear_prewarmed() + report.total_duration_s = time.perf_counter() - build_started return result + @staticmethod + def _build_resource_outcome( + resource_name: str, resource_result: ValidationResult, duration_s: float + ) -> ResourceOutcome: + """Construct a ResourceOutcome from a per-resource ValidationResult.""" + if not resource_result.is_valid: + return ResourceOutcome( + name=resource_name, + status="failed", + duration_s=duration_s, + error_message=( + resource_result.errors[0] if resource_result.errors else "unknown error" + ), + traceback=(resource_result.tracebacks[0] if resource_result.tracebacks else None), + ) + + is_skipped = any("No data returned" in msg for msg in resource_result.info) + return ResourceOutcome( + name=resource_name, + status="skipped" if is_skipped else "success", + records=resource_result.records or 0, + duration_s=duration_s, + ) + def _process_resource_with_schema_check( self, db: sqlite_utils.Database, resource_name: str, force_schema_reset: bool, build_id: str ) -> ValidationResult: @@ -179,16 +269,23 @@ def _process_resource_with_schema_check( except Exception as e: if isinstance(e, ZeekerSchemaConflictError): raise - # If we can't get sample data, proceed with build - pass + # Couldn't fetch a sample — surface this as a warning so + # the user knows the schema-conflict check ran blind, + # instead of a completely silent pass. + result.warnings.append( + f"Schema sample fetch failed for '{resource_name}' " + f"(continuing with build): {e}" + ) # Process the resource (pass pre-loaded module to avoid redundant load) resource_result = self.processor.process_resource(db, resource_name, module) if not resource_result.is_valid: result.errors.extend(resource_result.errors) + result.tracebacks.extend(resource_result.tracebacks) result.is_valid = False else: result.info.extend(resource_result.info) + result.records = resource_result.records # Update resource timestamps duration_ms = int((time.time() - start_time) * 1000) @@ -199,9 +296,11 @@ def _process_resource_with_schema_check( except ZeekerSchemaConflictError as e: result.is_valid = False result.errors.append(str(e)) + result.tracebacks.append(traceback.format_exc()) except Exception as e: result.is_valid = False result.errors.append(f"Failed to process resource '{resource_name}': {e}") + result.tracebacks.append(traceback.format_exc()) return result @@ -243,8 +342,13 @@ def _process_fragments_for_resource( main_data_context = self.processor.async_executor.call_fetch_data( fetch_data, existing_table, resource_name=resource_name ) - except Exception: - # If we can't get context, fragments will work without it + except Exception as e: + # If we can't get context, fragments will work without it — but + # surface the reason rather than swallowing the exception silently. + result.warnings.append( + f"Fragments context fetch failed for '{resource_name}' " + f"(fragments will run without main-data context): {e}" + ) main_data_context = None # Process fragments @@ -253,8 +357,70 @@ def _process_fragments_for_resource( ) if not fragments_result.is_valid: result.errors.extend(fragments_result.errors) + result.tracebacks.extend(fragments_result.tracebacks) result.is_valid = False else: result.info.extend(fragments_result.info) + result.records = fragments_result.records return result + + def _prewarm_fetches( + self, + db: sqlite_utils.Database, + resources_to_build: list[str], + max_parallel: int, + ) -> list[str]: + """Run ``fetch_data()`` for all resources concurrently and populate + the processor's executor with pre-warmed results. + + Returns a list of warning strings (one per resource whose pre-fetch + failed; the sequential loop will re-attempt those and record the + definitive error). + """ + # Synchronous prep: module loading and existing-table lookup. + # Doing this here (not inside the coroutines) avoids racy imports + # and lets us skip resources that can't even be loaded. + specs: list[tuple[str, object, object]] = [] + warnings: list[str] = [] + for name in resources_to_build: + mod_result = self.processor._load_resource_module(name) + if not mod_result.is_valid: + # Let the sequential loop produce the canonical error. + continue + module = mod_result.data + if not hasattr(module, "fetch_data"): + continue + fetch_data = getattr(module, "fetch_data") + existing = db[name] if db[name].exists() else None + specs.append((name, fetch_data, existing)) + + if not specs: + return warnings + + fresh = AsyncExecutor(cache_enabled=False) + sem = asyncio.Semaphore(max_parallel) + + async def run_one(name: str, fetch_data, existing): + async with sem: + try: + data = await fresh.acall_fetch_data(fetch_data, existing, name) + return name, data, None + except Exception as e: + return name, None, e + + async def run_all(): + return await asyncio.gather(*(run_one(*s) for s in specs)) + + results = asyncio.run(run_all()) + for name, data, err in results: + if err is not None: + warnings.append( + f"Parallel pre-fetch failed for '{name}' " + f"(sequential loop will retry): {err}" + ) + continue + if data is not None: + self.processor.async_executor.set_prewarmed(name, data) + + return warnings diff --git a/packages/zeeker/zeeker/core/database/processor.py b/packages/zeeker/zeeker/core/database/processor.py index 1a56472..c24659a 100644 --- a/packages/zeeker/zeeker/core/database/processor.py +++ b/packages/zeeker/zeeker/core/database/processor.py @@ -8,6 +8,7 @@ import importlib.util import inspect import sqlite3 +import traceback from pathlib import Path from typing import Any, Dict, List @@ -81,12 +82,14 @@ def process_resource( return result # Apply transformation if available - transformed_data = self._apply_transformation( + transformed_data, transform_tb = self._apply_transformation( module, raw_data, resource_name, "transform_data" ) if transformed_data is None: result.is_valid = False result.errors.append(f"Data transformation failed for '{resource_name}'") + if transform_tb: + result.tracebacks.append(transform_tb) return result # Validate transformed data structure @@ -104,6 +107,7 @@ def process_resource( # Insert all data at once for better performance table.insert_all(transformed_data, replace=False) + result.records = len(transformed_data) result.info.append( f"Processed {len(transformed_data)} records for resource '{resource_name}'" ) @@ -111,9 +115,11 @@ def process_resource( except sqlite3.IntegrityError as e: result.is_valid = False result.errors.append(f"Database integrity error in '{resource_name}': {e}") + result.tracebacks.append(traceback.format_exc()) except Exception as e: result.is_valid = False result.errors.append(f"Failed to process resource '{resource_name}': {e}") + result.tracebacks.append(traceback.format_exc()) return result @@ -170,12 +176,14 @@ def process_fragments_data( return result # Apply transformation if available - transformed_fragments = self._apply_transformation( + transformed_fragments, transform_tb = self._apply_transformation( module, raw_fragments, resource_name, "transform_fragments_data" ) if transformed_fragments is None: result.is_valid = False result.errors.append(f"Fragment transformation failed for '{resource_name}'") + if transform_tb: + result.tracebacks.append(transform_tb) return result # Validate fragments data structure @@ -197,6 +205,7 @@ def process_fragments_data( # Insert all fragments at once for better performance fragments_table.insert_all(transformed_fragments, replace=False) + result.records = len(transformed_fragments) result.info.append( f"Processed {len(transformed_fragments)} fragments for resource '{resource_name}'" ) @@ -204,9 +213,11 @@ def process_fragments_data( except sqlite3.IntegrityError as e: result.is_valid = False result.errors.append(f"Database integrity error in '{resource_name}' fragments: {e}") + result.tracebacks.append(traceback.format_exc()) except Exception as e: result.is_valid = False result.errors.append(f"Failed to process fragments for '{resource_name}': {e}") + result.tracebacks.append(traceback.format_exc()) return result @@ -237,6 +248,7 @@ def _load_resource_module(self, resource_name: str) -> ValidationResult: except Exception as e: result.is_valid = False result.errors.append(f"Failed to load resource module '{resource_name}': {e}") + result.tracebacks.append(traceback.format_exc()) return result @@ -268,26 +280,22 @@ def _call_fragments_function( def _apply_transformation( self, module: Any, data: List[Dict[str, Any]], resource_name: str, transform_func_name: str - ) -> List[Dict[str, Any]] | None: + ) -> tuple[List[Dict[str, Any]] | None, str | None]: """Apply transformation function if available. - Args: - module: The resource module - data: Data to transform - resource_name: Name of resource for error messages - transform_func_name: Name of transformation function - Returns: - Transformed data or None if transformation failed + A tuple ``(transformed_data, traceback_str)``: + - ``(data, None)`` on success (or when no transform function exists) + - ``(None, traceback_str)`` when the transform raised """ if hasattr(module, transform_func_name): try: transform_func = getattr(module, transform_func_name) - return transform_func(data) + return transform_func(data), None except Exception: - return None + return None, traceback.format_exc() else: - return data + return data, None def _validate_data_structure( self, data: List[Dict[str, Any]], context: str diff --git a/packages/zeeker/zeeker/core/database/s3_sync.py b/packages/zeeker/zeeker/core/database/s3_sync.py index 564a723..a2513cd 100644 --- a/packages/zeeker/zeeker/core/database/s3_sync.py +++ b/packages/zeeker/zeeker/core/database/s3_sync.py @@ -2,7 +2,10 @@ S3 synchronization for Zeeker databases. This module handles downloading existing databases from S3 before building -to enable multi-machine workflows and incremental updates. +to enable multi-machine workflows and incremental updates. It refuses to +overwrite an existing local DB by default (the user must opt in with +``force=True``) because any local patches/fixes would otherwise be silently +discarded. """ from pathlib import Path @@ -13,22 +16,65 @@ class S3Synchronizer: """Handles S3 database synchronization operations.""" - def sync_from_s3(self, database_name: str, local_db_path: Path) -> ValidationResult: + def check_local_divergence(self, local_db_path) -> ValidationResult: + """Return ``is_valid=False`` when a local DB already exists. + + Deliberately conservative: we don't try to auto-detect whether the + local file is "safe to overwrite" (every build writes to meta tables + anyway, so a byte-level hash comparison is unreliable). Instead we + treat any existing local DB as potentially containing work the user + doesn't want to lose and require an explicit ``--force-sync``. + """ + result = ValidationResult(is_valid=True) + path = Path(local_db_path) + if not path.exists(): + return result + + result.is_valid = False + result.errors.append( + f"Local database already exists at {path}. Syncing from " + "S3 would overwrite any local changes." + ) + result.errors.append( + "Use --force-sync to overwrite, or deploy first to push your " "local changes to S3." + ) + return result + + def sync_from_s3( + self, + database_name: str, + local_db_path: Path, + *, + force: bool = False, + ) -> ValidationResult: """Download existing database from S3 if available. Args: database_name: Name of the database file local_db_path: Local path where database should be saved + force: If True, overwrite an existing local DB without complaint. Returns: ValidationResult with sync results """ result = ValidationResult(is_valid=True) + path = Path(local_db_path) try: # Import here to avoid making boto3 a hard dependency from ..deployer import ZeekerDeployer + # Guard against wiping out local changes unless explicitly opted in. + if not force: + divergence = self.check_local_divergence(path) + if not divergence.is_valid: + return divergence + elif path.exists(): + result.warnings.append( + "--force-sync: overwriting local database (any unsynced " + "changes will be lost)." + ) + deployer = ZeekerDeployer() s3_key = f"latest/{database_name}" diff --git a/packages/zeeker/zeeker/core/deployer.py b/packages/zeeker/zeeker/core/deployer.py index c04bfb5..8d44c66 100644 --- a/packages/zeeker/zeeker/core/deployer.py +++ b/packages/zeeker/zeeker/core/deployer.py @@ -4,6 +4,7 @@ import hashlib import os +import traceback from pathlib import Path import boto3 @@ -69,6 +70,7 @@ def _upload_db_to_s3( except Exception as e: result.is_valid = False result.errors.append(f"Failed to {action_verb} database: {e}") + result.tracebacks.append(traceback.format_exc()) return result diff --git a/packages/zeeker/zeeker/core/project.py b/packages/zeeker/zeeker/core/project.py index 2bd2e40..920cc3f 100644 --- a/packages/zeeker/zeeker/core/project.py +++ b/packages/zeeker/zeeker/core/project.py @@ -3,11 +3,12 @@ """ from pathlib import Path +from typing import Callable from .database import DatabaseBuilder from .resources import ResourceManager from .scaffolding import ProjectScaffolder -from .types import ValidationResult, ZeekerProject +from .types import ResourceOutcome, ValidationResult, ZeekerProject class ZeekerProjectManager: @@ -74,6 +75,10 @@ def build_database( sync_from_s3: bool = False, resources: list[str] = None, setup_fts: bool = False, + *, + progress_callback: Callable[[str, ResourceOutcome | None], None] | None = None, + max_parallel: int = 1, + force_sync: bool = False, ) -> ValidationResult: """Build the SQLite database from resources with optional S3 sync. @@ -82,9 +87,15 @@ def build_database( sync_from_s3: If True, download existing database from S3 before building resources: List of specific resource names to build. If None, builds all resources. setup_fts: If True, set up full-text search indexes on configured fields + progress_callback: Optional callable (resource_name, outcome|None) used by the + CLI layer to render progress. See DatabaseBuilder.build_database. + max_parallel: Max concurrent fetch_data() calls (default 1 = sequential). + force_sync: With sync_from_s3, overwrite an existing local DB instead of + refusing. Returns: - ValidationResult with build results + ValidationResult with build results; the BuildReport is attached as + ``result.report``. """ if not self.is_project_root(): result = ValidationResult(is_valid=False) @@ -104,4 +115,12 @@ def build_database( builder = DatabaseBuilder(self.project_path, project) - return builder.build_database(force_schema_reset, sync_from_s3, resources, setup_fts) + return builder.build_database( + force_schema_reset, + sync_from_s3, + resources, + setup_fts, + progress_callback=progress_callback, + max_parallel=max_parallel, + force_sync=force_sync, + ) diff --git a/packages/zeeker/zeeker/core/types.py b/packages/zeeker/zeeker/core/types.py index 6f05762..b109805 100644 --- a/packages/zeeker/zeeker/core/types.py +++ b/packages/zeeker/zeeker/core/types.py @@ -7,7 +7,7 @@ import tomllib from dataclasses import dataclass, field from pathlib import Path -from typing import Any +from typing import Any, Literal # Meta table constants META_TABLE_SCHEMAS = "_zeeker_schemas" @@ -57,6 +57,46 @@ def __init__(self, resource_name: str, old_schema: dict[str, str], new_schema: d super().__init__("\n".join(msg_parts)) +@dataclass +class ResourceOutcome: + """Per-resource outcome from a build, aggregated by DatabaseBuilder.""" + + name: str + status: Literal["success", "failed", "skipped"] + records: int = 0 + duration_s: float = 0.0 + error_message: str | None = None + traceback: str | None = None + fragments_records: int | None = None + + +@dataclass +class BuildReport: + """Structured report of a full database build, one entry per resource.""" + + resources: list[ResourceOutcome] = field(default_factory=list) + total_duration_s: float = 0.0 + fts_error: str | None = None + fatal_error: str | None = None + # Set to a PostHookResult dataclass when --post-hook ran. Annotated as + # ``object`` to avoid a circular import between this module and + # commands/post_hook.py; dataclasses.asdict() still recurses correctly + # when the value is itself a dataclass instance. + post_hook: object | None = None + + @property + def failed(self) -> list[ResourceOutcome]: + return [r for r in self.resources if r.status == "failed"] + + @property + def succeeded(self) -> list[ResourceOutcome]: + return [r for r in self.resources if r.status == "success"] + + @property + def skipped(self) -> list[ResourceOutcome]: + return [r for r in self.resources if r.status == "skipped"] + + @dataclass class ValidationResult: """Result of validation operations.""" @@ -65,6 +105,9 @@ class ValidationResult: errors: list[str] = field(default_factory=list) warnings: list[str] = field(default_factory=list) info: list[str] = field(default_factory=list) + tracebacks: list[str] = field(default_factory=list) + report: "BuildReport | None" = None + records: int | None = None @dataclass diff --git a/uv.lock b/uv.lock index f26b28f..927a688 100644 --- a/uv.lock +++ b/uv.lock @@ -571,6 +571,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/31/b4/b9b800c45527aadd64d5b442f9b932b00648617eb5d63d2c7a6587b7cafc/jmespath-1.0.1-py3-none-any.whl", hash = "sha256:02e2e4cc71b5bcab88332eebf907519190dd9e6e82107fa7f83b1003a6252980", size = 20256, upload-time = "2022-06-17T18:00:10.251Z" }, ] +[[package]] +name = "markdown-it-py" +version = "4.0.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "mdurl" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/5b/f5/4ec618ed16cc4f8fb3b701563655a69816155e79e24a17b651541804721d/markdown_it_py-4.0.0.tar.gz", hash = "sha256:cb0a2b4aa34f932c007117b194e945bd74e0ec24133ceb5bac59009cda1cb9f3", size = 73070, upload-time = "2025-08-11T12:57:52.854Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/94/54/e7d793b573f298e1c9013b8c4dade17d481164aa517d1d7148619c2cedbf/markdown_it_py-4.0.0-py3-none-any.whl", hash = "sha256:87327c59b172c5011896038353a81343b6754500a08cd7a4973bb48c6d578147", size = 87321, upload-time = "2025-08-11T12:57:51.923Z" }, +] + [[package]] name = "markupsafe" version = "3.0.2" @@ -609,6 +621,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/4f/65/6079a46068dfceaeabb5dcad6d674f5f5c61a6fa5673746f42a9f4c233b3/MarkupSafe-3.0.2-cp313-cp313t-win_amd64.whl", hash = "sha256:e444a31f8db13eb18ada366ab3cf45fd4b31e4db1236a4448f68778c1d1a5a2f", size = 15739, upload-time = "2024-10-18T15:21:42.784Z" }, ] +[[package]] +name = "mdurl" +version = "0.1.2" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/d6/54/cfe61301667036ec958cb99bd3efefba235e65cdeb9c84d24a8293ba1d90/mdurl-0.1.2.tar.gz", hash = "sha256:bb413d29f5eea38f31dd4754dd7377d4465116fb207585f97bf925588687c1ba", size = 8729, upload-time = "2022-08-14T12:40:10.846Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/b3/38/89ba8ad64ae25be8de66a6d463314cf1eb366222074cfda9ee839c56a4b4/mdurl-0.1.2-py3-none-any.whl", hash = "sha256:84008a41e51615a49fc9966191ff91509e3c40b939176e643fd50a5c2196b8f8", size = 9979, upload-time = "2022-08-14T12:40:09.779Z" }, +] + [[package]] name = "mergedeep" version = "1.3.4" @@ -899,6 +920,19 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/1e/db/4254e3eabe8020b458f1a747140d32277ec7a271daf1d235b70dc0b4e6e3/requests-2.32.5-py3-none-any.whl", hash = "sha256:2462f94637a34fd532264295e186976db0f5d453d1cdd31473c85a6a161affb6", size = 64738, upload-time = "2025-08-18T20:46:00.542Z" }, ] +[[package]] +name = "rich" +version = "15.0.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "markdown-it-py" }, + { name = "pygments" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/c0/8f/0722ca900cc807c13a6a0c696dacf35430f72e0ec571c4275d2371fca3e9/rich-15.0.0.tar.gz", hash = "sha256:edd07a4824c6b40189fb7ac9bc4c52536e9780fbbfbddf6f1e2502c31b068c36", size = 230680, upload-time = "2026-04-12T08:24:00.75Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/82/3b/64d4899d73f91ba49a8c18a8ff3f0ea8f1c1d75481760df8c68ef5235bf5/rich-15.0.0-py3-none-any.whl", hash = "sha256:33bd4ef74232fb73fe9279a257718407f169c09b78a87ad3d296f548e27de0bb", size = 310654, upload-time = "2026-04-12T08:24:02.83Z" }, +] + [[package]] name = "ruff" version = "0.12.8" @@ -1081,6 +1115,7 @@ dependencies = [ { name = "jinja2" }, { name = "python-dotenv" }, { name = "pyyaml" }, + { name = "rich" }, { name = "sqlite-utils" }, ] @@ -1098,6 +1133,7 @@ requires-dist = [ { name = "jinja2", specifier = ">=3.1.6" }, { name = "python-dotenv", specifier = ">=1.0.0" }, { name = "pyyaml", specifier = ">=6.0.2" }, + { name = "rich", specifier = ">=13.0" }, { name = "sqlite-utils", specifier = ">=3.38" }, { name = "tenacity", marker = "extra == 'data'", specifier = ">=8.2.0" }, ]