-
Notifications
You must be signed in to change notification settings - Fork 128
Implement sdmx import harness using autoschematisation #1733
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
rohitkumarbhagat
wants to merge
66
commits into
datacommonsorg:master
Choose a base branch
from
rohitkumarbhagat:sdmx-import-harness
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
66 commits
Select commit
Hold shift + click to select a range
780074e
Add agentic pipeline runner scaffold
rohitkumarbhagat c125d78
Add SDMX pipeline state callback and tests
rohitkumarbhagat 5271fd3
Polish agentic import docstrings and format
rohitkumarbhagat d6441f6
Improve agentic import runner observability
rohitkumarbhagat 593db87
Add interactive confirmation to SDMX runner
rohitkumarbhagat cb78e38
Log dry-run previews instead of returning text
rohitkumarbhagat 746e130
Add SDMX pipeline planning
rohitkumarbhagat 561d61c
Generalize SDMX pipeline naming
rohitkumarbhagat 0cfcc21
Use explicit SDMX registry builder
rohitkumarbhagat 8d3525a
feat(sdmx-import): Add timestamps and improve pipeline planning
rohitkumarbhagat d6b0c98
refactor(sdmx-import): Simplify phase factory lambda
rohitkumarbhagat 7f15f17
Refactor(sdmx-import): Flatten pipeline steps and remove phases
rohitkumarbhagat 475ba5b
refactor(sdmx-import): Simplify pipeline step definition and construc…
rohitkumarbhagat 24d0e0c
feat(sdmx-import): add run_sdmx_pipeline orchestration
rohitkumarbhagat c41eb3f
feat(sdmx-import): add CLI entrypoint for SDMX pipeline
rohitkumarbhagat 4be87a7
refactor: centralize SDMX pipeline planning
rohitkumarbhagat 55b5015
Merge branch 'master' of github.com:datacommonsorg/data into sdmx-imp…
rohitkumarbhagat 69831c2
fix: add repo root to sys.path
rohitkumarbhagat 95214cb
chore: log SDMX pipeline step outcomes
rohitkumarbhagat d257f0c
Merge branch 'master' of github.com:datacommonsorg/data into sdmx-imp…
rohitkumarbhagat acca4aa
Merge branch 'master' of github.com:datacommonsorg/data into sdmx-imp…
rohitkumarbhagat 7400c0d
feat: implement metadata download step
rohitkumarbhagat 3ff6bee
Merge branch 'master' of github.com:datacommonsorg/data into sdmx-imp…
rohitkumarbhagat 3ea7f21
Refactor SDMX pipeline to use shared subprocess wrapper and cached co…
rohitkumarbhagat c1936bd
Implement DownloadDataStep in sdmx_import_pipeline
rohitkumarbhagat db4caa4
Refactor SDMX config resolution and align flag names
rohitkumarbhagat fc7c2e4
feat: Add early input file existence check to `CreateSampleStep` and …
rohitkumarbhagat e7c8db5
refactor: Replace `CommandPlan` dataclass with a nested `_StepContext…
rohitkumarbhagat 9a7e38d
refactor: move `dry_run` from the general `Step` interface to `SdmxSt…
rohitkumarbhagat 8fed16a
feat: Enable CreateSchemaMapStep to execute pvmap_generator.py and ad…
rohitkumarbhagat 26800ad
feat: implement `ProcessFullDataStep` using `stat_var_processor` and …
rohitkumarbhagat 55360a9
feat: Implement `CreateDcConfigStep` to generate custom DC configurat…
rohitkumarbhagat 76128ab
Merge branch 'master' of https://github.com/datacommonsorg/data into …
rohitkumarbhagat 1926be1
refactor: Rename dummy config to `_TEST_CONFIG` and centralize dummy …
rohitkumarbhagat 503ea7d
refactor: replace global flags with structured dataclass-based config…
rohitkumarbhagat 58f5d51
feat: Reorder imports and adjust `PipelineBuilder` parameter indentation
rohitkumarbhagat 6cc6172
feat: Add SDMX agentic import pipeline with new documentation, code, …
rohitkumarbhagat 8e4e7a7
fix: Move input file existence checks to `run` methods, allowing dry …
rohitkumarbhagat 7b4bb8d
feat: Introduce `working_dir` flag and encapsulate flag definitions t…
rohitkumarbhagat 5fd7e25
docs: clarify SDMX import pipeline prerequisites, usage, step names, …
rohitkumarbhagat eff8705
refactor: extract common SDMX test setup and helper methods into a ne…
rohitkumarbhagat 0fe16c5
Merge branch 'master' of https://github.com/datacommonsorg/data into …
rohitkumarbhagat 11abd36
feat: Resolve relative input and output paths against the working dir…
rohitkumarbhagat b08f8ca
feat: Add working directory flag to SDMX import pipeline and ensure a…
rohitkumarbhagat cd9041a
docs: Consolidate comments for output_path parsing logic.
rohitkumarbhagat 650800b
fix: enforce absolute generator paths
rohitkumarbhagat a97e212
Refactor run/dry-run test helpers
rohitkumarbhagat b98bbac
Refactor cache plan tests
rohitkumarbhagat c178bfa
Refactor missing-input test helpers
rohitkumarbhagat ef361a9
lint fix
rohitkumarbhagat 6a93ffc
Fix absolute output path handling
rohitkumarbhagat fc8d555
Add abs path backup test
rohitkumarbhagat 7b33667
minor fix in tests
rohitkumarbhagat cbec5c1
refactor
rohitkumarbhagat 1f74094
refactor: Remove `get_steps` method and directly access `pipeline.ste…
rohitkumarbhagat 31c00f8
feat: Introduce structured SDMX agentic import pipeline with dedicate…
rohitkumarbhagat b10c1be
refactor: centralize SDMX flag constants
rohitkumarbhagat c3c9001
Merge branch 'master' of github.com:datacommonsorg/data into sdmx-imp…
rohitkumarbhagat e9105a9
fix: support string version identifiers
rohitkumarbhagat a2b81bd
removed del
rohitkumarbhagat 4d6afce
Merge branch 'master' into sdmx-import-harness
rohitkumarbhagat 650224d
Merge branch 'master' into sdmx-import-harness
rohitkumarbhagat cdc944e
Merge branch 'master' into sdmx-import-harness
rohitkumarbhagat 2ef93cc
Merge branch 'master' of github.com:datacommonsorg/data into sdmx-imp…
rohitkumarbhagat 9f5d634
Merge branch 'master' of github.com:datacommonsorg/data into sdmx-imp…
rohitkumarbhagat 17f43bf
add comment
rohitkumarbhagat File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -9,6 +9,7 @@ | |
|
|
||
| absl-py | ||
| chembl-webresource-client | ||
| dataclasses-json | ||
| deepdiff | ||
| earthengine-api | ||
| flask_restful | ||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,136 @@ | ||
| # Copyright 2025 Google LLC | ||
| # | ||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||
| # you may not use this file except in compliance with the License. | ||
| # You may obtain a copy of the License at | ||
| # | ||
| # https://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
| """Generic building blocks for lightweight agentic pipelines. | ||
| """ | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| import abc | ||
| from dataclasses import dataclass | ||
| from typing import Sequence | ||
|
|
||
| from absl import logging | ||
|
|
||
|
|
||
| class PipelineAbort(Exception): | ||
| """Raised when pipeline execution should stop early for any reason.""" | ||
|
|
||
|
|
||
| class Step(abc.ABC): | ||
| """Abstract pipeline step interface.""" | ||
|
|
||
| @property | ||
| @abc.abstractmethod | ||
| def name(self) -> str: | ||
| """Human friendly identifier used for logging.""" | ||
|
|
||
| @property | ||
| @abc.abstractmethod | ||
| def version(self) -> str: | ||
| """Version string used for invalidation decisions.""" | ||
|
|
||
| @abc.abstractmethod | ||
| def run(self) -> None: | ||
| """Execute the step. Raise an exception to signal failure.""" | ||
|
|
||
rohitkumarbhagat marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| class BaseStep(Step, abc.ABC): | ||
| """Helper base class that stores mandatory metadata.""" | ||
|
|
||
| def __init__(self, *, name: str, version: str) -> None: | ||
| if not name: | ||
| raise ValueError("step requires a name") | ||
| self._name = name | ||
| self._version = version | ||
|
|
||
| @property | ||
| def name(self) -> str: | ||
| return self._name | ||
|
|
||
| @property | ||
| def version(self) -> str: | ||
| return self._version | ||
|
|
||
|
|
||
| @dataclass(frozen=True) | ||
| class Pipeline: | ||
| steps: Sequence[Step] | ||
|
|
||
|
|
||
| class PipelineCallback: | ||
| """Lifecycle hooks consumed by the runner; defaults are no-ops.""" | ||
|
|
||
| def before_step(self, step: Step) -> None: | ||
| """Called immediately before `step.run()`; raising an error skips execution.""" | ||
|
|
||
| def after_step(self, step: Step, *, error: Exception | None = None) -> None: | ||
| """Runs once per step after `step.run()` succeeds or raises.""" | ||
|
|
||
|
|
||
| class CompositeCallback(PipelineCallback): | ||
| """Fans out events to child callbacks in order.""" | ||
|
|
||
| def __init__(self, callbacks: Sequence[PipelineCallback]) -> None: | ||
| self._callbacks = list(callbacks) | ||
|
|
||
| def before_step(self, step: Step) -> None: | ||
| for callback in self._callbacks: | ||
| callback.before_step(step) | ||
rohitkumarbhagat marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| def after_step(self, step: Step, *, error: Exception | None = None) -> None: | ||
| for callback in self._callbacks: | ||
| callback.after_step(step, error=error) | ||
|
|
||
|
|
||
| @dataclass(frozen=True) | ||
| class RunnerConfig: | ||
| """Placeholder for future runner toggles.""" | ||
|
|
||
|
|
||
| class PipelineRunner: | ||
|
|
||
| def __init__(self, config: RunnerConfig | None = None) -> None: | ||
| self._config = config or RunnerConfig() | ||
|
|
||
| def run(self, | ||
| pipeline: Pipeline, | ||
| callback: PipelineCallback | None = None) -> None: | ||
| current_step: Step | None = None | ||
| steps = pipeline.steps | ||
| logging.info(f"Starting pipeline with {len(steps)} steps") | ||
| try: | ||
| for step in steps: | ||
| current_step = step | ||
| logging.info(f"[STEP START] {step.name} (v{step.version})") | ||
| if callback: | ||
| callback.before_step(step) | ||
| error: Exception | None = None | ||
| try: | ||
| step.run() | ||
| except Exception as exc: # pylint: disable=broad-except | ||
| error = exc | ||
| logging.exception( | ||
| f"[STEP END] {step.name} (v{step.version}) status=failed" | ||
rohitkumarbhagat marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| ) | ||
| raise | ||
| finally: | ||
| if callback: | ||
| callback.after_step(step, error=error) | ||
| logging.info( | ||
| f"[STEP END] {step.name} (v{step.version}) status=succeeded" | ||
| ) | ||
| logging.info("Pipeline completed") | ||
| except PipelineAbort: | ||
| name = current_step.name if current_step else "<none>" | ||
rohitkumarbhagat marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| logging.info(f"[STEP END] {name} status=aborted; pipeline aborted") | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,183 @@ | ||
| #!/usr/bin/env python3 | ||
|
|
||
| # Copyright 2025 Google LLC | ||
| # | ||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||
| # you may not use this file except in compliance with the License. | ||
| # You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
| """Unit tests for the agentic pipeline skeleton.""" | ||
|
|
||
| import os | ||
| import sys | ||
| import unittest | ||
|
|
||
| _SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) | ||
| sys.path.append(_SCRIPT_DIR) | ||
|
|
||
| from pipeline import ( # pylint: disable=import-error | ||
| BaseStep, CompositeCallback, Pipeline, PipelineAbort, PipelineCallback, | ||
| PipelineRunner, RunnerConfig, Step, | ||
| ) | ||
|
|
||
|
|
||
| class _TrackingStep(BaseStep): | ||
|
|
||
| def __init__(self, name: str, events: list[str]) -> None: | ||
| super().__init__(name=name, version="1") | ||
| self._events = events | ||
| self.executed = False | ||
|
|
||
| def run(self) -> None: | ||
| self.executed = True | ||
| self._events.append(f"run:{self.name}") | ||
|
|
||
|
|
||
| class _FailingStep(BaseStep): | ||
|
|
||
| def __init__(self, *, name: str, version: str) -> None: | ||
| super().__init__(name=name, version=version) | ||
|
|
||
| def run(self) -> None: | ||
| raise ValueError("boom") | ||
|
|
||
|
|
||
| class PipelineRunnerTest(unittest.TestCase): | ||
|
|
||
| def _build_pipeline(self, events: list[str]) -> Pipeline: | ||
| step_one = _TrackingStep("one", events) | ||
| step_two = _TrackingStep("two", events) | ||
| return Pipeline(steps=[step_one, step_two]) | ||
|
|
||
| def test_on_before_step_runs_before_each_step(self) -> None: | ||
| events: list[str] = [] | ||
|
|
||
| class RecordingCallback(PipelineCallback): | ||
|
|
||
| def before_step(self, step: Step) -> None: | ||
| events.append(f"before:{step.name}") | ||
|
|
||
| pipeline = self._build_pipeline(events) | ||
| PipelineRunner(RunnerConfig()).run(pipeline, RecordingCallback()) | ||
|
|
||
| self.assertEqual( | ||
| events, | ||
| [ | ||
| "before:one", | ||
| "run:one", | ||
| "before:two", | ||
| "run:two", | ||
| ], | ||
| ) | ||
|
|
||
| def test_pipeline_abort_skips_downstream_steps(self) -> None: | ||
| events: list[str] = [] | ||
| pipeline = self._build_pipeline(events) | ||
| runner = PipelineRunner(RunnerConfig()) | ||
|
|
||
| class AbortOnSecond(PipelineCallback): | ||
|
|
||
| def before_step(self, step: Step) -> None: | ||
| if step.name == "two": | ||
| raise PipelineAbort("stop") | ||
|
|
||
| runner.run(pipeline, AbortOnSecond()) | ||
|
|
||
| self.assertEqual(events, ["run:one"]) | ||
| # PipelineAbort is swallowed by the runner, so execution simply stops. | ||
|
|
||
| def test_before_step_exception_skips_after_step(self) -> None: | ||
| events: list[str] = [] | ||
| pipeline = Pipeline(steps=[_TrackingStep("one", events)]) | ||
| runner = PipelineRunner(RunnerConfig()) | ||
|
|
||
| class RecordingCallback(PipelineCallback): | ||
|
|
||
| def before_step(self, step: Step) -> None: | ||
| events.append(f"before:{step.name}") | ||
| raise RuntimeError("boom") | ||
|
|
||
| def after_step(self, | ||
| step: Step, | ||
| *, | ||
| error: Exception | None = None) -> None: | ||
| del step, error | ||
| events.append("after-called") | ||
|
|
||
| with self.assertRaises(RuntimeError): | ||
| runner.run(pipeline, RecordingCallback()) | ||
|
|
||
| self.assertEqual(events, ["before:one"]) | ||
|
|
||
| def test_after_step_receives_error_when_step_fails(self) -> None: | ||
|
|
||
| class RecordingCallback(PipelineCallback): | ||
|
|
||
| def __init__(self) -> None: | ||
| self.after_calls: list[tuple[str, str | None]] = [] | ||
|
|
||
| def after_step(self, | ||
| step: Step, | ||
| *, | ||
| error: Exception | None = None) -> None: | ||
| name = step.name | ||
| error_name = type(error).__name__ if error else None | ||
| self.after_calls.append((name, error_name)) | ||
|
|
||
| callback = RecordingCallback() | ||
| pipeline = Pipeline(steps=[_FailingStep(name="fail-step", version="1")]) | ||
|
|
||
| with self.assertRaises(ValueError): | ||
| PipelineRunner(RunnerConfig()).run(pipeline, callback) | ||
|
|
||
| self.assertEqual(callback.after_calls, [("fail-step", "ValueError")]) | ||
|
|
||
|
|
||
| class CompositeCallbackTest(unittest.TestCase): | ||
|
|
||
| def test_callbacks_run_in_order_for_each_hook(self) -> None: | ||
| events: list[str] = [] | ||
|
|
||
| class RecordingCallback(PipelineCallback): | ||
|
|
||
| def __init__(self, label: str) -> None: | ||
| self._label = label | ||
|
|
||
| def before_step(self, step: Step) -> None: | ||
| events.append(f"{self._label}:before:{step.name}") | ||
|
|
||
| def after_step(self, | ||
| step: Step, | ||
| *, | ||
| error: Exception | None = None) -> None: | ||
| del error | ||
| events.append(f"{self._label}:after:{step.name}") | ||
|
|
||
| composite = CompositeCallback( | ||
| [RecordingCallback("first"), | ||
| RecordingCallback("second")]) | ||
| step = _TrackingStep("composite", events) | ||
|
|
||
| composite.before_step(step) | ||
| composite.after_step(step) | ||
|
|
||
| self.assertEqual( | ||
| events, | ||
| [ | ||
| "first:before:composite", | ||
| "second:before:composite", | ||
| "first:after:composite", | ||
| "second:after:composite", | ||
| ], | ||
| ) | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| unittest.main() |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this return a bool run status?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO, run without an error indicates a successful run. Failures can be raised as an exception. Using false status for failure would require caller to handle both exception and false for failures - adding more handling.