respira-data is the data platform for Proyecto Respira. It ingests raw sensor
data replicated by Airbyte into Postgres, builds a reusable canonical layer with
dbt, and orchestrates canonical plus project-specific pipelines with Prefect.
Today the only active project is respira_gold, but the repository is already
structured as a modular monorepo:
dbt/models/canonical: reusable ingestion, normalization, dimensions, and silver modelsdbt/models/projects/respira_gold: project-specific marts and inference featurespipelines/flows: Prefect orchestration for canonical and project pipelinespipelines/config/projects.py: registry of project-level runtime configurationscripts/prefect_worker_start.sh: worker bootstrap, deployment registration, and scheduling
For a first working deploy against a clean database that already has raw Airbyte
tables in airbyte, use this sequence.
- Prepare
.env.
cp .env.example .envSet at least:
REMOTE_PG_HOSTREMOTE_PG_PORTREMOTE_PG_DBREMOTE_PG_USERREMOTE_PG_PASSWORDREMOTE_PG_SSLMODEMODEL_6H_PATHMODEL_12H_PATH
Important:
- If the password contains
$, wrap it in single quotes inside.env. MODEL_6H_PATHandMODEL_12H_PATHmust point to files available inside the containers, usually under/app/models/....
- Start the local stack.
docker compose up -d --build
docker compose ps- Validate dbt connectivity.
docker compose exec app bash -lc "cd /app/dbt && dbt debug --target prod"- Install dbt packages and load seeds.
docker compose exec app bash -lc "cd /app/dbt && dbt clean"
docker compose exec app bash -lc "cd /app/dbt && dbt deps"
docker compose exec app bash -lc "cd /app/dbt && dbt seed --target prod --full-refresh"- Build canonical layers.
docker compose exec app bash -lc "cd /app/dbt && dbt run --target prod --selector canonical_core"
docker compose exec app bash -lc "cd /app/dbt && dbt run --target prod --selector canonical_silver"- Create operational and inference tables.
docker compose exec app bash -lc "cd /app && python3 pipelines/flows/warehouse_bootstrap.py"- Build the project layer.
docker compose exec app bash -lc "cd /app/dbt && dbt run --target prod --selector project_respira_gold"
docker compose exec app bash -lc "cd /app/dbt && dbt test --target prod --selector project_respira_gold_tests"- Run inference or the full project pipeline.
docker compose exec prefect_worker bash -lc "cd /app && python3 pipelines/flows/project_inference.py"
docker compose exec prefect_worker bash -lc "cd /app && python3 pipelines/flows/project_pipeline.py"- Optional: trigger Prefect deployments from the local server instead of running the flow files directly.
docker compose exec prefect_server prefect deployment ls
docker compose exec prefect_server prefect deployment run 'canonical_incremental/canonical-incremental'
docker compose exec prefect_server prefect deployment run 'project_pipeline/project-pipeline-respira_gold'warehouse_bootstrap.pyusescreate schema if not exists ..., so the DB runtime user must haveCREATEon the database, or those bootstrap steps will fail.station_inference_featuresis now a persisted incremental table. If you are upgrading from an older local state where it exists as a view, drop it before rebuilding:
docker compose exec app bash -lc 'PGPASSWORD="$REMOTE_PG_PASSWORD" psql "host=$REMOTE_PG_HOST port=$REMOTE_PG_PORT dbname=$REMOTE_PG_DB user=$REMOTE_PG_USER sslmode=$REMOTE_PG_SSLMODE" -c "drop view if exists respira_gold.station_inference_features cascade;"'respira_gold.inference_resultsnow stores one row perinference_run_id + station_idwithforecast_6h,forecast_12h, andaqi_input. If you still have the old table shape from a previous run, recreate it before bootstrapping again:
docker compose exec app bash -lc 'PGPASSWORD="$REMOTE_PG_PASSWORD" psql "host=$REMOTE_PG_HOST port=$REMOTE_PG_PORT dbname=$REMOTE_PG_DB user=$REMOTE_PG_USER sslmode=$REMOTE_PG_SSLMODE" -c "drop table if exists respira_gold.inference_results cascade;"'
docker compose exec app bash -lc "cd /app && python3 pipelines/flows/warehouse_bootstrap.py"- Docker Compose does not start a local Postgres instance. This stack connects
to an external Postgres warehouse configured through
.env. - Airbyte is assumed to replicate raw tables into the
airbyteschema of that external Postgres database. - The worker auto-registers Prefect deployments on startup. In local development, the worker bootstrap script is the operational source of truth for schedules.
- The platform has two pipeline layers:
canonical_*builds reusable shared dataproject_*builds project-specific marts and optional inference outputs
- All timestamps in the silver layer are expected to be UTC. FIUNA source timestamps arrive as local UTC-3 wall-clock time and are converted to UTC in staging before they reach silver.
docker-compose.yml starts three services:
prefect_server: local Prefect API and UI onhttp://localhost:4200app: generic runner container used for dbt commands, ad hoc flow execution, and local shell accessprefect_worker: long-running worker process that creates deployments and polls the Prefect work pool
Important runtime details:
- The repository is mounted into both
appandprefect_worker, so local code edits are visible immediately inside containers. prefect_workerusesDockerfile.worker, which includes the extra inference dependencies.appusesDockerfileand is the default place for dbt commands.make smoke-testruns with hostpoetry, not inside Docker Compose.
dbt/: dbt project, macros, seeds, and modelsdbt/models/canonical/: shared canonical modelsdbt/models/projects/respira_gold/: project-specific models forrespira_golddbt/seeds/: metadata for organizations, projects, variables, stations, and project scopingpipelines/flows/: Prefect flows such ascanonical_incrementalandproject_pipelinepipelines/tasks/: dbt execution, warehouse bootstrap, inference, notifications, and audit helperspipelines/config/: runtime settings, dbt selectors, and registered projectspipelines/sql/: SQL used by operational bootstrap tasksscripts/: operational helper scripts, especially worker startup and deployment registrationtests/: orchestration and inference-adjacent testssrc/inference/: inference runtime code used by project inference flows
The warehouse is organized into logical schemas:
airbyte: raw replicated source tables, managed outside this repostaging: source-specific dbt staging viewsintermediate: dbt normalization and shaping viewscore: canonical dimensions and metadata modelssilver: canonical reusable fact layerrespira_gold: project-specific marts, features, and inference tablesops: operational audit and inference status tables
Current architectural rules:
- Canonical models should not depend on project-specific marts.
- Project scope is metadata-driven through seeds such as
project_data_sources.csvandproject_organizations.csv. - Project-specific inference tables live in the project schema, while audit
tables live in
ops.
The values in .env are loaded into both the app and prefect_worker
containers. For dbt-based operations, the REMOTE_PG_* values are still
required because dbt/profiles.yml reads them directly.
Required database settings:
REMOTE_PG_HOSTREMOTE_PG_PORTREMOTE_PG_DBREMOTE_PG_USERREMOTE_PG_PASSWORDREMOTE_PG_SSLMODE
Optional database setting for Python tasks:
DB_DSN: optional SQLAlchemy DSN for non-dbt Python tasks. Useful, but it does not replace theREMOTE_PG_*values required by dbt.
Prefect and worker settings:
PREFECT_API_URL: defaults tohttp://prefect_server:4200/apiPREFECT_WORKER_TYPE: defaults toprocessPREFECT_CANONICAL_WORK_POOL: defaults tocanonicalPREFECT_PROJECT_RESPIRA_GOLD_WORK_POOL: defaults torespira_goldPREFECT_SCHEDULE_TIMEZONE: defaults toUTC
Schedule settings:
PREFECT_CANONICAL_INCREMENTAL_CRON: defaults to5 * * * *PREFECT_PROJECT_RESPIRA_GOLD_CRON: defaults to20 * * * *
dbt runtime settings:
DBT_TARGET: defaults toprodDBT_THREADS: defaults to1DBT_TIMEOUT_CANONICAL_CORE_SDBT_TIMEOUT_CANONICAL_SILVER_SDBT_TIMEOUT_PROJECT_SDBT_TIMEOUT_TESTS_S
Inference settings:
MODEL_6H_PATHMODEL_12H_PATHMODEL_6H_VERSIONMODEL_12H_VERSIONDEFAULT_WINDOW_HOURSINFERENCE_MIN_POINTS
Alerting:
SLACK_WEBHOOK_URL: optional; used for flow failure alerts and dbt test alerts
- Copy the example environment file:
cp .env.example .env-
Fill in the remote warehouse credentials and, if scheduled inference is needed, set
MODEL_6H_PATHandMODEL_12H_PATH. -
Build and start the stack:
make up-build-
Open Prefect UI at
http://localhost:4200. -
For a fresh or reset database, run the initial bootstrap and first load:
make prefect-bootstrap
make dbt-deps
make seed
make run-canonical-incremental
make run-project-pipelineWhat happens automatically when prefect_worker starts:
- waits for the Prefect API health check
- creates or updates the
canonicalandrespira_goldwork pools - deploys
canonical_incremental - deploys
canonical_full_refresh - deploys
project_pipeline(project_code=respira_gold) - starts one worker process per configured work pool
If both MODEL_6H_PATH and MODEL_12H_PATH are present, the project pipeline
is deployed with its schedule. If either model path is missing, the deployment
is still created but without a schedule.
Common commands:
make up
make up-build
make down
make ps
make logs
make logs-worker
make shell
make dbt-debug
make prefect-bootstrap
make run-canonical-incremental
make run-canonical-full-refresh
make run-project-pipeline
make run-project-inference
make smoke-testWhat each operational command does:
make prefect-bootstrap: createsopsaudit tables and project inference tables, but does not run dbtmake run-canonical-incremental: runsdbt deps, canonical core, and canonical silvermake run-canonical-full-refresh: manual maintenance flow for a full canonical rebuild plus testsmake run-project-pipeline: runs dbt forrespira_gold, project tests, and inference if enabledmake run-project-inference: runs inference onlymake smoke-test: lightweight orchestration test suite on the host machine
dbt-only layered commands:
make run-canonical-core
make run-canonical-silver
make run-project-respira_gold
make build
make build-frUse build-fr after major schema or logic changes that require a full dbt
rebuild.
The local scheduling model is controlled by
scripts/prefect_worker_start.sh.
Current behavior:
canonical_incrementalis deployed on a cron schedulecanonical_full_refreshis deployed without a schedule and is intended to be manualproject_pipeline(project_code=respira_gold)is deployed on a cron schedule only when both model paths are configured- the worker re-registers these deployments every time it restarts
Operational implications:
- If you change cron settings, restart
prefect_workerso deployments are re-created with the new schedule. social_broadcastdefaults to0 11,20 * * *inUTCand can be overridden withPREFECT_SOCIAL_BROADCAST_CRON.- If you add model paths after startup, restart
prefect_workerto attach the project schedule. - Editing deployment YAML files in
pipelines/deployments/is not enough for local behavior unless the worker bootstrap logic is updated or deployments are re-applied explicitly.
This repository uses warehouse tables for runtime auditability.
Created by make prefect-bootstrap:
ops.dbt_run_auditops.inference_station_statusrespira_gold.inference_runsrespira_gold.inference_results
Useful operational checks:
- use Prefect UI for run history and task logs
- use
make logs-workerfor deployment and worker startup issues - inspect
ops.dbt_run_auditfor dbt command status and summaries - inspect
ops.inference_station_statusfor per-station inference failures - inspect
respira_gold.inference_runsandrespira_gold.inference_resultsfor project inference lifecycle and outputs
Fresh database or rebuilt warehouse:
make prefect-bootstrap
make dbt-deps
make seed
make run-canonical-incremental
make run-project-pipelineTroubleshooting dbt connectivity:
make dbt-debug
make logs
make logs-workerIf a project deployment is missing from Prefect UI:
- Check
make logs-worker - Confirm
PREFECT_API_URLand work pool settings - Confirm
MODEL_6H_PATHandMODEL_12H_PATHif the schedule should exist - Restart the worker with
make downandmake up-build
If inference should run but does not:
- Confirm the project has
inference_enabled=Trueinpipelines/config/projects.py - Confirm model paths exist inside the container filesystem
- Run
make run-project-inference - Inspect
ops.inference_station_statusandrespira_gold.inference_runs
If you change project registration:
- Update
pipelines/config/projects.py - Add or update the dbt models under
dbt/models/projects/<project_code> - Update seeds for project metadata and project-data-source membership
- Update worker bootstrap deployment logic if the new project needs scheduling
- Restart the worker to register the new deployment
To add a new project:
- create
dbt/models/projects/<project_code>/ - register the project in
dbt/seeds/projects.csv - add
project_data_sources.csvandproject_organizations.csventries - add a
ProjectConfigentry inpipelines/config/projects.py - decide whether the project has inference and, if yes, define its source and result tables
- update deployment/bootstrap logic if the project should run on a schedule
To add a new Airbyte data source, use the checklist below.
Use this checklist whenever we connect a new Airbyte stream and want it to flow through the canonical layer and into one or more projects.
-
Define the canonical source name.
Use a stable snake_case name such as
my_provider_airbyte. This is the identifier that will appear in dbt models, seeds, project scoping, and audits. -
Register the raw Airbyte table in dbt sources.
Add the replicated raw table name under
dbt/models/canonical/sources/sources_airbyte.yml.If Airbyte creates multiple raw tables for the same provider, list all of them there.
-
Create a staging model in
dbt/models/canonical/staging.Add a model such as
stg_my_provider_measurements.sqlthat reads from the raw Airbyte source and normalizes it to the canonical staging contract.Every staging model should emit at least:
_airbyte_raw_idextracted_atdata_source_namestation_codemeasured_at_rawmeasured_at_parsedis_measured_at_validraw_payload
Add
cursor_idwhen the source has a reliable sequential identifier, and keep any extra columns needed later for station enrichment. -
Add tests and documentation for the new staging model.
Register the model in
dbt/models/canonical/staging/schema.ymlwith:not_nulltests on the canonical required fields- an
accepted_valuestest fordata_source_name - uniqueness tests if the source has a natural cursor or key
-
Register the source in
dbt/dbt_project.yml.Add a new entry under
vars.measurements_sourceswith:relationstation_code_colmeasured_at_colraw_payload_colis_measured_at_valid_colcursor_id_colwhen available- the
variablesmapping from canonical variable code to staging column
int_measurements_longuses this registry to union all measurement sources, so forgetting this step means the new source will never reach silver. -
Add metadata seeds for the new source.
Update:
dbt/seeds/data_sources.csvto register the source and itsorganization_codedbt/seeds/project_data_sources.csvfor every project that should consume itdbt/seeds/project_organizations.csvif the organization is now part of a projectdbt/seeds/organizations.csvif this is a brand-new organization
-
Add or update variable metadata if the source introduces new measurements.
If the source contains variables we do not model yet, update:
dbt/seeds/variables.csvdbt/seeds/variable_rules.csvwhen parsing or validation rules are needed
-
Update station enrichment if the source contributes station metadata.
If the new Airbyte payload provides coordinates, names, or station descriptors that should feed the canonical station dimension, update
dbt/models/canonical/intermediate/int_stations_candidates.sql.If the source depends on hand-maintained station metadata, update
dbt/seeds/stations_static.csvinstead. -
Add timestamp repair logic if the source needs custom handling.
dbt/models/canonical/intermediate/int_measurements_time_silver.sqlcurrently contains source-specific logic forfiuna_airbyte. If the new source has broken timestamps, delayed cursors, or custom imputation rules, add that logic there explicitly. -
Validate the full path from canonical to project.
make dbt-deps
make seed
make run-canonical-incremental
make run-project-pipeline
make smoke-testAfter that, confirm that:
- the new source appears in canonical silver outputs
respira_goldonly receives it if it was added todbt/seeds/project_data_sources.csv- station and variable dimensions look correct
- there are no leftover hardcoded references to the old source set