A production-style end-to-end data pipeline built on the Bronze → Silver → Gold medallion architecture
Overview · Architecture · Quick Start · Pipeline Layers · Data Model · Project Structure · Commands
This project implements a full data engineering pipeline for the Olist Brazilian E-Commerce public dataset. Raw CSV files are ingested, cleaned, and modelled into a star schema warehouse that feeds a Power BI dashboard.
- 4 Airflow DAGs orchestrating the full ETL lifecycle on a daily schedule
- Bronze layer — raw data ingested as-is with
_loaded_ataudit timestamps - Silver layer — deduplicated, typed, null-safe, and standardised tables
- Gold layer — star schema with dimensions, facts, ML features, and Power BI views
- Quality gate — automated row count, null rate, and business rule validation
- 🔲 FastAPI backend —
/api/metrics,/api/trends,/api/pipeline/status - 🔲 React + Recharts dashboard — KPI cards, time-series charts, pipeline monitor
┌──────────────────────────────────────────────────────────────┐
│ DATA SOURCES │
│ Olist Brazilian E-Commerce (9 CSV files) │
└─────────────────────────────┬────────────────────────────────┘
│
▼ DAG 1 · daily 01:00 UTC
┌──────────────────────────────────────────────────────────────┐
│ 🥉 BRONZE · schema: bronze │
│ │
│ Raw ingestion — all columns stored as TEXT │
│ _loaded_at audit timestamp appended to every table │
│ No transformations, no type casts, no filtering │
│ │
│ bronze.orders bronze.customers bronze.products │
│ bronze.order_items bronze.payments bronze.sellers │
│ bronze.reviews bronze.geolocation │
└─────────────────────────────┬────────────────────────────────┘
│
▼ DAG 2 · daily 03:00 UTC
┌──────────────────────────────────────────────────────────────┐
│ 🥈 SILVER · schema: silver │
│ │
│ Deduplication ROW_NUMBER() OVER (PARTITION BY pk │
│ ORDER BY loaded_at DESC)│
│ Null handling NULLIF(col, '') before every type cast │
│ Type casting ::TIMESTAMP ::NUMERIC(12,2) ::INT │
│ Standardisation UPPER(TRIM()) city/state │
│ LOWER(TRIM()) category names │
│ Imputation Per-category median for product dimensions │
│ Ordering ORDER BY natural key ASC │
└─────────────────────────────┬────────────────────────────────┘
│
▼ DAG 3 · daily 05:00 UTC
┌──────────────────────────────────────────────────────────────┐
│ 🥇 GOLD · schema: gold │
│ │
│ Dimensions dim_customers dim_products │
│ dim_sellers dim_dates │
│ │
│ Facts fact_orders fact_order_payments │
│ fact_seller_performance │
│ │
│ ML features fulfilment_days approval_hours │
│ delivery_delta_days is_high_instalment │
│ │
│ Public views public.* ──▶ Power BI navigator ready │
└─────────────────────────────┬────────────────────────────────┘
│
▼ DAG 4 · daily 07:00 UTC
┌──────────────────────────────────────────────────────────────┐
│ ✅ QUALITY CHECKS │
│ │
│ Row count minimums per gold table │
│ Null rate thresholds on key columns │
│ Business rules no negative revenue · no orphan FK keys │
│ no duplicate order IDs · valid review scores│
└─────────────────────────────┬────────────────────────────────┘
│
┌───────────────┴────────────────┐
▼ ▼ (next phase)
Power BI Desktop FastAPI + React Dashboard
(public.* views via PostgreSQL)
| Tool | Version | Purpose |
|---|---|---|
| Docker Desktop | Latest | Runs the PostgreSQL warehouse container |
| Astro CLI | Latest | Manages local Airflow environment |
| Python | 3.12 | Local test execution |
# Run PowerShell as Administrator
winget install -e --id Astronomer.Astro
# Confirm the install
astro versionAlready using Docker Desktop? Keep it by skipping the Podman dependency:
winget install -e --id Astronomer.Astro --skip-dependencies
git clone https://github.com/your-username/ecommerce-pipeline.git
cd ecommerce-pipelineCreate a .env file in the project root:
WAREHOUSE_HOST=host.docker.internal
WAREHOUSE_PORT=5433
WAREHOUSE_DB=ecommerce_dw
WAREHOUSE_USER=warehouse
WAREHOUSE_PASSWORD=warehouse
DATA_DIR=/usr/local/airflow/include/dataDownload the dataset from Kaggle and place all CSV files into include/data/:
include/data/
├── olist_orders_dataset.csv
├── olist_order_items_dataset.csv
├── olist_customers_dataset.csv
├── olist_products_dataset.csv
├── olist_sellers_dataset.csv
├── olist_order_payments_dataset.csv
├── olist_order_reviews_dataset.csv
├── olist_geolocation_dataset.csv
└── product_category_name_translation.csv
Add
include/data/to your.gitignore— do not commit raw data files.
docker compose -f docker-compose.warehouse.yml up -d# First time only — initialises the Astro project files
astro dev init
# Start the Airflow environment
astro dev startOpen http://localhost:8080 · credentials: admin / admin
In the Airflow UI navigate to Admin → Connections → + and enter:
| Field | Value |
|---|---|
| Connection ID | postgres_warehouse |
| Connection Type | Postgres |
| Host | host.docker.internal |
| Database | ecommerce_dw |
| Login | warehouse |
| Password | warehouse |
| Port | 5433 |
astro dev run dags trigger 01_bronze_ingest
astro dev run dags trigger 02_silver_clean
astro dev run dags trigger 03_gold_model
astro dev run dags trigger 04_quality_checksDAGs also run automatically on their daily schedule — see DAG Schedules.
Every bronze table passes through the same cleaning pipeline before data reaches gold:
| Operation | Method | Reason |
|---|---|---|
| Deduplication | ROW_NUMBER() OVER (PARTITION BY pk ORDER BY loaded_at DESC) |
CSV re-exports introduce duplicate rows per entity |
| Empty string → NULL | NULLIF(col, '') before every ::TYPE cast |
Ingest stores everything as TEXT; empty strings crash type casts |
| NULL imputation | COALESCE(value, category_median, 0) |
Product dimensions use per-category medians so ML features are always populated |
| Type casting | ::TIMESTAMP, ::NUMERIC(12,2), ::INT |
Applied after the NULLIF guard to prevent runtime cast errors |
| Standardisation | UPPER(TRIM(...)) on city/state · LOWER(TRIM(...)) on categories |
Prevents broken GROUP BY from mixed-case data ("SP" vs "sp") |
| Ordering | ORDER BY on natural key ascending |
Reduces sort overhead in downstream window functions and joins |
The gold fact tables include derived feature columns designed for ML workloads:
| Feature | Table | Description |
|---|---|---|
fulfilment_days |
fact_orders |
Days from purchase to delivery |
approval_hours |
fact_orders |
Hours from purchase to payment approval |
delivery_delta_days |
fact_orders |
Estimated minus actual delivery (negative = late) |
delivered_on_time |
fact_orders |
Boolean — delivered before estimated date |
is_high_instalment |
fact_order_payments |
Flag for orders with > 6 payment instalments |
avg_review_score |
fact_seller_performance |
Monthly rolling review average per seller |
on_time_pct |
fact_seller_performance |
Monthly on-time delivery rate per seller |
| DAG | ID | Schedule | Runs after |
|---|---|---|---|
| Bronze ingest | 01_bronze_ingest |
0 1 * * * |
— |
| Silver clean | 02_silver_clean |
0 3 * * * |
Bronze |
| Gold model | 03_gold_model |
0 5 * * * |
Silver |
| Quality checks | 04_quality_checks |
0 7 * * * |
Gold |
| Table | Grain | Primary key | Notable columns |
|---|---|---|---|
gold.dim_customers |
1 row per unique customer | customer_key |
city, state, order_account_count |
gold.dim_products |
1 row per product | product_key |
category, weight_g, volumetric_weight_kg |
gold.dim_sellers |
1 row per seller | seller_key |
city, state |
gold.dim_dates |
1 row per calendar day | date_key |
year, quarter, month_name, is_weekend, is_public_holiday, fiscal_period |
| Table | Grain | Key measures |
|---|---|---|
gold.fact_orders |
1 row per order | revenue, freight_revenue, gross_revenue, fulfilment_days, delivered_on_time, review_score |
gold.fact_order_payments |
1 row per order × payment type × instalments | total_amount, installments, avg_instalment_amount, is_high_instalment |
gold.fact_seller_performance |
1 row per seller × month | total_revenue, total_orders, avg_review_score, on_time_pct, avg_fulfilment_days |
The public schema exposes thin views over every gold table. Power BI's PostgreSQL navigator surfaces public by default — no schema configuration required.
public.dim_customers → gold.dim_customers
public.dim_products → gold.dim_products
public.dim_sellers → gold.dim_sellers
public.dim_dates → gold.dim_dates
public.fact_orders → gold.fact_orders
public.fact_order_payments → gold.fact_order_payments
public.fact_seller_performance → gold.fact_seller_performanceecommerce-pipeline/
│
├── 📄 Dockerfile Astro Runtime image (generated by astro dev init)
├── 📄 .env Warehouse credentials — gitignored
├── 📄 requirements.txt Python dependencies
├── 📄 docker-compose.warehouse.yml PostgreSQL 15 warehouse on port 5433
│
├── 📁 .astro/ Astro CLI project config
│
├── 📁 dags/
│ ├── 01_bronze_ingest.py Load CSVs → bronze via psycopg2 COPY
│ ├── 02_silver_clean.py Execute sql/silver/*.sql scripts
│ ├── 03_gold_model.py Execute sql/gold/*.sql scripts
│ └── 04_quality_checks.py Validate gold layer — fails loudly on breach
│
├── 📁 sql/
│ ├── silver/
│ │ ├── 00_create_schemas.sql CREATE SCHEMA bronze / silver / gold
│ │ ├── 01_orders.sql Dedup + NULLIF casts + derived fulfilment cols
│ │ ├── 02_order_items.sql Dedup + price cast + line_total
│ │ ├── 03_customers.sql Dedup + UPPER(TRIM()) geography
│ │ ├── 04_products.sql Dedup + per-category median imputation
│ │ └── 05_sellers_payments_reviews.sql
│ └── gold/
│ ├── 01_dimensions.sql dim_* tables + PKs + indexes
│ ├── 02_facts.sql fact_* tables + ML features
│ └── 03_powerbi_views.sql public.* views for Power BI
│
├── 📁 include/
│ └── data/ Olist CSVs — gitignored
│
├── 📁 plugins/ Custom Airflow plugins
├── 📁 config/ Airflow connections / variables
├── 📁 tests/ DAG integrity and unit tests
│
├── 📁 backend/ FastAPI REST API ← next phase
└── 📁 frontend/ React + Recharts dashboard ← next phase
| Command | Description |
|---|---|
astro dev start |
Start Airflow (webserver, scheduler, triggerer) |
astro dev stop |
Stop all Airflow containers |
astro dev restart |
Rebuild Docker image and restart — required after editing requirements.txt |
astro dev logs |
Stream live scheduler and webserver logs |
astro dev parse |
Validate all DAG files for import errors without starting containers |
astro dev run dags trigger <dag_id> |
Manually trigger a DAG by its ID |
pip install pytest
pytest tests/| Layer | Technology |
|---|---|
| Orchestration | Apache Airflow 2.8 · Astronomer Astro CLI |
| Warehouse | PostgreSQL 15 |
| Ingestion | Python · pandas 2.1 · psycopg2 COPY |
| Transformation | SQL executed via PostgresHook.get_conn() |
| BI | Power BI Desktop |
| Containerisation | Docker · Astro Runtime (based on Python 3.12) |
- Bronze ingest — raw CSV loading with audit timestamps
- Silver clean — deduplication, type casting, null handling, standardisation
- Gold model — star schema with ML-ready feature columns
- Quality checks — automated validation gate before BI refresh
- Power BI integration — public schema views
- FastAPI backend — metrics, trends, and pipeline status endpoints
- React dashboard — KPI cards, charts, pipeline run monitor
- dbt integration — replace raw SQL files with versioned dbt models
- CI/CD — GitHub Actions for DAG linting, parsing, and test runs
Pull requests are welcome. For major changes please open an issue first to discuss what you would like to change. Please make sure to update tests as appropriate.
Built with the Olist Brazilian E-Commerce public dataset