From 37111a75bda974ee031390ff7cdb7c7c41084e3f Mon Sep 17 00:00:00 2001 From: Anthony Volk Date: Tue, 12 May 2026 19:26:01 +0200 Subject: [PATCH 1/8] Add pipeline status error reporting --- .github/copilot-instructions.md | 3 + AGENTS.md | 3 + CLAUDE.md | 3 + .../pipeline-status-reporting.added.md | 1 + docs/engineering/skills/README.md | 2 + .../engineering/skills/pipeline_operations.md | 94 +++++ modal_app/pipeline.py | 189 ++++++++- modal_app/step_manifests/errors.py | 359 ++++++++++++++++++ modal_app/step_manifests/status.py | 206 ++++++++++ modal_app/step_manifests/store.py | 11 +- policyengine_us_data/utils/error_redaction.py | 94 +++++ policyengine_us_data/utils/step_manifest.py | 13 +- .../integration/test_modal_pipeline_seams.py | 60 +++ tests/unit/test_pipeline_source_contracts.py | 22 +- tests/unit/test_pipeline_status.py | 314 +++++++++++++++ 15 files changed, 1349 insertions(+), 25 deletions(-) create mode 100644 changelog.d/pipeline-status-reporting.added.md create mode 100644 docs/engineering/skills/pipeline_operations.md create mode 100644 modal_app/step_manifests/errors.py create mode 100644 modal_app/step_manifests/status.py create mode 100644 policyengine_us_data/utils/error_redaction.py create mode 100644 tests/unit/test_pipeline_status.py diff --git a/.github/copilot-instructions.md b/.github/copilot-instructions.md index 69f545da5..521626ee6 100644 --- a/.github/copilot-instructions.md +++ b/.github/copilot-instructions.md @@ -13,3 +13,6 @@ from branches in `PolicyEngine/policyengine-us-data`; never create fork PRs. For PRs that change pipeline behavior, stage boundaries, generated artifacts, or public library functions, read `docs/engineering/skills/documentation_review.md` during review. + +For deployed Modal pipeline run status or failure diagnosis, read +`docs/engineering/skills/pipeline_operations.md`. diff --git a/AGENTS.md b/AGENTS.md index bdc774017..e2714a388 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -17,6 +17,9 @@ When reviewing PRs that change pipeline behavior, stage boundaries, generated artifacts, or public library functions, read `docs/engineering/skills/documentation_review.md`. +When diagnosing a deployed Modal pipeline run or a failed publication pipeline, +read `docs/engineering/skills/pipeline_operations.md`. + ## GitHub PRs Read `docs/engineering/skills/github-prs.md` before opening, replacing, or diff --git a/CLAUDE.md b/CLAUDE.md index 536d56146..78c54b144 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -25,6 +25,9 @@ When reviewing PRs that change pipeline behavior, stage boundaries, generated artifacts, or public library functions, read `docs/engineering/skills/documentation_review.md`. +When diagnosing a deployed Modal pipeline run or a failed publication pipeline, +read `docs/engineering/skills/pipeline_operations.md`. + ## Safety boundaries Do not fabricate data, validation metrics, academic results, or performance diff --git a/changelog.d/pipeline-status-reporting.added.md b/changelog.d/pipeline-status-reporting.added.md new file mode 100644 index 000000000..93af8d375 --- /dev/null +++ b/changelog.d/pipeline-status-reporting.added.md @@ -0,0 +1 @@ +Add structured Modal pipeline status reporting with durable run-scoped error records. diff --git a/docs/engineering/skills/README.md b/docs/engineering/skills/README.md index eb80afaba..48b938772 100644 --- a/docs/engineering/skills/README.md +++ b/docs/engineering/skills/README.md @@ -16,5 +16,7 @@ Current skills: conventions. - `pipeline_docs.md`: decorator-backed pipeline map maintenance and generated pydoc-style artifacts. +- `pipeline_operations.md`: model-neutral workflow for diagnosing deployed Modal + pipeline status and durable error records. - `testing.md`: test layout, fixture scope, helper placement, and quality guard expectations. diff --git a/docs/engineering/skills/pipeline_operations.md b/docs/engineering/skills/pipeline_operations.md new file mode 100644 index 000000000..25cc4c6af --- /dev/null +++ b/docs/engineering/skills/pipeline_operations.md @@ -0,0 +1,94 @@ +# Pipeline Operations + +Use this skill when diagnosing a deployed Modal pipeline run, especially when a +GitHub Actions pipeline launch fails or a user asks for the status of a run. + +## Source Of Truth + +Treat the pipeline status endpoint and run-scoped error records as the first +diagnostic source. Modal dashboard logs are useful supporting evidence, but they +are not the durable error record for this repo. + +The status system reports: + +- the run-level manifest; +- all stage and substage manifests present for that run; +- missing expected runtime manifest IDs; +- the latest durable error record, when one exists; +- a redacted, bounded traceback when one exists. + +## Fetch Status + +First identify the run context from the GitHub Actions summary, workflow logs, or +run-context output: + +- `run_id` +- Modal app name +- Modal environment + +For agent or CLI diagnosis, call the deployed Modal function: + +```bash +uv run python - <<'PY' +import json +import modal + +app_name = "POLICYENGINE_US_DATA_MODAL_APP" +environment_name = "main" +run_id = "US_DATA_RUN_ID" + +fn = modal.Function.from_name( + app_name, + "get_pipeline_status", + environment_name=environment_name, +) +print(json.dumps(fn.remote(run_id), indent=2)) +PY +``` + +The status payload includes a traceback when one is available. Tracebacks are +redacted and bounded by keeping the newest text if they are very long. + +If the local environment cannot sync the full project environment, use the same +snippet with a Modal-only temporary environment by replacing `uv run python` +with `uv run --no-sync --with modal python`. + +If using the HTTP endpoint, authenticate with Modal proxy auth headers. Do not +publish or paste proxy auth values into PRs, issues, logs, or docs. + +```bash +curl \ + -H "Modal-Key: $MODAL_PROXY_TOKEN_ID" \ + -H "Modal-Secret: $MODAL_PROXY_TOKEN_SECRET" \ + "https://.modal.run?run_id=" +``` + +## Interpret Results + +Use `status` and `message` for the short answer. Then inspect: + +- `error.stage_id`: canonical top-level stage, such as `3_fit_weights`; +- `error.substage_id`: narrower substage, such as + `3a_weight_fitting_regional`; +- `error.record_path`: immutable error record path in the pipeline volume; +- `error.latest_path`: latest error pointer for the run; +- `stage_manifests[].manifest.error`: manifest-local failure details; +- `missing_expected_manifest_ids`: expected runtime manifests that have not yet + been written. + +When reporting back, name the failing stage and substage, summarize the exception +type and message, and cite whether the traceback came from the status endpoint or +from Modal dashboard logs. + +## Safety Rules + +- Do not paste tracebacks into PRs, issues, or chat unless the user needs that + detail. +- Redact secrets before sharing command output, even though the status endpoint + already applies obvious redaction. +- Do not infer that a missing later-stage manifest is a failure if the run is + still running. +- If the run was hard-killed before Python exception handling ran, the endpoint + may show a running run with no durable error. In that case, report the last + completed/running manifest and then use Modal dashboard logs as secondary + evidence. diff --git a/modal_app/pipeline.py b/modal_app/pipeline.py index 12b42d2c8..095436da9 100644 --- a/modal_app/pipeline.py +++ b/modal_app/pipeline.py @@ -52,6 +52,10 @@ from modal_app.images import cpu_image as image # noqa: E402 from modal_app.resilience import ensure_resume_sha_compatible # noqa: E402 +from modal_app.step_manifests.errors import ( # noqa: E402 + clear_latest_pipeline_error as _clear_latest_pipeline_error, + record_pipeline_error as _record_pipeline_error, +) from modal_app.step_manifests.specs import ( # noqa: E402 BUILD_CALIBRATION_PACKAGE, BUILD_DATASETS, @@ -87,7 +91,14 @@ step_reusable as _step_reusable, write_run_meta, ) +from modal_app.step_manifests.status import ( # noqa: E402 + build_pipeline_status_payload as _build_pipeline_status_payload, +) from policyengine_us_data.utils.run_context import RunContext, resolve_run_id # noqa: E402 +from policyengine_us_data.utils.error_redaction import ( # noqa: E402 + redacted_bounded_error_text, + redact_error_text, +) from policyengine_us_data.utils.step_manifest import ( # noqa: E402 ArtifactReference, ReuseMeasurement, @@ -125,6 +136,7 @@ ) REPO_URL = "https://github.com/PolicyEngine/policyengine-us-data.git" +status_image = image.pip_install("fastapi") def _python_cmd(*args: str) -> list[str]: @@ -158,6 +170,56 @@ def _calibration_package_parameters( return {key: value for key, value in params.items() if value is not None} +def _record_pipeline_failure( + exc: BaseException, + *, + run_id: str, + manifest: StepManifest | None, + meta: RunMetadata, + surface: str, + traceback_text: str, +) -> ArtifactReference | None: + """Write durable error details without masking the original exception.""" + + try: + error_write = _record_pipeline_error( + exc, + run_id=run_id, + manifest=manifest, + meta=meta, + surface=surface, + traceback_text=traceback_text, + vol=pipeline_volume, + ) + return error_write.record_ref + except Exception as record_exc: + print(f"WARNING: failed to write durable pipeline error record: {record_exc}") + return None + + +def _pipeline_error_summary( + exc: BaseException, + *, + traceback_ref: ArtifactReference | None = None, + traceback_text: str | None = None, +) -> str: + summary = redact_error_text(f"{type(exc).__name__}: {exc}") + if traceback_ref is None: + if traceback_text: + return redacted_bounded_error_text(f"{summary}\n{traceback_text}").text + return summary + return f"{summary}; traceback_ref={traceback_ref.path}" + + +def _clear_pipeline_error_pointer(run_id: str) -> None: + cleared = _clear_latest_pipeline_error(_run_dir(run_id), vol=pipeline_volume) + if not cleared: + print( + "WARNING: failed to clear durable pipeline error pointer " + f"for run {run_id}; continuing after successful work." + ) + + def get_pinned_sha(branch: str) -> str: """Get the current tip SHA for a branch from GitHub.""" result = subprocess.run( @@ -461,6 +523,8 @@ def verify_runtime_seams() -> dict: "modal_app/step_manifests/specs.py", "modal_app/step_manifests/state.py", "modal_app/step_manifests/store.py", + "modal_app/step_manifests/errors.py", + "modal_app/step_manifests/status.py", "modal_app/fixtures/h5_cases.py", "tests/integration/test_fixture_50hh.h5", "policyengine_us_data/calibration/target_config.yaml", @@ -502,6 +566,8 @@ def verify_runtime_seams() -> dict: "modal_app.step_manifests.specs", "modal_app.step_manifests.state", "modal_app.step_manifests.store", + "modal_app.step_manifests.errors", + "modal_app.step_manifests.status", "modal_app.worker_script", "numpy", "pandas", @@ -1635,6 +1701,8 @@ def run_pipeline( # ── Step 5: Finalize ── print("\n[Step 5/5] Finalizing run...") meta.status = "completed" + meta.error = None + _clear_pipeline_error_pointer(run_id) write_run_meta(meta, pipeline_volume) print("\n" + "=" * 60) @@ -1654,9 +1722,27 @@ def run_pipeline( return run_id except Exception as e: - _fail_step_manifest(active_step_manifest, e, pipeline_volume) + traceback_text = traceback.format_exc() + traceback_ref = _record_pipeline_failure( + e, + run_id=run_id, + manifest=active_step_manifest, + meta=meta, + surface="run_pipeline", + traceback_text=traceback_text, + ) + _fail_step_manifest( + active_step_manifest, + e, + pipeline_volume, + traceback_ref=traceback_ref, + ) meta.status = "failed" - meta.error = f"{type(e).__name__}: {e}\n{traceback.format_exc()}" + meta.error = _pipeline_error_summary( + e, + traceback_ref=traceback_ref, + traceback_text=traceback_text, + ) write_run_meta(meta, pipeline_volume) print(f"\nPIPELINE FAILED: {e}") print(f"Resume with: --resume-run-id {run_id}") @@ -1835,6 +1921,8 @@ def promote_run( # Update run status only after all required promotion work succeeds. meta.status = "promoted" + meta.error = None + _clear_pipeline_error_pointer(run_id) _complete_step_manifest( promote_manifest, outputs=[ @@ -1846,7 +1934,27 @@ def promote_run( ) write_run_meta(meta, pipeline_volume) except Exception as exc: - _fail_step_manifest(promote_manifest, exc, pipeline_volume) + traceback_text = traceback.format_exc() + traceback_ref = _record_pipeline_failure( + exc, + run_id=run_id, + manifest=promote_manifest, + meta=meta, + surface="promote_run", + traceback_text=traceback_text, + ) + _fail_step_manifest( + promote_manifest, + exc, + pipeline_volume, + traceback_ref=traceback_ref, + ) + meta.error = _pipeline_error_summary( + exc, + traceback_ref=traceback_ref, + traceback_text=traceback_text, + ) + write_run_meta(meta, pipeline_volume) raise print("\n" + "=" * 60) @@ -1861,6 +1969,39 @@ def promote_run( # ── Status ─────────────────────────────────────────────────────── +@app.function( + image=image, + timeout=60, + volumes={PIPELINE_MOUNT: pipeline_volume}, +) +def get_pipeline_status( + run_id: str, +) -> dict: + """Get structured JSON status for a pipeline run.""" + + pipeline_volume.reload() + return _build_pipeline_status_payload(run_id) + + +@app.function( + image=status_image, + timeout=60, + volumes={PIPELINE_MOUNT: pipeline_volume}, +) +@modal.fastapi_endpoint( + method="GET", + docs=False, + requires_proxy_auth=True, +) +def pipeline_status_endpoint( + run_id: str, +) -> dict: + """Protected HTTP endpoint for structured pipeline status.""" + + pipeline_volume.reload() + return _build_pipeline_status_payload(run_id) + + @app.function( image=image, timeout=60, @@ -1881,28 +2022,38 @@ def pipeline_status( return "No pipeline runs found." if run_id: - meta = read_run_meta(run_id, pipeline_volume) - steps_dir = _run_dir(run_id) / "steps" + payload = _build_pipeline_status_payload(run_id) + if payload["status"] == "not_found": + return payload["message"] + run_manifest = payload["run_manifest"] lines = [ - f"Run: {meta.run_id}", - f" Branch: {meta.branch}", - f" SHA: {meta.sha[:12]}", - f" Version: {meta.version}", - f" Status: {meta.status}", - f" Started: {meta.start_time}", + f"Run: {payload['run_id']}", + f" Branch: {run_manifest['branch']}", + f" SHA: {run_manifest['sha'][:12]}", + f" Version: {run_manifest['version']}", + f" Status: {payload['status']}", + f" Started: {run_manifest['started_at']}", ] - if meta.error: - lines.append(f" Error: {meta.error[:200]}") - if steps_dir.exists(): + if payload["error"]: + error = payload["error"] + lines.append( + f" Error: {error['error_type']}: {error.get('message', '')[:200]}" + ) + if error.get("record_path"): + lines.append(f" Error record: {error['record_path']}") + if payload["stage_manifests"]: lines.append(" Step manifests:") - for manifest_path in sorted(steps_dir.glob("*.json")): - manifest = read_step_manifest(manifest_path) + for item in payload["stage_manifests"]: + manifest = item["manifest"] duration = ( - manifest.duration_s if manifest.duration_s is not None else "?" + manifest["duration_s"] + if manifest.get("duration_s") is not None + else "?" ) - reuse = manifest.reuse_decision + reuse = manifest.get("reuse_decision", "not_applicable") lines.append( - f" {manifest.step_id}: {duration}s ({manifest.status}, {reuse})" + f" {manifest['step_id']}: {duration}s " + f"({manifest['status']}, {reuse})" ) return "\n".join(lines) diff --git a/modal_app/step_manifests/errors.py b/modal_app/step_manifests/errors.py new file mode 100644 index 000000000..d86211177 --- /dev/null +++ b/modal_app/step_manifests/errors.py @@ -0,0 +1,359 @@ +"""Durable error records for Modal pipeline runs.""" + +from __future__ import annotations + +import re +import traceback as traceback_module +from dataclasses import asdict, dataclass, replace +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, Mapping + +from policyengine_us_data.utils.canonical_json import ( + canonical_json_dumps, + canonical_json_loads, +) +from policyengine_us_data.utils.error_redaction import ( + DEFAULT_ERROR_TEXT_MAX_CHARS, + bound_error_text, + redact_error_text, +) +from policyengine_us_data.utils.step_manifest import ArtifactReference, StepManifest +from policyengine_us_data.stage_contracts.stages import ( + CANONICAL_STAGE_IDS, + SUBSTAGE_IDS_BY_STAGE_ID, + is_canonical_stage_id, + is_canonical_substage_id, +) + +from modal_app.step_manifests import state as pipeline_state +from modal_app.step_manifests.specs import parent_step_id + +PIPELINE_ERROR_RECORD_SCHEMA_VERSION = "1" +ERRORS_DIR_NAME = "errors" +LATEST_ERROR_FILENAME = "latest_error.json" + + +def _drop_none(value: Any) -> Any: + if isinstance(value, dict): + return { + key: _drop_none(item) for key, item in value.items() if item is not None + } + if isinstance(value, list): + return [_drop_none(item) for item in value] + return value + + +def stage_ids_for_manifest( + manifest: StepManifest | None, +) -> tuple[str | None, str | None]: + """Return public stage/substage IDs for an internal step manifest.""" + + if manifest is None: + return None, None + if is_canonical_stage_id(manifest.step_id): + return manifest.step_id, None + explicit_parent = manifest.parent_step_id or parent_step_id(manifest.step_id) + inferred_parent = ( + explicit_parent + if explicit_parent + and is_canonical_substage_id(explicit_parent, manifest.step_id) + else _canonical_stage_for_substage(manifest.step_id) + ) + if inferred_parent and is_canonical_substage_id(inferred_parent, manifest.step_id): + return inferred_parent, manifest.step_id + return None, None + + +def _canonical_stage_for_substage(substage_id: str) -> str | None: + for stage_id, substage_ids in SUBSTAGE_IDS_BY_STAGE_ID.items(): + if substage_id in substage_ids: + return stage_id + return None + + +def _validate_explicit_stage_ids( + *, + stage_id: str | None, + substage_id: str | None, +) -> None: + if stage_id is None: + if substage_id is not None: + raise ValueError("substage_id cannot be set without stage_id") + return + if stage_id not in CANONICAL_STAGE_IDS: + raise ValueError(f"Invalid canonical stage_id: {stage_id!r}") + if substage_id is not None and not is_canonical_substage_id(stage_id, substage_id): + raise ValueError( + f"substage_id {substage_id!r} does not belong to stage_id {stage_id!r}" + ) + + +@dataclass(frozen=True) +class PipelineErrorRecord: + """Durable traceback record for one pipeline failure surface.""" + + run_id: str + stage_id: str | None + substage_id: str | None + surface: str + occurred_at: str + error_type: str + message: str + traceback: str + branch: str | None = None + sha: str | None = None + version: str | None = None + modal_app_name: str | None = None + modal_environment: str | None = None + modal_call_id: str | None = None + record_path: str | None = None + latest_path: str | None = None + traceback_format: str = "python" + schema_version: str = PIPELINE_ERROR_RECORD_SCHEMA_VERSION + + def to_dict( + self, + *, + traceback_max_chars: int | None = DEFAULT_ERROR_TEXT_MAX_CHARS, + ) -> dict[str, Any]: + payload = asdict(self) + bounded_traceback = bound_error_text( + self.traceback, + max_chars=traceback_max_chars, + ) + payload["traceback"] = bounded_traceback.text + payload["traceback_available"] = bool(self.traceback) + payload["traceback_truncated"] = bounded_traceback.truncated + if bounded_traceback.truncated: + payload["traceback_omitted_chars"] = bounded_traceback.omitted_chars + return _drop_none(payload) + + @classmethod + def from_dict(cls, data: Mapping[str, Any]) -> "PipelineErrorRecord": + stage_id = data.get("stage_id") + substage_id = data.get("substage_id") + resolved_stage_id = str(stage_id) if stage_id is not None else None + resolved_substage_id = str(substage_id) if substage_id is not None else None + _validate_explicit_stage_ids( + stage_id=resolved_stage_id, + substage_id=resolved_substage_id, + ) + return cls( + schema_version=str( + data.get("schema_version", PIPELINE_ERROR_RECORD_SCHEMA_VERSION) + ), + run_id=str(data["run_id"]), + stage_id=resolved_stage_id, + substage_id=resolved_substage_id, + surface=str(data.get("surface", "pipeline")), + occurred_at=str(data["occurred_at"]), + error_type=str(data["error_type"]), + message=str(data["message"]), + traceback=str(data.get("traceback", "")), + branch=data.get("branch"), + sha=data.get("sha"), + version=data.get("version"), + modal_app_name=data.get("modal_app_name"), + modal_environment=data.get("modal_environment"), + modal_call_id=data.get("modal_call_id"), + record_path=data.get("record_path"), + latest_path=data.get("latest_path"), + traceback_format=str(data.get("traceback_format", "python")), + ) + + +@dataclass(frozen=True) +class PipelineErrorWriteResult: + """File references written for one durable pipeline error record.""" + + record: PipelineErrorRecord + record_ref: ArtifactReference + latest_ref: ArtifactReference + + +def build_pipeline_error_record( + exc: BaseException, + *, + run_id: str, + manifest: StepManifest | None = None, + meta: Any | None = None, + stage_id: str | None = None, + substage_id: str | None = None, + surface: str = "pipeline", + traceback_text: str | None = None, + occurred_at: str | None = None, + env: Mapping[str, str] | None = None, +) -> PipelineErrorRecord: + """Build a redacted error record from an exception and run context.""" + + inferred_stage_id, inferred_substage_id = stage_ids_for_manifest(manifest) + resolved_stage_id = stage_id or inferred_stage_id + resolved_substage_id = ( + substage_id if substage_id is not None else inferred_substage_id + ) + _validate_explicit_stage_ids( + stage_id=resolved_stage_id, + substage_id=resolved_substage_id, + ) + trace = traceback_text + if trace is None: + trace = "".join( + traceback_module.format_exception(type(exc), exc, exc.__traceback__) + ) + return PipelineErrorRecord( + run_id=run_id, + stage_id=resolved_stage_id, + substage_id=resolved_substage_id, + surface=surface, + occurred_at=occurred_at or datetime.now(timezone.utc).isoformat(), + error_type=type(exc).__name__, + message=redact_error_text(str(exc), env=env), + traceback=redact_error_text(trace, env=env), + branch=getattr(meta, "branch", None) or getattr(manifest, "branch", None), + sha=getattr(meta, "sha", None) or getattr(manifest, "sha", None), + version=getattr(meta, "version", None) or getattr(manifest, "version", None), + modal_app_name=getattr(meta, "modal_app_name", None) + or getattr(manifest, "modal_app_name", None), + modal_environment=getattr(meta, "modal_environment", None) + or getattr(manifest, "modal_environment", None), + modal_call_id=getattr(manifest, "modal_call_id", None), + ) + + +def error_records_dir(run_dir: str | Path) -> Path: + return Path(run_dir) / ERRORS_DIR_NAME + + +def latest_error_record_path(run_dir: str | Path) -> Path: + return error_records_dir(run_dir) / LATEST_ERROR_FILENAME + + +def _safe_filename(value: str) -> str: + safe = re.sub(r"[^A-Za-z0-9_.-]+", "-", value).strip("-") + return safe or "pipeline" + + +def _timestamp_for_filename(value: str) -> str: + parsed = datetime.fromisoformat(value.replace("Z", "+00:00")) + return parsed.astimezone(timezone.utc).strftime("%Y%m%dT%H%M%S%fZ") + + +def _volume_root_for_run_dir(run_dir: Path) -> Path: + try: + return run_dir.parent.parent + except IndexError: + return Path(".") + + +def _relative_path(path: Path, root: Path) -> str: + try: + return str(path.relative_to(root)) + except ValueError: + return str(path) + + +def write_pipeline_error_record( + record: PipelineErrorRecord, + *, + run_dir: str | Path, + volume_root: str | Path | None = None, +) -> PipelineErrorWriteResult: + """Write timestamped and latest error records for a run.""" + + resolved_run_dir = Path(run_dir) + root = ( + Path(volume_root) + if volume_root is not None + else _volume_root_for_run_dir(resolved_run_dir) + ) + errors_dir = error_records_dir(resolved_run_dir) + errors_dir.mkdir(parents=True, exist_ok=True) + + stage_label = record.substage_id or record.stage_id or record.surface + filename = f"{_timestamp_for_filename(record.occurred_at)}-{_safe_filename(stage_label)}.json" + record_path = errors_dir / filename + latest_path = errors_dir / LATEST_ERROR_FILENAME + record_with_paths = replace( + record, + record_path=_relative_path(record_path, root), + latest_path=_relative_path(latest_path, root), + ) + payload = canonical_json_dumps(record_with_paths.to_dict(traceback_max_chars=None)) + record_path.write_text(payload) + latest_path.write_text(payload) + return PipelineErrorWriteResult( + record=record_with_paths, + record_ref=ArtifactReference.from_path( + record_path, + role="error", + base_dir=root, + media_type="application/json", + ), + latest_ref=ArtifactReference.from_path( + latest_path, + role="error", + base_dir=root, + media_type="application/json", + ), + ) + + +def read_latest_pipeline_error( + run_dir: str | Path, +) -> PipelineErrorRecord | None: + path = latest_error_record_path(run_dir) + if not path.exists(): + return None + return PipelineErrorRecord.from_dict(canonical_json_loads(path.read_text())) + + +def clear_latest_pipeline_error( + run_dir: str | Path, + *, + vol: Any | None = None, + strict: bool = False, +) -> bool: + """Clear the mutable latest-error pointer after a successful retry.""" + + try: + path = latest_error_record_path(run_dir) + if path.exists(): + path.unlink() + if vol is not None: + vol.commit() + return True + except Exception: + if strict: + raise + return False + + +def record_pipeline_error( + exc: BaseException, + *, + run_id: str, + manifest: StepManifest | None = None, + meta: Any | None = None, + surface: str = "pipeline", + traceback_text: str | None = None, + vol: Any | None = None, +) -> PipelineErrorWriteResult: + """Persist an exception to the run error ledger and optionally commit volume.""" + + record = build_pipeline_error_record( + exc, + run_id=run_id, + manifest=manifest, + meta=meta, + surface=surface, + traceback_text=traceback_text, + ) + result = write_pipeline_error_record( + record, + run_dir=pipeline_state.run_dir(run_id), + volume_root=pipeline_state.PIPELINE_MOUNT, + ) + if vol is not None: + vol.commit() + return result diff --git a/modal_app/step_manifests/status.py b/modal_app/step_manifests/status.py new file mode 100644 index 000000000..089e87f68 --- /dev/null +++ b/modal_app/step_manifests/status.py @@ -0,0 +1,206 @@ +"""Structured status payloads for Modal pipeline runs.""" + +from __future__ import annotations + +from pathlib import Path +from typing import Any + +from policyengine_us_data.utils.step_manifest import ( + read_run_manifest, + read_step_manifest, + run_manifest_path, + step_manifest_dir, +) +from policyengine_us_data.utils.error_redaction import ( + bound_error_text, + redacted_bounded_error_text, + redact_error_text, +) + +from modal_app.step_manifests import state as pipeline_state +from modal_app.step_manifests.errors import ( + PipelineErrorRecord, + read_latest_pipeline_error, + stage_ids_for_manifest, +) +from modal_app.step_manifests.specs import RUN_MANIFEST_STEP_IDS, step_title + +PIPELINE_STATUS_SCHEMA_VERSION = "1" + + +def _run_dir(run_id: str, runs_dir: str | Path | None = None) -> Path: + return ( + Path(runs_dir) if runs_dir is not None else Path(pipeline_state.RUNS_DIR) + ) / run_id + + +def _step_sort_key(step_id: str, expected_ids: list[str]) -> tuple[int, str]: + try: + return expected_ids.index(step_id), step_id + except ValueError: + return len(expected_ids), step_id + + +def _error_payload( + error_record: PipelineErrorRecord | None, +) -> dict[str, Any] | None: + if error_record is None: + return None + return error_record.to_dict() + + +def _legacy_error_payload(error_text: str | None) -> dict[str, Any] | None: + if not error_text: + return None + redacted = redact_error_text(error_text) + bounded = bound_error_text(redacted) + first_line = redacted.splitlines()[0] if redacted else "Pipeline error" + error_type = "RuntimeError" + message = first_line + if ":" in first_line: + maybe_type, maybe_message = first_line.split(":", 1) + if maybe_type.strip(): + error_type = maybe_type.strip() + message = maybe_message.strip() or first_line + payload = { + "source": "run_manifest.error", + "surface": "run_manifest", + "stage_id": None, + "substage_id": None, + "error_type": error_type, + "message": message, + "traceback": bounded.text, + "traceback_available": bool(bounded.text), + "traceback_truncated": bounded.truncated, + } + if bounded.truncated: + payload["traceback_omitted_chars"] = bounded.omitted_chars + return payload + + +def _message( + *, + status: str, + stage_manifests: list[dict[str, Any]], + error: dict[str, Any] | None, +) -> str: + if error: + location = ( + error.get("substage_id") or error.get("stage_id") or error.get("surface") + ) + detail = error.get("message") or error.get("error_type") or "unknown error" + if status == "failed": + return f"Pipeline failed in {location}: {detail}" + return f"Pipeline has a recorded error in {location}: {detail}" + if status == "not_found": + return "Pipeline run not found." + if stage_manifests: + latest = stage_manifests[-1] + return ( + f"Pipeline {status}; latest manifest " + f"{latest['substage_id'] or latest['stage_id']} is {latest['status']}." + ) + return f"Pipeline {status}." + + +def _sanitize_error_value(value: Any) -> Any: + if isinstance(value, str): + return redacted_bounded_error_text(value).text + if isinstance(value, dict): + return {key: _sanitize_error_value(item) for key, item in value.items()} + if isinstance(value, list): + return [_sanitize_error_value(item) for item in value] + return value + + +def _run_manifest_payload(run_manifest) -> dict[str, Any]: + payload = run_manifest.to_dict() + if payload.get("error"): + payload["error"] = redacted_bounded_error_text(payload["error"]).text + return payload + + +def _manifest_payload(manifest) -> dict[str, Any]: + stage_id, substage_id = stage_ids_for_manifest(manifest) + manifest_payload = manifest.to_dict() + if manifest_payload.get("error"): + manifest_payload["error"] = _sanitize_error_value(manifest_payload["error"]) + return { + "stage_id": stage_id, + "substage_id": substage_id, + "step_id": manifest.step_id, + "title": step_title(manifest.step_id), + "status": manifest.status, + "manifest": manifest_payload, + } + + +def build_pipeline_status_payload( + run_id: str, + *, + runs_dir: str | Path | None = None, +) -> dict[str, Any]: + """Build a JSON-serializable status payload for a pipeline run.""" + + if not run_id: + return { + "schema_version": PIPELINE_STATUS_SCHEMA_VERSION, + "run_id": run_id, + "status": "invalid_request", + "message": "run_id is required.", + "run_manifest": None, + "stage_manifests": [], + "missing_expected_manifest_ids": [], + "error": None, + } + + run_dir = _run_dir(run_id, runs_dir) + manifest_path = run_manifest_path(run_dir) + if not manifest_path.exists(): + return { + "schema_version": PIPELINE_STATUS_SCHEMA_VERSION, + "run_id": run_id, + "status": "not_found", + "message": f"Pipeline run {run_id} not found.", + "run_manifest": None, + "stage_manifests": [], + "missing_expected_manifest_ids": list(RUN_MANIFEST_STEP_IDS), + "error": None, + } + + run_manifest = read_run_manifest(manifest_path) + expected_ids = list(run_manifest.known_step_ids or RUN_MANIFEST_STEP_IDS) + manifests = [] + steps_dir = step_manifest_dir(run_dir) + if steps_dir.exists(): + manifests = [ + read_step_manifest(path) for path in sorted(steps_dir.glob("*.json")) + ] + manifests.sort(key=lambda manifest: _step_sort_key(manifest.step_id, expected_ids)) + stage_manifests = [_manifest_payload(manifest) for manifest in manifests] + present_ids = {manifest.step_id for manifest in manifests} + missing_expected = [ + step_id for step_id in expected_ids if step_id not in present_ids + ] + error = _error_payload( + read_latest_pipeline_error(run_dir), + ) or _legacy_error_payload(run_manifest.error) + status = run_manifest.status + return { + "schema_version": PIPELINE_STATUS_SCHEMA_VERSION, + "run_id": run_id, + "status": status, + "message": _message( + status=status, + stage_manifests=stage_manifests, + error=error, + ), + "run_manifest": _run_manifest_payload(run_manifest), + "stage_manifests": stage_manifests, + "missing_expected_manifest_ids": missing_expected, + "error": error, + "updated_at": run_manifest.updated_at, + "modal_app_name": run_manifest.modal_app_name, + "modal_environment": run_manifest.modal_environment, + "pipeline_volume_name": run_manifest.run_context.get("pipeline_volume_name"), + } diff --git a/modal_app/step_manifests/store.py b/modal_app/step_manifests/store.py index a7b82b6ad..d8b7b21de 100644 --- a/modal_app/step_manifests/store.py +++ b/modal_app/step_manifests/store.py @@ -175,10 +175,19 @@ def fail_step_manifest( manifest: StepManifest | None, exc: BaseException, vol: Any, + *, + traceback_ref: ArtifactReference | dict[str, Any] | None = None, ) -> None: if manifest is None: return - failed = manifest.fail(exc) + error_details = {} + if traceback_ref is not None: + error_details["traceback_ref"] = ( + traceback_ref.to_dict() + if isinstance(traceback_ref, ArtifactReference) + else dict(traceback_ref) + ) + failed = manifest.fail(exc, error_details=error_details) write_step_manifest( step_manifest_path(run_dir(failed.run_id), failed.step_id), failed ) diff --git a/policyengine_us_data/utils/error_redaction.py b/policyengine_us_data/utils/error_redaction.py new file mode 100644 index 000000000..a8a2cc1b9 --- /dev/null +++ b/policyengine_us_data/utils/error_redaction.py @@ -0,0 +1,94 @@ +"""Helpers for redacting and bounding error text.""" + +from __future__ import annotations + +import os +import re +from dataclasses import dataclass +from typing import Mapping + +DEFAULT_ERROR_TEXT_MAX_CHARS = 24_000 + +_SECRET_KEY_MARKERS = ( + "TOKEN", + "SECRET", + "PASSWORD", + "CREDENTIAL", + "PRIVATE_KEY", + "API_KEY", + "ACCESS_KEY", +) +_SECRET_ASSIGNMENT_RE = re.compile( + r"(?i)\b([A-Z0-9_]*(?:TOKEN|SECRET|PASSWORD|CREDENTIAL|PRIVATE_KEY|" + r"API_KEY|ACCESS_KEY)[A-Z0-9_]*)\s*=\s*([^\s,;]+)" +) + + +@dataclass(frozen=True) +class BoundedErrorText: + """Error text bounded for status payloads and logs.""" + + text: str + truncated: bool + omitted_chars: int + + +def _is_secret_key(key: str) -> bool: + upper = key.upper() + return any(marker in upper for marker in _SECRET_KEY_MARKERS) + + +def redact_error_text(text: str | None, *, env: Mapping[str, str] | None = None) -> str: + """Redact obvious secret values from error text.""" + + redacted = text or "" + source_env = env or os.environ + for key, value in source_env.items(): + if not value or len(value) < 8 or not _is_secret_key(key): + continue + redacted = redacted.replace(value, f"") + return _SECRET_ASSIGNMENT_RE.sub(r"\1=", redacted) + + +def bound_error_text( + text: str | None, + *, + max_chars: int | None = DEFAULT_ERROR_TEXT_MAX_CHARS, +) -> BoundedErrorText: + """Keep the newest error text when a traceback is too long.""" + + value = text or "" + if max_chars is None or len(value) <= max_chars: + return BoundedErrorText(text=value, truncated=False, omitted_chars=0) + if max_chars <= 0: + return BoundedErrorText(text="", truncated=True, omitted_chars=len(value)) + + marker = "\n[truncated older error text; omitted {omitted} chars]\n" + omitted = len(value) - max_chars + rendered_marker = marker.format(omitted=omitted) + if len(rendered_marker) >= max_chars: + return BoundedErrorText( + text=value[-max_chars:], + truncated=True, + omitted_chars=omitted, + ) + + tail_chars = max_chars - len(rendered_marker) + omitted = len(value) - tail_chars + rendered_marker = marker.format(omitted=omitted) + return BoundedErrorText( + text=f"{rendered_marker}{value[-tail_chars:]}", + truncated=True, + omitted_chars=omitted, + ) + + +def redacted_bounded_error_text( + text: str | None, + *, + env: Mapping[str, str] | None = None, + max_chars: int | None = DEFAULT_ERROR_TEXT_MAX_CHARS, +) -> BoundedErrorText: + """Redact error text, then bound it by keeping the newest content.""" + + return bound_error_text(redact_error_text(text, env=env), max_chars=max_chars) diff --git a/policyengine_us_data/utils/step_manifest.py b/policyengine_us_data/utils/step_manifest.py index a9c83d04e..5870a7f6d 100644 --- a/policyengine_us_data/utils/step_manifest.py +++ b/policyengine_us_data/utils/step_manifest.py @@ -17,6 +17,7 @@ from policyengine_us_data.utils.canonical_json import ( canonical_json_dumps as _canonical_json_dumps, ) +from policyengine_us_data.utils.error_redaction import redact_error_text STEP_MANIFEST_SCHEMA_VERSION = "1" @@ -261,10 +262,17 @@ def fail( exc: BaseException, *, completed_at: str | None = None, + error_details: Mapping[str, Any] | None = None, ) -> "StepManifest": completed = completed_at or utc_now() started = datetime.fromisoformat(self.started_at) ended = datetime.fromisoformat(completed) + error = { + "type": type(exc).__name__, + "message": redact_error_text(str(exc)), + } + if error_details: + error.update(dict(error_details)) return StepManifest( run_id=self.run_id, step_id=self.step_id, @@ -290,10 +298,7 @@ def fail( reuse_decision="failed", reuse_reason="step_failed", reuse_measurement=self.reuse_measurement, - error={ - "type": type(exc).__name__, - "message": str(exc), - }, + error=error, schema_version=self.schema_version, ) diff --git a/tests/integration/test_modal_pipeline_seams.py b/tests/integration/test_modal_pipeline_seams.py index 597d033c7..5906c5f42 100644 --- a/tests/integration/test_modal_pipeline_seams.py +++ b/tests/integration/test_modal_pipeline_seams.py @@ -9,6 +9,7 @@ import os import pytest +import requests modal = pytest.importorskip("modal") @@ -23,6 +24,19 @@ def _require_modal_tokens() -> None: pytest.skip("Modal credentials are required for deployed-image seam tests") +def _modal_proxy_auth_headers() -> dict[str, str]: + key = os.environ.get("MODAL_PROXY_TOKEN_ID") or os.environ.get("MODAL_TOKEN_ID") + secret = os.environ.get("MODAL_PROXY_TOKEN_SECRET") or os.environ.get( + "MODAL_TOKEN_SECRET" + ) + if not (key and secret): + pytest.skip("Modal proxy auth credentials are required for HTTP seam tests") + return { + "Modal-Key": key, + "Modal-Secret": secret, + } + + def test_pipeline_image_runtime_seams(): _require_modal_tokens() @@ -46,6 +60,8 @@ def test_pipeline_image_runtime_seams(): "modal_app/step_manifests/specs.py": True, "modal_app/step_manifests/state.py": True, "modal_app/step_manifests/store.py": True, + "modal_app/step_manifests/errors.py": True, + "modal_app/step_manifests/status.py": True, "modal_app/fixtures/h5_cases.py": True, "tests/integration/test_fixture_50hh.h5": True, "policyengine_us_data/calibration/target_config.yaml": True, @@ -66,6 +82,8 @@ def test_pipeline_image_runtime_seams(): "modal_app.step_manifests.specs", "modal_app.step_manifests.state", "modal_app.step_manifests.store", + "modal_app.step_manifests.errors", + "modal_app.step_manifests.status", "numpy", "policyengine_us", "policyengine_us_data", @@ -87,3 +105,45 @@ def test_pipeline_image_runtime_seams(): assert checkpoint_policy["runner_exposes_checkpoint_name"] is False assert checkpoint_policy["runner_passes_checkpoint_output"] is False assert checkpoint_policy["runner_collects_checkpoint_path"] is False + + +def test_pipeline_status_callable_reports_missing_run(): + _require_modal_tokens() + + fn = modal.Function.from_name( + APP_NAME, + "get_pipeline_status", + environment_name=MODAL_ENVIRONMENT, + ) + result = fn.remote("missing-run-for-status-seam") + + assert result["status"] == "not_found" + assert result["run_id"] == "missing-run-for-status-seam" + assert result["stage_manifests"] == [] + + +def test_pipeline_status_http_endpoint_reports_missing_run(): + _require_modal_tokens() + headers = _modal_proxy_auth_headers() + + fn = modal.Function.from_name( + APP_NAME, + "pipeline_status_endpoint", + environment_name=MODAL_ENVIRONMENT, + ) + endpoint = fn.get_web_url() + assert endpoint + + response = requests.get( + endpoint, + params={"run_id": "missing-run-for-status-http-seam"}, + headers=headers, + timeout=30, + ) + + assert response.status_code == 200, response.text[:500] + result = response.json() + assert result["status"] == "not_found" + assert result["run_id"] == "missing-run-for-status-http-seam" + assert result["stage_manifests"] == [] + assert result["error"] is None diff --git a/tests/unit/test_pipeline_source_contracts.py b/tests/unit/test_pipeline_source_contracts.py index 89bd368e4..1311e8d8d 100644 --- a/tests/unit/test_pipeline_source_contracts.py +++ b/tests/unit/test_pipeline_source_contracts.py @@ -16,6 +16,10 @@ def _function_def(tree: ast.Module, name: str) -> ast.FunctionDef: raise AssertionError(f"Could not find function {name}") +def _name(node: ast.AST) -> str | None: + return node.id if isinstance(node, ast.Name) else None + + def test_promote_run_uses_single_full_release_promotion() -> None: tree = ast.parse(PIPELINE_SOURCE.read_text()) promote_run = _function_def(tree, "promote_run") @@ -56,8 +60,24 @@ def test_promote_run_fails_closed_for_required_promotion_steps() -> None: tree = ast.parse(PIPELINE_SOURCE.read_text()) promote_run = _function_def(tree, "promote_run") source = ast.get_source_segment(PIPELINE_SOURCE.read_text(), promote_run) + fail_step_calls = [ + node + for node in ast.walk(promote_run) + if isinstance(node, ast.Call) + and isinstance(node.func, ast.Name) + and node.func.id == "_fail_step_manifest" + ] - assert "_fail_step_manifest(promote_manifest, exc, pipeline_volume)" in source + assert any( + [_name(arg) for arg in call.args[:3]] + == ["promote_manifest", "exc", "pipeline_volume"] + for call in fail_step_calls + ) + assert any( + any(keyword.arg == "traceback_ref" for keyword in call.keywords) + for call in fail_step_calls + ) + assert "meta.error =" in source assert "WARNING: Base dataset promotion" not in source assert "WARNING: Regional promote" not in source assert "WARNING: National promote" not in source diff --git a/tests/unit/test_pipeline_status.py b/tests/unit/test_pipeline_status.py new file mode 100644 index 000000000..01d1e8d6c --- /dev/null +++ b/tests/unit/test_pipeline_status.py @@ -0,0 +1,314 @@ +from unittest.mock import MagicMock, patch + +import pytest + +from modal_app.step_manifests.errors import ( + PipelineErrorRecord, + build_pipeline_error_record, + clear_latest_pipeline_error, + read_latest_pipeline_error, + write_pipeline_error_record, +) +from modal_app.step_manifests.specs import BUILD_DATASETS, WEIGHT_FITTING_REGIONAL +from modal_app.step_manifests.status import build_pipeline_status_payload +from modal_app.step_manifests.store import fail_step_manifest +from policyengine_us_data.utils.step_manifest import ( + RunManifest, + StepManifest, + read_step_manifest, + run_manifest_path, + step_manifest_path, + write_run_manifest, + write_step_manifest, +) + + +def _manifest(step_id: str, *, parent_step_id: str | None = None) -> StepManifest: + return StepManifest( + run_id="run-1", + step_id=step_id, + parent_step_id=parent_step_id, + status="running", + attempt=1, + started_at="2026-05-12T12:00:00+00:00", + branch="main", + sha="abc123", + version="1.0.0", + modal_app_name="policyengine-us-data-pipeline-run-1", + modal_environment="main", + ) + + +def test_pipeline_error_record_uses_stage_and_substage_ids(tmp_path): + exc = RuntimeError("failed with secret-value and API_TOKEN=secret-value") + manifest = _manifest( + WEIGHT_FITTING_REGIONAL.id, + parent_step_id=WEIGHT_FITTING_REGIONAL.parent_id, + ) + + record = build_pipeline_error_record( + exc, + run_id="run-1", + manifest=manifest, + surface="run_pipeline", + traceback_text="Traceback contains secret-value", + occurred_at="2026-05-12T12:00:00+00:00", + env={"API_TOKEN": "secret-value"}, + ) + result = write_pipeline_error_record(record, run_dir=tmp_path / "runs" / "run-1") + + latest = read_latest_pipeline_error(tmp_path / "runs" / "run-1") + + assert result.record.stage_id == "3_fit_weights" + assert result.record.substage_id == "3a_weight_fitting_regional" + assert result.record.record_path.endswith("3a_weight_fitting_regional.json") + assert result.record_ref.role == "error" + assert latest == result.record + assert "secret-value" not in result.record.message + assert "secret-value" not in result.record.traceback + assert result.record.message == ( + "failed with and API_TOKEN=" + ) + + +def test_pipeline_error_record_infers_canonical_stage_without_manifest_parent(): + manifest = _manifest(WEIGHT_FITTING_REGIONAL.id) + + record = build_pipeline_error_record( + RuntimeError("fit failed"), + run_id="run-1", + manifest=manifest, + traceback_text="traceback", + ) + + assert record.stage_id == "3_fit_weights" + assert record.substage_id == "3a_weight_fitting_regional" + + +def test_pipeline_error_record_rejects_invalid_explicit_stage_pair(): + with pytest.raises(ValueError, match="does not belong"): + build_pipeline_error_record( + RuntimeError("bad stage"), + run_id="run-1", + stage_id="2_build_calibration_package", + substage_id="3a_weight_fitting_regional", + traceback_text="traceback", + ) + + +def test_pipeline_error_record_rejects_invalid_stored_stage_pair(): + with pytest.raises(ValueError, match="does not belong"): + PipelineErrorRecord.from_dict( + { + "run_id": "run-1", + "stage_id": "2_build_calibration_package", + "substage_id": "3a_weight_fitting_regional", + "surface": "run_pipeline", + "occurred_at": "2026-05-12T12:00:00+00:00", + "error_type": "RuntimeError", + "message": "bad stage", + "traceback": "traceback", + } + ) + + +def test_failed_step_manifest_can_reference_durable_error_record(tmp_path): + manifest = _manifest(BUILD_DATASETS.id) + ref = { + "path": "runs/run-1/errors/error.json", + "size_bytes": 10, + "sha256": "abc", + "role": "error", + "media_type": "application/json", + } + volume = MagicMock() + runs_dir = tmp_path / "runs" + + with patch("modal_app.step_manifests.state.RUNS_DIR", str(runs_dir)): + fail_step_manifest(manifest, RuntimeError("boom"), volume, traceback_ref=ref) + + failed = read_step_manifest( + step_manifest_path(runs_dir / "run-1", BUILD_DATASETS.id) + ) + assert failed.status == "failed" + assert failed.error == { + "type": "RuntimeError", + "message": "boom", + "traceback_ref": ref, + } + volume.commit.assert_called_once() + + +def test_failed_step_manifest_redacts_error_message(tmp_path, monkeypatch): + monkeypatch.setenv("API_TOKEN", "secret-value") + manifest = _manifest(BUILD_DATASETS.id) + volume = MagicMock() + runs_dir = tmp_path / "runs" + + with patch("modal_app.step_manifests.state.RUNS_DIR", str(runs_dir)): + fail_step_manifest( + manifest, + RuntimeError("failed with secret-value and API_TOKEN=secret-value"), + volume, + ) + + failed = read_step_manifest( + step_manifest_path(runs_dir / "run-1", BUILD_DATASETS.id) + ) + assert failed.error["message"] == ( + "failed with and API_TOKEN=" + ) + + +def test_status_payload_orders_manifests_and_includes_bounded_traceback( + tmp_path, + monkeypatch, +): + monkeypatch.setenv("API_TOKEN", "secret-value") + runs_dir = tmp_path / "runs" + run_dir = runs_dir / "run-1" + write_run_manifest( + run_manifest_path(run_dir), + RunManifest( + run_id="run-1", + branch="main", + sha="abc123", + version="1.0.0", + status="failed", + started_at="2026-05-12T12:00:00+00:00", + known_step_ids=[ + BUILD_DATASETS.id, + WEIGHT_FITTING_REGIONAL.id, + "4a_local_area_h5_regional", + ], + run_context={"pipeline_volume_name": "pipeline-artifacts-run-1"}, + modal_app_name="policyengine-us-data-pipeline-run-1", + modal_environment="main", + error="RuntimeError: raw secret-value\nTraceback contains secret-value", + ), + ) + write_step_manifest( + step_manifest_path(run_dir, WEIGHT_FITTING_REGIONAL.id), + _manifest( + WEIGHT_FITTING_REGIONAL.id, + parent_step_id=WEIGHT_FITTING_REGIONAL.parent_id, + ).fail(RuntimeError("fit failed")), + ) + write_step_manifest( + step_manifest_path(run_dir, BUILD_DATASETS.id), + _manifest(BUILD_DATASETS.id).complete(), + ) + error_record = build_pipeline_error_record( + RuntimeError("fit failed with secret-value"), + run_id="run-1", + manifest=_manifest( + WEIGHT_FITTING_REGIONAL.id, + parent_step_id=WEIGHT_FITTING_REGIONAL.parent_id, + ), + traceback_text="full traceback with secret-value", + occurred_at="2026-05-12T12:00:01+00:00", + ) + write_pipeline_error_record(error_record, run_dir=run_dir, volume_root=tmp_path) + + payload = build_pipeline_status_payload("run-1", runs_dir=runs_dir) + + assert payload["status"] == "failed" + assert payload["stage_manifests"][0]["stage_id"] == "1_build_datasets" + assert payload["stage_manifests"][0]["substage_id"] is None + assert payload["stage_manifests"][1]["stage_id"] == "3_fit_weights" + assert payload["stage_manifests"][1]["substage_id"] == ( + "3a_weight_fitting_regional" + ) + assert payload["missing_expected_manifest_ids"] == ["4a_local_area_h5_regional"] + assert payload["error"]["stage_id"] == "3_fit_weights" + assert payload["error"]["substage_id"] == "3a_weight_fitting_regional" + assert payload["error"]["traceback_available"] is True + assert payload["error"]["traceback"] == ("full traceback with ") + assert payload["error"]["traceback_truncated"] is False + assert "secret-value" not in payload["run_manifest"]["error"] + assert payload["pipeline_volume_name"] == "pipeline-artifacts-run-1" + + +def test_status_payload_falls_back_to_run_manifest_error(tmp_path, monkeypatch): + monkeypatch.setenv("API_TOKEN", "secret-value") + runs_dir = tmp_path / "runs" + run_dir = runs_dir / "run-1" + write_run_manifest( + run_manifest_path(run_dir), + RunManifest( + run_id="run-1", + branch="main", + sha="abc123", + version="1.0.0", + status="failed", + started_at="2026-05-12T12:00:00+00:00", + known_step_ids=[BUILD_DATASETS.id], + error="RuntimeError: old failure\nTraceback contains secret-value", + ), + ) + + payload = build_pipeline_status_payload("run-1", runs_dir=runs_dir) + + assert payload["error"]["source"] == "run_manifest.error" + assert payload["error"]["error_type"] == "RuntimeError" + assert payload["error"]["message"] == "old failure" + assert payload["error"]["traceback_available"] is True + assert payload["error"]["traceback"].endswith( + "Traceback contains " + ) + assert "secret-value" not in payload["run_manifest"]["error"] + + +def test_status_payload_truncates_oldest_traceback_text(tmp_path): + runs_dir = tmp_path / "runs" + run_dir = runs_dir / "run-1" + write_run_manifest( + run_manifest_path(run_dir), + RunManifest( + run_id="run-1", + branch="main", + sha="abc123", + version="1.0.0", + status="failed", + started_at="2026-05-12T12:00:00+00:00", + known_step_ids=[BUILD_DATASETS.id], + ), + ) + traceback_text = "oldest\n" + ("x" * 30_000) + "\nnewest failure line" + error_record = build_pipeline_error_record( + RuntimeError("fit failed"), + run_id="run-1", + manifest=_manifest(BUILD_DATASETS.id), + traceback_text=traceback_text, + occurred_at="2026-05-12T12:00:01+00:00", + ) + write_pipeline_error_record(error_record, run_dir=run_dir, volume_root=tmp_path) + + latest = read_latest_pipeline_error(run_dir) + payload = build_pipeline_status_payload("run-1", runs_dir=runs_dir) + + assert latest.traceback == traceback_text + assert payload["error"]["traceback_truncated"] is True + assert payload["error"]["traceback"].startswith( + "\n[truncated older error text; omitted " + ) + assert payload["error"]["traceback"].endswith("newest failure line") + assert "oldest" not in payload["error"]["traceback"] + + +def test_clear_latest_pipeline_error_is_best_effort(tmp_path): + run_dir = tmp_path / "runs" / "run-1" + latest_path = run_dir / "errors" / "latest_error.json" + latest_path.mkdir(parents=True) + + assert clear_latest_pipeline_error(run_dir) is False + with pytest.raises(OSError): + clear_latest_pipeline_error(run_dir, strict=True) + + +def test_status_payload_reports_missing_run(tmp_path): + payload = build_pipeline_status_payload("missing-run", runs_dir=tmp_path / "runs") + + assert payload["status"] == "not_found" + assert payload["stage_manifests"] == [] + assert payload["run_manifest"] is None From 1fb1ebaf91e265ec41f6b1f9b920e136bb1f43e0 Mon Sep 17 00:00:00 2001 From: Anthony Volk Date: Tue, 12 May 2026 19:57:49 +0200 Subject: [PATCH 2/8] Tighten pipeline status error payloads --- modal_app/step_manifests/errors.py | 19 ++++++- modal_app/step_manifests/status.py | 22 +++++-- policyengine_us_data/utils/error_redaction.py | 1 + tests/unit/test_pipeline.py | 35 ++++++++++++ tests/unit/test_pipeline_status.py | 57 +++++++++++++++++++ 5 files changed, 127 insertions(+), 7 deletions(-) diff --git a/modal_app/step_manifests/errors.py b/modal_app/step_manifests/errors.py index d86211177..0027afafb 100644 --- a/modal_app/step_manifests/errors.py +++ b/modal_app/step_manifests/errors.py @@ -14,6 +14,7 @@ canonical_json_loads, ) from policyengine_us_data.utils.error_redaction import ( + DEFAULT_ERROR_MESSAGE_MAX_CHARS, DEFAULT_ERROR_TEXT_MAX_CHARS, bound_error_text, redact_error_text, @@ -112,16 +113,28 @@ class PipelineErrorRecord: traceback_format: str = "python" schema_version: str = PIPELINE_ERROR_RECORD_SCHEMA_VERSION - def to_dict( + def to_dict(self) -> dict[str, Any]: + return _drop_none(asdict(self)) + + def to_status_dict( self, *, + message_max_chars: int | None = DEFAULT_ERROR_MESSAGE_MAX_CHARS, traceback_max_chars: int | None = DEFAULT_ERROR_TEXT_MAX_CHARS, ) -> dict[str, Any]: - payload = asdict(self) + payload = self.to_dict() + bounded_message = bound_error_text( + self.message, + max_chars=message_max_chars, + ) bounded_traceback = bound_error_text( self.traceback, max_chars=traceback_max_chars, ) + payload["message"] = bounded_message.text + payload["message_truncated"] = bounded_message.truncated + if bounded_message.truncated: + payload["message_omitted_chars"] = bounded_message.omitted_chars payload["traceback"] = bounded_traceback.text payload["traceback_available"] = bool(self.traceback) payload["traceback_truncated"] = bounded_traceback.truncated @@ -279,7 +292,7 @@ def write_pipeline_error_record( record_path=_relative_path(record_path, root), latest_path=_relative_path(latest_path, root), ) - payload = canonical_json_dumps(record_with_paths.to_dict(traceback_max_chars=None)) + payload = canonical_json_dumps(record_with_paths.to_dict()) record_path.write_text(payload) latest_path.write_text(payload) return PipelineErrorWriteResult( diff --git a/modal_app/step_manifests/status.py b/modal_app/step_manifests/status.py index 089e87f68..573811410 100644 --- a/modal_app/step_manifests/status.py +++ b/modal_app/step_manifests/status.py @@ -12,6 +12,7 @@ step_manifest_dir, ) from policyengine_us_data.utils.error_redaction import ( + DEFAULT_ERROR_MESSAGE_MAX_CHARS, bound_error_text, redacted_bounded_error_text, redact_error_text, @@ -46,7 +47,7 @@ def _error_payload( ) -> dict[str, Any] | None: if error_record is None: return None - return error_record.to_dict() + return error_record.to_status_dict() def _legacy_error_payload(error_text: str | None) -> dict[str, Any] | None: @@ -62,17 +63,24 @@ def _legacy_error_payload(error_text: str | None) -> dict[str, Any] | None: if maybe_type.strip(): error_type = maybe_type.strip() message = maybe_message.strip() or first_line + bounded_message = bound_error_text( + message, + max_chars=DEFAULT_ERROR_MESSAGE_MAX_CHARS, + ) payload = { "source": "run_manifest.error", "surface": "run_manifest", "stage_id": None, "substage_id": None, "error_type": error_type, - "message": message, + "message": bounded_message.text, + "message_truncated": bounded_message.truncated, "traceback": bounded.text, "traceback_available": bool(bounded.text), "traceback_truncated": bounded.truncated, } + if bounded_message.truncated: + payload["message_omitted_chars"] = bounded_message.omitted_chars if bounded.truncated: payload["traceback_omitted_chars"] = bounded.omitted_chars return payload @@ -105,7 +113,10 @@ def _message( def _sanitize_error_value(value: Any) -> Any: if isinstance(value, str): - return redacted_bounded_error_text(value).text + return redacted_bounded_error_text( + value, + max_chars=DEFAULT_ERROR_MESSAGE_MAX_CHARS, + ).text if isinstance(value, dict): return {key: _sanitize_error_value(item) for key, item in value.items()} if isinstance(value, list): @@ -116,7 +127,10 @@ def _sanitize_error_value(value: Any) -> Any: def _run_manifest_payload(run_manifest) -> dict[str, Any]: payload = run_manifest.to_dict() if payload.get("error"): - payload["error"] = redacted_bounded_error_text(payload["error"]).text + payload["error"] = redacted_bounded_error_text( + payload["error"], + max_chars=DEFAULT_ERROR_MESSAGE_MAX_CHARS, + ).text return payload diff --git a/policyengine_us_data/utils/error_redaction.py b/policyengine_us_data/utils/error_redaction.py index a8a2cc1b9..48a670047 100644 --- a/policyengine_us_data/utils/error_redaction.py +++ b/policyengine_us_data/utils/error_redaction.py @@ -8,6 +8,7 @@ from typing import Mapping DEFAULT_ERROR_TEXT_MAX_CHARS = 24_000 +DEFAULT_ERROR_MESSAGE_MAX_CHARS = 2_000 _SECRET_KEY_MARKERS = ( "TOKEN", diff --git a/tests/unit/test_pipeline.py b/tests/unit/test_pipeline.py index 41a303aea..87a9ac506 100644 --- a/tests/unit/test_pipeline.py +++ b/tests/unit/test_pipeline.py @@ -13,6 +13,7 @@ NATIONAL_FIT_LAMBDA_L0, _build_diagnostics_upload_script, _calibration_package_parameters, + _pipeline_error_summary, _run_required_promotion_subprocess, ) from modal_app.step_manifests.state import RunMetadata # noqa: E402 @@ -20,6 +21,7 @@ read_run_meta, write_run_meta, ) +from policyengine_us_data.utils.step_manifest import ArtifactReference # noqa: E402 # -- RunMetadata tests ------------------------------------------ @@ -67,6 +69,39 @@ def test_national_fit_lambda_matches_national_preset(): assert NATIONAL_FIT_LAMBDA_L0 == pytest.approx(1e-4) +def test_pipeline_error_summary_uses_traceback_ref_when_available(): + ref = ArtifactReference( + path="runs/run-1/errors/error.json", + size_bytes=10, + sha256="abc", + role="error", + media_type="application/json", + ) + + summary = _pipeline_error_summary( + RuntimeError("boom"), + traceback_ref=ref, + traceback_text="full traceback should not be duplicated", + ) + + assert summary == "RuntimeError: boom; traceback_ref=runs/run-1/errors/error.json" + + +def test_pipeline_error_summary_falls_back_to_bounded_traceback(monkeypatch): + monkeypatch.setenv("API_TOKEN", "secret-value") + traceback_text = "old traceback\n" + ("x" * 30_000) + "\nnewest secret-value" + + summary = _pipeline_error_summary( + RuntimeError("failed with secret-value"), + traceback_text=traceback_text, + ) + + assert "secret-value" not in summary + assert summary.startswith("\n[truncated older error text; omitted ") + assert summary.endswith("newest ") + assert "old traceback" not in summary + + class TestRunMetadata: def test_to_dict(self): meta = RunMetadata( diff --git a/tests/unit/test_pipeline_status.py b/tests/unit/test_pipeline_status.py index 01d1e8d6c..653493019 100644 --- a/tests/unit/test_pipeline_status.py +++ b/tests/unit/test_pipeline_status.py @@ -1,3 +1,4 @@ +import json from unittest.mock import MagicMock, patch import pytest @@ -71,6 +72,36 @@ def test_pipeline_error_record_uses_stage_and_substage_ids(tmp_path): ) +def test_pipeline_error_record_persists_exact_schema_and_bounds_status_payload( + tmp_path, +): + long_message = "old message " + ("m" * 3_000) + " newest message" + long_traceback = "old traceback\n" + ("x" * 30_000) + "\nnewest traceback" + record = build_pipeline_error_record( + RuntimeError(long_message), + run_id="run-1", + manifest=_manifest(BUILD_DATASETS.id), + traceback_text=long_traceback, + occurred_at="2026-05-12T12:00:00+00:00", + ) + result = write_pipeline_error_record(record, run_dir=tmp_path / "runs" / "run-1") + + persisted = json.loads((tmp_path / result.record.record_path).read_text()) + status_payload = result.record.to_status_dict() + + assert persisted["message"] == long_message + assert persisted["traceback"] == long_traceback + assert "traceback_available" not in persisted + assert "traceback_truncated" not in persisted + assert "message_truncated" not in persisted + assert status_payload["message_truncated"] is True + assert status_payload["message"].endswith("newest message") + assert "old message" not in status_payload["message"] + assert status_payload["traceback_truncated"] is True + assert status_payload["traceback"].endswith("newest traceback") + assert "old traceback" not in status_payload["traceback"] + + def test_pipeline_error_record_infers_canonical_stage_without_manifest_parent(): manifest = _manifest(WEIGHT_FITTING_REGIONAL.id) @@ -259,6 +290,32 @@ def test_status_payload_falls_back_to_run_manifest_error(tmp_path, monkeypatch): assert "secret-value" not in payload["run_manifest"]["error"] +def test_status_payload_bounds_legacy_error_message(tmp_path): + runs_dir = tmp_path / "runs" + run_dir = runs_dir / "run-1" + write_run_manifest( + run_manifest_path(run_dir), + RunManifest( + run_id="run-1", + branch="main", + sha="abc123", + version="1.0.0", + status="failed", + started_at="2026-05-12T12:00:00+00:00", + known_step_ids=[BUILD_DATASETS.id], + error="RuntimeError: old message " + ("m" * 3_000) + " newest message", + ), + ) + + payload = build_pipeline_status_payload("run-1", runs_dir=runs_dir) + + assert payload["error"]["message_truncated"] is True + assert payload["error"]["message"].endswith("newest message") + assert "old message" not in payload["error"]["message"] + assert payload["run_manifest"]["error"].endswith("newest message") + assert "old message" not in payload["run_manifest"]["error"] + + def test_status_payload_truncates_oldest_traceback_text(tmp_path): runs_dir = tmp_path / "runs" run_dir = runs_dir / "run-1" From c3a4f098535f9e4f5dfb14085f5842ecb54c6d4f Mon Sep 17 00:00:00 2001 From: Anthony Volk Date: Tue, 12 May 2026 20:06:25 +0200 Subject: [PATCH 3/8] Use issue-numbered pipeline status changelog --- changelog.d/{pipeline-status-reporting.added.md => 962.added.md} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename changelog.d/{pipeline-status-reporting.added.md => 962.added.md} (100%) diff --git a/changelog.d/pipeline-status-reporting.added.md b/changelog.d/962.added.md similarity index 100% rename from changelog.d/pipeline-status-reporting.added.md rename to changelog.d/962.added.md From 52e2e1048857effb1d0cc810a7b20836328f7e9f Mon Sep 17 00:00:00 2001 From: Anthony Volk Date: Tue, 12 May 2026 20:25:12 +0200 Subject: [PATCH 4/8] Clarify run manifest error fallback --- modal_app/step_manifests/status.py | 4 ++-- tests/unit/test_pipeline_status.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/modal_app/step_manifests/status.py b/modal_app/step_manifests/status.py index 573811410..ec3e2aaae 100644 --- a/modal_app/step_manifests/status.py +++ b/modal_app/step_manifests/status.py @@ -50,7 +50,7 @@ def _error_payload( return error_record.to_status_dict() -def _legacy_error_payload(error_text: str | None) -> dict[str, Any] | None: +def _run_manifest_error_payload(error_text: str | None) -> dict[str, Any] | None: if not error_text: return None redacted = redact_error_text(error_text) @@ -198,7 +198,7 @@ def build_pipeline_status_payload( ] error = _error_payload( read_latest_pipeline_error(run_dir), - ) or _legacy_error_payload(run_manifest.error) + ) or _run_manifest_error_payload(run_manifest.error) status = run_manifest.status return { "schema_version": PIPELINE_STATUS_SCHEMA_VERSION, diff --git a/tests/unit/test_pipeline_status.py b/tests/unit/test_pipeline_status.py index 653493019..523b9fcf5 100644 --- a/tests/unit/test_pipeline_status.py +++ b/tests/unit/test_pipeline_status.py @@ -260,7 +260,7 @@ def test_status_payload_orders_manifests_and_includes_bounded_traceback( assert payload["pipeline_volume_name"] == "pipeline-artifacts-run-1" -def test_status_payload_falls_back_to_run_manifest_error(tmp_path, monkeypatch): +def test_status_payload_uses_run_manifest_error_as_last_resort(tmp_path, monkeypatch): monkeypatch.setenv("API_TOKEN", "secret-value") runs_dir = tmp_path / "runs" run_dir = runs_dir / "run-1" @@ -290,7 +290,7 @@ def test_status_payload_falls_back_to_run_manifest_error(tmp_path, monkeypatch): assert "secret-value" not in payload["run_manifest"]["error"] -def test_status_payload_bounds_legacy_error_message(tmp_path): +def test_status_payload_bounds_run_manifest_error_message(tmp_path): runs_dir = tmp_path / "runs" run_dir = runs_dir / "run-1" write_run_manifest( From 47e617d477899384ed43284d86187cf23584bd5f Mon Sep 17 00:00:00 2001 From: Anthony Volk Date: Tue, 12 May 2026 21:02:28 +0200 Subject: [PATCH 5/8] Move pipeline status into sub-app --- modal_app/pipeline.py | 122 +--------------- modal_app/pipeline_status.py | 136 ++++++++++++++++++ .../integration/test_modal_pipeline_seams.py | 2 + 3 files changed, 145 insertions(+), 115 deletions(-) create mode 100644 modal_app/pipeline_status.py diff --git a/modal_app/pipeline.py b/modal_app/pipeline.py index 095436da9..e4c51ce75 100644 --- a/modal_app/pipeline.py +++ b/modal_app/pipeline.py @@ -91,9 +91,6 @@ step_reusable as _step_reusable, write_run_meta, ) -from modal_app.step_manifests.status import ( # noqa: E402 - build_pipeline_status_payload as _build_pipeline_status_payload, -) from policyengine_us_data.utils.run_context import RunContext, resolve_run_id # noqa: E402 from policyengine_us_data.utils.error_redaction import ( # noqa: E402 redacted_bounded_error_text, @@ -136,7 +133,6 @@ ) REPO_URL = "https://github.com/PolicyEngine/policyengine-us-data.git" -status_image = image.pip_install("fastapi") def _python_cmd(*args: str) -> list[str]: @@ -315,6 +311,11 @@ def archive_diagnostics( app.include(_local_area_app) +from modal_app.pipeline_status import app as _pipeline_status_app # noqa: E402 +from modal_app.pipeline_status import pipeline_status # noqa: E402 + +app.include(_pipeline_status_app) + # ── Upload helpers ────────────────────────────────────────────── @@ -519,6 +520,7 @@ def verify_runtime_seams() -> dict: "uv.lock", "modal_app/worker_script.py", "modal_app/local_area.py", + "modal_app/pipeline_status.py", "modal_app/h5_test_harness.py", "modal_app/step_manifests/specs.py", "modal_app/step_manifests/state.py", @@ -562,6 +564,7 @@ def verify_runtime_seams() -> dict: "modal_app.fixtures.h5_cases", "modal_app.h5_test_harness", "modal_app.local_area", + "modal_app.pipeline_status", "modal_app.remote_calibration_runner", "modal_app.step_manifests.specs", "modal_app.step_manifests.state", @@ -1966,117 +1969,6 @@ def promote_run( return f"Promoted run {run_id} as version {version}" -# ── Status ─────────────────────────────────────────────────────── - - -@app.function( - image=image, - timeout=60, - volumes={PIPELINE_MOUNT: pipeline_volume}, -) -def get_pipeline_status( - run_id: str, -) -> dict: - """Get structured JSON status for a pipeline run.""" - - pipeline_volume.reload() - return _build_pipeline_status_payload(run_id) - - -@app.function( - image=status_image, - timeout=60, - volumes={PIPELINE_MOUNT: pipeline_volume}, -) -@modal.fastapi_endpoint( - method="GET", - docs=False, - requires_proxy_auth=True, -) -def pipeline_status_endpoint( - run_id: str, -) -> dict: - """Protected HTTP endpoint for structured pipeline status.""" - - pipeline_volume.reload() - return _build_pipeline_status_payload(run_id) - - -@app.function( - image=image, - timeout=60, - volumes={PIPELINE_MOUNT: pipeline_volume}, -) -def pipeline_status( - run_id: str = None, -) -> str: - """Get pipeline status. - - If run_id is provided, show that run's details. - Otherwise, list all runs. - """ - pipeline_volume.reload() - runs_dir = Path(RUNS_DIR) - - if not runs_dir.exists(): - return "No pipeline runs found." - - if run_id: - payload = _build_pipeline_status_payload(run_id) - if payload["status"] == "not_found": - return payload["message"] - run_manifest = payload["run_manifest"] - lines = [ - f"Run: {payload['run_id']}", - f" Branch: {run_manifest['branch']}", - f" SHA: {run_manifest['sha'][:12]}", - f" Version: {run_manifest['version']}", - f" Status: {payload['status']}", - f" Started: {run_manifest['started_at']}", - ] - if payload["error"]: - error = payload["error"] - lines.append( - f" Error: {error['error_type']}: {error.get('message', '')[:200]}" - ) - if error.get("record_path"): - lines.append(f" Error record: {error['record_path']}") - if payload["stage_manifests"]: - lines.append(" Step manifests:") - for item in payload["stage_manifests"]: - manifest = item["manifest"] - duration = ( - manifest["duration_s"] - if manifest.get("duration_s") is not None - else "?" - ) - reuse = manifest.get("reuse_decision", "not_applicable") - lines.append( - f" {manifest['step_id']}: {duration}s " - f"({manifest['status']}, {reuse})" - ) - return "\n".join(lines) - - # List all runs - runs = [] - for entry in sorted(runs_dir.iterdir()): - manifest_path = entry / "run_manifest.json" - if manifest_path.exists(): - with open(manifest_path) as f: - data = json.load(f) - runs.append( - f" {data['run_id']}: " - f"{data['status']} " - f"(branch={data['branch']}, " - f"v={data['version']})" - ) - - if not runs: - return "No pipeline runs found." - - return "Pipeline runs:\n" + "\n".join(runs) - - # ── Local entrypoint ───────────────────────────────────────────── diff --git a/modal_app/pipeline_status.py b/modal_app/pipeline_status.py new file mode 100644 index 000000000..e30543430 --- /dev/null +++ b/modal_app/pipeline_status.py @@ -0,0 +1,136 @@ +"""Modal status functions for deployed pipeline runs.""" + +from __future__ import annotations + +import os +import sys +from pathlib import Path + +import modal + +_baked = "/root/policyengine-us-data" +_local = str(Path(__file__).resolve().parent.parent) +for _p in (_baked, _local): + if _p not in sys.path: + sys.path.insert(0, _p) + +from modal_app.images import cpu_image as image # noqa: E402 +from modal_app.step_manifests.state import PIPELINE_MOUNT, RUNS_DIR # noqa: E402 +from modal_app.step_manifests.status import build_pipeline_status_payload # noqa: E402 + +app = modal.App( + os.environ.get("US_DATA_PIPELINE_STATUS_APP_NAME") + or os.environ.get("US_DATA_MODAL_APP_NAME") + or "policyengine-us-data-pipeline-status" +) + +pipeline_volume = modal.Volume.from_name( + os.environ.get("US_DATA_PIPELINE_VOLUME_NAME", "pipeline-artifacts"), + create_if_missing=True, + version=2, +) +status_image = image.pip_install("fastapi") + + +@app.function( + image=image, + timeout=60, + volumes={PIPELINE_MOUNT: pipeline_volume}, +) +def get_pipeline_status( + run_id: str, +) -> dict: + """Get structured JSON status for a pipeline run.""" + + pipeline_volume.reload() + return build_pipeline_status_payload(run_id) + + +@app.function( + image=status_image, + timeout=60, + volumes={PIPELINE_MOUNT: pipeline_volume}, +) +@modal.fastapi_endpoint( + method="GET", + docs=False, + requires_proxy_auth=True, +) +def pipeline_status_endpoint( + run_id: str, +) -> dict: + """Protected HTTP endpoint for structured pipeline status.""" + + pipeline_volume.reload() + return build_pipeline_status_payload(run_id) + + +@app.function( + image=image, + timeout=60, + volumes={PIPELINE_MOUNT: pipeline_volume}, +) +def pipeline_status( + run_id: str = None, +) -> str: + """Get human-readable pipeline status.""" + + pipeline_volume.reload() + runs_dir = Path(RUNS_DIR) + + if not runs_dir.exists(): + return "No pipeline runs found." + + if run_id: + payload = build_pipeline_status_payload(run_id) + if payload["status"] == "not_found": + return payload["message"] + run_manifest = payload["run_manifest"] + lines = [ + f"Run: {payload['run_id']}", + f" Branch: {run_manifest['branch']}", + f" SHA: {run_manifest['sha'][:12]}", + f" Version: {run_manifest['version']}", + f" Status: {payload['status']}", + f" Started: {run_manifest['started_at']}", + ] + if payload["error"]: + error = payload["error"] + lines.append( + f" Error: {error['error_type']}: {error.get('message', '')[:200]}" + ) + if error.get("record_path"): + lines.append(f" Error record: {error['record_path']}") + if payload["stage_manifests"]: + lines.append(" Step manifests:") + for item in payload["stage_manifests"]: + manifest = item["manifest"] + duration = ( + manifest["duration_s"] + if manifest.get("duration_s") is not None + else "?" + ) + reuse = manifest.get("reuse_decision", "not_applicable") + lines.append( + f" {manifest['step_id']}: {duration}s " + f"({manifest['status']}, {reuse})" + ) + return "\n".join(lines) + + runs = [] + for entry in sorted(runs_dir.iterdir()): + manifest_path = entry / "run_manifest.json" + if manifest_path.exists(): + data = build_pipeline_status_payload(entry.name) + run_manifest = data["run_manifest"] + runs.append( + f" {data['run_id']}: " + f"{data['status']} " + f"(branch={run_manifest['branch']}, " + f"v={run_manifest['version']})" + ) + + if not runs: + return "No pipeline runs found." + + return "Pipeline runs:\n" + "\n".join(runs) diff --git a/tests/integration/test_modal_pipeline_seams.py b/tests/integration/test_modal_pipeline_seams.py index 5906c5f42..95497f2df 100644 --- a/tests/integration/test_modal_pipeline_seams.py +++ b/tests/integration/test_modal_pipeline_seams.py @@ -56,6 +56,7 @@ def test_pipeline_image_runtime_seams(): "uv.lock": True, "modal_app/worker_script.py": True, "modal_app/local_area.py": True, + "modal_app/pipeline_status.py": True, "modal_app/h5_test_harness.py": True, "modal_app/step_manifests/specs.py": True, "modal_app/step_manifests/state.py": True, @@ -78,6 +79,7 @@ def test_pipeline_image_runtime_seams(): "modal_app.fixtures.h5_cases", "modal_app.h5_test_harness", "modal_app.local_area", + "modal_app.pipeline_status", "modal_app.remote_calibration_runner", "modal_app.step_manifests.specs", "modal_app.step_manifests.state", From fc3f308d1c43d900c72cd42383dfa332be8470de Mon Sep 17 00:00:00 2001 From: Anthony Volk Date: Tue, 12 May 2026 21:17:31 +0200 Subject: [PATCH 6/8] Clarify pipeline status surfaces --- docs/engineering/skills/pipeline_operations.md | 13 +++++++++++++ modal_app/pipeline.py | 4 ++-- modal_app/pipeline_status.py | 2 +- 3 files changed, 16 insertions(+), 3 deletions(-) diff --git a/docs/engineering/skills/pipeline_operations.md b/docs/engineering/skills/pipeline_operations.md index 25cc4c6af..ef070427a 100644 --- a/docs/engineering/skills/pipeline_operations.md +++ b/docs/engineering/skills/pipeline_operations.md @@ -17,6 +17,19 @@ The status system reports: - the latest durable error record, when one exists; - a redacted, bounded traceback when one exists. +## Status Surfaces + +The structured status payload is canonical. The pipeline status sub-app exposes +three Modal functions: + +- `get_pipeline_status`: Python-callable structured JSON for agents, scripts, + dashboards, and tests. Prefer this for diagnosis and automation. +- `pipeline_status_endpoint`: protected HTTP endpoint returning the same + structured JSON for non-Python clients. Use Modal proxy auth headers. +- `pipeline_status_snippet`: human-readable text used by + `modal run modal_app/pipeline.py::main --action status`. This is for quick + terminal inspection only and must not be treated as a schema. + ## Fetch Status First identify the run context from the GitHub Actions summary, workflow logs, or diff --git a/modal_app/pipeline.py b/modal_app/pipeline.py index e4c51ce75..2b56fcafd 100644 --- a/modal_app/pipeline.py +++ b/modal_app/pipeline.py @@ -312,7 +312,7 @@ def archive_diagnostics( app.include(_local_area_app) from modal_app.pipeline_status import app as _pipeline_status_app # noqa: E402 -from modal_app.pipeline_status import pipeline_status # noqa: E402 +from modal_app.pipeline_status import pipeline_status_snippet # noqa: E402 app.include(_pipeline_status_app) @@ -2015,7 +2015,7 @@ def main( print(f"\nPipeline run complete: {result}") elif action == "status": - result = pipeline_status.remote( + result = pipeline_status_snippet.remote( run_id=run_id, ) print(result) diff --git a/modal_app/pipeline_status.py b/modal_app/pipeline_status.py index e30543430..1cb4821c7 100644 --- a/modal_app/pipeline_status.py +++ b/modal_app/pipeline_status.py @@ -70,7 +70,7 @@ def pipeline_status_endpoint( timeout=60, volumes={PIPELINE_MOUNT: pipeline_volume}, ) -def pipeline_status( +def pipeline_status_snippet( run_id: str = None, ) -> str: """Get human-readable pipeline status.""" From 6ba5a101ee5d689c53280da0b103d146d975be8e Mon Sep 17 00:00:00 2001 From: Anthony Volk Date: Tue, 12 May 2026 21:50:38 +0200 Subject: [PATCH 7/8] Add pipeline status CLI seam coverage --- modal_app/pipeline_status.py | 9 ++++----- tests/integration/test_modal_pipeline_seams.py | 13 +++++++++++++ 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/modal_app/pipeline_status.py b/modal_app/pipeline_status.py index 1cb4821c7..f09c0f9b7 100644 --- a/modal_app/pipeline_status.py +++ b/modal_app/pipeline_status.py @@ -76,11 +76,6 @@ def pipeline_status_snippet( """Get human-readable pipeline status.""" pipeline_volume.reload() - runs_dir = Path(RUNS_DIR) - - if not runs_dir.exists(): - return "No pipeline runs found." - if run_id: payload = build_pipeline_status_payload(run_id) if payload["status"] == "not_found": @@ -117,6 +112,10 @@ def pipeline_status_snippet( ) return "\n".join(lines) + runs_dir = Path(RUNS_DIR) + if not runs_dir.exists(): + return "No pipeline runs found." + runs = [] for entry in sorted(runs_dir.iterdir()): manifest_path = entry / "run_manifest.json" diff --git a/tests/integration/test_modal_pipeline_seams.py b/tests/integration/test_modal_pipeline_seams.py index 95497f2df..c95f0c01a 100644 --- a/tests/integration/test_modal_pipeline_seams.py +++ b/tests/integration/test_modal_pipeline_seams.py @@ -149,3 +149,16 @@ def test_pipeline_status_http_endpoint_reports_missing_run(): assert result["run_id"] == "missing-run-for-status-http-seam" assert result["stage_manifests"] == [] assert result["error"] is None + + +def test_pipeline_status_cli_snippet_reports_missing_run(): + _require_modal_tokens() + + fn = modal.Function.from_name( + APP_NAME, + "pipeline_status_snippet", + environment_name=MODAL_ENVIRONMENT, + ) + result = fn.remote("missing-run-for-status-cli-seam") + + assert result == "Pipeline run missing-run-for-status-cli-seam not found." From 4fa414e5d7c6f2defc26d5dd7e4ea8d493624cd6 Mon Sep 17 00:00:00 2001 From: Anthony Volk Date: Wed, 13 May 2026 15:00:40 +0200 Subject: [PATCH 8/8] Use proxy credentials in status endpoint CI --- .github/workflows/pr.yaml | 2 ++ tests/integration/test_modal_pipeline_seams.py | 6 ++---- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/pr.yaml b/.github/workflows/pr.yaml index ac35ea1e3..5f17456e6 100644 --- a/.github/workflows/pr.yaml +++ b/.github/workflows/pr.yaml @@ -149,6 +149,8 @@ jobs: env: MODAL_TOKEN_ID: ${{ secrets.MODAL_TOKEN_ID }} MODAL_TOKEN_SECRET: ${{ secrets.MODAL_TOKEN_SECRET }} + MODAL_PROXY_TOKEN_ID: ${{ secrets.MODAL_PROXY_TOKEN_ID }} + MODAL_PROXY_TOKEN_SECRET: ${{ secrets.MODAL_PROXY_TOKEN_SECRET }} HUGGING_FACE_TOKEN: ${{ secrets.HUGGING_FACE_TOKEN }} GOOGLE_APPLICATION_CREDENTIALS: ${{ secrets.GOOGLE_APPLICATION_CREDENTIALS }} MODAL_ENVIRONMENT: staging-us-data-pr-${{ github.event.pull_request.number }} diff --git a/tests/integration/test_modal_pipeline_seams.py b/tests/integration/test_modal_pipeline_seams.py index c95f0c01a..d96bcfa54 100644 --- a/tests/integration/test_modal_pipeline_seams.py +++ b/tests/integration/test_modal_pipeline_seams.py @@ -25,10 +25,8 @@ def _require_modal_tokens() -> None: def _modal_proxy_auth_headers() -> dict[str, str]: - key = os.environ.get("MODAL_PROXY_TOKEN_ID") or os.environ.get("MODAL_TOKEN_ID") - secret = os.environ.get("MODAL_PROXY_TOKEN_SECRET") or os.environ.get( - "MODAL_TOKEN_SECRET" - ) + key = os.environ.get("MODAL_PROXY_TOKEN_ID") + secret = os.environ.get("MODAL_PROXY_TOKEN_SECRET") if not (key and secret): pytest.skip("Modal proxy auth credentials are required for HTTP seam tests") return {