Skip to content

SourceAdapter Protocol for Extensible Data Pipelines #191

@jfrench9

Description

@jfrench9

Summary

This RFC proposes a formal SourceAdapter protocol that standardizes how data source integrations are packaged, discovered, and registered with Dagster. The design enables a marketplace of adapters where users can add custom data sources to their RoboSystems fork without merge conflicts.

Motivation: Problem

Currently, each adapter (SEC, QuickBooks, Plaid) follows similar patterns but lacks a formal interface:

  • No standard way to discover what adapters are available
  • No consistent structure for adapter metadata (dependencies, entity types, schema extensions)
  • Custom adapters require understanding internal implementation details
  • dbt projects and Dagster assets are loosely coupled

This makes it difficult for users to:

  1. Add their own data sources (custom ERPs, CRMs, banks)
  2. Share adapters with the community
  3. Understand what an adapter provides before installing it

Motivation: Why Now?

With SEC pipeline production-ready and QuickBooks/Plaid scaffolded (#117, #118), we're establishing patterns that will be difficult to change later. Defining the protocol now ensures:

  • Consistent structure as we build more adapters
  • Clear extension points for community contributions
  • Foundation for future adapter marketplace on robosystems.io

Proposed Design: Overview

Introduce three components:

  1. SourceAdapter Protocol - Python interface all adapters implement
  2. adapter.yaml Manifest - Declarative metadata for each adapter
  3. schema_mapping.yaml - Declarative source-to-graph entity mapping

Proposed Design: Detailed Design

SourceAdapter Protocol

class SourceAdapter(Protocol):
    """Interface for consistent adapter packaging."""

    name: str  # e.g., "quickbooks", "custom_erp"
    pipeline_type: str  # "dbt" (most adapters) or "custom" (SEC/Arelle)

    def authenticate(self, credentials: dict) -> Any:
        """Return authenticated client for this source."""

    def extract_entities(self, client: Any, entity_type: str, config: ExtractConfig) -> Iterator[dict]:
        """Yield raw records from source."""

    def get_dbt_project_path(self) -> Path | None:
        """Path to dbt project (None for custom pipeline adapters like SEC)."""

    def get_dagster_assets(self) -> list[AssetsDefinition]:
        """Return Dagster asset definitions for auto-registration."""

    def get_schema_extension(self) -> str:
        """Schema extension this source requires (e.g., 'accounting')."""

    def get_entity_types(self) -> list[str]:
        """List of entity types this source can extract."""

adapter.yaml Manifest

name: quickbooks
version: 1.0.0
description: QuickBooks Online integration for RoboSystems
author: RoboFinSystems
license: Apache-2.0

pipeline_type: dbt  # or "custom" for non-dbt pipelines
dependencies:
  - dbt-core>=1.7
  - python-quickbooks>=0.9

dagster_module: robosystems.adapters.quickbooks.pipeline
dbt_project: robosystems/adapters/quickbooks/transforms

schema_extension: schemas/quickbooks.yaml
entity_types:
  - Account
  - Transaction
  - Vendor
  - Customer

Schema Mapping (Declarative)

# adapters/quickbooks/schema_mapping.yaml
source: quickbooks
schema_extension: accounting

entities:
  - source_table: mart_account
    graph_node: Account
    properties:
      - source: id → graph: source_id
      - source: name → graph: name
      - source: account_type → graph: type

relationships:
  - source_table: mart_line_item
    graph_rel: HAS_LINE_ITEM
    from_node: Transaction
    to_node: Account

Fork-Friendly Directory Structure

The directory structure serves as a merge boundary:

robosystems/adapters/
├── sec/                 # ← Upstream maintains
├── quickbooks/          # ← Upstream maintains
├── plaid/               # ← Upstream maintains
├── _template/           # ← Copy for new adapters
│
└── custom_*/            # ← Fork namespace (upstream never touches)
    ├── custom_erp/      #    User additions live here
    ├── custom_crm/      #    Merge conflicts impossible
    └── custom_bank/     #    git pull upstream main "just works"

Registration

# robosystems/dagster/definitions.py
UPSTREAM_ADAPTERS = [SECAdapter(), QuickBooksAdapter(), PlaidAdapter()]

# === FORK ADDITIONS (add your adapters here) ===
CUSTOM_ADAPTERS = [
    # CustomERPAdapter(),  # Uncomment and add your adapters
]

ALL_ADAPTERS = UPSTREAM_ADAPTERS + CUSTOM_ADAPTERS

Distribution Model

GitHub repos, not pip packages:

  • dbt projects don't fit pip (SQL + YAML, not Python)
  • Customization is the value prop—users modify adapters
  • Adding an adapter is git clone into adapters/custom_*/

Discovery flow:

robosystems.io → Browse adapters → Fork adapter repo → Clone into adapters/custom_*/ → Register

Alternatives Considered

Alternative Pros Cons
pip packages Standard Python distribution dbt projects don't fit; customization harder
No formal protocol Faster to ship Inconsistent adapters; harder to extend
Plugin system (entry points) Auto-discovery Over-engineered for current scale

Why this approach: GitHub repos match how users already fork RoboSystems. The protocol provides structure without requiring complex plugin infrastructure.

Implementation: Phases

  1. Phase 1: Protocol Definition

    • Define SourceAdapter protocol in adapters/base.py
    • Create _template/ directory with example adapter
    • Document in wiki
  2. Phase 2: Refactor Existing Adapters

    • Update SEC adapter to implement protocol
    • Update QuickBooks adapter to implement protocol
    • Update Plaid adapter to implement protocol
  3. Phase 3: Auto-Registration

    • Implement adapter discovery in Dagster definitions
    • Parse adapter.yaml manifests
    • Auto-register assets from compliant adapters
  4. Phase 4: Schema Mapping

    • Implement declarative schema mapping parser
    • Generate materialization logic from YAML
    • Validate mappings against graph schema

Implementation: Dependencies

Risks & Mitigations

Risk Mitigation
Protocol too rigid Start minimal, extend based on real adapter needs
Breaking existing adapters Incremental migration, protocol is additive
Over-engineering Only build what's needed for QuickBooks/Plaid first

Open Questions

  • Should adapter.yaml support specifying required secrets/credentials schema?
  • How should adapters declare their schedule preferences (daily, hourly, etc.)?
  • Should we support adapter versioning for breaking changes?

References

Metadata

Metadata

Assignees

No one assigned

    Labels

    area:adaptersData source adapters (SEC, QuickBooks, Plaid, custom)area:dagsterPipelines and jobsdocumentationImprovements or additions to documentation

    Type

    Projects

    Status

    No status

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions