Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
66 commits
Select commit Hold shift + click to select a range
780074e
Add agentic pipeline runner scaffold
rohitkumarbhagat Nov 10, 2025
c125d78
Add SDMX pipeline state callback and tests
rohitkumarbhagat Nov 10, 2025
5271fd3
Polish agentic import docstrings and format
rohitkumarbhagat Nov 10, 2025
d6441f6
Improve agentic import runner observability
rohitkumarbhagat Nov 11, 2025
593db87
Add interactive confirmation to SDMX runner
rohitkumarbhagat Nov 11, 2025
cb78e38
Log dry-run previews instead of returning text
rohitkumarbhagat Nov 11, 2025
746e130
Add SDMX pipeline planning
rohitkumarbhagat Nov 11, 2025
561d61c
Generalize SDMX pipeline naming
rohitkumarbhagat Nov 11, 2025
0cfcc21
Use explicit SDMX registry builder
rohitkumarbhagat Nov 11, 2025
8d3525a
feat(sdmx-import): Add timestamps and improve pipeline planning
rohitkumarbhagat Nov 12, 2025
d6b0c98
refactor(sdmx-import): Simplify phase factory lambda
rohitkumarbhagat Nov 12, 2025
7f15f17
Refactor(sdmx-import): Flatten pipeline steps and remove phases
rohitkumarbhagat Nov 12, 2025
475ba5b
refactor(sdmx-import): Simplify pipeline step definition and construc…
rohitkumarbhagat Nov 12, 2025
24d0e0c
feat(sdmx-import): add run_sdmx_pipeline orchestration
rohitkumarbhagat Nov 13, 2025
c41eb3f
feat(sdmx-import): add CLI entrypoint for SDMX pipeline
rohitkumarbhagat Nov 13, 2025
4be87a7
refactor: centralize SDMX pipeline planning
rohitkumarbhagat Nov 14, 2025
55b5015
Merge branch 'master' of github.com:datacommonsorg/data into sdmx-imp…
rohitkumarbhagat Nov 14, 2025
69831c2
fix: add repo root to sys.path
rohitkumarbhagat Nov 14, 2025
95214cb
chore: log SDMX pipeline step outcomes
rohitkumarbhagat Nov 14, 2025
d257f0c
Merge branch 'master' of github.com:datacommonsorg/data into sdmx-imp…
rohitkumarbhagat Nov 14, 2025
acca4aa
Merge branch 'master' of github.com:datacommonsorg/data into sdmx-imp…
rohitkumarbhagat Nov 18, 2025
7400c0d
feat: implement metadata download step
rohitkumarbhagat Nov 25, 2025
3ff6bee
Merge branch 'master' of github.com:datacommonsorg/data into sdmx-imp…
rohitkumarbhagat Nov 25, 2025
3ea7f21
Refactor SDMX pipeline to use shared subprocess wrapper and cached co…
rohitkumarbhagat Nov 25, 2025
c1936bd
Implement DownloadDataStep in sdmx_import_pipeline
rohitkumarbhagat Nov 25, 2025
db4caa4
Refactor SDMX config resolution and align flag names
rohitkumarbhagat Nov 25, 2025
fc7c2e4
feat: Add early input file existence check to `CreateSampleStep` and …
rohitkumarbhagat Nov 25, 2025
e7c8db5
refactor: Replace `CommandPlan` dataclass with a nested `_StepContext…
rohitkumarbhagat Nov 25, 2025
9a7e38d
refactor: move `dry_run` from the general `Step` interface to `SdmxSt…
rohitkumarbhagat Nov 25, 2025
8fed16a
feat: Enable CreateSchemaMapStep to execute pvmap_generator.py and ad…
rohitkumarbhagat Nov 26, 2025
26800ad
feat: implement `ProcessFullDataStep` using `stat_var_processor` and …
rohitkumarbhagat Nov 26, 2025
55360a9
feat: Implement `CreateDcConfigStep` to generate custom DC configurat…
rohitkumarbhagat Nov 26, 2025
76128ab
Merge branch 'master' of https://github.com/datacommonsorg/data into …
rohitkumarbhagat Nov 26, 2025
1926be1
refactor: Rename dummy config to `_TEST_CONFIG` and centralize dummy …
rohitkumarbhagat Nov 26, 2025
503ea7d
refactor: replace global flags with structured dataclass-based config…
rohitkumarbhagat Nov 26, 2025
58f5d51
feat: Reorder imports and adjust `PipelineBuilder` parameter indentation
rohitkumarbhagat Nov 26, 2025
6cc6172
feat: Add SDMX agentic import pipeline with new documentation, code, …
rohitkumarbhagat Nov 26, 2025
8e4e7a7
fix: Move input file existence checks to `run` methods, allowing dry …
rohitkumarbhagat Nov 26, 2025
7b4bb8d
feat: Introduce `working_dir` flag and encapsulate flag definitions t…
rohitkumarbhagat Nov 27, 2025
5fd7e25
docs: clarify SDMX import pipeline prerequisites, usage, step names, …
rohitkumarbhagat Nov 27, 2025
eff8705
refactor: extract common SDMX test setup and helper methods into a ne…
rohitkumarbhagat Nov 27, 2025
0fe16c5
Merge branch 'master' of https://github.com/datacommonsorg/data into …
rohitkumarbhagat Nov 27, 2025
11abd36
feat: Resolve relative input and output paths against the working dir…
rohitkumarbhagat Nov 27, 2025
b08f8ca
feat: Add working directory flag to SDMX import pipeline and ensure a…
rohitkumarbhagat Nov 27, 2025
cd9041a
docs: Consolidate comments for output_path parsing logic.
rohitkumarbhagat Nov 27, 2025
650800b
fix: enforce absolute generator paths
rohitkumarbhagat Nov 27, 2025
a97e212
Refactor run/dry-run test helpers
rohitkumarbhagat Nov 27, 2025
b98bbac
Refactor cache plan tests
rohitkumarbhagat Nov 27, 2025
c178bfa
Refactor missing-input test helpers
rohitkumarbhagat Nov 27, 2025
ef361a9
lint fix
rohitkumarbhagat Nov 27, 2025
6a93ffc
Fix absolute output path handling
rohitkumarbhagat Nov 27, 2025
fc8d555
Add abs path backup test
rohitkumarbhagat Nov 27, 2025
7b33667
minor fix in tests
rohitkumarbhagat Nov 27, 2025
cbec5c1
refactor
rohitkumarbhagat Nov 27, 2025
1f74094
refactor: Remove `get_steps` method and directly access `pipeline.ste…
rohitkumarbhagat Nov 27, 2025
31c00f8
feat: Introduce structured SDMX agentic import pipeline with dedicate…
rohitkumarbhagat Nov 27, 2025
b10c1be
refactor: centralize SDMX flag constants
rohitkumarbhagat Nov 27, 2025
c3c9001
Merge branch 'master' of github.com:datacommonsorg/data into sdmx-imp…
rohitkumarbhagat Dec 8, 2025
e9105a9
fix: support string version identifiers
rohitkumarbhagat Dec 8, 2025
a2b81bd
removed del
rohitkumarbhagat Dec 8, 2025
4d6afce
Merge branch 'master' into sdmx-import-harness
rohitkumarbhagat Dec 12, 2025
650224d
Merge branch 'master' into sdmx-import-harness
rohitkumarbhagat Dec 15, 2025
cdc944e
Merge branch 'master' into sdmx-import-harness
rohitkumarbhagat Dec 16, 2025
2ef93cc
Merge branch 'master' of github.com:datacommonsorg/data into sdmx-imp…
rohitkumarbhagat Dec 16, 2025
9f5d634
Merge branch 'master' of github.com:datacommonsorg/data into sdmx-imp…
rohitkumarbhagat Dec 17, 2025
17f43bf
add comment
rohitkumarbhagat Dec 17, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions requirements_all.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

absl-py
chembl-webresource-client
dataclasses-json
deepdiff
earthengine-api
flask_restful
Expand Down
11 changes: 11 additions & 0 deletions tools/agentic_import/backup_processor_run_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,17 @@ def _read_manifest(self, backup_dir: Path) -> str:
with open(manifest_path, 'r') as manifest_file:
return manifest_file.read()

def test_absolute_path_copied(self):
absolute_file = self.working_dir / 'abs.txt'
absolute_file.write_text('absolute')

backup_dir = self._run_backup([str(absolute_file)])

self.assertTrue((backup_dir / 'abs.txt').exists())
manifest = self._read_manifest(backup_dir)
self.assertIn(str(absolute_file), manifest)
self.assertNotIn('Skipped (missing or blocked):', manifest)

def test_copies_requested_files(self):
first = self.working_dir / 'a.txt'
second = self.working_dir / 'b.txt'
Expand Down
136 changes: 136 additions & 0 deletions tools/agentic_import/pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Generic building blocks for lightweight agentic pipelines.
"""

from __future__ import annotations

import abc
from dataclasses import dataclass
from typing import Sequence

from absl import logging


class PipelineAbort(Exception):
"""Raised when pipeline execution should stop early for any reason."""


class Step(abc.ABC):
"""Abstract pipeline step interface."""

@property
@abc.abstractmethod
def name(self) -> str:
"""Human friendly identifier used for logging."""

@property
@abc.abstractmethod
def version(self) -> str:
"""Version string used for invalidation decisions."""

@abc.abstractmethod
def run(self) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this return a bool run status?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, run without an error indicates a successful run. Failures can be raised as an exception. Using false status for failure would require caller to handle both exception and false for failures - adding more handling.

"""Execute the step. Raise an exception to signal failure."""


class BaseStep(Step, abc.ABC):
"""Helper base class that stores mandatory metadata."""

def __init__(self, *, name: str, version: str) -> None:
if not name:
raise ValueError("step requires a name")
self._name = name
self._version = version

@property
def name(self) -> str:
return self._name

@property
def version(self) -> str:
return self._version


@dataclass(frozen=True)
class Pipeline:
steps: Sequence[Step]


class PipelineCallback:
"""Lifecycle hooks consumed by the runner; defaults are no-ops."""

def before_step(self, step: Step) -> None:
"""Called immediately before `step.run()`; raising an error skips execution."""

def after_step(self, step: Step, *, error: Exception | None = None) -> None:
"""Runs once per step after `step.run()` succeeds or raises."""


class CompositeCallback(PipelineCallback):
"""Fans out events to child callbacks in order."""

def __init__(self, callbacks: Sequence[PipelineCallback]) -> None:
self._callbacks = list(callbacks)

def before_step(self, step: Step) -> None:
for callback in self._callbacks:
callback.before_step(step)

def after_step(self, step: Step, *, error: Exception | None = None) -> None:
for callback in self._callbacks:
callback.after_step(step, error=error)


@dataclass(frozen=True)
class RunnerConfig:
"""Placeholder for future runner toggles."""


class PipelineRunner:

def __init__(self, config: RunnerConfig | None = None) -> None:
self._config = config or RunnerConfig()

def run(self,
pipeline: Pipeline,
callback: PipelineCallback | None = None) -> None:
current_step: Step | None = None
steps = pipeline.steps
logging.info(f"Starting pipeline with {len(steps)} steps")
try:
for step in steps:
current_step = step
logging.info(f"[STEP START] {step.name} (v{step.version})")
if callback:
callback.before_step(step)
error: Exception | None = None
try:
step.run()
except Exception as exc: # pylint: disable=broad-except
error = exc
logging.exception(
f"[STEP END] {step.name} (v{step.version}) status=failed"
)
raise
finally:
if callback:
callback.after_step(step, error=error)
logging.info(
f"[STEP END] {step.name} (v{step.version}) status=succeeded"
)
logging.info("Pipeline completed")
except PipelineAbort:
name = current_step.name if current_step else "<none>"
logging.info(f"[STEP END] {name} status=aborted; pipeline aborted")
183 changes: 183 additions & 0 deletions tools/agentic_import/pipeline_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
#!/usr/bin/env python3

# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Unit tests for the agentic pipeline skeleton."""

import os
import sys
import unittest

_SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.append(_SCRIPT_DIR)

from pipeline import ( # pylint: disable=import-error
BaseStep, CompositeCallback, Pipeline, PipelineAbort, PipelineCallback,
PipelineRunner, RunnerConfig, Step,
)


class _TrackingStep(BaseStep):

def __init__(self, name: str, events: list[str]) -> None:
super().__init__(name=name, version="1")
self._events = events
self.executed = False

def run(self) -> None:
self.executed = True
self._events.append(f"run:{self.name}")


class _FailingStep(BaseStep):

def __init__(self, *, name: str, version: str) -> None:
super().__init__(name=name, version=version)

def run(self) -> None:
raise ValueError("boom")


class PipelineRunnerTest(unittest.TestCase):

def _build_pipeline(self, events: list[str]) -> Pipeline:
step_one = _TrackingStep("one", events)
step_two = _TrackingStep("two", events)
return Pipeline(steps=[step_one, step_two])

def test_on_before_step_runs_before_each_step(self) -> None:
events: list[str] = []

class RecordingCallback(PipelineCallback):

def before_step(self, step: Step) -> None:
events.append(f"before:{step.name}")

pipeline = self._build_pipeline(events)
PipelineRunner(RunnerConfig()).run(pipeline, RecordingCallback())

self.assertEqual(
events,
[
"before:one",
"run:one",
"before:two",
"run:two",
],
)

def test_pipeline_abort_skips_downstream_steps(self) -> None:
events: list[str] = []
pipeline = self._build_pipeline(events)
runner = PipelineRunner(RunnerConfig())

class AbortOnSecond(PipelineCallback):

def before_step(self, step: Step) -> None:
if step.name == "two":
raise PipelineAbort("stop")

runner.run(pipeline, AbortOnSecond())

self.assertEqual(events, ["run:one"])
# PipelineAbort is swallowed by the runner, so execution simply stops.

def test_before_step_exception_skips_after_step(self) -> None:
events: list[str] = []
pipeline = Pipeline(steps=[_TrackingStep("one", events)])
runner = PipelineRunner(RunnerConfig())

class RecordingCallback(PipelineCallback):

def before_step(self, step: Step) -> None:
events.append(f"before:{step.name}")
raise RuntimeError("boom")

def after_step(self,
step: Step,
*,
error: Exception | None = None) -> None:
del step, error
events.append("after-called")

with self.assertRaises(RuntimeError):
runner.run(pipeline, RecordingCallback())

self.assertEqual(events, ["before:one"])

def test_after_step_receives_error_when_step_fails(self) -> None:

class RecordingCallback(PipelineCallback):

def __init__(self) -> None:
self.after_calls: list[tuple[str, str | None]] = []

def after_step(self,
step: Step,
*,
error: Exception | None = None) -> None:
name = step.name
error_name = type(error).__name__ if error else None
self.after_calls.append((name, error_name))

callback = RecordingCallback()
pipeline = Pipeline(steps=[_FailingStep(name="fail-step", version="1")])

with self.assertRaises(ValueError):
PipelineRunner(RunnerConfig()).run(pipeline, callback)

self.assertEqual(callback.after_calls, [("fail-step", "ValueError")])


class CompositeCallbackTest(unittest.TestCase):

def test_callbacks_run_in_order_for_each_hook(self) -> None:
events: list[str] = []

class RecordingCallback(PipelineCallback):

def __init__(self, label: str) -> None:
self._label = label

def before_step(self, step: Step) -> None:
events.append(f"{self._label}:before:{step.name}")

def after_step(self,
step: Step,
*,
error: Exception | None = None) -> None:
del error
events.append(f"{self._label}:after:{step.name}")

composite = CompositeCallback(
[RecordingCallback("first"),
RecordingCallback("second")])
step = _TrackingStep("composite", events)

composite.before_step(step)
composite.after_step(step)

self.assertEqual(
events,
[
"first:before:composite",
"second:before:composite",
"first:after:composite",
"second:after:composite",
],
)


if __name__ == "__main__":
unittest.main()
Loading
Loading