Skip to content

Add a multi-key sync support#211

Open
Yostra wants to merge 7 commits intov2from
multi_org
Open

Add a multi-key sync support#211
Yostra wants to merge 7 commits intov2from
multi_org

Conversation

@Yostra
Copy link
Copy Markdown
Collaborator

@Yostra Yostra commented Mar 31, 2026

Summary

  • Adds support for syncing multiple Stripe accounts into a single Postgres schema. Each row includes an _account_id column with the real Stripe account ID, and tables use a composite primary key (id, _account_id) so records from different accounts coexist without collisions.
  • Introduces a sync-multi CLI subcommand that accepts a JSON config with multiple pipelines, auto-resolves Stripe account IDs from API keys, runs setup sequentially (to avoid Postgres schema creation races), then syncs all pipelines concurrently.
  • Uses the resolved account ID as the syncId for the state store, so page cursors are isolated per account and don't interfere across pipelines.

How to test (optional)

replace STRIPE_API_KEY_1 and STRIPE_API_KEY_2 with actual keys

node apps/engine/dist/cli/index.js sync-multi --config '{
  "pipelines": [
    {
      "source": { "name": "stripe", "api_key": "[STRIPE_API_KEY_1]" },
      "destination": {
        "name": "postgres",
        "connection_string": "postgresql://postgres:postgres@localhost:55432/postgres?sslmode=disable",
        "schema": "stripe"
      },
      "streams": [{ "name": "customers" }]
    },
    {
      "source": { "name": "stripe", "api_key": "[STRIPE_API_KEY_2]" },
      "destination": {
        "name": "postgres",
        "connection_string": "postgresql://postgres:postgres@localhost:55432/postgres?sslmode=disable",
        "schema": "stripe"
      },
      "streams": [{ "name": "customers" }]
    }
  ]
}'

@Yostra Yostra marked this pull request as ready for review March 31, 2026 23:54
Copy link
Copy Markdown
Collaborator

@tonyxiao tonyxiao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good feature overall — the composite PK approach is the right way to multiplex accounts into a shared schema, and isolating cursor state via syncId is exactly what needs to happen. A few things worth addressing before merging:


Breaking change for existing single-account setups

discover() now unconditionally calls resolveAccountId() and always returns primary_key: [['id'], ['_account_id']]. This means every existing single-account deployment will:

  1. See a new migration fingerprint → applySchemaFromCatalog re-runs DDL
  2. ADD COLUMN IF NOT EXISTS "_account_id" succeeds (additive)
  3. But the old PRIMARY KEY ("id") constraint stays — DDL is additive-only, so the PK can't be changed in-place
  4. write() now builds keyColumns = ['id', '_account_id'] from the catalog → ON CONFLICT ("id", "_account_id")Postgres error: no unique constraint matching given keys

If this is a v2 intentional break (users must recreate tables), it needs prominent docs/migration guidance. If it should be backwards-compatible, the single-account path needs to keep PK [['id']].


new Stripe(api_key as string) in the CLI bypasses makeClient

// command.ts
const account = await new Stripe(pipeline.source.api_key as string).accounts.retrieve()

makeClient in source-stripe already exists to centralise client construction (proxy config, custom fetch, timeouts, etc.). The CLI should either call the exported resolveAccountId() from source-stripe, or make it an exported helper. Also the as string cast will silently produce undefined if someone passes a non-Stripe source config.


Missing engine.teardown() in sync-multi

The sync-multi command calls setup() and syncs, but never calls engine.teardown(). If any pipeline set up a webhook endpoint (via config.webhook_url), it won't be cleaned up on normal exit. The finally only closes the state store.


Dead code: if (!accountId)

resolveAccountId either returns a non-empty string or throws — it never returns a falsy value. The branch:

if (!accountId) {
  yield* inner
  return
}

is never reachable. Safe to remove.


Minor

  • stateStore as { close?(): Promise<void> } — redundant cast; selectStateStore's return type already includes close?().
  • applySchemaFromCatalog takes a single primary_key that applies to all streams. The destination's setup() correctly passes per-stream PKs from cs.stream.primary_key, but if external callers use applySchemaFromCatalog with mixed-PK catalogs it won't work. Low risk for now since all streams use the same PK in practice.

Copy link
Copy Markdown
Collaborator

@tonyxiao tonyxiao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking _account_id should be part of the modified openapi spec that we give to downstream.

This is the main change i think wwe should do

Copy link
Copy Markdown
Collaborator

@tonyxiao tonyxiao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review

The core idea is solid — composite PK with _account_id is the right way to let multiple Stripe accounts coexist in one schema. Tests cover the key cases well. A few things to address:

1. _account_id in stream metadata doesn't belong

metadata: { account_id: accountId } on each stream is the wrong abstraction level. A stream isn't "owned by" an account — records within it are. The record-level injection (_account_id into each record's data) is already correct and sufficient. Drop account_id from stream metadata.

2. sync-multi CLI command should not live in the engine

The engine is a single-pipeline tool. Running N pipelines concurrently with sequential setup (to avoid schema creation races) is orchestration — that belongs in the service/workflow layer (Temporal), not the engine CLI. Suggest removing this command from the PR and handling multi-account orchestration at the service level.

3. _account_id composite PK is unconditional — should it be opt-in?

Every stream now gets primary_key: [['id'], ['_account_id']], even for single-account syncs. This means:

  • Every table gets an _account_id generated column even when there's only one account
  • Existing single-account deployments will hit a schema migration issue (tables have PRIMARY KEY ("id"), now expects PRIMARY KEY ("id", "_account_id"))

Consider making this opt-in (flag or config) rather than changing the default for all syncs.

4. Extra API call on every read() and discover()

resolveAccountId() calls stripe.accounts.retrieve() on every read() invocation. In the Temporal workflow, read() is called per activity execution (each reconciliation page). This could be resolved once at pipeline creation/setup and threaded through, rather than hitting the API repeatedly.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants