-
Notifications
You must be signed in to change notification settings - Fork 1
Description
Summary
This RFC proposes a formal SourceAdapter protocol that standardizes how data source integrations are packaged, discovered, and registered with Dagster. The design enables a marketplace of adapters where users can add custom data sources to their RoboSystems fork without merge conflicts.
Motivation: Problem
Currently, each adapter (SEC, QuickBooks, Plaid) follows similar patterns but lacks a formal interface:
- No standard way to discover what adapters are available
- No consistent structure for adapter metadata (dependencies, entity types, schema extensions)
- Custom adapters require understanding internal implementation details
- dbt projects and Dagster assets are loosely coupled
This makes it difficult for users to:
- Add their own data sources (custom ERPs, CRMs, banks)
- Share adapters with the community
- Understand what an adapter provides before installing it
Motivation: Why Now?
With SEC pipeline production-ready and QuickBooks/Plaid scaffolded (#117, #118), we're establishing patterns that will be difficult to change later. Defining the protocol now ensures:
- Consistent structure as we build more adapters
- Clear extension points for community contributions
- Foundation for future adapter marketplace on robosystems.io
Proposed Design: Overview
Introduce three components:
SourceAdapterProtocol - Python interface all adapters implementadapter.yamlManifest - Declarative metadata for each adapterschema_mapping.yaml- Declarative source-to-graph entity mapping
Proposed Design: Detailed Design
SourceAdapter Protocol
class SourceAdapter(Protocol):
"""Interface for consistent adapter packaging."""
name: str # e.g., "quickbooks", "custom_erp"
pipeline_type: str # "dbt" (most adapters) or "custom" (SEC/Arelle)
def authenticate(self, credentials: dict) -> Any:
"""Return authenticated client for this source."""
def extract_entities(self, client: Any, entity_type: str, config: ExtractConfig) -> Iterator[dict]:
"""Yield raw records from source."""
def get_dbt_project_path(self) -> Path | None:
"""Path to dbt project (None for custom pipeline adapters like SEC)."""
def get_dagster_assets(self) -> list[AssetsDefinition]:
"""Return Dagster asset definitions for auto-registration."""
def get_schema_extension(self) -> str:
"""Schema extension this source requires (e.g., 'accounting')."""
def get_entity_types(self) -> list[str]:
"""List of entity types this source can extract."""adapter.yaml Manifest
name: quickbooks
version: 1.0.0
description: QuickBooks Online integration for RoboSystems
author: RoboFinSystems
license: Apache-2.0
pipeline_type: dbt # or "custom" for non-dbt pipelines
dependencies:
- dbt-core>=1.7
- python-quickbooks>=0.9
dagster_module: robosystems.adapters.quickbooks.pipeline
dbt_project: robosystems/adapters/quickbooks/transforms
schema_extension: schemas/quickbooks.yaml
entity_types:
- Account
- Transaction
- Vendor
- CustomerSchema Mapping (Declarative)
# adapters/quickbooks/schema_mapping.yaml
source: quickbooks
schema_extension: accounting
entities:
- source_table: mart_account
graph_node: Account
properties:
- source: id → graph: source_id
- source: name → graph: name
- source: account_type → graph: type
relationships:
- source_table: mart_line_item
graph_rel: HAS_LINE_ITEM
from_node: Transaction
to_node: AccountFork-Friendly Directory Structure
The directory structure serves as a merge boundary:
robosystems/adapters/
├── sec/ # ← Upstream maintains
├── quickbooks/ # ← Upstream maintains
├── plaid/ # ← Upstream maintains
├── _template/ # ← Copy for new adapters
│
└── custom_*/ # ← Fork namespace (upstream never touches)
├── custom_erp/ # User additions live here
├── custom_crm/ # Merge conflicts impossible
└── custom_bank/ # git pull upstream main "just works"
Registration
# robosystems/dagster/definitions.py
UPSTREAM_ADAPTERS = [SECAdapter(), QuickBooksAdapter(), PlaidAdapter()]
# === FORK ADDITIONS (add your adapters here) ===
CUSTOM_ADAPTERS = [
# CustomERPAdapter(), # Uncomment and add your adapters
]
ALL_ADAPTERS = UPSTREAM_ADAPTERS + CUSTOM_ADAPTERSDistribution Model
GitHub repos, not pip packages:
- dbt projects don't fit pip (SQL + YAML, not Python)
- Customization is the value prop—users modify adapters
- Adding an adapter is
git cloneintoadapters/custom_*/
Discovery flow:
robosystems.io → Browse adapters → Fork adapter repo → Clone into adapters/custom_*/ → Register
Alternatives Considered
| Alternative | Pros | Cons |
|---|---|---|
| pip packages | Standard Python distribution | dbt projects don't fit; customization harder |
| No formal protocol | Faster to ship | Inconsistent adapters; harder to extend |
| Plugin system (entry points) | Auto-discovery | Over-engineered for current scale |
Why this approach: GitHub repos match how users already fork RoboSystems. The protocol provides structure without requiring complex plugin infrastructure.
Implementation: Phases
-
Phase 1: Protocol Definition
- Define
SourceAdapterprotocol inadapters/base.py - Create
_template/directory with example adapter - Document in wiki
- Define
-
Phase 2: Refactor Existing Adapters
- Update SEC adapter to implement protocol
- Update QuickBooks adapter to implement protocol
- Update Plaid adapter to implement protocol
-
Phase 3: Auto-Registration
- Implement adapter discovery in Dagster definitions
- Parse
adapter.yamlmanifests - Auto-register assets from compliant adapters
-
Phase 4: Schema Mapping
- Implement declarative schema mapping parser
- Generate materialization logic from YAML
- Validate mappings against graph schema
Implementation: Dependencies
- Depends on: SEC Shared Repository Production Launch #117 (SEC pipeline complete), QuickBooks Data Pipeline #118 (QuickBooks pipeline)
- Blocked by: None (can start protocol definition now)
Risks & Mitigations
| Risk | Mitigation |
|---|---|
| Protocol too rigid | Start minimal, extend based on real adapter needs |
| Breaking existing adapters | Incremental migration, protocol is additive |
| Over-engineering | Only build what's needed for QuickBooks/Plaid first |
Open Questions
- Should
adapter.yamlsupport specifying required secrets/credentials schema? - How should adapters declare their schedule preferences (daily, hourly, etc.)?
- Should we support adapter versioning for breaking changes?
References
- Related: SEC Shared Repository Production Launch #117 (SEC Shared Repository), QuickBooks Data Pipeline #118 (QuickBooks Pipeline)
- Related: Shared Replicas Deployment #113 (Shared Replicas Deployment)
- Internal:
local/docs/PIPELINES.md(design details) - Wiki: Pipeline Guide
Metadata
Metadata
Assignees
Labels
Type
Projects
Status