diff --git a/apps/dagster/DISTRIBUTION_SOLUTION.md b/apps/dagster/DISTRIBUTION_SOLUTION.md
new file mode 100644
index 0000000000..b936a43a0a
--- /dev/null
+++ b/apps/dagster/DISTRIBUTION_SOLUTION.md
@@ -0,0 +1,427 @@
+# Distribution Assets - Implementation Solution
+
+## Overview
+
+This document outlines how to dynamically generate distribution assets in Dagster based on product metadata.
+
+## Product Metadata Structure
+
+### Key Classes
+
+From `dcpy.product_metadata.models.metadata`:
+
+- **`OrgMetadata`**: Root organization metadata
+ - Located at repo root (configured via `config.CONF["product_metadata"]["repo_path"]`)
+ - Lists all products in `metadata.products`
+
+- **`ProductMetadata`**: Per-product metadata
+ - Retrieved via `org.product(product_name)`
+ - Has method `all_destinations()` that returns list of destinations
+
+- **`Destination`**: Individual publishing destination
+ - Has `id`, `type`, `tags`, `custom` dict
+
+### Loading Product Metadata
+
+```python
+from dcpy.lifecycle import product_metadata
+
+# Load organization metadata (with template vars if needed)
+org = product_metadata.load(version='26v1', **other_template_vars)
+
+# Get a specific product
+product = org.product('pluto')
+
+# Get all destinations for that product
+destinations = product.all_destinations()
+```
+
+### Destination Structure
+
+Each destination dict contains:
+
+```python
+{
+ "product": "building_elevation_and_subgrade",
+ "dataset_id": "building_elevation_and_subgrade",
+ "destination_id": "socrata",
+ "destination_type": "open_data", # Connector type
+ "tags": set(),
+ "custom": {"four_four": "bsin-59hv"}, # Destination-specific config
+ "destination_path": "building_elevation_and_subgrade.building_elevation_and_subgrade.socrata"
+}
+```
+
+**Key Fields:**
+- `product`: Product name (e.g., "pluto", "edde")
+- `dataset_id`: Dataset within product
+- `destination_id`: Unique destination identifier
+- `destination_type`: Connector type (e.g., "open_data", "bytes", "socrata")
+- `custom`: Dict with destination-specific configuration (e.g., Socrata four-four)
+- `destination_path`: Unique path in format `{product}.{dataset}.{destination_id}`
+
+---
+
+## Dagster Implementation Strategy
+
+### 1. Asset Generation Approach
+
+We'll dynamically generate one Dagster asset **per destination** for each product.
+
+**Asset naming pattern:**
+```
+{product}_dist_{dataset_id}_{destination_id}
+```
+
+Examples:
+- `pluto_dist_pluto_socrata`
+- `pluto_dist_pluto_water_included_socrata`
+- `building_elevation_dist_building_elevation_socrata`
+
+### 2. Implementation in `builds/assets.py`
+
+```python
+from dcpy.lifecycle import product_metadata, lifecycle
+
+def make_distribution_assets_for_product(product: lifecycle.asset_models.Product):
+ """Generate distribution assets for a single product.
+
+ Returns a list of Dagster assets - one per destination.
+ """
+ # Load product metadata
+ # NOTE: Template vars might need to come from partition context
+ org = product_metadata.load()
+ product_md = org.product(product.name)
+
+ # Get all destinations for this product
+ destinations = product_md.all_destinations()
+
+ distribution_assets = []
+
+ for dest in destinations:
+ asset_func = make_single_distribution_asset(product, dest)
+ distribution_assets.append(asset_func)
+
+ return distribution_assets
+
+
+def make_single_distribution_asset(product, destination: dict):
+ """Create a single distribution asset for a specific destination."""
+
+ # Generate unique asset name
+ asset_name = f"{product.name}_dist_{destination['dataset_id']}_{destination['destination_id']}"
+
+ # Determine upstream dependency
+ # This should be external_review for the product
+ upstream_asset = f"{product.name}_external_review"
+
+ @asset(
+ name=asset_name,
+ group_name=product.name, # Keep in product group for lineage
+ partitions_def=build_partition_def,
+ deps=[upstream_asset],
+ tags={
+ "product": product.name,
+ "lifecycle_stage": "dist.publish",
+ "domain": "distribution",
+ "destination_type": destination["destination_type"],
+ "destination_id": destination["destination_id"],
+ },
+ )
+ def _distribution_asset(context: AssetExecutionContext):
+ """Publish to {destination_type}: {destination_id}"""
+
+ partition_key = context.partition_key
+
+ context.log.info(
+ f"Publishing {product.name} to {destination['destination_type']}: {destination['destination_id']}"
+ )
+
+ # Get packaged artifacts from previous step
+ # These should be in blob storage from package_artifacts step
+ artifacts = get_packaged_artifacts(
+ product=product.name,
+ version=partition_key
+ )
+
+ # Get the appropriate connector based on destination_type
+ from dcpy.lifecycle.connector_registry import connectors
+ connector = connectors[destination["destination_type"]]
+
+ # Publish using connector
+ # The connector interface varies by type, so we might need type-specific handling
+ result = publish_to_destination(
+ connector=connector,
+ destination=destination,
+ artifacts=artifacts,
+ partition_key=partition_key
+ )
+
+ return MaterializeResult(
+ metadata={
+ "destination_type": destination["destination_type"],
+ "destination_id": destination["destination_id"],
+ "destination_path": destination["destination_path"],
+ "published_files": [f.name for f in artifacts],
+ "custom_config": destination.get("custom", {}),
+ "publish_result": str(result),
+ }
+ )
+
+ return _distribution_asset
+
+
+def publish_to_destination(connector, destination: dict, artifacts, partition_key: str):
+ """Handle publishing to a destination.
+
+ Different destination types may have different connector interfaces.
+ """
+ destination_type = destination["destination_type"]
+ destination_id = destination["destination_id"]
+ custom_config = destination.get("custom", {})
+
+ # Type-specific publishing logic
+ if destination_type == "open_data":
+ # Socrata/Open Data publishing
+ # custom config might have "four_four" identifier
+ four_four = custom_config.get("four_four")
+ return connector.push(
+ dataset_id=four_four,
+ files=artifacts,
+ version=partition_key
+ )
+
+ elif destination_type == "bytes":
+ # EDM Bytes publishing
+ return connector.push(
+ key=destination_id,
+ files=artifacts,
+ version=partition_key
+ )
+
+ else:
+ # Generic fallback
+ return connector.push(
+ key=destination_id,
+ files=artifacts,
+ version=partition_key
+ )
+
+
+# Generate distribution assets for all products
+products = lifecycle.list_products()
+distribution_assets = []
+for product in products:
+ distribution_assets.extend(make_distribution_assets_for_product(product))
+
+# Add to build_assets export
+build_assets = plan_recipe_assets + load_data_assets + distribution_assets
+```
+
+### 3. Key Challenges & Solutions
+
+#### Challenge 1: Template Variables
+
+**Problem:** Product metadata uses Jinja2 templates that need variables (e.g., `version`, `BUILD_ENGINE_VERSION`)
+
+**Solution:** Pass partition key and other context as template vars when loading metadata:
+
+```python
+def make_distribution_assets_for_product(product):
+ # In asset definition, not at module load time
+ @asset(...)
+ def _distribution_asset(context):
+ partition_key = context.partition_key
+
+ # Load metadata with template vars from context
+ org = product_metadata.load(
+ version=partition_key,
+ BUILD_ENGINE_VERSION=partition_key,
+ **other_vars_from_recipe
+ )
+```
+
+**Alternative:** Load metadata at asset materialization time (not module import time) to have access to partition context.
+
+#### Challenge 2: Product-Dataset-Destination Hierarchy
+
+**Problem:** Destination paths are `{product}.{dataset}.{destination_id}` - one product can have multiple datasets, each with multiple destinations.
+
+**Solution:**
+- Asset name includes dataset: `{product}_dist_{dataset}_{destination_id}`
+- For products with one dataset (common case), this is simple
+- For products with multiple datasets, we get multiple sets of distribution assets
+
+**Example:**
+```
+pluto_dist_pluto_socrata
+pluto_dist_pluto_bytes
+pluto_dist_pluto_clipped_socrata
+pluto_dist_pluto_clipped_bytes
+```
+
+#### Challenge 3: Connector Interface Differences
+
+**Problem:** Different destination types use different connectors with different interfaces.
+
+**Solution:** Create a dispatch function that handles type-specific publishing:
+
+```python
+def publish_to_destination(connector, destination, artifacts, partition_key):
+ """Central dispatch for different destination types."""
+ # See implementation above
+```
+
+**Better Solution:** Standardize connector interface (separate task, not blocking).
+
+---
+
+## Testing Strategy
+
+### 1. Verify Asset Generation
+
+```python
+# In a test or notebook
+from dagster import build_asset_context
+from apps.dagster.builds.assets import distribution_assets
+
+# Check that assets are generated
+print(f"Generated {len(distribution_assets)} distribution assets")
+
+# Check asset names
+for asset in distribution_assets[:5]:
+ print(asset.name)
+```
+
+### 2. Mock Materialization
+
+```python
+# Test a single distribution asset without actually publishing
+context = build_asset_context()
+result = pluto_dist_pluto_socrata(context)
+print(result.metadata)
+```
+
+### 3. Integration Test
+
+```bash
+# In Dagster UI
+dagster dev
+
+# Navigate to pluto group
+# Verify distribution assets appear
+# Check lineage shows: external_review → [dist_1, dist_2, dist_3]
+```
+
+---
+
+## Migration Path
+
+### Phase 1: Generate Assets (No Publishing)
+
+1. Implement `make_distribution_assets_for_product()`
+2. Generate assets that log what they would do
+3. Verify in Dagster UI that assets appear correctly
+4. Check lineage graph shows fan-out pattern
+
+### Phase 2: Implement Publishing
+
+1. Implement `publish_to_destination()` for one destination type (e.g., bytes)
+2. Test with a single product
+3. Add error handling and logging
+4. Expand to other destination types
+
+### Phase 3: Handle Edge Cases
+
+1. Products with no destinations (skip distribution asset generation)
+2. Products with multiple datasets
+3. Template variable resolution from recipe/partition context
+4. Connector interface standardization
+
+---
+
+## Open Questions
+
+### 1. Template Variables for Product Metadata
+
+**Question:** Where do template vars come from for `product_metadata.load()`?
+
+**Options:**
+- A: From partition key (e.g., `version='26v1'`)
+- B: From recipe.yml template vars (same as plan step)
+- C: Both
+
+**Recommendation:** Start with partition key only, expand if needed.
+
+### 2. Dependency on Package vs. External Review
+
+**Question:** Should distribution depend on `package_artifacts` or `external_review`?
+
+**Current Spec:** `external_review` (allows skipping package step)
+
+**Consideration:** If package is optional, distribution must handle both:
+- With package: Get artifacts from `package_artifacts` upload location
+- Without package: Get artifacts from `build_artifacts` upload location
+
+**Solution:** Check if `package_artifacts` exists for product, use that if present, otherwise use `build_artifacts`.
+
+### 3. Products Without Destinations
+
+**Question:** What if a product has no destinations in metadata?
+
+**Answer:** Skip distribution asset generation entirely. Not all products are published externally.
+
+**Implementation:**
+```python
+destinations = product_md.all_destinations()
+if not destinations:
+ return [] # No distribution assets for this product
+```
+
+### 4. Connector Registration
+
+**Question:** Are all destination types already registered in `connectors` registry?
+
+**Answer:** Need to verify. Check `dcpy.lifecycle.connector_registry`.
+
+**Fallback:** If connector not found, log warning and skip (or fail asset materialization).
+
+---
+
+## File Organization
+
+```
+apps/dagster/builds/
+├── assets.py # Build assets + distribution asset generators
+├── distribution.py # NEW: Distribution-specific helpers
+│ ├── make_distribution_assets_for_product()
+│ ├── make_single_distribution_asset()
+│ └── publish_to_destination()
+├── partitions.py
+├── resources.py
+└── definitions.py
+```
+
+**Recommendation:** Keep distribution logic in `assets.py` initially, extract to `distribution.py` if it grows large.
+
+---
+
+## Next Steps
+
+1. ✅ **Read and understand product metadata structure** (DONE)
+2. **Implement asset generation** in `builds/assets.py`:
+ - `make_distribution_assets_for_product()`
+ - `make_single_distribution_asset()`
+3. **Test asset generation** (no publishing yet):
+ - Run `dagster dev`
+ - Verify assets appear in UI
+ - Check lineage graph
+4. **Implement publishing logic**:
+ - Start with `bytes` destination type
+ - Add error handling
+ - Add logging
+5. **Handle edge cases**:
+ - Products without destinations
+ - Multiple datasets per product
+ - Template variable resolution
+6. **Update SPEC.md** with implementation details
diff --git a/apps/dagster/PIPELINE_DIAGRAM.md b/apps/dagster/PIPELINE_DIAGRAM.md
new file mode 100644
index 0000000000..aa8125bef5
--- /dev/null
+++ b/apps/dagster/PIPELINE_DIAGRAM.md
@@ -0,0 +1,233 @@
+# Dagster Build Pipeline - Visual Overview
+
+This diagram shows the complete asset lineage for a typical product in our Dagster implementation.
+
+## Full Product Pipeline
+
+```mermaid
+graph TD
+ %% Styling
+ classDef planStyle fill:#e1f5ff,stroke:#0288d1,stroke-width:2px
+ classDef buildStyle fill:#fff3e0,stroke:#f57c00,stroke-width:2px
+ classDef qaStyle fill:#f3e5f5,stroke:#7b1fa2,stroke-width:2px
+ classDef packageStyle fill:#e8f5e9,stroke:#388e3c,stroke-width:2px
+ classDef reviewStyle fill:#ffebee,stroke:#c62828,stroke-width:3px
+ classDef distStyle fill:#e0f2f1,stroke:#00796b,stroke-width:2px
+
+ %% Plan Stage
+ plan["product_plan
Resolve recipe & versions"]:::planStyle
+
+ %% Load Stage
+ source["product_source_data
Load datasets into postgres/files"]:::planStyle
+
+ %% Build Stage
+ build["product_build
Execute transformations
📊 Build report → PRIVATE bucket"]:::buildStyle
+ build_artifacts["product_build_artifacts
Export datasets to files
📁 Shapefiles, Parquet, CSV → PUBLIC bucket"]:::buildStyle
+
+ %% QA Stage (Optional)
+ qa["product_qa
Run validations
📊 QA report → PRIVATE bucket
(optional)"]:::qaStyle
+ qa_artifacts["product_qa_artifacts
Export QA outputs
📁 CSV/TXT → qaqc/ subfolder
(optional)"]:::qaStyle
+
+ %% Package Stage (Optional)
+ package["product_package_artifacts
Add READMEs, metadata
📦 Enhanced files → PUBLIC bucket
(optional)"]:::packageStyle
+
+ %% External Review (Manual Gate)
+ review["product_external_review
Manual approval gate
⏸️ Requires manual materialization"]:::reviewStyle
+
+ %% Distribution (Fan-out)
+ dist1["product_dist_socrata_main
Publish to Socrata"]:::distStyle
+ dist2["product_dist_socrata_alt
Publish to Socrata (alt)"]:::distStyle
+ dist3["product_dist_bytes_public
Publish to S3/Bytes"]:::distStyle
+
+ %% Flow
+ plan --> source
+ source --> build
+ build --> build_artifacts
+ build_artifacts --> qa
+ qa --> qa_artifacts
+ qa_artifacts --> package
+ package --> review
+
+ %% Distribution fan-out (parallel)
+ review --> dist1
+ review --> dist2
+ review --> dist3
+
+ %% Notes
+ note1["🔒 PRIVATE bucket:
Build reports, QA reports
(performance stats, internal metadata)"]
+ note2["🌐 PUBLIC bucket:
Dataset files, QA artifacts,
packaged files
(external distribution)"]
+
+ style note1 fill:#ffcdd2,stroke:#c62828,stroke-dasharray: 5 5
+ style note2 fill:#c8e6c9,stroke:#388e3c,stroke-dasharray: 5 5
+```
+
+## Simplified View (Minimal Pipeline)
+
+For products without QA or packaging steps:
+
+```mermaid
+graph TD
+ classDef planStyle fill:#e1f5ff,stroke:#0288d1,stroke-width:2px
+ classDef buildStyle fill:#fff3e0,stroke:#f57c00,stroke-width:2px
+ classDef reviewStyle fill:#ffebee,stroke:#c62828,stroke-width:3px
+ classDef distStyle fill:#e0f2f1,stroke:#00796b,stroke-width:2px
+
+ plan["product_plan"]:::planStyle
+ source["product_source_data"]:::planStyle
+ build["product_build"]:::buildStyle
+ artifacts["product_build_artifacts"]:::buildStyle
+ review["product_external_review
(manual gate)"]:::reviewStyle
+ dist1["product_dist_socrata"]:::distStyle
+ dist2["product_dist_bytes"]:::distStyle
+
+ plan --> source --> build --> artifacts --> review
+ review --> dist1
+ review --> dist2
+```
+
+## Key Characteristics
+
+### Linear Sequential Flow
+- Plan → Source Data → Build → Build Artifacts → QA → QA Artifacts → Package → External Review
+- Each step depends on the previous step completing
+
+### Fan-Out at Distribution
+- **All distribution assets run in parallel** after external review approval
+- Independent destinations don't block each other
+- Example: Socrata publish failure doesn't prevent S3 upload
+
+### Materialized Assets vs. Side Effects
+
+| Asset | Materialized Artifact | Side Effects (Ephemeral) |
+|-------|----------------------|--------------------------|
+| `product_plan` | recipe.lock.yml in blob storage | - |
+| `product_source_data` | build_metadata.json | Postgres tables in build schema |
+| `product_build` | Build report (PRIVATE bucket) | Postgres tables OR local files |
+| `product_build_artifacts` | Dataset files (PUBLIC bucket) | - |
+| `product_qa` | QA report (PRIVATE bucket) | QA validation tables |
+| `product_qa_artifacts` | QA output files (PUBLIC bucket) | - |
+| `product_package_artifacts` | Enhanced files (PUBLIC bucket) | - |
+| `product_external_review` | Approval metadata | - |
+| `product_dist_*` | Published artifacts | - |
+
+### Asset Grouping in Dagster UI
+
+All assets for a product are in one group:
+- `group_name="pluto"` for all PLUTO assets (plan through distribution)
+- `group_name="edde"` for all EDDE assets
+- `group_name="ingest"` for ingest domain (separate)
+
+**UI Benefit:** Click "pluto" in sidebar → see entire pipeline with lineage graph
+
+### Optional Steps
+
+Products can skip optional steps via recipe configuration:
+
+```yaml
+stage_config:
+ qa:
+ enabled: false # Skip QA entirely
+
+ package:
+ enabled: false # Skip package step
+```
+
+If disabled, dependency chain adjusts:
+- No QA: `build_artifacts` → `external_review`
+- No Package: `qa_artifacts` → `external_review`
+- No QA or Package: `build_artifacts` → `external_review`
+
+## Build Types
+
+Different products can use different build approaches:
+
+### SQL/DBT Build (Postgres-based)
+```mermaid
+graph LR
+ A[Source Data
in Postgres] --> B[Build runs
SQL/dbt]
+ B --> C[Output tables
in Postgres]
+ C --> D[Export tables
to files]
+ D --> E[Upload to
blob storage]
+
+ style B fill:#fff3e0
+ style C fill:#ffe0b2
+ style D fill:#ffcc80
+```
+
+### Python Build (File-based)
+```mermaid
+graph LR
+ A[Source Data
in files] --> B[Build runs
Python code]
+ B --> C[Output files
generated]
+ C --> D[Upload to
blob storage]
+
+ style B fill:#fff3e0
+ style C fill:#ffcc80
+```
+
+## Storage Strategy
+
+### Private Bucket (Internal Only)
+- Build reports (performance stats, logs)
+- QA reports (validation details)
+- Any operational/telemetry data
+
+### Public Bucket (External Distribution)
+- Exported dataset files (shapefiles, parquet, CSV)
+- QA artifacts (validation outputs for review)
+- Packaged files (with READMEs, metadata)
+
+Path structure: `{product}/builds/{version}/[qaqc/]`
+- Example: `pluto/builds/26v1/pluto_26v1.shp`
+- Example: `pluto/builds/26v1/qaqc/validation_results.csv`
+
+## Example: PLUTO Product
+
+Concrete example with PLUTO:
+
+```mermaid
+graph TD
+ classDef planStyle fill:#e1f5ff,stroke:#0288d1,stroke-width:2px
+ classDef buildStyle fill:#fff3e0,stroke:#f57c00,stroke-width:2px
+ classDef qaStyle fill:#f3e5f5,stroke:#7b1fa2,stroke-width:2px
+ classDef reviewStyle fill:#ffebee,stroke:#c62828,stroke-width:3px
+ classDef distStyle fill:#e0f2f1,stroke:#00796b,stroke-width:2px
+
+ plan["pluto_plan
26v1"]:::planStyle
+ source["pluto_source_data
Load MapPLUTO, DTM, etc."]:::planStyle
+ build["pluto_build
Run SQL transformations"]:::buildStyle
+ artifacts["pluto_build_artifacts
Export to Shapefile, Parquet"]:::buildStyle
+ qa["pluto_qa
Geometry validation"]:::qaStyle
+ qa_artifacts["pluto_qa_artifacts
Export validation CSVs"]:::qaStyle
+ review["pluto_external_review
DCP approval"]:::reviewStyle
+
+ dist1["pluto_dist_socrata_main
NYC Open Data - Main"]:::distStyle
+ dist2["pluto_dist_socrata_water
NYC Open Data - With Water"]:::distStyle
+ dist3["pluto_dist_bytes_public
EDM Bytes Public"]:::distStyle
+
+ plan --> source --> build --> artifacts --> qa --> qa_artifacts --> review
+ review --> dist1
+ review --> dist2
+ review --> dist3
+```
+
+**Partition:** `26v1`
+**Group:** `pluto` (all assets)
+**Tags:** `product=pluto`, `lifecycle_stage=builds.*` or `dist.publish`
+
+## Questions?
+
+- **Q: What if build fails?**
+ A: Downstream assets blocked. Postgres schema remains for debugging. No automatic cleanup.
+
+- **Q: Can I retry just the export step?**
+ A: Yes! `build_artifacts` can be re-materialized independently if postgres tables still exist.
+
+- **Q: What if one distribution destination fails?**
+ A: Other destinations proceed independently (fan-out pattern). Fix and retry failed destination.
+
+- **Q: How do I run the entire pipeline?**
+ A: Use per-product job (e.g., `build_pluto_job`) or materialize `product_external_review` and select "Materialize upstream".
+
+- **Q: Can I skip QA for a quick test build?**
+ A: Set `stage_config.qa.enabled: false` in recipe or manually materialize `external_review` after `build_artifacts`.
diff --git a/apps/dagster/SPEC.md b/apps/dagster/SPEC.md
new file mode 100644
index 0000000000..427533605a
--- /dev/null
+++ b/apps/dagster/SPEC.md
@@ -0,0 +1,984 @@
+# Dagster Application Specification
+
+**Version:** 0.4
+**Last Updated:** 2026-05-14
+**Status:** Draft - Evolving Design Document
+
+## Overview
+
+This Dagster application manages three primary domains for the data engineering pipeline:
+
+1. **Ingest**: Regularly pulling data from external sources, normalizing it, and storing in blob storage
+2. **Builds**: Transforming ingested data through planning, loading, processing (dbt/Python), QA, and packaging
+3. **Distribution**: Publishing build artifacts to external destinations (Socrata, S3, etc.)
+
+## Design Principles
+
+- **Asset-based thinking**: Think in nouns (states) rather than verbs (actions)
+- **Idempotency**: Steps should be safely re-runnable
+- **Product-centric**: All assets tagged and organized by product (except for ingest)
+- **Partitioned workflows**: Each domain uses partitions to track versions/time periods
+- **Separation of concerns**: Build process (ephemeral) vs. Build artifacts (durable)
+
+---
+
+## 1. Ingest Domain
+
+**Status:** ✅ Implemented
+
+Handles regular data pulls from external sources and normalization for downstream builds.
+
+*(Details TBD - this is already implemented)*
+
+---
+
+## 2. Builds Domain
+
+**Status:** 🚧 In Progress
+
+### Overview
+
+The builds domain transforms ingested data into publishable products through a series of steps.
+
+### Partitioning Strategy
+
+- **Partition Definition**: `build_partition_def` (shared across all build assets)
+- **Partition Key Format**: Product-specific (e.g., "2024Q1", "26v1")
+- **Scope**: All build steps for a given product share the same partition key
+
+### Asset Naming Convention
+
+**Simplified:** Remove redundant `builds_` prefix since Dagster groups by `group_name`
+
+```
+{product}_{step}
+```
+
+Examples:
+- `edde_plan`
+- `edde_source_data`
+- `edde_build`
+- `edde_build_artifacts`
+- `pluto_qa`
+- `pluto_external_review`
+
+### Asset Organization
+
+**Group by Product:**
+- Each product gets its own group: `group_name="{product}"`
+- Clicking product group in UI shows full end-to-end lineage (plan → distribution)
+- Ingest domain uses separate `group_name="ingest"` (not product-centric)
+
+**Asset Configuration:**
+```python
+@asset(
+ name="pluto_plan",
+ group_name="pluto", # Product-based grouping
+ partitions_def=build_partition_def,
+ tags={
+ "product": "pluto",
+ "lifecycle_stage": "builds.plan",
+ "domain": "builds"
+ }
+)
+```
+
+**Jobs:**
+- Define per-product jobs for easy pipeline execution
+- Example: `build_pluto_job`, `build_edde_job`
+- Allows "run entire product pipeline" with one click
+
+```python
+pluto_job = define_asset_job(
+ name="build_pluto",
+ selection=AssetSelection.groups("pluto"),
+ partitions_def=build_partition_def
+)
+```
+
+### Build Types
+
+Products can have different build types, configured in recipe.yml:
+
+```yaml
+stage_config:
+ build:
+ type: dbt | sql | python | bash
+ # Optional: specify tables to export (for sql/dbt builds)
+ export_tables:
+ - name: pluto_output
+ format: shapefile
+ - name: pluto_csv
+ format: parquet
+```
+
+**Build Type Execution:**
+- **dbt**: Use Dagster's dbt integration - auto-generates assets per dbt model
+- **python**: Dynamically import and execute Python module/function
+- **sql**: Execute SQL scripts
+- **bash**: Execute shell script (fallback)
+
+**Python Module Configuration:**
+```yaml
+stage_config:
+ build:
+ type: python
+ module: path.to.module # e.g., "external_review.collate"
+ function: run_build
+ args: # Optional arguments to pass to function
+ arg1: value1
+ arg2: value2
+```
+
+**Convention over Configuration:**
+- Default: Each product has `build.sh` script in product directory
+- Dagster calls: `./build.sh {stage}` where stage is: `build`, `qa`, `package`
+- If stage not supported by build.sh, it should exit 0 (no-op)
+- Override via recipe.yml `stage_config` if needed
+
+### Build Steps
+
+All steps are **linear and sequential**. Each step is a separate Dagster asset.
+
+#### Step 1: Plan ✅ IMPLEMENTED
+
+**Asset Name:** `{product}_plan`
+
+**Materialized Artifact:** `recipe.lock.yml` uploaded to blob storage (plan connector)
+
+**Purpose:** Resolve recipe templates and dataset versions
+
+**Implementation:**
+- Takes template variables as config (dynamically generated from recipe.yml)
+- Resolves all `{{ vars }}` and dataset versions to specific values
+- Validates that referenced dataset versions exist in ingest storage
+- Creates `recipe.lock.yml` with fully resolved configuration
+- Uploads plan to blob storage via plan connector
+
+**Metadata:**
+- Template variables used
+- Plan upload location
+- Partition key
+
+**Dependencies:** None (first step)
+
+---
+
+#### Step 2: Source Data 🚧 IN PROGRESS
+
+**Asset Name:** `{product}_source_data`
+
+**Materialized Artifact:** `build_metadata.json` documenting loaded datasets
+
+**Purpose:** Ensure source data from ingest is loaded into build environment (postgres/filesystem)
+
+**Implementation:**
+- Pull `recipe.lock.yml` from plan storage
+- Call `dcpy.lifecycle.builds.load.load_source_data_from_resolved_recipe()`
+- Load datasets into postgres schema and/or local filesystem per recipe configuration
+- **Idempotent**: Dagster handles re-materialization
+
+**Behavior:**
+- **Fails immediately** if dataset version is missing
+- **For dbt/sql builds only**: Ensure postgres schema exists (don't drop if already exists)
+ - Check if schema exists before calling `setup_build_pg_schema()`
+ - Skip schema setup if already exists (preserves existing data)
+- Imports datasets according to `destination` field (postgres/file/df)
+
+**Side Effects (not materialized assets):**
+- **For dbt/sql builds**: Datasets in postgres schema `{build_name}` (temporary/scratch space)
+- **For python/bash builds**: Datasets in local data library (if file-based)
+
+**Metadata:**
+- Build type (determines postgres usage)
+- Build schema name (postgres, if applicable)
+- Number of datasets loaded
+- Dataset IDs and versions
+
+**Dependencies:** `{product}_plan`
+
+---
+
+#### Step 3: Build ⏳ NOT IMPLEMENTED
+
+**Asset Name:** `{product}_build`
+
+**Materialized Artifact:** Build report (logs, performance stats, table metadata)
+
+**Purpose:** Execute build transformations to create output tables/files
+
+**Implementation:**
+
+Based on `stage_config.build.type`:
+
+**For dbt builds:**
+```python
+@dbt_assets(manifest=product.dbt_manifest)
+def {product}_dbt_models(context):
+ """Auto-generated assets per dbt model."""
+ yield from dbt.cli(["build"]).stream()
+
+@asset(name="{product}_build", deps=[{product}_dbt_models])
+def {product}_build_report(context):
+ """Aggregate build report from dbt run."""
+ results = parse_dbt_run_results()
+ return MaterializeResult(
+ metadata={
+ "build_type": "dbt",
+ "models_built": results.models,
+ "test_results": results.tests,
+ "duration_seconds": results.duration
+ }
+ )
+```
+
+**For python/sql/bash builds:**
+```python
+@asset(name="{product}_build")
+def {product}_build(context, {product}_source_data):
+ """Execute build transformation."""
+ start_time = time.time()
+
+ # Execute based on type
+ if build_type == "python":
+ # Dynamically import module and function
+ module_path = recipe.stage_config.build.module
+ function_name = recipe.stage_config.build.function
+ args = recipe.stage_config.build.get("args", {})
+
+ module = importlib.import_module(module_path)
+ func = getattr(module, function_name)
+ result = func(context, **args)
+ else:
+ result = subprocess.run(["./build.sh", "build"])
+
+ duration = time.time() - start_time
+
+ return MaterializeResult(
+ metadata={
+ "build_type": build_type,
+ "duration_seconds": duration,
+ "tables_created": [...], # For postgres builds
+ "files_created": [...], # For file builds
+ "build_log": "path/to/build.log"
+ }
+ )
+```
+
+**Side Effects (not materialized assets):**
+- **Postgres builds**: Tables in build schema (temporary/scratch space)
+- **File builds**: Output files in build directory
+
+**Storage:**
+- **Build reports MUST use private bucket** (contains performance stats, internal metadata)
+- Use private connector for uploading build logs/reports
+- Unlike most storage which is public, build telemetry is internal-only
+
+**Metadata:**
+- Build type
+- Duration
+- Tables/files created (listing only, not the artifacts themselves)
+- Build log path
+- Success/failure status per step
+
+**Dependencies:** `{product}_source_data`
+
+---
+
+#### Step 4: Build Artifacts (Export) ⏳ NOT IMPLEMENTED
+
+**Asset Name:** `{product}_build_artifacts`
+
+**Materialized Artifact:** Dataset files (shapefiles, parquet, CSV, etc.) uploaded to cloud storage
+
+**Purpose:** Export build outputs to files and upload to cloud
+
+**Implementation:**
+
+```python
+@asset(name="{product}_build_artifacts")
+def {product}_build_artifacts(context, {product}_build):
+ """Export build outputs to files and upload."""
+
+ build_meta = context.upstream_output.metadata
+ export_config = recipe.stage_config.build.exports
+
+ if build_meta["build_type"] in ["dbt", "sql"]:
+ # Export specified tables from postgres to files
+ files = export_tables_from_postgres(
+ schema=build_meta.get("pg_schema"),
+ datasets=export_config.datasets, # List of {name, filename, format}
+ output_folder=export_config.output_folder
+ )
+ else: # build_type == "python" or "bash" (files already exist)
+ # Collect the files from build directory
+ files = collect_build_outputs(
+ build_directory=build_path,
+ output_folder=export_config.output_folder
+ )
+
+ # Upload to cloud storage (builds connector)
+ upload_result = upload_to_cloud(
+ files=files,
+ connector=get_builds_default_connector(),
+ key=product_name,
+ version=partition_key
+ )
+
+ return MaterializeResult(
+ metadata={
+ "exported_files": [f.name for f in files],
+ "file_sizes_mb": {f.name: f.size for f in files},
+ "upload_location": upload_result.location,
+ "total_size_mb": sum(f.size for f in files)
+ }
+ )
+```
+
+**Export Configuration (recipe.yml):**
+
+Based on existing convention (see `products/cscl/recipe.yml`):
+
+```yaml
+stage_config:
+ build:
+ type: sql # or dbt
+ exports:
+ output_folder: output
+ datasets:
+ - name: manhattan_lion_dat # Table name in postgres
+ filename: ManhattanLION.dat # Output filename
+ format: dat # Output format
+ - name: pluto_output
+ filename: pluto_26v1.shp
+ format: shapefile
+ - name: pluto_csv
+ filename: pluto_26v1.parquet
+ format: parquet
+```
+
+**Notes:**
+- `custom` field (e.g., `custom: { formatting: lion_dat }`) will be implemented later
+- For now, standard format conversion only
+
+**Behavior:**
+- **dbt/sql builds**: Export specified tables to configured formats
+- **Python/bash builds**: Collect files from output folder
+- Upload to builds connector (blob storage) at path: `{product}/builds/{version}/`
+- **This is the first durable artifact** - files persist beyond build lifetime
+
+**Retry Strategy:**
+- Can retry export independently without re-running build
+- Build report still exists, postgres tables may still exist (if not cleaned up)
+
+**Dependencies:** `{product}_build`
+
+---
+
+#### Step 5: QA ⏳ NOT IMPLEMENTED
+
+**Asset Name:** `{product}_qa`
+
+**Materialized Artifact:** QA report (validation results, test outputs) - uploaded to PRIVATE bucket
+
+**Purpose:** Run quality assurance checks and validations
+
+**Implementation:**
+
+```python
+@asset(name="{product}_qa")
+def {product}_qa(context, {product}_build_artifacts):
+ """Run QA validation."""
+ start_time = time.time()
+
+ qa_config = recipe.stage_config.get("qa", {})
+
+ if qa_config.get("type") == "python":
+ # Dynamically import module and function
+ module = importlib.import_module(qa_config["module"])
+ func = getattr(module, qa_config["function"])
+ result = func(context, **qa_config.get("args", {}))
+ else:
+ result = subprocess.run(["./build.sh", "qa"])
+
+ duration = time.time() - start_time
+
+ # Upload QA report to PRIVATE bucket
+ upload_qa_report(result, private_connector)
+
+ return MaterializeResult(
+ metadata={
+ "qa_passed": result.success,
+ "validations_run": result.validation_count,
+ "failures": result.failures,
+ "qa_log": "path/to/qa.log",
+ "duration_seconds": duration
+ }
+ )
+```
+
+**Configuration:**
+```yaml
+stage_config:
+ qa:
+ enabled: true # Optional - if false, skip asset creation entirely
+ type: python
+ module: qa.validate
+ function: run_qa
+ args:
+ threshold: 0.95
+```
+
+**Optional:** If `stage_config.qa.enabled` is false or missing, do not create this asset
+
+**QA Outputs:**
+- Can produce validation tables in postgres
+- Can produce QA reports on filesystem
+- May fail the asset if validations fail (blocking downstream steps)
+
+**Storage:**
+- **QA reports MUST use private bucket** (internal validation details)
+
+**Dependencies:** `{product}_build_artifacts`
+
+---
+
+#### Step 6: QA Artifacts ⏳ NOT IMPLEMENTED
+
+**Asset Name:** `{product}_qa_artifacts`
+
+**Materialized Artifact:** QA output files (CSVs, validation reports) uploaded to build storage
+
+**Purpose:** Export and upload QA validation outputs for review
+
+**Implementation:**
+
+```python
+@asset(name="{product}_qa_artifacts")
+def {product}_qa_artifacts(context, {product}_qa):
+ """Export QA outputs and upload to build storage."""
+
+ qa_config = recipe.stage_config.get("qa", {})
+ export_config = qa_config.get("exports", {})
+
+ # Collect QA output files (mostly CSVs and text files)
+ qa_files = collect_qa_outputs(
+ output_folder=export_config.get("output_folder", "qaqc"),
+ datasets=export_config.get("datasets", [])
+ )
+
+ # Upload to builds connector in a qaqc subfolder
+ # Path: {product}/builds/{version}/qaqc/
+ upload_result = upload_to_cloud(
+ files=qa_files,
+ connector=get_builds_default_connector(),
+ key=product_name,
+ version=partition_key,
+ subfolder="qaqc"
+ )
+
+ return MaterializeResult(
+ metadata={
+ "qa_files": [f.name for f in qa_files],
+ "file_sizes_mb": {f.name: f.size for f in qa_files},
+ "upload_location": upload_result.location,
+ "total_size_mb": sum(f.size for f in qa_files)
+ }
+ )
+```
+
+**Configuration (recipe.yml):**
+```yaml
+stage_config:
+ qa:
+ enabled: true
+ type: python
+ module: qa.validate
+ function: run_qa
+ exports:
+ output_folder: qaqc
+ datasets:
+ - name: validation_results
+ filename: validation_results.csv
+ format: csv
+ - name: qa_summary
+ filename: qa_summary.txt
+ format: txt
+```
+
+**Storage Location:**
+- Files uploaded to: `{product}/builds/{version}/qaqc/`
+- Example: `pluto/builds/26v1/qaqc/validation_results.csv`
+
+**Optional:** If no QA or no QA exports configured, skip this asset
+
+**Dependencies:** `{product}_qa`
+
+---
+
+#### Step 7: Package Artifacts ⏳ NOT IMPLEMENTED
+
+**Asset Name:** `{product}_package_artifacts`
+
+**Materialized Artifact:** Packaged/enhanced dataset files ready for distribution
+
+**Purpose:** Enhance exported files with metadata, READMEs, data dictionaries
+
+**Implementation:**
+
+```python
+@asset(name="{product}_package_artifacts")
+def {product}_package_artifacts(context, {product}_qa_artifacts, {product}_build_artifacts):
+ """Package artifacts for distribution."""
+
+ build_artifacts_meta = context.upstream_output.metadata_for("{product}_build_artifacts")
+ exported_files = get_exported_files(build_artifacts_meta["upload_location"])
+
+ package_config = recipe.stage_config.get("package", {})
+
+ if package_config.get("type") == "python":
+ # Dynamically import module and function
+ module = importlib.import_module(package_config["module"])
+ func = getattr(module, package_config["function"])
+ packaged_files = func(context, exported_files, **package_config.get("args", {}))
+ else:
+ subprocess.run(["./build.sh", "package"])
+ packaged_files = collect_packaged_outputs()
+
+ return MaterializeResult(
+ metadata={
+ "packaged_files": [f.name for f in packaged_files],
+ "enhancements_applied": ["readme", "data_dict", "metadata_stamp"],
+ "package_location": "..."
+ }
+ )
+```
+
+**Configuration:**
+```yaml
+stage_config:
+ package:
+ enabled: true
+ type: python
+ module: package.enhance
+ function: run_package
+ args:
+ include_readme: true
+```
+
+**Optional:** If `stage_config.package.enabled` is false or missing, skip this asset
+
+**Operations:**
+- Pull README, data dictionary from product metadata
+- Stamp metadata into geospatial formats (shapefiles, FGDB)
+- Create ZIP archives if needed (defer to later - CSCL implementation)
+- Generate manifest files
+- Re-upload enhanced artifacts
+
+**Dependencies:**
+- `{product}_qa_artifacts` (if QA artifacts exist)
+- `{product}_build_artifacts` (always)
+
+---
+
+#### Step 8: External Review ⏳ NOT IMPLEMENTED
+
+**Asset Name:** `{product}_external_review`
+
+**Materialized Artifact:** None (approval gate only)
+
+**Purpose:** Manual approval gate before distribution
+
+**Implementation:**
+
+```python
+@asset(name="{product}_external_review")
+def {product}_external_review(context, {product}_package):
+ """Manual approval gate - requires manual materialization."""
+
+ # This asset does nothing except serve as a dependency gate
+ # Must be manually materialized by external QA team
+
+ context.log.info("External review completed - approved for distribution")
+
+ return MaterializeResult(
+ metadata={
+ "approved_by": "external_qa_team",
+ "approval_timestamp": datetime.now().isoformat()
+ }
+ )
+```
+
+**Behavior:**
+- **Requires manual materialization** - cannot be auto-triggered
+- Blocks all distribution assets until approved
+- Future: Could integrate with external approval workflow system
+
+**Lifecycle Stage:** `builds.external_review` (part of builds, not distribution)
+
+**Dependencies:**
+- `{product}_package_artifacts` (if package exists)
+- `{product}_qa_artifacts` (if no package but QA exists)
+- `{product}_build_artifacts` (if no QA or package)
+
+---
+
+### Build Step Summary
+
+**Simplified Linear Flow:**
+
+```
+{product}_plan
+ ↓
+{product}_source_data
+ ↓
+{product}_build ← Materialized: build report (PRIVATE storage)
+ ↓
+{product}_build_artifacts ← Materialized: dataset files (public storage)
+ ↓
+{product}_qa ← Materialized: QA report (PRIVATE storage) [optional]
+ ↓
+{product}_qa_artifacts ← Materialized: QA output files in qaqc/ subfolder [optional]
+ ↓
+{product}_package_artifacts ← Materialized: enhanced files [optional]
+ ↓
+{product}_external_review ← Manual gate
+```
+
+**For dbt products:**
+
+```
+{product}_plan
+ ↓
+{product}_source_data
+ ↓
+{product}_{dbt_model_1} ← Auto-generated dbt assets
+{product}_{dbt_model_2}
+{product}_{dbt_model_3}
+ ↓
+{product}_build ← Aggregate dbt report
+ ↓
+{product}_build_artifacts ← Export dbt outputs to files
+ ↓
+{product}_qa ← Run QA validations
+ ↓
+{product}_qa_artifacts ← Export QA outputs
+ ↓
+{product}_package_artifacts ← Enhance with metadata/READMEs
+ ↓
+{product}_external_review
+```
+
+---
+
+## 3. Distribution Domain
+
+**Status:** ⏳ Not Implemented
+
+### Overview
+
+Distribution publishes build artifacts to external destinations (Socrata, S3, EDM, etc.).
+
+### Partitioning Strategy
+
+- **Partition Definition**: Same as builds (`build_partition_def`)
+- **Rationale**: Straight shot from build → distribution for a given version
+- **Consideration**: Final destinations often don't have versions (e.g., only one PLUTO on opendata)
+
+### Asset Naming Convention
+
+```
+{product}_dist_{destination_key}
+```
+
+Examples:
+- `pluto_dist_socrata_main`
+- `pluto_dist_socrata_water_included`
+- `edde_dist_s3_public`
+
+### Distribution Implementation
+
+**One asset per destination** - all run in **parallel** (fan-out from external_review)
+
+```python
+@asset(
+ name="{product}_dist_{destination_key}",
+ group_name="{product}", # Include in product group for full lineage view
+ partitions_def=build_partition_def,
+ tags={
+ "product": "{product}",
+ "lifecycle_stage": "dist.publish",
+ "domain": "distribution",
+ "destination_type": "{destination_type}"
+ }
+)
+def {product}_dist_{destination_key}(context, {product}_external_review):
+ """Publish to specific destination."""
+
+ # Read product metadata for destination config
+ dest_config = get_destination_config(product, destination_key)
+
+ # Get packaged artifacts from previous step
+ artifacts = get_packaged_artifacts(product, partition_key)
+
+ # Use existing connector infrastructure
+ connector = connectors[dest_config["destination-type"]]
+ result = connector.push(
+ key=dest_config["destination-key"],
+ files=artifacts,
+ **dest_config.get("connector_args", {})
+ )
+
+ return MaterializeResult(
+ metadata={
+ "destination_type": dest_config["destination-type"],
+ "destination_key": dest_config["destination-key"],
+ "published_files": artifacts,
+ "publish_result": result
+ }
+ )
+```
+
+**Destination Configuration:**
+- **Source**: `dcpy/product_metadata/{product}/publishing.yml`
+- **Fields**:
+ - `destination-type`: Connector type (e.g., "socrata", "edm.bytes")
+ - `destination-key`: Specific destination identifier
+
+**Example (PLUTO with 2 Socrata destinations):**
+
+Product metadata (`dcpy/product_metadata/pluto/publishing.yml`):
+```yaml
+destinations:
+ - destination-type: socrata
+ destination-key: pluto-main
+ - destination-type: socrata
+ destination-key: pluto-water-included
+```
+
+Generated assets:
+- `pluto_dist_socrata_pluto_main`
+- `pluto_dist_socrata_pluto_water_included`
+
+Both run in parallel after `pluto_external_review` is approved.
+
+**Dependencies:** `{product}_external_review`
+
+---
+
+### Distribution Step Summary
+
+**Fan-out Pattern:** All distribution assets run in parallel after external review approval
+
+```
+{product}_external_review ← Manual gate (builds domain)
+ ↓
+ ├─→ {product}_dist_socrata_dest1 ← Parallel
+ ├─→ {product}_dist_socrata_dest2 ← Parallel
+ └─→ {product}_dist_s3_dest3 ← Parallel
+```
+
+**Dagster Lineage View:**
+- All distribution assets in same product group
+- Shows fan-out from external_review node
+- Can materialize all destinations at once or individually
+- Clicking product group (e.g., "pluto") shows full pipeline from plan through all distribution destinations
+
+---
+
+## Cross-Cutting Concerns
+
+### Materialized Assets vs. Side Effects
+
+**Key Principle:** Distinguish between durable materialized assets and ephemeral side effects
+
+**Materialized Assets (durable):**
+- Plan: `recipe.lock.yml` in blob storage
+- Source Data: `build_metadata.json`
+- Build: Build report with performance stats (PRIVATE bucket)
+- Build Artifacts: Dataset files (shapefiles, parquet, CSV) (PUBLIC bucket)
+- QA: QA report (PRIVATE bucket)
+- QA Artifacts: QA output files in qaqc/ subfolder (PUBLIC bucket)
+- Package Artifacts: Enhanced dataset files (PUBLIC bucket)
+- Distribution: Published artifacts
+
+**Side Effects (ephemeral):**
+- Postgres tables in build schema (scratch space, can be deleted after export)
+- Intermediate files during build
+- Build logs (unless explicitly uploaded)
+
+**Philosophy:**
+- Postgres tables are **working memory**, not the final product
+- Export step creates **durable artifacts** from ephemeral build state
+- Can clean up postgres schemas after export without "unmaterializing" assets
+- Retry export without re-running build (if postgres tables still exist)
+
+### Storage & Security
+
+**Private vs. Public Storage:**
+
+- **PRIVATE bucket** (internal use only):
+ - Build reports (performance stats, internal metadata)
+ - QA reports (validation details)
+ - Build logs
+ - Any telemetry or operational data
+
+- **PUBLIC bucket** (external access):
+ - Exported dataset files
+ - Packaged artifacts
+ - Distribution-ready files
+
+**Implementation:**
+- Use separate connectors for private vs. public storage
+- Default most storage to public, explicitly configure private for build/QA outputs
+
+### Error Handling
+
+**Philosophy:** Fail fast, leave artifacts for debugging
+
+**Behavior on Failure:**
+- ❌ **No automatic cleanup** of postgres schemas or partial artifacts
+- ✅ **Leave artifacts in place** for manual inspection and debugging
+- Dagster's retry mechanisms available for transient failures
+
+**Rationale:**
+- Initial deployment is single-machine (server + runner)
+- MVP phase prioritizes debuggability over automation
+- Manual intervention acceptable in early stages
+
+### Tagging & Grouping Strategy
+
+**Primary Organization: Product Groups**
+- Most assets grouped by product: `group_name="{product}"`
+- Ingest assets use: `group_name="ingest"`
+- Provides full lineage view in Dagster UI when clicking product group
+
+**Required Tags:**
+- `product`: Product name (e.g., "pluto", "edde") - for all builds/distribution assets
+- `lifecycle_stage`: Stage identifier (e.g., "builds.plan", "builds.build_artifacts", "dist.publish")
+- `domain`: High-level domain (e.g., "builds", "distribution", "ingest")
+
+**Optional Tags:**
+- `destination_type`: For distribution assets (e.g., "socrata", "edm.bytes")
+- `build_type`: For build assets (e.g., "dbt", "python", "sql")
+
+**Examples:**
+
+```python
+# Build asset
+@asset(
+ name="edde_build_artifacts",
+ group_name="edde", # Product group
+ partitions_def=build_partition_def,
+ tags={
+ "product": "edde",
+ "lifecycle_stage": "builds.build_artifacts",
+ "domain": "builds"
+ }
+)
+
+# Distribution asset
+@asset(
+ name="pluto_dist_socrata_main",
+ group_name="pluto", # Same product group for full lineage
+ partitions_def=build_partition_def,
+ tags={
+ "product": "pluto",
+ "lifecycle_stage": "dist.publish",
+ "domain": "distribution",
+ "destination_type": "socrata"
+ }
+)
+
+# Ingest asset (not product-specific)
+@asset(
+ name="ingest_dcp_pluto",
+ group_name="ingest", # Separate ingest group
+ tags={
+ "domain": "ingest",
+ "dataset": "dcp_pluto"
+ }
+)
+```
+
+**UI Benefits:**
+- Click "pluto" group → see plan → build → qa → package → external_review → [dist_socrata_1, dist_socrata_2, ...] with full DAG lineage
+- Filter by `tag:domain=builds` → see all build assets across products
+- Filter by `tag:lifecycle_stage=dist.publish` → see all distribution assets
+- Selection syntax: `pluto*` or `tag:product=pluto` for programmatic access
+
+### Resource Usage
+
+**Local Storage:**
+- `LocalStorageResource`: Manages build directories
+- Path structure: `/tmp/dagster/{domain}/{product}/{partition_key}/`
+
+**Connectors:**
+- Plan connector: `get_plan_default_connector()`
+- Private connector: `get_private_default_connector()` (for build/QA reports)
+- Builds connector: `get_builds_default_connector()` (for exported artifacts)
+- Distribution connectors: Per destination type (from product metadata)
+
+---
+
+## Implementation Status
+
+| Domain | Step | Status | Notes |
+|--------|------|--------|-------|
+| **Ingest** | All | ✅ Implemented | Pre-existing |
+| **Builds** | Plan | ✅ Implemented | `{product}_plan` |
+| **Builds** | Source Data | 🚧 In Progress | `{product}_source_data` |
+| **Builds** | Build | ⏳ Not Started | `{product}_build` - build report to PRIVATE bucket |
+| **Builds** | Build Artifacts | ⏳ Not Started | `{product}_build_artifacts` - export datasets |
+| **Builds** | QA | ⏳ Not Started | `{product}_qa` - QA report to PRIVATE bucket [optional] |
+| **Builds** | QA Artifacts | ⏳ Not Started | `{product}_qa_artifacts` - export QA outputs [optional] |
+| **Builds** | Package | ⏳ Not Started | `{product}_package_artifacts` - enhance files [optional] |
+| **Builds** | External Review | ⏳ Not Started | `{product}_external_review` - manual gate |
+| **Distribution** | Publish | ⏳ Not Started | `{product}_dist_{dest}` - one per destination |
+
+---
+
+## TODOs
+
+1. **Recipe syntax cleanup**: Change `stage_config.builds.*` → `stage_config.*` (assume recipes are build-specific)
+ - Current: `stage_config.builds.build.type`
+ - Future: `stage_config.build.type`
+ - Note: Don't implement yet, just document for future refactor
+
+2. **Private connector implementation**: Ensure private bucket connector exists for build/QA reports
+
+3. **Build type implementations**: Create execution handlers for dbt, python, sql, bash types
+
+4. **Product metadata reader**: Parse `dcpy/product_metadata/{product}/publishing.yml` for distribution destinations
+
+5. **Per-product jobs**: Define Dagster jobs for easy pipeline execution
+ ```python
+ pluto_job = define_asset_job(
+ name="build_pluto",
+ selection=AssetSelection.groups("pluto"),
+ partitions_def=build_partition_def
+ )
+ ```
+
+---
+
+## Open Questions & Future Considerations
+
+1. **Postgres cleanup strategy**: When to delete build schemas? After export? After distribution? Manual?
+2. **Build command dependencies**: For products with complex build steps, how to express dependencies?
+3. **Partition retention**: How long to keep old build partitions? Automatic cleanup policy?
+4. **Monitoring**: What metrics to track (step duration, artifact sizes, failure rates)?
+5. **Approval workflow**: Future integration with formal approval system vs. manual materialization?
+6. **Incremental builds**: Any products that could benefit from incremental processing?
+7. **Build.sh convention enforcement**: Require build.sh to exist, or make truly optional?
+
+---
+
+## Revision History
+
+| Date | Version | Changes |
+|------|---------|---------|
+| 2026-05-13 | 0.1 | Initial specification based on requirements discussion |
+| 2026-05-14 | 0.2 | Simplified model: single asset per stage, build/export separation, private storage for build reports |
+| 2026-05-14 | 0.3 | Refined export configuration (based on cscl convention), added QA artifacts asset, renamed export assets to *_artifacts pattern, added dynamic Python module imports with args, clarified postgres schema creation (ensure not create for dbt/sql) |
+| 2026-05-14 | 0.4 | Added asset organization strategy: product-based groups for full lineage view, distribution fan-out pattern for parallel execution, detailed tagging strategy, per-product jobs |
diff --git a/apps/dagster/builds/assets.py b/apps/dagster/builds/assets.py
index 9efd81e292..65497f5949 100644
--- a/apps/dagster/builds/assets.py
+++ b/apps/dagster/builds/assets.py
@@ -148,9 +148,88 @@ def _plan_recipe_asset(
return _plan_recipe_asset
+def make_load_data_asset(product: lifecycle.asset_models.Product):
+ """Create a load data asset for a specific product.
+
+ Args:
+ product: Product object with name and path attributes
+
+ Returns:
+ A Dagster asset function
+ """
+ plan_asset_name = f"plan_recipe_{product.name}"
+
+ @asset(
+ name=f"load_data_{product.name}",
+ partitions_def=build_partition_def,
+ group_name="build",
+ tags={"product": product.name, "lifecycle_stage": "builds.load"},
+ deps=[plan_asset_name],
+ )
+ def _load_data_asset(
+ context: AssetExecutionContext,
+ local_storage: LocalStorageResource,
+ ):
+ """Load source data for product from the planned recipe."""
+ from dcpy.lifecycle.builds.connector import get_plan_default_connector
+ from dcpy.lifecycle.builds.load import load_source_data_from_resolved_recipe
+
+ partition_key = context.partition_key
+
+ context.log.info(
+ f"Loading data for {product.name} build {partition_key} from plan storage"
+ )
+
+ # Pull the planned recipe from blob storage
+ plan_connector = get_plan_default_connector()
+ build_path = local_storage.get_path("builds", product.name, partition_key)
+
+ context.log.info(f"Pulling plan from storage to {build_path}")
+ pull_result = plan_connector.pull_versioned(
+ key=product.name,
+ version=partition_key,
+ destination_path=Path(build_path),
+ )
+ context.log.info(f"Pull result: {pull_result}")
+
+ # Verify recipe.lock.yml exists
+ lock_file = Path(build_path) / "recipe.lock.yml"
+ if not lock_file.exists():
+ raise FileNotFoundError(
+ f"recipe.lock.yml not found at {lock_file} after pulling from storage"
+ )
+
+ context.log.info(f"Loading source data from {lock_file}")
+
+ # Load the source data using the resolved recipe
+ load_result = load_source_data_from_resolved_recipe(
+ recipe_lock_path=lock_file,
+ clear_pg_schema=True,
+ target_schema=None, # Will use default from metadata.build_name()
+ )
+
+ context.log.info(
+ f"Successfully loaded {len(load_result.datasets)} datasets into build {load_result.build_name}"
+ )
+
+ return MaterializeResult(
+ metadata={
+ "recipe_lock_path": str(lock_file),
+ "build_name": load_result.build_name,
+ "num_datasets": len(load_result.datasets),
+ "dataset_ids": ", ".join(load_result.datasets.keys()),
+ "version": partition_key,
+ "product": product.name,
+ }
+ )
+
+ return _load_data_asset
+
+
# Generate assets for all products
products = lifecycle.list_products()
plan_recipe_assets = [make_plan_recipe_asset(product) for product in products]
+load_data_assets = [make_load_data_asset(product) for product in products]
# Export all assets
-build_assets = plan_recipe_assets
+build_assets = plan_recipe_assets + load_data_assets