Configuration-driven ETL pipeline for ingesting structured data into PostgreSQL.
The system implements modular ingestion with validation, transformation, reject handling, and structured loading into staging tables.
- YAML-driven ingestion workflows
- CSV and JSON source support
- PostgreSQL staging table creation
- Schema-based type casting
- Primary key enforcement
- Rule-based validation
- Reject capture via
stg_rejects - Upsert-based loading strategy
- Structured logging
- Unit tests with pytest and coverage reporting
CSV / JSON / API Sources
↓
Reader (extract raw data)
↓
Validator (schema + rule checks)
↓
Cleaner (type casting + normalization)
↓
Loader (insert into PostgreSQL staging tables)
↓
stg_* tables (clean dataset)
Invalid Records
↓
Validation Failure Reason Attached
↓
stg_rejects (audit table for debugging and replay)
src/
├── cleaners/ # Data normalization and type casting
├── database/ # PostgreSQL connection + query execution
├── loaders/ # Insert and upsert logic
├── loggers/ # Structured logging utilities
├── readers/ # CSV, JSON, API ingestion
├── tests/ # Unit and integration tests
├── utils/ # Shared helpers
├── validators/ # Schema + rule validation engine
└── main.py # Pipeline entry point
sources:
- name: weather_data
type: json
target_table: stg_weather
pk:
- time
schema:
time: datetime
temperature: float
rules:
- temperature >= -100- Python 3.11+
- PostgreSQL
python -m venv .venv
pip install -r requirements.txtEnvironment variables:
DATABASE_URL=postgresql://user:password@localhost:5432/databasepython src/main.pypytest
pytest --cov --cov-report=term-missing- Open-Meteo API ingestion
- NOAA GSOM ingestion
- Integration testing
- Docker support
- CI/CD pipeline
- Incremental loading
- Audit tracking
- Modular pipeline stages (extract → validate → transform → load)
- Configuration-driven design
- Explicit reject handling (no silent failures)
- Reproducibility and idempotency
- Testable components