diff --git a/requirements_all.txt b/requirements_all.txt index 9ce4e5a975..6983f07837 100644 --- a/requirements_all.txt +++ b/requirements_all.txt @@ -9,6 +9,7 @@ absl-py chembl-webresource-client +dataclasses-json deepdiff earthengine-api flask_restful diff --git a/tools/agentic_import/backup_processor_run_test.py b/tools/agentic_import/backup_processor_run_test.py index 3681b3e7ae..027f1f81f3 100644 --- a/tools/agentic_import/backup_processor_run_test.py +++ b/tools/agentic_import/backup_processor_run_test.py @@ -45,6 +45,17 @@ def _read_manifest(self, backup_dir: Path) -> str: with open(manifest_path, 'r') as manifest_file: return manifest_file.read() + def test_absolute_path_copied(self): + absolute_file = self.working_dir / 'abs.txt' + absolute_file.write_text('absolute') + + backup_dir = self._run_backup([str(absolute_file)]) + + self.assertTrue((backup_dir / 'abs.txt').exists()) + manifest = self._read_manifest(backup_dir) + self.assertIn(str(absolute_file), manifest) + self.assertNotIn('Skipped (missing or blocked):', manifest) + def test_copies_requested_files(self): first = self.working_dir / 'a.txt' second = self.working_dir / 'b.txt' diff --git a/tools/agentic_import/pipeline.py b/tools/agentic_import/pipeline.py new file mode 100644 index 0000000000..57b234da98 --- /dev/null +++ b/tools/agentic_import/pipeline.py @@ -0,0 +1,136 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Generic building blocks for lightweight agentic pipelines. +""" + +from __future__ import annotations + +import abc +from dataclasses import dataclass +from typing import Sequence + +from absl import logging + + +class PipelineAbort(Exception): + """Raised when pipeline execution should stop early for any reason.""" + + +class Step(abc.ABC): + """Abstract pipeline step interface.""" + + @property + @abc.abstractmethod + def name(self) -> str: + """Human friendly identifier used for logging.""" + + @property + @abc.abstractmethod + def version(self) -> str: + """Version string used for invalidation decisions.""" + + @abc.abstractmethod + def run(self) -> None: + """Execute the step. Raise an exception to signal failure.""" + + +class BaseStep(Step, abc.ABC): + """Helper base class that stores mandatory metadata.""" + + def __init__(self, *, name: str, version: str) -> None: + if not name: + raise ValueError("step requires a name") + self._name = name + self._version = version + + @property + def name(self) -> str: + return self._name + + @property + def version(self) -> str: + return self._version + + +@dataclass(frozen=True) +class Pipeline: + steps: Sequence[Step] + + +class PipelineCallback: + """Lifecycle hooks consumed by the runner; defaults are no-ops.""" + + def before_step(self, step: Step) -> None: + """Called immediately before `step.run()`; raising an error skips execution.""" + + def after_step(self, step: Step, *, error: Exception | None = None) -> None: + """Runs once per step after `step.run()` succeeds or raises.""" + + +class CompositeCallback(PipelineCallback): + """Fans out events to child callbacks in order.""" + + def __init__(self, callbacks: Sequence[PipelineCallback]) -> None: + self._callbacks = list(callbacks) + + def before_step(self, step: Step) -> None: + for callback in self._callbacks: + callback.before_step(step) + + def after_step(self, step: Step, *, error: Exception | None = None) -> None: + for callback in self._callbacks: + callback.after_step(step, error=error) + + +@dataclass(frozen=True) +class RunnerConfig: + """Placeholder for future runner toggles.""" + + +class PipelineRunner: + + def __init__(self, config: RunnerConfig | None = None) -> None: + self._config = config or RunnerConfig() + + def run(self, + pipeline: Pipeline, + callback: PipelineCallback | None = None) -> None: + current_step: Step | None = None + steps = pipeline.steps + logging.info(f"Starting pipeline with {len(steps)} steps") + try: + for step in steps: + current_step = step + logging.info(f"[STEP START] {step.name} (v{step.version})") + if callback: + callback.before_step(step) + error: Exception | None = None + try: + step.run() + except Exception as exc: # pylint: disable=broad-except + error = exc + logging.exception( + f"[STEP END] {step.name} (v{step.version}) status=failed" + ) + raise + finally: + if callback: + callback.after_step(step, error=error) + logging.info( + f"[STEP END] {step.name} (v{step.version}) status=succeeded" + ) + logging.info("Pipeline completed") + except PipelineAbort: + name = current_step.name if current_step else "" + logging.info(f"[STEP END] {name} status=aborted; pipeline aborted") diff --git a/tools/agentic_import/pipeline_test.py b/tools/agentic_import/pipeline_test.py new file mode 100644 index 0000000000..2abfc4c45c --- /dev/null +++ b/tools/agentic_import/pipeline_test.py @@ -0,0 +1,183 @@ +#!/usr/bin/env python3 + +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Unit tests for the agentic pipeline skeleton.""" + +import os +import sys +import unittest + +_SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) +sys.path.append(_SCRIPT_DIR) + +from pipeline import ( # pylint: disable=import-error + BaseStep, CompositeCallback, Pipeline, PipelineAbort, PipelineCallback, + PipelineRunner, RunnerConfig, Step, +) + + +class _TrackingStep(BaseStep): + + def __init__(self, name: str, events: list[str]) -> None: + super().__init__(name=name, version="1") + self._events = events + self.executed = False + + def run(self) -> None: + self.executed = True + self._events.append(f"run:{self.name}") + + +class _FailingStep(BaseStep): + + def __init__(self, *, name: str, version: str) -> None: + super().__init__(name=name, version=version) + + def run(self) -> None: + raise ValueError("boom") + + +class PipelineRunnerTest(unittest.TestCase): + + def _build_pipeline(self, events: list[str]) -> Pipeline: + step_one = _TrackingStep("one", events) + step_two = _TrackingStep("two", events) + return Pipeline(steps=[step_one, step_two]) + + def test_on_before_step_runs_before_each_step(self) -> None: + events: list[str] = [] + + class RecordingCallback(PipelineCallback): + + def before_step(self, step: Step) -> None: + events.append(f"before:{step.name}") + + pipeline = self._build_pipeline(events) + PipelineRunner(RunnerConfig()).run(pipeline, RecordingCallback()) + + self.assertEqual( + events, + [ + "before:one", + "run:one", + "before:two", + "run:two", + ], + ) + + def test_pipeline_abort_skips_downstream_steps(self) -> None: + events: list[str] = [] + pipeline = self._build_pipeline(events) + runner = PipelineRunner(RunnerConfig()) + + class AbortOnSecond(PipelineCallback): + + def before_step(self, step: Step) -> None: + if step.name == "two": + raise PipelineAbort("stop") + + runner.run(pipeline, AbortOnSecond()) + + self.assertEqual(events, ["run:one"]) + # PipelineAbort is swallowed by the runner, so execution simply stops. + + def test_before_step_exception_skips_after_step(self) -> None: + events: list[str] = [] + pipeline = Pipeline(steps=[_TrackingStep("one", events)]) + runner = PipelineRunner(RunnerConfig()) + + class RecordingCallback(PipelineCallback): + + def before_step(self, step: Step) -> None: + events.append(f"before:{step.name}") + raise RuntimeError("boom") + + def after_step(self, + step: Step, + *, + error: Exception | None = None) -> None: + del step, error + events.append("after-called") + + with self.assertRaises(RuntimeError): + runner.run(pipeline, RecordingCallback()) + + self.assertEqual(events, ["before:one"]) + + def test_after_step_receives_error_when_step_fails(self) -> None: + + class RecordingCallback(PipelineCallback): + + def __init__(self) -> None: + self.after_calls: list[tuple[str, str | None]] = [] + + def after_step(self, + step: Step, + *, + error: Exception | None = None) -> None: + name = step.name + error_name = type(error).__name__ if error else None + self.after_calls.append((name, error_name)) + + callback = RecordingCallback() + pipeline = Pipeline(steps=[_FailingStep(name="fail-step", version="1")]) + + with self.assertRaises(ValueError): + PipelineRunner(RunnerConfig()).run(pipeline, callback) + + self.assertEqual(callback.after_calls, [("fail-step", "ValueError")]) + + +class CompositeCallbackTest(unittest.TestCase): + + def test_callbacks_run_in_order_for_each_hook(self) -> None: + events: list[str] = [] + + class RecordingCallback(PipelineCallback): + + def __init__(self, label: str) -> None: + self._label = label + + def before_step(self, step: Step) -> None: + events.append(f"{self._label}:before:{step.name}") + + def after_step(self, + step: Step, + *, + error: Exception | None = None) -> None: + del error + events.append(f"{self._label}:after:{step.name}") + + composite = CompositeCallback( + [RecordingCallback("first"), + RecordingCallback("second")]) + step = _TrackingStep("composite", events) + + composite.before_step(step) + composite.after_step(step) + + self.assertEqual( + events, + [ + "first:before:composite", + "second:before:composite", + "first:after:composite", + "second:after:composite", + ], + ) + + +if __name__ == "__main__": + unittest.main() diff --git a/tools/agentic_import/pvmap_generator.py b/tools/agentic_import/pvmap_generator.py index 2bf0217d98..eb1cdf5bdb 100644 --- a/tools/agentic_import/pvmap_generator.py +++ b/tools/agentic_import/pvmap_generator.py @@ -33,45 +33,60 @@ _FLAGS = flags.FLAGS _SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) -flags.DEFINE_list('input_data', None, - 'List of input data file paths (required)') -flags.mark_flag_as_required('input_data') -# TODO: Allow users to provide original source path and auto-generate sample data files internally -flags.DEFINE_list('input_metadata', [], - 'List of input metadata file paths (optional)') +def _define_flags(): + try: + flags.DEFINE_list('input_data', None, + 'List of input data file paths (required)') + flags.mark_flag_as_required('input_data') -flags.DEFINE_boolean('sdmx_dataset', False, - 'Whether the dataset is in SDMX format (default: False)') + flags.DEFINE_list('input_metadata', [], + 'List of input metadata file paths (optional)') -flags.DEFINE_boolean('dry_run', False, - 'Generate prompt only without calling Gemini CLI') + flags.DEFINE_boolean( + 'sdmx_dataset', False, + 'Whether the dataset is in SDMX format (default: False)') -flags.DEFINE_string('maps_api_key', None, 'Google Maps API key (optional)') + flags.DEFINE_boolean('dry_run', False, + 'Generate prompt only without calling Gemini CLI') -flags.DEFINE_string('dc_api_key', None, 'Data Commons API key (optional)') + flags.DEFINE_string('maps_api_key', None, + 'Google Maps API key (optional)') -flags.DEFINE_integer('max_iterations', 10, - 'Maximum number of attempts for statvar processor.') + flags.DEFINE_string('dc_api_key', None, + 'Data Commons API key (optional)') -flags.DEFINE_boolean( - 'skip_confirmation', False, - 'Skip user confirmation before starting PV map generation') + flags.DEFINE_integer( + 'max_iterations', 10, + 'Maximum number of attempts for statvar processor.') -flags.DEFINE_boolean( - 'enable_sandboxing', - platform.system() == 'Darwin', - 'Enable sandboxing for Gemini CLI (default: True on macOS, False elsewhere)' -) + flags.DEFINE_boolean( + 'skip_confirmation', False, + 'Skip user confirmation before starting PV map generation') -flags.DEFINE_string( - 'output_path', 'output/output', - 'Output path prefix for all generated files (default: output/output)') + flags.DEFINE_boolean( + 'enable_sandboxing', + platform.system() == 'Darwin', + 'Enable sandboxing for Gemini CLI (default: True on macOS, False elsewhere)' + ) + + flags.DEFINE_string( + 'output_path', 'output/output', + 'Output path prefix for all generated files (default: output/output)' + ) -flags.DEFINE_string( - 'gemini_cli', 'gemini', 'Custom path or command to invoke Gemini CLI. ' - 'Example: "/usr/local/bin/gemini". ' - 'WARNING: This value is executed in a shell - use only with trusted input.') + flags.DEFINE_string( + 'gemini_cli', 'gemini', + 'Custom path or command to invoke Gemini CLI. ' + 'Example: "/usr/local/bin/gemini". ' + 'WARNING: This value is executed in a shell - use only with trusted input.' + ) + + flags.DEFINE_string( + 'working_dir', None, + 'Working directory for the generator (default: current directory)') + except flags.DuplicateFlagError: + pass @dataclass @@ -93,6 +108,7 @@ class Config: enable_sandboxing: bool = False output_path: str = 'output/output' gemini_cli: Optional[str] = None + working_dir: Optional[str] = None @dataclass @@ -110,7 +126,12 @@ class PVMapGenerator: def __init__(self, config: Config): # Define working directory once for consistent path resolution - self._working_dir = Path.cwd() + self._working_dir = Path( + config.working_dir).resolve() if config.working_dir else Path.cwd() + if self._working_dir.exists() and not self._working_dir.is_dir(): + raise ValueError( + f"working_dir is not a directory: {self._working_dir}") + self._working_dir.mkdir(parents=True, exist_ok=True) # Copy config to avoid modifying the original self._config = copy.deepcopy(config) @@ -129,14 +150,18 @@ def __init__(self, config: Config): for path in self._config.data_config.input_metadata ] - # Parse output_path into directory and basename components - output_path = Path(self._config.output_path) - self._output_dir = output_path.parent - self._output_basename = output_path.name + # Parse output_path into absolute path, handling relative paths and ~ expansion + output_path = Path(self._config.output_path).expanduser() + if not output_path.is_absolute(): + output_path = self._working_dir / output_path + self._output_path_abs = output_path.resolve() + + self._output_dir_abs = self._output_path_abs.parent + self._output_basename = self._output_path_abs.name + self._config.output_path = str(self._output_path_abs) # Create output directory if it doesn't exist - output_full_dir = self._working_dir / self._output_dir - output_full_dir.mkdir(parents=True, exist_ok=True) + self._output_dir_abs.mkdir(parents=True, exist_ok=True) self._datacommons_dir = self._initialize_datacommons_dir() @@ -150,7 +175,10 @@ def __init__(self, config: Config): def _validate_and_convert_path(self, path: str) -> Path: """Convert path to absolute and validate it's within working directory.""" - real_path = Path(path).expanduser().resolve() + p = Path(path).expanduser() + if not p.is_absolute(): + p = self._working_dir / p + real_path = p.resolve() working_dir = self._working_dir.resolve() try: real_path.relative_to(working_dir) @@ -184,7 +212,7 @@ def _get_user_confirmation(self, prompt_file: Path) -> bool: print(f"Generated prompt: {prompt_file}") print(f"Working directory: {self._working_dir}") print(f"Output path: {self._config.output_path}") - print(f"Output directory: {self._output_dir}") + print(f"Output directory: {self._output_dir_abs}") print(f"Output basename: {self._output_basename}") print( f"Sandboxing: {'Enabled' if self._config.enable_sandboxing else 'Disabled'}" @@ -314,6 +342,7 @@ def _run_subprocess(self, command: str) -> int: stdout=subprocess.PIPE, stderr=subprocess.STDOUT, # Combine stderr with stdout shell=True, # Using shell to support pipe operations + cwd=self._working_dir, # Run in the specified working directory encoding='utf-8', errors='replace', bufsize=1, # Line buffered @@ -348,21 +377,21 @@ def _generate_prompt(self) -> Path: template = env.get_template('generate_pvmap_prompt.j2') # Calculate paths and prepare template variables - working_dir = str(self._working_dir) # Use defined working directory + working_dir = str(self._working_dir) # Absolute working directory # Point to tools/ directory (parent of agentic_import) - tools_dir = os.path.abspath(os.path.join(_SCRIPT_DIR, '..')) + tools_dir = os.path.abspath(os.path.join(_SCRIPT_DIR, '..')) # Absolute template_vars = { - 'working_dir': + 'working_dir_abs': working_dir, 'python_interpreter': sys.executable, - 'script_dir': + 'script_dir_abs': tools_dir, - 'input_data': + 'input_data_abs': str(self._config.data_config.input_data[0]) if self._config.data_config.input_data else "", - 'input_metadata': [ + 'input_metadata_abs': [ str(path) for path in self._config.data_config.input_metadata ] if self._config.data_config.input_metadata else [], 'dataset_type': @@ -372,10 +401,10 @@ def _generate_prompt(self) -> Path: 'gemini_run_id': self. _gemini_run_id, # Pass the gemini run ID for backup tracking - 'output_path': - self._config.output_path, # Full path for statvar processor - 'output_dir': - str(self._output_dir), # Directory for pvmap/metadata files + 'output_path_abs': + str(self._output_path_abs), # Absolute path prefix for outputs + 'output_dir_abs': + str(self._output_dir_abs), # Directory for pvmap/metadata files 'output_basename': self._output_basename # Base name for pvmap/metadata files } @@ -406,7 +435,8 @@ def prepare_config() -> Config: skip_confirmation=_FLAGS.skip_confirmation, enable_sandboxing=_FLAGS.enable_sandboxing, output_path=_FLAGS.output_path, - gemini_cli=_FLAGS.gemini_cli) + gemini_cli=_FLAGS.gemini_cli, + working_dir=_FLAGS.working_dir) def main(_): @@ -424,4 +454,5 @@ def main(_): if __name__ == '__main__': + _define_flags() app.run(main) diff --git a/tools/agentic_import/pvmap_generator_test.py b/tools/agentic_import/pvmap_generator_test.py index 32ad242787..5323a684f8 100644 --- a/tools/agentic_import/pvmap_generator_test.py +++ b/tools/agentic_import/pvmap_generator_test.py @@ -116,6 +116,9 @@ def _assert_prompt_content(self, prompt_path: Path, *, expect_sdmx: bool, self.assertIn(f'You have exactly {config.max_iterations} attempts', prompt_text) + # Output path should be absolute in the prompt + self.assertIn(f'--output-path "{config.output_path}"', prompt_text) + if expect_sdmx: # SDMX prompts highlight dataset type and show SDMX-specific banner. self.assertIn('"dataset_type": "sdmx"', prompt_text) @@ -178,6 +181,81 @@ def test_rejects_paths_outside_working_directory(self): input_data=[str(external_file)], input_metadata=[]), dry_run=True)) + def test_generate_prompt_with_relative_working_dir(self): + # Create a subdirectory for the relative working directory test + sub_dir_name = 'sub_working_dir' + sub_dir = Path(self._temp_dir.name) / sub_dir_name + sub_dir.mkdir() + + # Create input files inside the subdirectory + data_file = sub_dir / 'input.csv' + data_file.write_text('header\nvalue') + metadata_file = sub_dir / 'metadata.csv' + metadata_file.write_text('parameter,value') + + # Use relative path for working_dir + config = Config( + data_config=DataConfig( + input_data=['input.csv'], # Relative to working_dir + input_metadata=['metadata.csv'], # Relative to working_dir + is_sdmx_dataset=False, + ), + dry_run=True, + max_iterations=3, + output_path='output/output_file', + working_dir=sub_dir_name, # Relative path + ) + + # We need to run from the parent directory so the relative path is valid + # The setUp already changed to self._temp_dir.name, so we are in the right place + + generator = PVMapGenerator(config) + result = generator.generate() + + self._assert_generation_result(result) + prompt_path = self._read_prompt_path(result) + prompt_text = prompt_path.read_text() + + # Verify that the working directory in the prompt is the absolute path of the subdirectory + expected_working_dir = str(sub_dir.resolve()) + self.assertIn(expected_working_dir, prompt_text) + self.assertIn(f'"working_dir": "{expected_working_dir}"', prompt_text) + + # Verify input paths are also absolute in the prompt + self.assertIn(str(data_file.resolve()), prompt_text) + self.assertIn(str(metadata_file.resolve()), prompt_text) + + def test_relative_paths_resolved_against_working_dir(self): + # Create a separate working directory + with tempfile.TemporaryDirectory() as work_dir: + work_path = Path(work_dir) + # Create input files inside the working directory + data_file = work_path / 'input.csv' + data_file.write_text('header\nvalue') + + # Run from a different directory (current temp dir) + # Use relative path to input file, which should be resolved against work_dir + config = Config( + data_config=DataConfig( + input_data=['input.csv'], # Relative to work_dir + input_metadata=[], + is_sdmx_dataset=False, + ), + dry_run=True, + working_dir=work_dir, + ) + + # This should not raise ValueError because input.csv is found in work_dir + generator = PVMapGenerator(config) + result = generator.generate() + self._assert_generation_result(result) + self.assertEqual(str(generator._config.data_config.input_data[0]), + str(data_file.resolve())) + # Verify output directory is also under working_dir + self.assertTrue( + str(generator._output_dir_abs).startswith( + str(work_path.resolve()))) + if __name__ == '__main__': unittest.main() diff --git a/tools/agentic_import/run_statvar_processor.sh b/tools/agentic_import/run_statvar_processor.sh index d907e4f724..2c2cd83fca 100755 --- a/tools/agentic_import/run_statvar_processor.sh +++ b/tools/agentic_import/run_statvar_processor.sh @@ -67,6 +67,13 @@ if [[ -z "$PYTHON_INTERPRETER" || -z "$SCRIPT_DIR" || -z "$WORKING_DIR" || -z "$ exit 1 fi +# Normalize output prefix: respect absolute paths, otherwise anchor under working dir. +if [[ "${OUTPUT_PATH}" = /* ]]; then + OUTPUT_PREFIX="${OUTPUT_PATH}" +else + OUTPUT_PREFIX="${WORKING_DIR}/${OUTPUT_PATH}" +fi + # Create .datacommons directory if it doesn't exist mkdir -p "${WORKING_DIR}/.datacommons" @@ -81,20 +88,19 @@ OUTPUT_COLUMNS="observationDate,observationAbout,variableMeasured,value,observat echo "Running statvar processor..." "${PYTHON_INTERPRETER}" "${SCRIPT_DIR}/statvar_importer/stat_var_processor.py" \ --input_data="${INPUT_DATA}" \ - --pv_map="${WORKING_DIR}/${OUTPUT_PATH}_pvmap.csv" \ - --config_file="${WORKING_DIR}/${OUTPUT_PATH}_metadata.csv" \ + --pv_map="${OUTPUT_PREFIX}_pvmap.csv" \ + --config_file="${OUTPUT_PREFIX}_metadata.csv" \ --generate_statvar_name=True \ --skip_constant_csv_columns=False \ --output_columns="${OUTPUT_COLUMNS}" \ --output_counters="${WORKING_DIR}/.datacommons/output_counters.csv" \ - --output_path="${WORKING_DIR}/${OUTPUT_PATH}" > "${PROCESSOR_LOG}" 2>&1 + --output_path="${OUTPUT_PREFIX}" > "${PROCESSOR_LOG}" 2>&1 # Capture the processor exit code PROCESSOR_EXIT_CODE=${PIPESTATUS[0]} # Run backup script silently (redirect output to backup log) echo "Backing up run data..." -OUTPUT_PREFIX="${WORKING_DIR}/${OUTPUT_PATH}" declare -a BACKUP_ARGS=( "--working_dir=${WORKING_DIR}" "--gemini_run_id=${GEMINI_RUN_ID}" diff --git a/tools/agentic_import/sdmx_import_pipeline.md b/tools/agentic_import/sdmx_import_pipeline.md new file mode 100644 index 0000000000..32f7eae252 --- /dev/null +++ b/tools/agentic_import/sdmx_import_pipeline.md @@ -0,0 +1,100 @@ +# SDMX Agentic Import Pipeline + +The SDMX Agentic Import Pipeline is a Python-based system designed to automate the retrieval and processing of SDMX (Statistical Data and Metadata eXchange) data for Data Commons. It provides a structured, step-based approach to downloading, sampling, mapping, and processing SDMX data into Data Commons artifacts. + +## Overview + +The pipeline orchestrates several tools to handle the end-to-end import process: +1. **Download**: Retrieves data and metadata from SDMX endpoints. +2. **Sample**: Creates a manageable sample of the data for analysis. +3. **Schema Mapping**: Generates Property-Value (PV) mappings using LLM-based tools. +4. **Full Data Processing**: Converts the full dataset into Data Commons MCF and CSV formats. +5. **Custom DC Config**: Generates configuration for custom Data Commons instances. + +## Prerequisites + +Before running the pipeline, ensure you have set up your environment as described in the [main README](./README.md#step-2-environment-setup). Key requirements include: + +1. **DC_DATA_REPO_PATH**: Environment variable pointing to your cloned Data Commons data repository. +2. **WORKING_DIR**: Environment variable pointing to your working directory. +3. **Python Environment**: Activated virtual environment with required dependencies. +4. **Gemini CLI**: Installed and configured for schema mapping. +5. **Data Commons API Key**: Set in your environment. + +## Usage + +The pipeline is executed using the `sdmx_import_pipeline.py` script. + +**Important:** The command must be run from within your working directory. + +### Basic Command + +```bash +# Ensure you are in your working directory +cd $WORKING_DIR + +# Run the pipeline using the full path to the script +python $DC_DATA_REPO_PATH/tools/agentic_import/sdmx_import_pipeline.py \ + --sdmx.endpoint="https://sdmx.example.org/data" \ + --sdmx.agency="AGENCY_ID" \ + --sdmx.dataflow.id="DATAFLOW_ID" \ + --dataset_prefix="my_dataset" +``` + +### Key Flags + +- `--sdmx.endpoint`: The SDMX API endpoint URL. +- `--sdmx.agency`: The SDMX agency ID. +- `--sdmx.dataflow.id`: The SDMX dataflow ID. +- `--sdmx.dataflow.key`: (Optional) Filter key for data download. +- `--sdmx.dataflow.param`: (Optional) Additional parameters for data download. +- `--dataset_prefix`: (Optional) Prefix for generated artifacts. Useful for disambiguating multiple datasets in the same working directory. If not provided, it is derived from the dataflow ID. +- `--sample.rows`: Number of rows for the sample dataset (default: 1000). +- `--force`: Force re-execution of all steps, ignoring saved state. +- `--skip_confirmation`: Skip interactive confirmation prompts during schema mapping. +- `--verbose`: Enable verbose logging. + +## Pipeline Steps + +The pipeline consists of the following steps, executed in order: + +1. **DownloadDataStep**: Downloads SDMX data to `_data.csv`. +2. **DownloadMetadataStep**: Downloads SDMX metadata to `_metadata.xml`. +3. **CreateSampleStep**: Creates `_sample.csv` from the downloaded data. +4. **CreateSchemaMapStep**: Generates PV map and config in `sample_output/` using `pvmap_generator.py`. +5. **ProcessFullDataStep**: Processes the full data using `stat_var_processor.py` to generate artifacts in `output/`. +6. **CreateDcConfigStep**: Generates `output/_config.json` for custom DC imports. + +## Directory Structure + +The pipeline organizes outputs within the specified working directory: + +``` +working_dir/ +├── _data.csv # Raw downloaded data +├── _metadata.xml # Raw downloaded metadata +├── _sample.csv # Sampled data +├── .datacommons/ +│ └── .state.json # Pipeline state for resuming runs +├── sample_output/ # Intermediate artifacts from mapping +│ ├── _pvmap.csv +│ └── _metadata.csv +└── output/ # Final Data Commons artifacts + ├── .csv + ├── .mcf + ├── .tmcf + └── _config.json +``` + +## State Management + +The pipeline automatically saves its state to a `.state.json` file in the `.datacommons/` directory within your working directory. +- **Resuming**: If a run is interrupted, running the same command again will resume from the last successful step. +- **Skipping**: Steps that have already completed successfully will be skipped unless `--force` is used. +- **Input Hashing**: The pipeline tracks input configuration. If critical configuration changes, it may trigger re-execution of steps. + +## Troubleshooting + +- **Gemini CLI Errors**: If the schema mapping step fails, check the Gemini CLI logs (usually in `.datacommons/runs/` within the working directory). Refer to the [main README](./README.md#debugging) for detailed debugging instructions. +- **Missing Data**: Ensure the SDMX endpoint, agency, and dataflow ID are correct. Use `--verbose` to see the exact commands being run. +- **State Issues**: If the pipeline is stuck or behaving unexpectedly, you can delete the state file to reset the state, or use `--force`. diff --git a/tools/agentic_import/sdmx_import_pipeline.py b/tools/agentic_import/sdmx_import_pipeline.py new file mode 100644 index 0000000000..32957e6c6d --- /dev/null +++ b/tools/agentic_import/sdmx_import_pipeline.py @@ -0,0 +1,333 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Helpers for the SDMX agentic import pipeline.""" + +from __future__ import annotations + +import copy +import hashlib +import json +import os +import re +import shlex +import sys +import dataclasses +from datetime import datetime, timezone +from pathlib import Path +from typing import Callable, Sequence + +from absl import app, flags, logging + +REPO_ROOT = Path(__file__).resolve().parents[2] +if str(REPO_ROOT) not in sys.path: + sys.path.insert(0, str(REPO_ROOT)) + +from tools.agentic_import.pipeline import (CompositeCallback, PipelineAbort, + PipelineCallback, PipelineRunner, + RunnerConfig, Step) +from tools.agentic_import.sdmx_pipeline_builder import build_sdmx_pipeline +from tools.agentic_import.sdmx_pipeline_config import ( + FLAG_SDMX_AGENCY, + FLAG_SDMX_DATAFLOW_ID, + FLAG_SDMX_DATAFLOW_KEY, + FLAG_SDMX_DATAFLOW_PARAM, + FLAG_SDMX_ENDPOINT, + PipelineConfig, + RunConfig, + SampleConfig, + SdmxConfig, + SdmxDataflowConfig, +) +from tools.agentic_import.sdmx_pipeline_steps import SdmxStep +from tools.agentic_import.state_handler import StateHandler, StepState + +# Flag names +_FLAG_SAMPLE_ROWS = "sample.rows" + +FLAGS = flags.FLAGS + + +def _define_flags() -> None: + flags.DEFINE_string(FLAG_SDMX_ENDPOINT, None, "SDMX service endpoint.") + flags.mark_flag_as_required(FLAG_SDMX_ENDPOINT) + + flags.DEFINE_string(FLAG_SDMX_AGENCY, None, + "Owning SDMX agency identifier.") + flags.mark_flag_as_required(FLAG_SDMX_AGENCY) + + flags.DEFINE_string(FLAG_SDMX_DATAFLOW_ID, None, + "Target SDMX dataflow identifier.") + flags.mark_flag_as_required(FLAG_SDMX_DATAFLOW_ID) + + flags.DEFINE_string(FLAG_SDMX_DATAFLOW_KEY, None, + "Optional SDMX key or filter.") + + flags.DEFINE_string( + FLAG_SDMX_DATAFLOW_PARAM, None, + "Optional SDMX parameter appended to the dataflow query.") + + flags.DEFINE_integer(_FLAG_SAMPLE_ROWS, 1000, + "Number of rows to sample from downloaded data.") + + flags.DEFINE_string( + "dataset_prefix", None, + "Optional dataset prefix to override auto-derived values.") + + flags.DEFINE_string("run_only", None, + "Execute only a specific pipeline step by name.") + + flags.DEFINE_boolean("force", False, "Force all steps to run.") + + flags.DEFINE_boolean("verbose", False, "Enable verbose logging.") + + flags.DEFINE_boolean("skip_confirmation", False, + "Skip interactive confirmation prompts.") + + flags.DEFINE_string("gemini_cli", "gemini", + "Path to Gemini CLI executable.") + + flags.DEFINE_string("working_dir", None, + "Working directory for the pipeline.") + + +def _format_time(value: datetime) -> str: + if value.tzinfo is None: + value = value.replace(tzinfo=timezone.utc) + return value.isoformat() + + +def _sanitize_run_id(dataflow: str) -> str: + normalized = dataflow.lower() + normalized = re.sub(r"[^a-z0-9_]+", "_", normalized) + normalized = re.sub(r"_+", "_", normalized) + return normalized.strip("_") + + +def _resolve_dataset_prefix(config: PipelineConfig) -> str: + if config.run.dataset_prefix: + return config.run.dataset_prefix + if not config.sdmx.dataflow.id: + raise ValueError( + "dataflow.id or dataset_prefix is required to derive dataset prefix" + ) + sanitized = _sanitize_run_id(config.sdmx.dataflow.id) + if not sanitized: + raise ValueError("dataflow value is invalid after sanitization") + return sanitized + + +def _compute_critical_input_hash(config: PipelineConfig) -> str: + payload = { + FLAG_SDMX_AGENCY: config.sdmx.agency, + FLAG_SDMX_DATAFLOW_ID: config.sdmx.dataflow.id, + FLAG_SDMX_ENDPOINT: config.sdmx.endpoint, + FLAG_SDMX_DATAFLOW_KEY: config.sdmx.dataflow.key, + FLAG_SDMX_DATAFLOW_PARAM: config.sdmx.dataflow.param, + } + serialized = json.dumps(payload, sort_keys=True, separators=(",", ":")) + return hashlib.sha256(serialized.encode("utf-8")).hexdigest() + + +def _resolve_working_dir(config: PipelineConfig) -> Path: + directory = Path(config.run.working_dir or os.getcwd()).resolve() + if directory.exists() and not directory.is_dir(): + raise ValueError(f"working_dir is not a directory: {directory}") + directory.mkdir(parents=True, exist_ok=True) + return directory + + +def _resolve_config(config: PipelineConfig) -> PipelineConfig: + """Resolves dynamic configuration values and returns a new config.""" + dataset_prefix = _resolve_dataset_prefix(config) + working_dir = _resolve_working_dir(config) + new_run = dataclasses.replace(config.run, + dataset_prefix=dataset_prefix, + working_dir=str(working_dir)) + return dataclasses.replace(config, run=new_run) + + +class InteractiveCallback(PipelineCallback): + """Prompts the user before each step runs.""" + + def before_step(self, step: Step) -> None: + if isinstance(step, SdmxStep): + logging.info(f"Dry run for {step.name} (v{step.version}):") + step.dry_run() + prompt = f"Run step {step.name} (v{step.version})? [Y/n] " + response = input(prompt).strip().lower() + if response in ("n", "no"): + raise PipelineAbort("user declined interactive prompt") + + +class JSONStateCallback(PipelineCallback): + """Persists pipeline progress to the SDMX state file via StateHandler. + + This callback assumes a single process owns the state file for the lifetime + of the run. The CLI or builder sets run metadata up-front; this class only + mutates state after a step executes. + """ + + def __init__(self, + *, + state_handler: StateHandler, + dataset_prefix: str, + critical_input_hash: str, + command: str, + now_fn: Callable[[], datetime] | None = None) -> None: + self._handler = state_handler + self._now_fn = now_fn or (lambda: datetime.now(timezone.utc)) + self._state = self._handler.get_state() + self._state.dataset_prefix = dataset_prefix + self._state.critical_input_hash = critical_input_hash + self._state.command = command + self._step_start_times: dict[str, datetime] = {} + logging.info(f"JSON state will be written to {self._handler.path}") + + def before_step(self, step: Step) -> None: + started_at = self._now() + self._step_start_times[step.name] = started_at + + def after_step(self, step: Step, *, error: Exception | None = None) -> None: + ended_at = self._now() + started_at = self._step_start_times.pop(step.name, None) + if started_at is None: + started_at = ended_at + duration = max(0.0, (ended_at - started_at).total_seconds()) + started_at_ts = int(started_at.timestamp() * 1000) + ended_at_ts = int(ended_at.timestamp() * 1000) + if isinstance(error, PipelineAbort): + logging.info( + f"Skipping state update for {step.name} due to pipeline abort") + return + if error: + message = str(error) or error.__class__.__name__ + else: + message = None + # Step stats are persisted only after the step finishes; steps can still + # be skipped after their before_step callback runs, so we leave skipped + # steps untouched to preserve prior state. + step_state = StepState( + version=step.version, + status="failed" if error else "succeeded", + started_at=_format_time(started_at), + ended_at=_format_time(ended_at), + duration_s=duration, + started_at_ts=started_at_ts, + ended_at_ts=ended_at_ts, + message=message, + ) + self._state.steps[step.name] = step_state + self._state.updated_at = step_state.ended_at + self._state.updated_at_ts = ended_at_ts + self._handler.save_state() + + def _now(self) -> datetime: + return self._now_fn() + + +def build_pipeline_callback( + *, + state_handler: StateHandler, + dataset_prefix: str, + critical_input_hash: str, + command: str, + skip_confirmation: bool, + now_fn: Callable[[], datetime] | None = None, +) -> PipelineCallback: + """Constructs the pipeline callback stack for the SDMX runner.""" + json_callback = JSONStateCallback(state_handler=state_handler, + dataset_prefix=dataset_prefix, + critical_input_hash=critical_input_hash, + command=command, + now_fn=now_fn) + if skip_confirmation: + return json_callback + interactive = InteractiveCallback() + return CompositeCallback([interactive, json_callback]) + + +def run_sdmx_pipeline( + *, + config: PipelineConfig, + now_fn: Callable[[], datetime] | None = None, +) -> None: + """Orchestrates the SDMX pipeline for the provided configuration.""" + resolved_config = _resolve_config(config) + working_dir = Path(resolved_config.run.working_dir) + dataset_prefix = resolved_config.run.dataset_prefix + state_handler = StateHandler( + state_path=working_dir / ".datacommons" / + f"{dataset_prefix}.state.json", + dataset_prefix=dataset_prefix, + ) + state = state_handler.get_state() + # Snapshot state for planning so callback mutations do not affect scheduling. + state_snapshot = copy.deepcopy(state) + critical_hash = _compute_critical_input_hash(resolved_config) + pipeline = build_sdmx_pipeline(config=resolved_config, + state=state_snapshot, + critical_input_hash=critical_hash) + callback = build_pipeline_callback( + state_handler=state_handler, + dataset_prefix=dataset_prefix, + critical_input_hash=critical_hash, + command=resolved_config.run.command, + skip_confirmation=resolved_config.run.skip_confirmation, + now_fn=now_fn, + ) + if resolved_config.run.verbose: + logging.set_verbosity(logging.DEBUG) + runner = PipelineRunner(RunnerConfig()) + runner.run(pipeline, callback) + + +def prepare_config() -> PipelineConfig: + """Builds PipelineConfig from CLI flags.""" + # absl.flags doesn't support dots in attribute access easily, + # so we access the flag values directly from the flag names. + command = shlex.join(sys.argv) if sys.argv else "python" + return PipelineConfig( + sdmx=SdmxConfig( + endpoint=FLAGS[FLAG_SDMX_ENDPOINT].value, + agency=FLAGS[FLAG_SDMX_AGENCY].value, + dataflow=SdmxDataflowConfig( + id=FLAGS[FLAG_SDMX_DATAFLOW_ID].value, + key=FLAGS[FLAG_SDMX_DATAFLOW_KEY].value, + param=FLAGS[FLAG_SDMX_DATAFLOW_PARAM].value, + ), + ), + sample=SampleConfig(rows=FLAGS[_FLAG_SAMPLE_ROWS].value,), + run=RunConfig( + command=command, + dataset_prefix=FLAGS.dataset_prefix, + working_dir=FLAGS.working_dir, + run_only=FLAGS.run_only, + force=FLAGS.force, + verbose=FLAGS.verbose, + skip_confirmation=FLAGS.skip_confirmation, + gemini_cli=FLAGS.gemini_cli, + ), + ) + + +def main(_: list[str]) -> int: + config = prepare_config() + logging.info(f"SDMX pipeline configuration: {config}") + run_sdmx_pipeline(config=config) + return 0 + + +if __name__ == "__main__": + _define_flags() + app.run(main) diff --git a/tools/agentic_import/sdmx_import_pipeline_test.py b/tools/agentic_import/sdmx_import_pipeline_test.py new file mode 100644 index 0000000000..367ee0364f --- /dev/null +++ b/tools/agentic_import/sdmx_import_pipeline_test.py @@ -0,0 +1,1059 @@ +#!/usr/bin/env python3 + +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Unit tests for SDMX pipeline helpers.""" + +from __future__ import annotations + +import hashlib +import dataclasses +import json +import os +import sys +import tempfile +import unittest +from datetime import datetime, timedelta, timezone +from pathlib import Path +from unittest import mock + +from absl import logging + +_SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) +_REPO_ROOT = os.path.dirname(_SCRIPT_DIR) +_PROJECT_ROOT = os.path.dirname(_REPO_ROOT) +for path in (_PROJECT_ROOT,): + if path not in sys.path: + sys.path.append(path) + +_TEST_COMMAND = "sdmx pipeline test" + +from tools.agentic_import.pipeline import ( # pylint: disable=import-error + BaseStep, CompositeCallback, Pipeline, PipelineAbort, PipelineRunner, + RunnerConfig, +) +from tools.agentic_import.sdmx_import_pipeline import ( # pylint: disable=import-error + InteractiveCallback, JSONStateCallback, build_pipeline_callback, + run_sdmx_pipeline) +from tools.agentic_import.sdmx_pipeline_builder import ( # pylint: disable=import-error + PipelineBuilder, StepDecision, build_sdmx_pipeline, build_steps) +from tools.agentic_import.sdmx_pipeline_config import ( # pylint: disable=import-error + PipelineConfig, RunConfig, SampleConfig, SdmxConfig, SdmxDataflowConfig) +from tools.agentic_import.sdmx_pipeline_steps import ( # pylint: disable=import-error + CreateDcConfigStep, CreateSampleStep, CreateSchemaMapStep, DownloadDataStep, + DownloadMetadataStep, ProcessFullDataStep, SdmxStep, _run_command) +from tools.agentic_import.state_handler import ( # pylint: disable=import-error + PipelineState, StateHandler, StepState) + +_TEST_CONFIG = PipelineConfig(run=RunConfig(command="test")) + + +class _IncrementingClock: + + def __init__(self, start: datetime, step: timedelta) -> None: + self._value = start + self._step = step + self._first_call = True + + def __call__(self) -> datetime: + if self._first_call: + self._first_call = False + return self._value + self._value = self._value + self._step + return self._value + + +class _RecordingStep(SdmxStep): + + def __init__(self, name: str, *, should_fail: bool = False) -> None: + super().__init__(name=name, version="1", config=_TEST_CONFIG) + self._should_fail = should_fail + + def run(self) -> None: + if self._should_fail: + raise ValueError("boom") + + def dry_run(self) -> None: + logging.info("noop") + + +class _VersionedStep(SdmxStep): + + def __init__(self, name: str, version: str) -> None: + super().__init__(name=name, version=version, config=_TEST_CONFIG) + + def run(self) -> None: + logging.info("noop") + + def dry_run(self) -> None: + logging.info("noop") + + +class JSONStateCallbackTest(unittest.TestCase): + + def _build_callback( + self, *, tmpdir: str, clock: _IncrementingClock + ) -> tuple[JSONStateCallback, StateHandler]: + state_path = os.path.join(tmpdir, ".datacommons", "demo.state.json") + handler = StateHandler(state_path=state_path, dataset_prefix="demo") + callback = JSONStateCallback( + state_handler=handler, + dataset_prefix="demo", + critical_input_hash="abc123", + command="python run", + now_fn=clock, + ) + return callback, handler + + def test_successful_step_persists_expected_schema(self) -> None: + clock = _IncrementingClock( + datetime(2025, 1, 1, 0, 0, tzinfo=timezone.utc), + timedelta(seconds=5)) + with tempfile.TemporaryDirectory() as tmpdir: + callback, handler = self._build_callback(tmpdir=tmpdir, clock=clock) + pipeline = Pipeline( + steps=[_RecordingStep("download.download-data")]) + runner = PipelineRunner(RunnerConfig()) + runner.run(pipeline, callback) + + with open(handler.path, encoding="utf-8") as fp: + state = json.load(fp) + + step_state = state["steps"]["download.download-data"] + self.assertEqual(state["dataset_prefix"], "demo") + self.assertEqual(state["critical_input_hash"], "abc123") + self.assertEqual(step_state["status"], "succeeded") + self.assertIn("started_at", step_state) + self.assertIn("ended_at", step_state) + self.assertAlmostEqual(step_state["duration_s"], 5.0) + self.assertIn("message", step_state) + self.assertIsNone(step_state["message"]) + self.assertEqual(state["updated_at"], step_state["ended_at"]) + ended_at_dt = datetime.fromisoformat(step_state["ended_at"]) + started_at_dt = datetime.fromisoformat(step_state["started_at"]) + self.assertEqual(step_state["ended_at_ts"], + int(ended_at_dt.timestamp() * 1000)) + self.assertEqual(step_state["started_at_ts"], + int(started_at_dt.timestamp() * 1000)) + self.assertEqual(state["updated_at_ts"], step_state["ended_at_ts"]) + + def test_failed_step_records_error_and_persists_file(self) -> None: + clock = _IncrementingClock( + datetime(2025, 1, 2, 0, 0, tzinfo=timezone.utc), + timedelta(seconds=7)) + with tempfile.TemporaryDirectory() as tmpdir: + callback, handler = self._build_callback(tmpdir=tmpdir, clock=clock) + pipeline = Pipeline(steps=[ + _RecordingStep("sample.create-sample", should_fail=True) + ]) + runner = PipelineRunner(RunnerConfig()) + + with self.assertRaisesRegex(ValueError, "boom"): + runner.run(pipeline, callback) + + with open(handler.path, encoding="utf-8") as fp: + state = json.load(fp) + + step_state = state["steps"]["sample.create-sample"] + self.assertEqual(step_state["status"], "failed") + self.assertIn("boom", step_state["message"]) + self.assertAlmostEqual(step_state["duration_s"], 7.0) + self.assertIn("ended_at_ts", step_state) + self.assertIn("started_at_ts", step_state) + + def test_abort_skips_state_persistence(self) -> None: + clock = _IncrementingClock( + datetime(2025, 1, 3, 0, 0, tzinfo=timezone.utc), + timedelta(seconds=3)) + with tempfile.TemporaryDirectory() as tmpdir: + state_dir = os.path.join(tmpdir, ".datacommons") + os.makedirs(state_dir, exist_ok=True) + state_path = os.path.join(state_dir, "demo.state.json") + previous = { + "dataset_prefix": "previous", + "critical_input_hash": "old", + "command": "old command", + "updated_at": "2025-01-01T00:00:00Z", + "updated_at_ts": 1, + "steps": { + "existing.step": { + "version": "1", + "status": "succeeded", + "started_at": "2025-01-01T00:00:00Z", + "started_at_ts": 0, + "ended_at": "2025-01-01T00:05:00Z", + "ended_at_ts": 300000, + "duration_s": 300.0, + "message": None, + } + }, + } + with open(state_path, "w", encoding="utf-8") as fp: + json.dump(previous, fp) + callback, handler = self._build_callback(tmpdir=tmpdir, clock=clock) + + class _AbortStep(SdmxStep): + + def __init__(self) -> None: + super().__init__(name="download.download-data", + version="1", + config=_TEST_CONFIG) + + def run(self) -> None: + raise PipelineAbort("user requested stop") + + def dry_run(self) -> None: + logging.info("noop") + + pipeline = Pipeline(steps=[_AbortStep()]) + runner = PipelineRunner(RunnerConfig()) + runner.run(pipeline, callback) + + with open(handler.path, encoding="utf-8") as fp: + state = json.load(fp) + + self.assertEqual(state, previous) + + +class InteractiveCallbackTest(unittest.TestCase): + + def test_prompt_accepts_and_runs_step(self) -> None: + callback = InteractiveCallback() + pipeline = Pipeline(steps=[_RecordingStep("download.preview")]) + runner = PipelineRunner(RunnerConfig()) + + with mock.patch("builtins.input", return_value="y"): + with self.assertLogs(logging.get_absl_logger(), + level="INFO") as logs: + runner.run(pipeline, callback) + + self.assertTrue( + any("Dry run for download.preview" in entry + for entry in logs.output)) + + def test_prompt_decline_aborts_pipeline(self) -> None: + events: list[str] = [] + + class _TrackingStep(_RecordingStep): + + def __init__(self) -> None: + super().__init__("sample.interactive") + self.executed = False + + def run(self) -> None: + self.executed = True + super().run() + + def dry_run(self) -> None: + events.append("dry_run") + logging.info("noop") + + callback = InteractiveCallback() + step = _TrackingStep() + pipeline = Pipeline(steps=[step]) + runner = PipelineRunner(RunnerConfig()) + + with mock.patch("builtins.input", return_value="n"): + with self.assertLogs(logging.get_absl_logger(), level="INFO"): + runner.run(pipeline, callback) + + self.assertFalse(step.executed) + self.assertTrue(events) + + +class CallbackFactoryTest(unittest.TestCase): + + def _state_handler_for_tmpdir(self, tmpdir: str) -> StateHandler: + path = os.path.join(tmpdir, ".datacommons", "demo.state.json") + return StateHandler(state_path=path, dataset_prefix="demo") + + def test_skip_confirmation_returns_json_callback(self) -> None: + with tempfile.TemporaryDirectory() as tmpdir: + handler = self._state_handler_for_tmpdir(tmpdir) + callback = build_pipeline_callback( + state_handler=handler, + dataset_prefix="demo", + critical_input_hash="abc", + command="python run", + skip_confirmation=True, + ) + self.assertIsInstance(callback, JSONStateCallback) + + def test_interactive_mode_returns_composite(self) -> None: + with tempfile.TemporaryDirectory() as tmpdir: + handler = self._state_handler_for_tmpdir(tmpdir) + with mock.patch("builtins.input", return_value="y"): + callback = build_pipeline_callback( + state_handler=handler, + dataset_prefix="demo", + critical_input_hash="abc", + command="python run", + skip_confirmation=False, + ) + self.assertIsInstance(callback, CompositeCallback) + + +class PlanningTest(unittest.TestCase): + + def _empty_state(self) -> PipelineState: + return PipelineState(dataset_prefix="demo", + critical_input_hash="", + command="", + updated_at="", + steps={}) + + def _state_with( + self, versions: dict[str, tuple[str, str, + int | None]]) -> PipelineState: + steps = { + name: + StepState(version=v, + status=st, + started_at="t", + ended_at="t", + duration_s=0.0, + started_at_ts=ts, + ended_at_ts=ts, + message=None) + for name, (v, st, ts) in versions.items() + } + return PipelineState(dataset_prefix="demo", + critical_input_hash="", + command="", + updated_at="", + updated_at_ts=None, + steps=steps) + + def _names_from_builder(self, + cfg: PipelineConfig, + steps: list[BaseStep] | None = None, + state: PipelineState | None = None) -> list[str]: + builder_steps = steps or build_steps(cfg) + builder = PipelineBuilder(config=cfg, + state=state or self._empty_state(), + steps=builder_steps) + result = builder.build() + pipeline = result.pipeline + return [step.name for step in pipeline.steps] + + def test_run_only_step(self) -> None: + cfg_step = PipelineConfig( + run=RunConfig(command=_TEST_COMMAND, run_only="download-data")) + names_step = self._names_from_builder(cfg_step) + self.assertEqual(names_step, ["download-data"]) + + with self.assertRaisesRegex(ValueError, "run_only step not found"): + self._names_from_builder( + PipelineConfig( + run=RunConfig(command=_TEST_COMMAND, run_only="nope"))) + with self.assertRaisesRegex(ValueError, "run_only step not found"): + self._names_from_builder( + PipelineConfig(run=RunConfig(command=_TEST_COMMAND, + run_only="download.nope"))) + + def test_force_semantics(self) -> None: + cfg_all = PipelineConfig( + run=RunConfig(command=_TEST_COMMAND, force=True)) + names_all = self._names_from_builder(cfg_all) + self.assertEqual(names_all, [ + "download-data", + "download-metadata", + "create-sample", + "create-schema-mapping", + "process-full-data", + "create-dc-config", + ]) + + def test_timestamp_chaining_triggers_next_step(self) -> None: + newer = 2_000 + older = 1_000 + state = self._state_with({ + "download-data": ("1", "succeeded", newer), + "download-metadata": ("1", "succeeded", older), + "create-sample": ("1", "succeeded", older), + "create-schema-mapping": ("1", "succeeded", older), + "process-full-data": ("1", "succeeded", older), + "create-dc-config": ("1", "succeeded", older), + }) + cfg = PipelineConfig(run=RunConfig(command=_TEST_COMMAND)) + names = self._names_from_builder(cfg, state=state) + self.assertEqual(names, [ + "download-metadata", + "create-sample", + "create-schema-mapping", + "process-full-data", + "create-dc-config", + ]) + + def test_force_branch_records_decisions(self) -> None: + cfg = PipelineConfig(run=RunConfig(command=_TEST_COMMAND, force=True)) + steps = build_steps(cfg) + builder = PipelineBuilder(config=cfg, + state=self._empty_state(), + steps=steps) + result = builder.build() + self.assertEqual(len(result.decisions), len(steps)) + for decision in result.decisions: + self.assertEqual(decision.decision, StepDecision.RUN) + self.assertIn("Force flag set", decision.reason) + + def test_run_only_ignores_timestamp_chaining(self) -> None: + newer = 4_000 + older = 3_000 + state = self._state_with({ + "download-data": ("1", "succeeded", newer), + "download-metadata": ("1", "succeeded", older), + }) + cfg = PipelineConfig( + run=RunConfig(command=_TEST_COMMAND, run_only="download-data")) + names = self._names_from_builder(cfg, state=state) + self.assertEqual(names, ["download-data"]) + + def test_version_bump_schedules_downstream(self) -> None: + steps = [ + _VersionedStep("download-data", "1"), + _VersionedStep("process-full-data", "2"), + _VersionedStep("create-dc-config", "1"), + ] + state = self._state_with({ + "download-data": ("1", "succeeded", 1000), + "process-full-data": ("1", "succeeded", 1000), + "create-dc-config": ("1", "succeeded", 1000), + }) + cfg = PipelineConfig(run=RunConfig(command=_TEST_COMMAND)) + names = self._names_from_builder(cfg, steps, state) + self.assertEqual(names, ["process-full-data", "create-dc-config"]) + + pipeline = build_sdmx_pipeline(config=cfg, state=state, steps=steps) + self.assertEqual([s.name for s in pipeline.steps], + ["process-full-data", "create-dc-config"]) + + def test_incremental_records_skip_reasons(self) -> None: + state = self._state_with({ + "download-data": ("1", "succeeded", 1_000), + "download-metadata": ("1", "succeeded", 1_000), + "create-sample": ("1", "succeeded", 1_000), + "create-schema-mapping": ("1", "succeeded", 1_000), + "process-full-data": ("1", "succeeded", 1_000), + "create-dc-config": ("1", "succeeded", 1_000), + }) + cfg = PipelineConfig(run=RunConfig(command=_TEST_COMMAND)) + steps = build_steps(cfg) + builder = PipelineBuilder(config=cfg, state=state, steps=steps) + result = builder.build() + self.assertFalse(result.pipeline.steps) + self.assertEqual(len(result.decisions), len(steps)) + for decision in result.decisions: + self.assertEqual(decision.decision, StepDecision.SKIP) + self.assertIn("up-to-date", decision.reason) + + +class SdmxTestBase(unittest.TestCase): + + def setUp(self) -> None: + self._tmpdir_obj = tempfile.TemporaryDirectory() + self.addCleanup(self._tmpdir_obj.cleanup) + self._tmpdir = self._tmpdir_obj.name + + def _create_test_input_files(self, prefix: str) -> None: + (Path(self._tmpdir) / f"{prefix}_data.csv").write_text("data") + (Path(self._tmpdir) / f"{prefix}_sample.csv").write_text("sample") + (Path(self._tmpdir) / f"{prefix}_metadata.xml").write_text("metadata") + + sample_output_dir = Path(self._tmpdir) / "sample_output" + sample_output_dir.mkdir(parents=True, exist_ok=True) + (sample_output_dir / f"{prefix}_pvmap.csv").write_text("pvmap") + (sample_output_dir / f"{prefix}_metadata.csv").write_text("metadata") + + output_dir = Path(self._tmpdir) / "output" + output_dir.mkdir(parents=True, exist_ok=True) + (output_dir / f"{prefix}.csv").write_text("output") + + def _build_config(self, + dataset_prefix: str | None, + dataflow: str | None = "FLOW", + command: str = "test", + endpoint: str = "https://example.com", + agency: str = "AGENCY") -> PipelineConfig: + return PipelineConfig( + sdmx=SdmxConfig( + endpoint=endpoint, + agency=agency, + dataflow=SdmxDataflowConfig( + id=dataflow, + key="test-key", + param="area=US", + ), + ), + run=RunConfig( + dataset_prefix=dataset_prefix, + working_dir=self._tmpdir, + skip_confirmation=True, + command=command, + ), + ) + + +class RunPipelineTest(SdmxTestBase): + + def setUp(self) -> None: + super().setUp() + # Mock _run_command to avoid actual execution during pipeline tests + self._run_command_patcher = mock.patch( + "tools.agentic_import.sdmx_pipeline_steps._run_command") + self._mock_run_command = self._run_command_patcher.start() + self.addCleanup(self._run_command_patcher.stop) + + def test_run_pipeline_updates_state_and_hash(self) -> None: + command = "sdmx run pipeline" + config = self._build_config(dataset_prefix="demo", + dataflow="df.1", + command=command) + clock = _IncrementingClock(datetime(2025, 1, 1, tzinfo=timezone.utc), + timedelta(seconds=1)) + + # Create test files for ProcessFullDataStep + self._create_test_input_files("demo") + + run_sdmx_pipeline(config=config, now_fn=clock) + + state_path = Path(self._tmpdir) / ".datacommons" / "demo.state.json" + self.assertTrue(state_path.exists()) + with state_path.open(encoding="utf-8") as fp: + state = json.load(fp) + + expected_hash = hashlib.sha256( + json.dumps( + { + "sdmx.agency": config.sdmx.agency, + "sdmx.dataflow.id": config.sdmx.dataflow.id, + "sdmx.endpoint": config.sdmx.endpoint, + "sdmx.dataflow.key": config.sdmx.dataflow.key, + "sdmx.dataflow.param": config.sdmx.dataflow.param, + }, + sort_keys=True, + separators=(",", ":")).encode("utf-8")).hexdigest() + + self.assertEqual(state["dataset_prefix"], "demo") + self.assertEqual(state["command"], command) + self.assertEqual(state["critical_input_hash"], expected_hash) + self.assertEqual(len(state["steps"]), 6) + + for step_name in [ + "download-data", "download-metadata", "create-sample", + "create-schema-mapping", "process-full-data", "create-dc-config" + ]: + self.assertIn(step_name, state["steps"]) + self.assertEqual(state["steps"][step_name]["status"], "succeeded") + + def test_run_id_sanitizes_dataflow_when_prefix_missing(self) -> None: + dataflow = "My Flow-Name 2025!!!" + config = self._build_config(dataset_prefix=None, + dataflow=dataflow, + command="sdmx run sanitized") + + # Create test files for ProcessFullDataStep with sanitized name + sanitized_prefix = "my_flow_name_2025" + self._create_test_input_files(sanitized_prefix) + + run_sdmx_pipeline(config=config, + now_fn=_IncrementingClock( + datetime(2025, 1, 2, tzinfo=timezone.utc), + timedelta(seconds=2))) + + expected_run_id = "my_flow_name_2025" + state_path = Path( + self._tmpdir) / ".datacommons" / f"{expected_run_id}.state.json" + self.assertTrue(state_path.exists()) + with state_path.open(encoding="utf-8") as fp: + state = json.load(fp) + self.assertEqual(state["dataset_prefix"], expected_run_id) + + def test_invalid_working_dir_raises(self) -> None: + path = Path(self._tmpdir) / "not_a_dir" + path.write_text("content") + base_config = self._build_config(dataset_prefix="demo", + dataflow="df", + command="sdmx run invalid") + updated_run = dataclasses.replace(base_config.run, + working_dir=str(path)) + config = dataclasses.replace(base_config, run=updated_run) + with self.assertRaisesRegex(ValueError, + "working_dir is not a directory"): + run_sdmx_pipeline(config=config) + + def test_hash_change_forces_full_rerun(self) -> None: + config = self._build_config(dataset_prefix="demo", + dataflow="df.2", + command="sdmx rerun force") + first_clock = _IncrementingClock( + datetime(2025, 1, 2, tzinfo=timezone.utc), timedelta(seconds=1)) + + # Create test files for ProcessFullDataStep + self._create_test_input_files("demo") + + # Run 1 with original config + run_sdmx_pipeline(config=config, now_fn=first_clock) + + state_path = Path(self._tmpdir) / ".datacommons" / "demo.state.json" + with state_path.open(encoding="utf-8") as fp: + first_state = json.load(fp) + + updated_dataflow = dataclasses.replace(config.sdmx.dataflow, + key="changed-key") + updated_sdmx = dataclasses.replace(config.sdmx, + dataflow=updated_dataflow) + updated_config = dataclasses.replace(config, sdmx=updated_sdmx) + second_clock = _IncrementingClock( + datetime(2025, 1, 3, tzinfo=timezone.utc), timedelta(seconds=1)) + run_sdmx_pipeline(config=updated_config, now_fn=second_clock) + + with state_path.open(encoding="utf-8") as fp: + second_state = json.load(fp) + + self.assertNotEqual(first_state["critical_input_hash"], + second_state["critical_input_hash"]) + self.assertGreater( + second_state["steps"]["download-data"]["ended_at_ts"], + first_state["steps"]["download-data"]["ended_at_ts"]) + + def test_hash_unchanged_skips_rerun(self) -> None: + config = self._build_config(dataset_prefix="demo", + dataflow="df.3", + command="sdmx rerun noop") + initial_clock = _IncrementingClock( + datetime(2025, 1, 3, tzinfo=timezone.utc), timedelta(seconds=1)) + + # Create test files for ProcessFullDataStep + self._create_test_input_files("demo") + + # Run 1 + run_sdmx_pipeline(config=config, now_fn=initial_clock) + + state_path = Path(self._tmpdir) / ".datacommons" / "demo.state.json" + with state_path.open(encoding="utf-8") as fp: + first_state = json.load(fp) + + later_clock = _IncrementingClock( + datetime(2025, 1, 7, tzinfo=timezone.utc), timedelta(seconds=1)) + run_sdmx_pipeline(config=config, now_fn=later_clock) + + with state_path.open(encoding="utf-8") as fp: + second_state = json.load(fp) + + self.assertEqual(first_state, second_state) + + +class SdmxStepTest(SdmxTestBase): + + def _assert_run_and_dry_run_use_same_plan( + self, + step, + *, + log_contains: str, + cmd_contains: str, + extra_cmd_checks=None, + expect_verbose: bool = True) -> None: + extra_cmd_checks = extra_cmd_checks or [] + with mock.patch("tools.agentic_import.sdmx_pipeline_steps._run_command" + ) as mock_run_cmd: + with self.assertLogs(logging.get_absl_logger(), + level="INFO") as logs: + step.dry_run() + step.run() + + self.assertTrue( + any("test-step (dry run): would run" in entry + for entry in logs.output)) + self.assertTrue(any(log_contains in entry for entry in logs.output)) + mock_run_cmd.assert_called_once() + args, kwargs = mock_run_cmd.call_args + command = args[0] + self.assertTrue(any(cmd_contains in arg for arg in command)) + self.assertEqual(kwargs["verbose"], expect_verbose) + for check in extra_cmd_checks: + check(command) + + def _assert_step_caches_plan(self, + step, + *, + command_contains=None, + path_attrs=None) -> None: + command_contains = command_contains or [] + path_attrs = path_attrs or [] + + context1 = step._prepare_command() + context2 = step._prepare_command() + self.assertIs(context1, context2) + + for attr in path_attrs: + self.assertTrue(getattr(context1, attr).is_absolute()) + + if command_contains: + for expected in command_contains: + self.assertTrue( + any(expected in arg for arg in context1.full_command)) + + def _assert_dry_run_succeeds_without_input(self, step) -> None: + step.dry_run() + + def _assert_run_fails_without_input(self, step, error_pattern: str) -> None: + with self.assertRaisesRegex(RuntimeError, error_pattern): + step.run() + + def test_run_command_logs_and_executes(self) -> None: + with mock.patch("subprocess.run") as mock_run: + with self.assertLogs(logging.get_absl_logger(), + level="DEBUG") as logs: + _run_command(["echo", "hello"], verbose=True) + + mock_run.assert_called_once_with(["echo", "hello"], check=True) + self.assertTrue( + any("Running command: echo hello" in entry + for entry in logs.output)) + + def test_download_metadata_step_caches_plan(self) -> None: + config = PipelineConfig( + sdmx=SdmxConfig( + endpoint="https://example.com", + agency="AGENCY", + dataflow=SdmxDataflowConfig(id="FLOW"), + ), + run=RunConfig( + command="test", + dataset_prefix="demo", + working_dir=self._tmpdir, + verbose=True, + ), + ) + step = DownloadMetadataStep(name="test-step", config=config) + self._assert_step_caches_plan( + step, + command_contains=[ + "download-metadata", "--endpoint=https://example.com" + ], + path_attrs=["output_path"], + ) + + def test_download_metadata_step_run_and_dry_run_use_same_plan(self) -> None: + config = PipelineConfig( + sdmx=SdmxConfig( + endpoint="https://example.com", + agency="AGENCY", + dataflow=SdmxDataflowConfig(id="FLOW"), + ), + run=RunConfig( + command="test", + dataset_prefix="demo", + working_dir=self._tmpdir, + verbose=True, + ), + ) + step = DownloadMetadataStep(name="test-step", config=config) + self._assert_run_and_dry_run_use_same_plan( + step, + log_contains="download-metadata", + cmd_contains="download-metadata", + ) + + def test_download_data_step_caches_plan(self) -> None: + config = PipelineConfig( + sdmx=SdmxConfig( + endpoint="https://example.com", + agency="AGENCY", + dataflow=SdmxDataflowConfig( + id="FLOW", + key="test-key", + param="area=US", + ), + ), + run=RunConfig( + command="test", + dataset_prefix="demo", + working_dir=self._tmpdir, + verbose=True, + ), + ) + step = DownloadDataStep(name="test-step", config=config) + self._assert_step_caches_plan( + step, + command_contains=[ + "download-data", + "--endpoint=https://example.com", + "--key=test-key", + "--param=area=US", + ], + path_attrs=["output_path"], + ) + + def test_download_data_step_run_and_dry_run_use_same_plan(self) -> None: + config = PipelineConfig( + sdmx=SdmxConfig( + endpoint="https://example.com", + agency="AGENCY", + dataflow=SdmxDataflowConfig(id="FLOW"), + ), + run=RunConfig( + command="test", + dataset_prefix="demo", + working_dir=self._tmpdir, + verbose=True, + ), + ) + step = DownloadDataStep(name="test-step", config=config) + self._assert_run_and_dry_run_use_same_plan( + step, + log_contains="download-data", + cmd_contains="download-data", + ) + + def test_create_sample_step_caches_plan(self) -> None: + config = PipelineConfig( + run=RunConfig( + command="test", + dataset_prefix="demo", + working_dir=self._tmpdir, + verbose=True, + ), + sample=SampleConfig(rows=500), + ) + step = CreateSampleStep(name="test-step", config=config) + self._assert_step_caches_plan( + step, + command_contains=["data_sampler.py", "--sampler_output_rows=500"], + path_attrs=["output_path"], + ) + + def test_create_sample_step_run_and_dry_run_use_same_plan(self) -> None: + config = PipelineConfig( + run=RunConfig( + command="test", + dataset_prefix="demo", + working_dir=self._tmpdir, + verbose=True, + ), + sample=SampleConfig(rows=500), + ) + step = CreateSampleStep(name="test-step", config=config) + + # Create test input file for run() + input_path = Path(self._tmpdir) / "demo_data.csv" + input_path.write_text("header\nrow1") + self._assert_run_and_dry_run_use_same_plan( + step, + log_contains="data_sampler.py", + cmd_contains="data_sampler.py", + ) + + def test_create_sample_step_dry_run_succeeds_if_input_missing(self) -> None: + config = PipelineConfig( + run=RunConfig( + command="test", + dataset_prefix="demo", + working_dir=self._tmpdir, + verbose=True, + ), + sample=SampleConfig(rows=500), + ) + step = CreateSampleStep(name="test-step", config=config) + # No input file created, dry run should still succeed + self._assert_dry_run_succeeds_without_input(step) + + def test_create_sample_step_run_fails_if_input_missing(self) -> None: + config = PipelineConfig( + run=RunConfig( + command="test", + dataset_prefix="demo", + working_dir=self._tmpdir, + verbose=True, + ), + sample=SampleConfig(rows=500), + ) + step = CreateSampleStep(name="test-step", config=config) + # No input file created, run should fail + self._assert_run_fails_without_input(step, + "Input file missing for sampling") + + def test_create_schema_map_step_caches_plan(self) -> None: + config = PipelineConfig(run=RunConfig( + command="test", + dataset_prefix="demo", + working_dir=self._tmpdir, + verbose=True, + gemini_cli="custom-gemini", + skip_confirmation=True, + ),) + step = CreateSchemaMapStep(name="test-step", config=config) + self._assert_step_caches_plan( + step, + command_contains=[ + "pvmap_generator.py", + "--gemini_cli=custom-gemini", + "--skip_confirmation", + ], + path_attrs=["sample_path", "metadata_path", "output_prefix"], + ) + + def test_create_schema_map_step_run_and_dry_run_use_same_plan(self) -> None: + config = PipelineConfig(run=RunConfig( + command="test", + dataset_prefix="demo", + working_dir=self._tmpdir, + verbose=True, + ),) + step = CreateSchemaMapStep(name="test-step", config=config) + + # Create test input files for run() + (Path(self._tmpdir) / "demo_sample.csv").write_text("header\nrow1") + (Path(self._tmpdir) / "demo_metadata.xml").write_text("") + self._assert_run_and_dry_run_use_same_plan( + step, + log_contains="pvmap_generator.py", + cmd_contains="pvmap_generator.py", + ) + + def test_create_schema_map_step_dry_run_succeeds_if_input_missing( + self) -> None: + config = PipelineConfig(run=RunConfig( + command="test", + dataset_prefix="demo", + working_dir=self._tmpdir, + verbose=True, + ),) + step = CreateSchemaMapStep(name="test-step", config=config) + # No input files created, dry run should still succeed + self._assert_dry_run_succeeds_without_input(step) + + def test_create_schema_map_step_run_fails_if_input_missing(self) -> None: + config = PipelineConfig(run=RunConfig( + command="test", + dataset_prefix="demo", + working_dir=self._tmpdir, + verbose=True, + ),) + step = CreateSchemaMapStep(name="test-step", config=config) + # No input files created, run should fail + self._assert_run_fails_without_input(step, ".*") + + def test_process_full_data_step_caches_plan(self) -> None: + config = PipelineConfig(run=RunConfig( + command="test", + dataset_prefix="demo", + working_dir=self._tmpdir, + verbose=True, + ),) + step = ProcessFullDataStep(name="test-step", config=config) + self._assert_step_caches_plan( + step, + path_attrs=[ + "input_data_path", + "pv_map_path", + "metadata_path", + "output_prefix", + ], + ) + + def test_process_full_data_step_run_and_dry_run_use_same_plan(self) -> None: + config = PipelineConfig(run=RunConfig( + command="test", + dataset_prefix="demo", + working_dir=self._tmpdir, + verbose=True, + ),) + step = ProcessFullDataStep(name="test-step", config=config) + + # Create test files + self._create_test_input_files("demo") + self._assert_run_and_dry_run_use_same_plan( + step, + log_contains="stat_var_processor.py", + cmd_contains="stat_var_processor.py", + extra_cmd_checks=[ + lambda command: self.assertTrue( + any(arg.startswith("--input_data=") for arg in command)), + ], + ) + + def test_process_full_data_step_dry_run_succeeds_if_input_missing( + self) -> None: + config = PipelineConfig(run=RunConfig( + command="test", + dataset_prefix="demo", + working_dir=self._tmpdir, + verbose=True, + ),) + step = ProcessFullDataStep(name="test-step", config=config) + # Missing input files, dry run should still succeed + self._assert_dry_run_succeeds_without_input(step) + + def test_process_full_data_step_run_fails_if_input_missing(self) -> None: + config = PipelineConfig(run=RunConfig( + command="test", + dataset_prefix="demo", + working_dir=self._tmpdir, + verbose=True, + ),) + step = ProcessFullDataStep(name="test-step", config=config) + # Missing input files, run should fail + self._assert_run_fails_without_input(step, ".*") + + def test_create_dc_config_step_caches_plan(self) -> None: + config = self._build_config(dataset_prefix="demo", + endpoint="https://example.com", + agency="AGENCY", + dataflow="FLOW") + step = CreateDcConfigStep(name="test-step", config=config) + self._assert_step_caches_plan( + step, + path_attrs=["input_csv", "output_config"], + ) + + def test_create_dc_config_step_run_and_dry_run_use_same_plan(self) -> None: + config = self._build_config(dataset_prefix="demo", + endpoint="https://example.com", + agency="AGENCY", + dataflow="FLOW") + step = CreateDcConfigStep(name="test-step", config=config) + + # Create test files + self._create_test_input_files("demo") + # Create final output dir and input csv + final_output_dir = Path(self._tmpdir) / "output" + final_output_dir.mkdir(parents=True, exist_ok=True) + (final_output_dir / "demo.csv").write_text("data") + self._assert_run_and_dry_run_use_same_plan( + step, + log_contains="generate_custom_dc_config.py", + cmd_contains="generate_custom_dc_config.py", + extra_cmd_checks=[ + lambda command: self.assertIn( + f"--input_csv={final_output_dir}/demo.csv", command), + lambda command: self.assertIn( + f"--output_config={final_output_dir}/demo_config.json", + command), + lambda command: self.assertIn("--provenance_name=FLOW", command + ), + lambda command: self.assertIn("--source_name=AGENCY", command), + lambda command: self.assertIn( + "--data_source_url=https://example.com", command), + lambda command: self.assertIn( + "--dataset_url=https://example.com/data/AGENCY,FLOW,", + command), + ], + expect_verbose=False, + ) + + +if __name__ == "__main__": + unittest.main() diff --git a/tools/agentic_import/sdmx_pipeline_builder.py b/tools/agentic_import/sdmx_pipeline_builder.py new file mode 100644 index 0000000000..e7588577e5 --- /dev/null +++ b/tools/agentic_import/sdmx_pipeline_builder.py @@ -0,0 +1,230 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Builder for the SDMX agentic import pipeline.""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import ClassVar, Sequence + +from absl import logging + +from tools.agentic_import.pipeline import Pipeline, Step +from tools.agentic_import.sdmx_pipeline_config import PipelineConfig +from tools.agentic_import.sdmx_pipeline_steps import ( + CreateDcConfigStep, CreateSampleStep, CreateSchemaMapStep, DownloadDataStep, + DownloadMetadataStep, ProcessFullDataStep) +from tools.agentic_import.state_handler import PipelineState + + +@dataclass(frozen=True) +class StepDecision: + """Represents whether a step will run and why.""" + + RUN: ClassVar[str] = "RUN" + SKIP: ClassVar[str] = "SKIP" + + step_name: str + decision: str + reason: str + + +@dataclass(frozen=True) +class BuildResult: + """Output of planning that includes the pipeline and per-step decisions.""" + + pipeline: Pipeline + decisions: list[StepDecision] + + +class PipelineBuilder: + + def __init__(self, + *, + config: PipelineConfig, + state: PipelineState, + steps: Sequence[Step], + critical_input_hash: str | None = None) -> None: + self._config = config + self._state = state + self._steps = steps + self._critical_input_hash = critical_input_hash + + def build(self) -> BuildResult: + if self._config.run.run_only: + planned, decisions = self._plan_run_only(self._config.run.run_only) + elif self._config.run.force: + logging.info("Force flag set; scheduling all SDMX steps") + planned, decisions = self._plan_all_steps( + "Force flag set; scheduling this step") + elif self._hash_changed(): + logging.info("Critical inputs changed; scheduling all SDMX steps") + planned, decisions = self._plan_all_steps( + "Critical inputs changed; scheduling this step") + else: + planned, decisions = self._plan_incremental() + logging.info("Built SDMX pipeline with %d steps", len(planned)) + return BuildResult(pipeline=Pipeline(steps=planned), + decisions=decisions) + + def _plan_run_only(self, + run_only: str) -> tuple[list[Step], list[StepDecision]]: + planned: list[Step] = [] + decisions: list[StepDecision] = [] + for step in self._steps: + if step.name == run_only: + planned.append(step) + decisions.append( + StepDecision( + step_name=step.name, + decision=StepDecision.RUN, + reason=(f"run_only={run_only} requested; running only " + "this step"), + )) + else: + decisions.append( + StepDecision( + step_name=step.name, + decision=StepDecision.SKIP, + reason=(f"run_only={run_only} requested; skipping " + "this step"), + )) + if not planned: + raise ValueError(f"run_only step not found: {run_only}") + return planned, decisions + + def _plan_all_steps(self, + reason: str) -> tuple[list[Step], list[StepDecision]]: + planned: list[Step] = [] + decisions: list[StepDecision] = [] + for step in self._steps: + planned.append(step) + decisions.append( + StepDecision(step_name=step.name, + decision=StepDecision.RUN, + reason=reason)) + return planned, decisions + + def _plan_incremental(self) -> tuple[list[Step], list[StepDecision]]: + planned: list[Step] = [] + decisions: list[StepDecision] = [] + schedule_all_remaining = False + previous: Step | None = None + for step in self._steps: + if schedule_all_remaining: + planned.append(step) + decisions.append( + StepDecision( + step_name=step.name, + decision=StepDecision.RUN, + reason=("Upstream step triggered rerun for remaining " + "steps"), + )) + previous = step + continue + + prev_state = self._state.steps.get(step.name) + if prev_state is None: + needs_run = True + reason = "No previous state recorded; scheduling step" + elif prev_state.status != "succeeded": + needs_run = True + reason = (f"Previous run status was {prev_state.status}; " + "rerunning step") + elif prev_state.version != step.version: + needs_run = True + reason = (f"Step version changed from {prev_state.version} to " + f"{step.version}; rerunning step") + else: + needs_run = False + reason = ("Previous run succeeded with same version; step is " + "up-to-date") + + if not needs_run and previous is not None: + if self._predecessor_newer(previous, step): + needs_run = True + reason = (f"Previous step {previous.name} finished more " + "recently; rerunning downstream steps") + + if needs_run: + planned.append(step) + decisions.append( + StepDecision(step_name=step.name, + decision=StepDecision.RUN, + reason=reason)) + schedule_all_remaining = True + else: + decisions.append( + StepDecision(step_name=step.name, + decision=StepDecision.SKIP, + reason=reason)) + previous = step + + if not planned: + logging.info("No steps scheduled.") + return planned, decisions + + def _hash_changed(self) -> bool: + if not self._critical_input_hash: + return False + previous = self._state.critical_input_hash + if not previous: + return True + return previous != self._critical_input_hash + + def _predecessor_newer(self, prev_step: Step, step: Step) -> bool: + prev_state = self._state.steps.get(prev_step.name) + curr_state = self._state.steps.get(step.name) + if prev_state is None or prev_state.ended_at_ts is None: + return False + if curr_state is None: + return True + if curr_state.status != "succeeded": + return True + if curr_state.ended_at_ts is None: + return True + return prev_state.ended_at_ts > curr_state.ended_at_ts + + +def build_steps(config: PipelineConfig) -> list[Step]: + """Constructs the hard-coded list of canonical steps.""" + return [ + DownloadDataStep(name="download-data", config=config), + DownloadMetadataStep(name="download-metadata", config=config), + CreateSampleStep(name="create-sample", config=config), + CreateSchemaMapStep(name="create-schema-mapping", config=config), + ProcessFullDataStep(name="process-full-data", config=config), + CreateDcConfigStep(name="create-dc-config", config=config), + ] + + +def _log_step_decisions(decisions: Sequence[StepDecision]) -> None: + for decision in decisions: + logging.info("step=%s decision=%s reason=%s", decision.step_name, + decision.decision, decision.reason) + + +def build_sdmx_pipeline(*, + config: PipelineConfig, + state: PipelineState, + steps: Sequence[Step] | None = None, + critical_input_hash: str | None = None) -> Pipeline: + builder_steps = steps if steps is not None else build_steps(config) + builder = PipelineBuilder(config=config, + state=state, + steps=builder_steps, + critical_input_hash=critical_input_hash) + result = builder.build() + _log_step_decisions(result.decisions) + return result.pipeline diff --git a/tools/agentic_import/sdmx_pipeline_config.py b/tools/agentic_import/sdmx_pipeline_config.py new file mode 100644 index 0000000000..d6260eabd7 --- /dev/null +++ b/tools/agentic_import/sdmx_pipeline_config.py @@ -0,0 +1,68 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Configuration dataclasses for the SDMX agentic import pipeline.""" + +from __future__ import annotations + +from dataclasses import dataclass, field + +# SDMX flag names shared across pipeline modules. +FLAG_SDMX_ENDPOINT = "sdmx.endpoint" +FLAG_SDMX_AGENCY = "sdmx.agency" +FLAG_SDMX_DATAFLOW_ID = "sdmx.dataflow.id" +FLAG_SDMX_DATAFLOW_KEY = "sdmx.dataflow.key" +FLAG_SDMX_DATAFLOW_PARAM = "sdmx.dataflow.param" + + +@dataclass(frozen=True) +class SdmxDataflowConfig: + """Configuration for SDMX dataflow.""" + id: str | None = None + key: str | None = None + param: str | None = None + + +@dataclass(frozen=True) +class SdmxConfig: + """Configuration for SDMX data access.""" + endpoint: str | None = None + agency: str | None = None + dataflow: SdmxDataflowConfig = field(default_factory=SdmxDataflowConfig) + + +@dataclass(frozen=True) +class SampleConfig: + """Configuration for data sampling.""" + rows: int = 1000 + + +@dataclass(frozen=True) +class RunConfig: + """Configuration for pipeline execution.""" + command: str + dataset_prefix: str | None = None + working_dir: str | None = None + run_only: str | None = None + force: bool = False + verbose: bool = False + skip_confirmation: bool = False + gemini_cli: str | None = None + + +@dataclass(frozen=True) +class PipelineConfig: + """Aggregated configuration for the pipeline.""" + sdmx: SdmxConfig = field(default_factory=SdmxConfig) + sample: SampleConfig = field(default_factory=SampleConfig) + run: RunConfig = field(default_factory=lambda: RunConfig(command="python")) diff --git a/tools/agentic_import/sdmx_pipeline_steps.py b/tools/agentic_import/sdmx_pipeline_steps.py new file mode 100644 index 0000000000..9455a811c3 --- /dev/null +++ b/tools/agentic_import/sdmx_pipeline_steps.py @@ -0,0 +1,461 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Step implementations for the SDMX agentic import pipeline.""" + +from __future__ import annotations + +import abc +import subprocess +import sys +from dataclasses import dataclass +from pathlib import Path +from typing import ClassVar, Sequence + +from absl import logging + +from tools.agentic_import.pipeline import Step +from tools.agentic_import.sdmx_pipeline_config import ( + FLAG_SDMX_AGENCY, + FLAG_SDMX_DATAFLOW_ID, + FLAG_SDMX_ENDPOINT, + PipelineConfig, +) + +REPO_ROOT = Path(__file__).resolve().parents[2] + +SDMX_CLI_PATH = REPO_ROOT / "tools" / "sdmx_import" / "sdmx_cli.py" +DATA_SAMPLER_PATH = REPO_ROOT / "tools" / "statvar_importer" / "data_sampler.py" +STAT_VAR_PROCESSOR_PATH = (REPO_ROOT / "tools" / "statvar_importer" / + "stat_var_processor.py") +PVMAP_GENERATOR_PATH = REPO_ROOT / "tools" / "agentic_import" / "pvmap_generator.py" +DC_CONFIG_GENERATOR_PATH = (REPO_ROOT / "tools" / "agentic_import" / + "generate_custom_dc_config.py") + +SAMPLE_OUTPUT_DIR = Path("sample_output") +FINAL_OUTPUT_DIR = Path("output") + + +def _require_config_field(value: str | None, field_name: str, + step_name: str) -> str: + if value: + return value + raise ValueError(f"{step_name} requires config.{field_name}") + + +def _run_command(command: Sequence[str], *, verbose: bool) -> None: + if verbose: + logging.debug(f"Running command: {' '.join(command)}") + subprocess.run(command, check=True) + + +class SdmxStep(Step): + """Base class for SDMX steps that carries immutable config and version.""" + + def __init__(self, *, name: str, version: str, + config: PipelineConfig) -> None: + if not name: + raise ValueError("step requires a name") + self._name = name + self._version = version + self._config = config + + @property + def name(self) -> str: + return self._name + + @property + def version(self) -> str: + return self._version + + @abc.abstractmethod + def dry_run(self) -> None: + """Log a read-only preview of the work to be done.""" + + +class DownloadDataStep(SdmxStep): + """Downloads SDMX data payloads.""" + + VERSION = "1" + + @dataclass(frozen=True) + class _StepContext: + full_command: list[str] + output_path: Path + + def __init__(self, *, name: str, config: PipelineConfig) -> None: + super().__init__(name=name, version=self.VERSION, config=config) + self._context: DownloadDataStep._StepContext | None = None + + def _prepare_command(self) -> _StepContext: + if self._context: + return self._context + endpoint = _require_config_field(self._config.sdmx.endpoint, + FLAG_SDMX_ENDPOINT, self.name) + agency = _require_config_field(self._config.sdmx.agency, + FLAG_SDMX_AGENCY, self.name) + dataflow = _require_config_field(self._config.sdmx.dataflow.id, + FLAG_SDMX_DATAFLOW_ID, self.name) + dataset_prefix = self._config.run.dataset_prefix + working_dir = Path(self._config.run.working_dir).resolve() + output_path = working_dir / f"{dataset_prefix}_data.csv" + args = [ + "download-data", + f"--endpoint={endpoint}", + f"--agency={agency}", + f"--dataflow={dataflow}", + f"--output_path={output_path}", + ] + if self._config.sdmx.dataflow.key: + args.append(f"--key={self._config.sdmx.dataflow.key}") + if self._config.sdmx.dataflow.param: + args.append(f"--param={self._config.sdmx.dataflow.param}") + if self._config.run.verbose: + args.append("--verbose") + full_command = [sys.executable, str(SDMX_CLI_PATH)] + args + self._context = DownloadDataStep._StepContext(full_command=full_command, + output_path=output_path) + return self._context + + def run(self) -> None: + context = self._prepare_command() + if self._config.run.verbose: + logging.info( + f"Starting SDMX data download: {' '.join(context.full_command)} -> {context.output_path}" + ) + else: + logging.info(f"Downloading SDMX data to {context.output_path}") + _run_command(context.full_command, verbose=self._config.run.verbose) + + def dry_run(self) -> None: + context = self._prepare_command() + logging.info( + f"{self.name} (dry run): would run {' '.join(context.full_command)}" + ) + + +class DownloadMetadataStep(SdmxStep): + """Downloads SDMX metadata payloads.""" + + VERSION = "1" + + @dataclass(frozen=True) + class _StepContext: + full_command: list[str] + output_path: Path + + def __init__(self, *, name: str, config: PipelineConfig) -> None: + super().__init__(name=name, version=self.VERSION, config=config) + self._context: DownloadMetadataStep._StepContext | None = None + + def _prepare_command(self) -> _StepContext: + if self._context: + return self._context + endpoint = _require_config_field(self._config.sdmx.endpoint, + FLAG_SDMX_ENDPOINT, self.name) + agency = _require_config_field(self._config.sdmx.agency, + FLAG_SDMX_AGENCY, self.name) + dataflow = _require_config_field(self._config.sdmx.dataflow.id, + FLAG_SDMX_DATAFLOW_ID, self.name) + dataset_prefix = self._config.run.dataset_prefix + working_dir = Path(self._config.run.working_dir).resolve() + output_path = working_dir / f"{dataset_prefix}_metadata.xml" + args = [ + "download-metadata", + f"--endpoint={endpoint}", + f"--agency={agency}", + f"--dataflow={dataflow}", + f"--output_path={output_path}", + ] + if self._config.run.verbose: + args.append("--verbose") + full_command = [sys.executable, str(SDMX_CLI_PATH)] + args + self._context = DownloadMetadataStep._StepContext( + full_command=full_command, output_path=output_path) + return self._context + + def run(self) -> None: + context = self._prepare_command() + if self._config.run.verbose: + logging.info( + f"Starting SDMX metadata download: {' '.join(context.full_command)} -> {context.output_path}" + ) + else: + logging.info(f"Downloading SDMX metadata to {context.output_path}") + _run_command(context.full_command, verbose=self._config.run.verbose) + + def dry_run(self) -> None: + context = self._prepare_command() + logging.info( + f"{self.name} (dry run): would run {' '.join(context.full_command)}" + ) + + +class CreateSampleStep(SdmxStep): + """Creates a sample dataset from downloaded data.""" + + VERSION = "1" + + @dataclass(frozen=True) + class _StepContext: + input_path: Path + full_command: list[str] + output_path: Path + + def __init__(self, *, name: str, config: PipelineConfig) -> None: + super().__init__(name=name, version=self.VERSION, config=config) + self._context: CreateSampleStep._StepContext | None = None + + def _prepare_command(self) -> _StepContext: + if self._context: + return self._context + dataset_prefix = self._config.run.dataset_prefix + working_dir = Path(self._config.run.working_dir).resolve() + input_path = working_dir / f"{dataset_prefix}_data.csv" + output_path = working_dir / f"{dataset_prefix}_sample.csv" + + args = [ + f"--sampler_input={input_path}", + f"--sampler_output={output_path}", + f"--sampler_output_rows={self._config.sample.rows}", + ] + full_command = [sys.executable, str(DATA_SAMPLER_PATH)] + args + self._context = CreateSampleStep._StepContext(input_path=input_path, + full_command=full_command, + output_path=output_path) + return self._context + + def run(self) -> None: + context = self._prepare_command() + if not context.input_path.is_file(): + raise RuntimeError( + f"Input file missing for sampling: {context.input_path}") + if self._config.run.verbose: + logging.info( + f"Starting data sampling: {' '.join(context.full_command)} -> {context.output_path}" + ) + else: + logging.info(f"Sampling data to {context.output_path}") + _run_command(context.full_command, verbose=self._config.run.verbose) + + def dry_run(self) -> None: + context = self._prepare_command() + logging.info( + f"{self.name} (dry run): would run {' '.join(context.full_command)}" + ) + + +class CreateSchemaMapStep(SdmxStep): + """Builds schema mappings for transformed data.""" + + VERSION = "1" + + @dataclass(frozen=True) + class _StepContext: + sample_path: Path + metadata_path: Path + output_prefix: Path + full_command: list[str] + + def __init__(self, *, name: str, config: PipelineConfig) -> None: + super().__init__(name=name, version=self.VERSION, config=config) + self._context: CreateSchemaMapStep._StepContext | None = None + + def _prepare_command(self) -> _StepContext: + if self._context: + return self._context + dataset_prefix = self._config.run.dataset_prefix + working_dir = Path(self._config.run.working_dir).resolve() + sample_path = working_dir / f"{dataset_prefix}_sample.csv" + metadata_path = working_dir / f"{dataset_prefix}_metadata.xml" + output_prefix = working_dir / SAMPLE_OUTPUT_DIR / dataset_prefix + + args = [ + f"--input_data={sample_path}", + f"--input_metadata={metadata_path}", + "--sdmx_dataset", + f"--output_path={output_prefix}", + ] + if self._config.run.skip_confirmation: + args.append("--skip_confirmation") + if self._config.run.gemini_cli: + args.append(f"--gemini_cli={self._config.run.gemini_cli}") + args.append(f"--working_dir={working_dir}") + + full_command = [sys.executable, str(PVMAP_GENERATOR_PATH)] + args + self._context = CreateSchemaMapStep._StepContext( + sample_path=sample_path, + metadata_path=metadata_path, + output_prefix=output_prefix, + full_command=full_command) + return self._context + + def run(self) -> None: + context = self._prepare_command() + if not context.sample_path.is_file(): + raise RuntimeError(f"Sample file missing: {context.sample_path}") + if not context.metadata_path.is_file(): + raise RuntimeError( + f"Metadata file missing: {context.metadata_path}") + context.output_prefix.parent.mkdir(parents=True, exist_ok=True) + logging.info( + f"Starting PV map generation: {' '.join(context.full_command)} -> {context.output_prefix}" + ) + _run_command(context.full_command, verbose=self._config.run.verbose) + + def dry_run(self) -> None: + context = self._prepare_command() + logging.info( + f"{self.name} (dry run): would run {' '.join(context.full_command)}" + ) + + +class ProcessFullDataStep(SdmxStep): + """Processes full SDMX data into DC artifacts.""" + + VERSION = "1" + + RUN_OUTPUT_COLUMNS: ClassVar[str] = ( + "observationDate,observationAbout,variableMeasured,value," + "observationPeriod,measurementMethod,unit,scalingFactor") + + @dataclass(frozen=True) + class _StepContext: + input_data_path: Path + pv_map_path: Path + metadata_path: Path + full_command: list[str] + output_prefix: Path + + def __init__(self, *, name: str, config: PipelineConfig) -> None: + super().__init__(name=name, version=self.VERSION, config=config) + self._context: ProcessFullDataStep._StepContext | None = None + + def _prepare_command(self) -> _StepContext: + if self._context: + return self._context + dataset_prefix = self._config.run.dataset_prefix + working_dir = Path(self._config.run.working_dir).resolve() + input_data_path = working_dir / f"{dataset_prefix}_data.csv" + pv_map_path = (working_dir / SAMPLE_OUTPUT_DIR / + f"{dataset_prefix}_pvmap.csv") + metadata_path = (working_dir / SAMPLE_OUTPUT_DIR / + f"{dataset_prefix}_metadata.csv") + output_prefix = working_dir / FINAL_OUTPUT_DIR / dataset_prefix + + args = [ + f"--input_data={input_data_path}", + f"--pv_map={pv_map_path}", + f"--config_file={metadata_path}", + "--generate_statvar_name=True", + "--skip_constant_csv_columns=False", + f"--output_columns={self.RUN_OUTPUT_COLUMNS}", + f"--output_path={output_prefix}", + ] + full_command = [sys.executable, str(STAT_VAR_PROCESSOR_PATH)] + args + self._context = ProcessFullDataStep._StepContext( + input_data_path=input_data_path, + pv_map_path=pv_map_path, + metadata_path=metadata_path, + full_command=full_command, + output_prefix=output_prefix, + ) + return self._context + + def run(self) -> None: + context = self._prepare_command() + for required in (context.input_data_path, context.pv_map_path, + context.metadata_path): + if not required.is_file(): + raise RuntimeError( + f"{self.name} requires existing input: {required}") + # Ensure output directory exists + context.output_prefix.parent.mkdir(parents=True, exist_ok=True) + logging.info( + f"Starting stat_var_processor: input={context.input_data_path} " + f"pvmap={context.pv_map_path} metadata={context.metadata_path} -> " + f"{context.output_prefix}") + _run_command(context.full_command, verbose=self._config.run.verbose) + + def dry_run(self) -> None: + context = self._prepare_command() + logging.info( + f"{self.name} (dry run): would run {' '.join(context.full_command)}" + ) + + +class CreateDcConfigStep(SdmxStep): + """Generates Datacommons configuration artifacts.""" + + VERSION = "1" + + @dataclass(frozen=True) + class _StepContext: + input_csv: Path + output_config: Path + full_command: list[str] + + def __init__(self, *, name: str, config: PipelineConfig) -> None: + super().__init__(name=name, version=self.VERSION, config=config) + self._context: CreateDcConfigStep._StepContext | None = None + + def _prepare_command(self) -> _StepContext: + if self._context: + return self._context + dataset_prefix = self._config.run.dataset_prefix + working_dir = Path(self._config.run.working_dir).resolve() + input_csv = working_dir / FINAL_OUTPUT_DIR / f"{dataset_prefix}.csv" + output_config = (working_dir / FINAL_OUTPUT_DIR / + f"{dataset_prefix}_config.json") + + endpoint = _require_config_field(self._config.sdmx.endpoint, + FLAG_SDMX_ENDPOINT, self.name) + agency = _require_config_field(self._config.sdmx.agency, + FLAG_SDMX_AGENCY, self.name) + dataflow = _require_config_field(self._config.sdmx.dataflow.id, + FLAG_SDMX_DATAFLOW_ID, self.name) + + dataset_url = (f"{endpoint.rstrip('/')}/data/" + f"{agency},{dataflow},") + + args = [ + f"--input_csv={input_csv}", + f"--output_config={output_config}", + f"--provenance_name={dataflow}", + f"--source_name={agency}", + f"--data_source_url={endpoint}", + f"--dataset_url={dataset_url}", + ] + full_command = [sys.executable, str(DC_CONFIG_GENERATOR_PATH)] + args + self._context = CreateDcConfigStep._StepContext( + input_csv=input_csv, + output_config=output_config, + full_command=full_command) + return self._context + + def run(self) -> None: + context = self._prepare_command() + if not context.input_csv.is_file(): + raise RuntimeError( + f"{self.name} requires existing input: {context.input_csv}") + + logging.info( + f"Starting custom DC config generation: input={context.input_csv} -> {context.output_config}" + ) + _run_command(context.full_command, verbose=self._config.run.verbose) + + def dry_run(self) -> None: + context = self._prepare_command() + logging.info( + f"{self.name} (dry run): would run {' '.join(context.full_command)}" + ) diff --git a/tools/agentic_import/state_handler.py b/tools/agentic_import/state_handler.py new file mode 100644 index 0000000000..ea1d593197 --- /dev/null +++ b/tools/agentic_import/state_handler.py @@ -0,0 +1,130 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""State file helpers shared by the SDMX agentic pipeline components. + +The handler centralizes JSON persistence so callers (builder, callbacks) can +operate on an in-memory `PipelineState`. This implementation assumes a single +process has exclusive ownership of the state file for the duration of a run. +""" + +from __future__ import annotations + +import json +from dataclasses import asdict, dataclass, field +from datetime import datetime, timezone +from pathlib import Path + +from absl import logging +from dataclasses_json import dataclass_json + + +@dataclass_json +@dataclass +class StepState: + version: str + status: str + started_at: str + ended_at: str + duration_s: float + started_at_ts: int | None = None + ended_at_ts: int | None = None + message: str | None = None + + +@dataclass_json +@dataclass +class PipelineState: + dataset_prefix: str + critical_input_hash: str + command: str + updated_at: str + updated_at_ts: int | None = None + steps: dict[str, StepState] = field(default_factory=dict) + + +class StateHandler: + """Minimal state manager that owns JSON file I/O.""" + + def __init__(self, state_path: str | Path, dataset_prefix: str) -> None: + self._state_path = Path(state_path) + self._dataset_prefix = dataset_prefix + self._state: PipelineState | None = None + + @property + def path(self) -> Path: + """Returns the backing state file path.""" + return self._state_path + + def get_state(self) -> PipelineState: + if self._state is None: + self._state = self._load_or_init() + return self._state + + def save_state(self) -> None: + state = self.get_state() + self._write_state(state) + + def _load_or_init(self) -> PipelineState: + path = self._state_path + path.parent.mkdir(parents=True, exist_ok=True) + if not path.exists(): + state = self._empty_state() + logging.info(f"Creating new state file at {path}") + self._write_state(state) + return state + try: + with path.open("r", encoding="utf-8") as fp: + data = json.load(fp) + state = PipelineState.from_dict(data) + if not state.dataset_prefix: + # Ensure a manual or corrupted state file still has prefix metadata. + state.dataset_prefix = self._dataset_prefix + return state + except (OSError, json.JSONDecodeError, ValueError, TypeError) as exc: + logging.warning(f"Failed to load state file {path}: {exc}") + self._backup_bad_file() + state = self._empty_state() + self._write_state(state) + return state + + def _write_state(self, state: PipelineState) -> None: + directory = self._state_path.parent + directory.mkdir(parents=True, exist_ok=True) + payload = json.dumps(asdict(state), indent=2, sort_keys=True) + "\n" + tmp_path = self._state_path.with_suffix(".tmp") + tmp_path.write_text(payload, encoding="utf-8") + tmp_path.replace(self._state_path) + + def _empty_state(self) -> PipelineState: + return PipelineState( + dataset_prefix=self._dataset_prefix, + critical_input_hash="", + command="", + updated_at="", + updated_at_ts=None, + ) + + def _backup_bad_file(self) -> None: + path = self._state_path + if not path.exists(): + return + timestamp = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%S") + backup_name = f"{path.name}.bad.{timestamp}.bak" + backup_path = path.with_name(backup_name) + try: + path.replace(backup_path) + logging.warning(f"Backed up corrupt state to {backup_path}") + except OSError as exc: + logging.warning( + f"Failed to backup corrupt state file {path}: {exc}") diff --git a/tools/agentic_import/state_handler_test.py b/tools/agentic_import/state_handler_test.py new file mode 100644 index 0000000000..dfcdd416dc --- /dev/null +++ b/tools/agentic_import/state_handler_test.py @@ -0,0 +1,75 @@ +#!/usr/bin/env python3 + +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Unit tests for the SDMX state handler.""" + +from __future__ import annotations + +import json +import os +import sys +import tempfile +import unittest + +_SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) +if _SCRIPT_DIR not in sys.path: + sys.path.append(_SCRIPT_DIR) + +from state_handler import StateHandler # pylint: disable=import-error + + +class StateHandlerTest(unittest.TestCase): + + def test_missing_file_creates_empty_state(self) -> None: + with tempfile.TemporaryDirectory() as tmpdir: + path = os.path.join(tmpdir, "demo.state.json") + handler = StateHandler(state_path=path, dataset_prefix="demo") + + state = handler.get_state() + + self.assertTrue(os.path.exists(path)) + self.assertEqual(state.dataset_prefix, "demo") + self.assertEqual(state.steps, {}) + + with open(path, encoding="utf-8") as fp: + data = json.load(fp) + self.assertEqual(data["dataset_prefix"], "demo") + self.assertEqual(data["steps"], {}) + self.assertIsNone(data.get("updated_at_ts")) + + def test_corrupt_file_creates_backup_and_resets_state(self) -> None: + with tempfile.TemporaryDirectory() as tmpdir: + path = os.path.join(tmpdir, "demo.state.json") + with open(path, "w", encoding="utf-8") as fp: + fp.write("{not-json}") + + handler = StateHandler(state_path=path, dataset_prefix="demo") + state = handler.get_state() + + backups = [ + name for name in os.listdir(tmpdir) + if name.startswith("demo.state.json.bad.") + ] + self.assertEqual(state.steps, {}) + self.assertGreaterEqual(len(backups), 1) + + with open(path, encoding="utf-8") as fp: + data = json.load(fp) + self.assertEqual(data["steps"], {}) + self.assertIsNone(data.get("updated_at_ts")) + + +if __name__ == "__main__": + unittest.main() diff --git a/tools/agentic_import/templates/generate_pvmap_prompt.j2 b/tools/agentic_import/templates/generate_pvmap_prompt.j2 index 99c6445d13..8fafc6ab47 100644 --- a/tools/agentic_import/templates/generate_pvmap_prompt.j2 +++ b/tools/agentic_import/templates/generate_pvmap_prompt.j2 @@ -1242,7 +1242,7 @@ When working with SDMX datasets, follow these additional guidelines: # CORE TASK Your primary goal is to analyze the provided CSV data and generate a complete -and valid `{{output_dir}}/{{output_basename}}_pvmap.csv` and `{{output_dir}}/{{output_basename}}_metadata.csv` files which can be used with Statvar +and valid `{{output_path_abs}}_pvmap.csv` and `{{output_path_abs}}_metadata.csv` files which can be used with Statvar processor tool to produce the final DataCommons artifacts. ## 📌 IMPORTANT: FILE NAMING CONVENTION @@ -1262,18 +1262,18 @@ This naming convention allows multiple datasets to be processed in the same work - ✅ No file conflicts or overwrites - ✅ Easy to organize outputs by topic or date -**Current Task**: For this specific run, your output path is `{{output_path}}`. +**Current Task**: For this specific run, your output path is `{{output_path_abs}}`. Throughout this documentation, you will see references to generic file names. **You MUST use the following specific file names for this task:** | Documentation Reference | Actual File You Must Create | |------------------------|----------------------------| -| `pvmap.csv` | `{{output_dir}}/{{output_basename}}_pvmap.csv` | -| `metadata.csv` | `{{output_dir}}/{{output_basename}}_metadata.csv` | -| `output.csv` | `{{output_path}}.csv` | +| `pvmap.csv` | `{{output_path_abs}}_pvmap.csv` | +| `metadata.csv` | `{{output_path_abs}}_metadata.csv` | +| `output.csv` | `{{output_path_abs}}.csv` | -**Example**: When the documentation says "create pvmap.csv", you must actually create `{{output_dir}}/{{output_basename}}_pvmap.csv` +**Example**: When the documentation says "create pvmap.csv", you must actually create `{{output_path_abs}}_pvmap.csv` **REMEMBER**: Whenever you see generic file names in the instructions, always use the specific names with the output path prefix. @@ -1343,11 +1343,11 @@ Follow these steps sequentially. {%- endif %} -**2. Generate `{{output_dir}}/{{output_basename}}_pvmap.csv` and `{{output_dir}}/{{output_basename}}_metadata.csv`** +**2. Generate `{{output_path_abs}}_pvmap.csv` and `{{output_path_abs}}_metadata.csv`** -- Create the `{{output_dir}}/{{output_basename}}_pvmap.csv` file, mapping the source data columns to DataCommons properties based on your findings. -- Create the `{{output_dir}}/{{output_basename}}_metadata.csv` file and define the necessary `statvar_processor` configuration parameters within it. -- Configuration rule: All processor flags/settings must live in `{{output_dir}}/{{output_basename}}_metadata.csv`. Do not embed configuration in `{{output_dir}}/{{output_basename}}_pvmap.csv` and do not rely on extra CLI flags. +- Create the `{{output_path_abs}}_pvmap.csv` file, mapping the source data columns to DataCommons properties based on your findings. +- Create the `{{output_path_abs}}_metadata.csv` file and define the necessary `statvar_processor` configuration parameters within it. +- Configuration rule: All processor flags/settings must live in `{{output_path_abs}}_metadata.csv`. Do not embed configuration in `{{output_path_abs}}_pvmap.csv` and do not rely on extra CLI flags. ### Validation Checklist While generating the files, ensure: @@ -1365,9 +1365,9 @@ While generating the files, ensure: - [ ] **Special/missing values mapped appropriately** - Use `#ignore` ONLY to drop entire rows. For skipping individual cell values, use empty mapping: `column:value,IntermediateProperty,''` (preserves row, skips cell) #### Metadata CSV Validation: -- [ ] **{{output_dir}}/{{output_basename}}_metadata.csv covers processor flags** - Includes required parameters (e.g., `header_rows`) -- [ ] **No config in {{output_dir}}/{{output_basename}}_pvmap.csv** - `{{output_dir}}/{{output_basename}}_pvmap.csv` contains only PV mappings, not processor settings -- [ ] **No extra CLI flags** - Configuration is exclusively in `{{output_dir}}/{{output_basename}}_metadata.csv`; wrapper provides input paths +- [ ] **{{output_path_abs}}_metadata.csv covers processor flags** - Includes required parameters (e.g., `header_rows`) +- [ ] **No config in {{output_path_abs}}_pvmap.csv** - `{{output_path_abs}}_pvmap.csv` contains only PV mappings, not processor settings +- [ ] **No extra CLI flags** - Configuration is exclusively in `{{output_path_abs}}_metadata.csv`; wrapper provides input paths - [ ] **Parameter names match documentation** - Not CLI flag names - [ ] **Quote values containing commas** - `key,"value1,value2,value3"` @@ -1394,13 +1394,13 @@ For SDMX datasets, also ensure: ```bash # Run statvar processor using dedicated script -{{script_dir}}/agentic_import/run_statvar_processor.sh \ +{{script_dir_abs}}/agentic_import/run_statvar_processor.sh \ --python "{{python_interpreter}}" \ - --script-dir "{{script_dir}}" \ - --working-dir "{{working_dir}}" \ - --input-data "{{input_data}}" \ + --script-dir "{{script_dir_abs}}" \ + --working-dir "{{working_dir_abs}}" \ + --input-data "{{input_data_abs}}" \ --gemini-run-id "{{gemini_run_id}}" \ - --output-path "{{output_path}}" + --output-path "{{output_path_abs}}" ``` The wrapper reads `metadata.csv` for all processor configuration. Do not add extra flags to this command. @@ -1409,11 +1409,11 @@ The wrapper reads `metadata.csv` for all processor configuration. Do not add ext **📊 VALIDATION CHECKLIST**: - Check the command exit code (0 = success, non-zero = failure) -- Verify that `{{working_dir}}/{{output_path}}.csv` exists and is not empty +- Verify that `{{output_path_abs}}.csv` exists and is not empty - Confirm no duplicate entries for same place, date, and variable - **Verify output.csv contains all required columns**: Must include at minimum `observationAbout`, `observationDate`, `variableMeasured`, `value` -- **Verify complete column mapping**: Any observation properties mapped in {{output_dir}}/{{output_basename}}_pvmap.csv (like `unit`, `scalingFactor`, `measurementMethod`, `observationPeriod`) must be present as columns in `{{working_dir}}/{{output_path}}.csv` -- **Verify `{{output_dir}}/{{output_basename}}_metadata.csv` completeness**: Confirm `header_rows` parameter is present and correctly specified +- **Verify complete column mapping**: Any observation properties mapped in {{output_path_abs}}_pvmap.csv (like `unit`, `scalingFactor`, `measurementMethod`, `observationPeriod`) must be present as columns in `{{output_path_abs}}.csv` +- **Verify `{{output_path_abs}}_metadata.csv` completeness**: Confirm `header_rows` parameter is present and correctly specified **🎯 DECISION LOGIC - APPLY THIS EXACTLY**: @@ -1428,22 +1428,22 @@ IF all items in the VALIDATION CHECKLIST above pass: ELIF CURRENT_ATTEMPT < {{max_iterations}}: → OUTPUT: "❌ ATTEMPT CURRENT_ATTEMPT FAILED - Error details: [describe specific error]" → OUTPUT: "🔄 Starting attempt [CURRENT_ATTEMPT + 1] of {{max_iterations}}..." - → Analyze the error from logs. In case statvar processor failed, read log file at: {{working_dir}}/.datacommons/processor.log + → Analyze the error from logs. In case statvar processor failed, read log file at: {{working_dir_abs}}/.datacommons/processor.log {# TODO: move debugging instructions to separate section #} - → **Common {{output_dir}}/{{output_basename}}_metadata.csv issues to check:** + → **Common {{output_path_abs}}_metadata.csv issues to check:** • Missing or wrong `header_rows` (should be 1 for standard CSV with headers) • Wrong `skip_rows` value skipping too much data • Debugging parameters left in production (`process_rows`, `input_rows`, `input_columns`) • Place resolution issues: missing `places_within` or wrong `place_type` - → Modify {{output_dir}}/{{output_basename}}_pvmap.csv and/or {{output_dir}}/{{output_basename}}_metadata.csv to fix identified issues + → Modify {{output_path_abs}}_pvmap.csv and/or {{output_path_abs}}_metadata.csv to fix identified issues → INCREMENT ATTEMPT COUNTER → Return to Step 5 (Run the Processor) ELSE (CURRENT_ATTEMPT >= {{max_iterations}}): → OUTPUT: "⛔ ITERATION LIMIT REACHED: Failed after {{max_iterations}} attempts" → OUTPUT: "📋 Final Status: FAILED - Manual intervention required" - → OUTPUT: "📁 Check logs at: {{working_dir}}/.datacommons/ for debugging" - → OUTPUT: "📁 Check backup at: {{working_dir}}/runs/{{gemini_run_id}}/ for debugging" + → OUTPUT: "📁 Check logs at: {{working_dir_abs}}/.datacommons/ for debugging" + → OUTPUT: "📁 Check backup at: {{working_dir_abs}}/runs/{{gemini_run_id}}/ for debugging" → STOP EXECUTION IMMEDIATELY → DO NOT MAKE ANY MORE ATTEMPTS ``` @@ -1462,21 +1462,21 @@ CRITICAL: Follow all SDMX-specific guidelines and use metadata for semantic mapp ```json { - "input_data": ["{{input_data}}"], - "input_metadata": {{input_metadata | tojson}}, - "working_dir": "{{working_dir}}", - "output_dir": "{{working_dir}}/{{output_dir}}", + "input_data": ["{{input_data_abs}}"], + "input_metadata": {{input_metadata_abs | tojson}}, + "working_dir": "{{working_dir_abs}}", + "output_dir": "{{output_dir_abs}}", "dataset_type": "{{dataset_type}}" } ``` # OUTPUT REQUIREMENTS & FINAL INSTRUCTION -- Generate `{{output_dir}}/{{output_basename}}_pvmap.csv` and `{{output_dir}}/{{output_basename}}_metadata.csv` +- Generate `{{output_path_abs}}_pvmap.csv` and `{{output_path_abs}}_metadata.csv` - **Adhere to Rules:** Strictly follow all schema rules, property requirements, and formatting guidelines from the knowledge base. - DO NOT deviate from the documented standards. -- Configuration location: Place all processor flags/settings in `{{output_dir}}/{{output_basename}}_metadata.csv` only. Do not embed settings in `{{output_dir}}/{{output_basename}}_pvmap.csv` and do not propose additional CLI flags. +- Configuration location: Place all processor flags/settings in `{{output_path_abs}}_metadata.csv` only. Do not embed settings in `{{output_path_abs}}_pvmap.csv` and do not propose additional CLI flags. # 🛑 FINAL EXECUTION REMINDERS @@ -1491,7 +1491,7 @@ CRITICAL: Follow all SDMX-specific guidelines and use metadata for semantic mapp # ACTION REQUIRED NOW -**Execute** the data analysis and generate the `{{output_dir}}/{{output_basename}}_pvmap.csv` and `{{output_dir}}/{{output_basename}}_metadata.csv` +**Execute** the data analysis and generate the `{{output_path_abs}}_pvmap.csv` and `{{output_path_abs}}_metadata.csv` files now. Follow the primary workflow **WITHOUT** deviation. **REMEMBER**: You have {{max_iterations}} attempts maximum. Track each attempt and stop when you succeed or reach the limit.