Conversation
tonyxiao
left a comment
There was a problem hiding this comment.
Good feature overall — the composite PK approach is the right way to multiplex accounts into a shared schema, and isolating cursor state via syncId is exactly what needs to happen. A few things worth addressing before merging:
Breaking change for existing single-account setups
discover() now unconditionally calls resolveAccountId() and always returns primary_key: [['id'], ['_account_id']]. This means every existing single-account deployment will:
- See a new migration fingerprint →
applySchemaFromCatalogre-runs DDL ADD COLUMN IF NOT EXISTS "_account_id"succeeds (additive)- But the old
PRIMARY KEY ("id")constraint stays — DDL is additive-only, so the PK can't be changed in-place write()now buildskeyColumns = ['id', '_account_id']from the catalog →ON CONFLICT ("id", "_account_id")→ Postgres error: no unique constraint matching given keys
If this is a v2 intentional break (users must recreate tables), it needs prominent docs/migration guidance. If it should be backwards-compatible, the single-account path needs to keep PK [['id']].
new Stripe(api_key as string) in the CLI bypasses makeClient
// command.ts
const account = await new Stripe(pipeline.source.api_key as string).accounts.retrieve()makeClient in source-stripe already exists to centralise client construction (proxy config, custom fetch, timeouts, etc.). The CLI should either call the exported resolveAccountId() from source-stripe, or make it an exported helper. Also the as string cast will silently produce undefined if someone passes a non-Stripe source config.
Missing engine.teardown() in sync-multi
The sync-multi command calls setup() and syncs, but never calls engine.teardown(). If any pipeline set up a webhook endpoint (via config.webhook_url), it won't be cleaned up on normal exit. The finally only closes the state store.
Dead code: if (!accountId)
resolveAccountId either returns a non-empty string or throws — it never returns a falsy value. The branch:
if (!accountId) {
yield* inner
return
}is never reachable. Safe to remove.
Minor
stateStore as { close?(): Promise<void> }— redundant cast;selectStateStore's return type already includesclose?().applySchemaFromCatalogtakes a singleprimary_keythat applies to all streams. The destination'ssetup()correctly passes per-stream PKs fromcs.stream.primary_key, but if external callers useapplySchemaFromCatalogwith mixed-PK catalogs it won't work. Low risk for now since all streams use the same PK in practice.
tonyxiao
left a comment
There was a problem hiding this comment.
I'm thinking _account_id should be part of the modified openapi spec that we give to downstream.
This is the main change i think wwe should do
tonyxiao
left a comment
There was a problem hiding this comment.
Review
The core idea is solid — composite PK with _account_id is the right way to let multiple Stripe accounts coexist in one schema. Tests cover the key cases well. A few things to address:
1. _account_id in stream metadata doesn't belong
metadata: { account_id: accountId } on each stream is the wrong abstraction level. A stream isn't "owned by" an account — records within it are. The record-level injection (_account_id into each record's data) is already correct and sufficient. Drop account_id from stream metadata.
2. sync-multi CLI command should not live in the engine
The engine is a single-pipeline tool. Running N pipelines concurrently with sequential setup (to avoid schema creation races) is orchestration — that belongs in the service/workflow layer (Temporal), not the engine CLI. Suggest removing this command from the PR and handling multi-account orchestration at the service level.
3. _account_id composite PK is unconditional — should it be opt-in?
Every stream now gets primary_key: [['id'], ['_account_id']], even for single-account syncs. This means:
- Every table gets an
_account_idgenerated column even when there's only one account - Existing single-account deployments will hit a schema migration issue (tables have
PRIMARY KEY ("id"), now expectsPRIMARY KEY ("id", "_account_id"))
Consider making this opt-in (flag or config) rather than changing the default for all syncs.
4. Extra API call on every read() and discover()
resolveAccountId() calls stripe.accounts.retrieve() on every read() invocation. In the Temporal workflow, read() is called per activity execution (each reconciliation page). This could be resolved once at pipeline creation/setup and threaded through, rather than hitting the API repeatedly.
Summary
How to test (optional)
replace STRIPE_API_KEY_1 and STRIPE_API_KEY_2 with actual keys