diff --git a/apps/engine/src/lib/index.ts b/apps/engine/src/lib/index.ts index 32cba14ee..d42cc3814 100644 --- a/apps/engine/src/lib/index.ts +++ b/apps/engine/src/lib/index.ts @@ -28,7 +28,7 @@ export { destinationTest, destinationTestSpec } from './destination-test.js' export type { DestinationTestConfig } from './destination-test.js' export { readonlyStateStore } from './state-store.js' export type { StateStore } from './state-store.js' -export { maybeDestinationStateStore } from './select-state-store.js' +export { maybeDestinationStateStore, selectStateStore } from './select-state-store.js' export { createConnectorSchemas, connectorSchemaName, diff --git a/apps/engine/src/lib/select-state-store.ts b/apps/engine/src/lib/select-state-store.ts index e82e8b96a..3c8721647 100644 --- a/apps/engine/src/lib/select-state-store.ts +++ b/apps/engine/src/lib/select-state-store.ts @@ -6,9 +6,9 @@ import type { PipelineConfig } from '@stripe/sync-protocol' * Tries to resolve a destination-colocated state store. * * Imports `@stripe/sync-state-${destination.type}` and calls its - * `createStateStore(destConfig)`. Not all destinations support this — - * Postgres does (state table alongside synced data), Google Sheets doesn't. - * Falls back to a read-only no-op store when unavailable. + * `createStateStore(destConfig, pipelineId)`. Not all destinations support this + * convention; Postgres does (state table alongside synced data), Google Sheets + * doesn't. Falls back to a read-only store when unavailable. * * If the package exports a `setupStateStore(destConfig)` function, * it is called first to ensure the state table exists (runs migrations). @@ -20,24 +20,27 @@ import type { PipelineConfig } from '@stripe/sync-protocol' * e.g. the HTTP API (state flows in via X-State header, out via NDJSON stream) * or Temporal workflows (workflow memory is the source of truth). * Writing state to the destination DB in those cases creates unexpected tables. + * + * @param pipelineId Identifies the sync slot. Defaults to `'default'` downstream. + * Pass a unique value per pipeline (e.g. the Stripe account ID) to isolate cursor state. */ -export async function maybeDestinationStateStore( - params: PipelineConfig +export async function selectStateStore( + params: PipelineConfig, + pipelineId?: string ): Promise }> { try { const { type: destType, ...destConfig } = params.destination const pkg = await import(`@stripe/sync-state-${destType}`) if (typeof pkg.createStateStore === 'function') { - // Run migrations if the package provides a setup function if (typeof pkg.setupStateStore === 'function') { await pkg.setupStateStore(destConfig) } - return pkg.createStateStore(destConfig) as StateStore & { - close?(): Promise - } + return pkg.createStateStore(destConfig, pipelineId) } } catch { // Package not installed — fall through to readonly } return readonlyStateStore() } + +export const maybeDestinationStateStore = selectStateStore diff --git a/e2e/test-server-sync.test.ts b/e2e/test-server-sync.test.ts index 2e31875a0..bc22686ec 100644 --- a/e2e/test-server-sync.test.ts +++ b/e2e/test-server-sync.test.ts @@ -668,6 +668,37 @@ describe('test-server sync via Docker engine', () => { expect((state.streams.customers as StripeStreamState).status).toBe('complete') }, 600_000) + it('multiple keys: concurrent syncs with different API keys do not interfere', async () => { + const COUNT = 5000 + const KEYS = ['sk_test_key_alpha', 'sk_test_key_bravo', 'sk_test_key_charlie'] + + await seedCustomers(generateCustomers(COUNT, 'cus_mk_')) + + const syncs = KEYS.map(async (apiKey) => { + const destSchema = uniqueSchema(`multikey_${apiKey.slice(-5)}`) + const { state } = await runSync({ + destSchema, + sourceOverrides: { api_key: apiKey, backfill_concurrency: 3 }, + }) + return { apiKey, destSchema, state } + }) + + const results = await Promise.all(syncs) + + for (const { apiKey, destSchema, state } of results) { + const ids = await listIds(destSchema, 'customers') + expect(ids.length, `key ${apiKey}: expected ${COUNT} rows`).toBe(COUNT) + + const destIds = new Set(ids) + for (let i = 0; i < COUNT; i++) { + const expected = `cus_mk_${String(i).padStart(5, '0')}` + expect(destIds.has(expected), `key ${apiKey}: missing ${expected}`).toBe(true) + } + + expect((state.streams.customers as StripeStreamState).status).toBe('complete') + } + }, 180_000) + it('v2 stream: syncs v2_core_event_destinations via cursor pagination', async () => { const destSchema = uniqueSchema('v2sync') const STREAM = 'v2_core_event_destinations' diff --git a/packages/destination-postgres/src/index.test.ts b/packages/destination-postgres/src/index.test.ts index 9c3289a07..a4552dcc6 100644 --- a/packages/destination-postgres/src/index.test.ts +++ b/packages/destination-postgres/src/index.test.ts @@ -240,6 +240,94 @@ describe('destination default export', () => { }) }) +describe('multi-org sync (two account IDs)', () => { + const multiOrgCatalog: ConfiguredCatalog = { + streams: [ + { + stream: { + name: 'customers', + primary_key: [['id'], ['_account_id']], + metadata: {}, + }, + sync_mode: 'full_refresh', + destination_sync_mode: 'overwrite', + }, + ], + } + + beforeEach(async () => { + await drain(destination.setup!({ config: makeConfig(), catalog: multiOrgCatalog })) + }) + + it('creates table with composite primary key (id, _account_id)', async () => { + const { rows } = await pool.query( + `SELECT column_name FROM information_schema.columns + WHERE table_schema = $1 AND table_name = 'customers' + ORDER BY ordinal_position`, + [SCHEMA] + ) + const columnNames = rows.map((r) => r.column_name) + expect(columnNames).toContain('id') + expect(columnNames).toContain('_account_id') + + const { rows: pkRows } = await pool.query( + `SELECT a.attname + FROM pg_index i + JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey) + WHERE i.indrelid = $1::regclass AND i.indisprimary + ORDER BY array_position(i.indkey, a.attnum)`, + [`"${SCHEMA}"."customers"`] + ) + expect(pkRows.map((r) => r.attname)).toEqual(['id', '_account_id']) + }) + + it('stores rows from two accounts with the same object id as separate rows', async () => { + const messages = toAsyncIter([ + makeRecord('customers', { id: 'cus_1', name: 'Alice (Acct A)', _account_id: 'acct_AAA' }), + makeRecord('customers', { id: 'cus_1', name: 'Alice (Acct B)', _account_id: 'acct_BBB' }), + ]) + + await collectOutputs( + destination.write({ config: makeConfig(), catalog: multiOrgCatalog }, messages) + ) + + const { rows } = await pool.query( + `SELECT id, _account_id, _raw_data->>'name' AS name + FROM "${SCHEMA}".customers ORDER BY _account_id` + ) + expect(rows).toEqual([ + { id: 'cus_1', _account_id: 'acct_AAA', name: 'Alice (Acct A)' }, + { id: 'cus_1', _account_id: 'acct_BBB', name: 'Alice (Acct B)' }, + ]) + }) + + it('upserts per-account: same id + same account updates, different account inserts', async () => { + const batch1 = toAsyncIter([ + makeRecord('customers', { id: 'cus_1', name: 'Alice v1', _account_id: 'acct_AAA' }), + makeRecord('customers', { id: 'cus_1', name: 'Alice v1', _account_id: 'acct_BBB' }), + ]) + await collectOutputs( + destination.write({ config: makeConfig(), catalog: multiOrgCatalog }, batch1) + ) + + const batch2 = toAsyncIter([ + makeRecord('customers', { id: 'cus_1', name: 'Alice v2', _account_id: 'acct_AAA' }), + ]) + await collectOutputs( + destination.write({ config: makeConfig(), catalog: multiOrgCatalog }, batch2) + ) + + const { rows } = await pool.query( + `SELECT _account_id, _raw_data->>'name' AS name + FROM "${SCHEMA}".customers ORDER BY _account_id` + ) + expect(rows).toEqual([ + { _account_id: 'acct_AAA', name: 'Alice v2' }, + { _account_id: 'acct_BBB', name: 'Alice v1' }, + ]) + }) +}) + describe('upsertMany standalone', () => { beforeEach(async () => { await drain(destination.setup!({ config: makeConfig(), catalog })) diff --git a/packages/destination-postgres/src/index.ts b/packages/destination-postgres/src/index.ts index 8552e8c78..84cb72c91 100644 --- a/packages/destination-postgres/src/index.ts +++ b/packages/destination-postgres/src/index.ts @@ -68,7 +68,8 @@ export async function upsertMany( schema: string, table: string, // eslint-disable-next-line @typescript-eslint/no-explicit-any - entries: Record[] + entries: Record[], + keyColumns: string[] = ['id'] ): Promise { if (!entries.length) return await upsert( @@ -77,7 +78,7 @@ export async function upsertMany( { schema, table, - keyColumns: ['id'], + keyColumns, } ) } @@ -160,23 +161,12 @@ const destination = { `) await Promise.all( catalog.streams.map(async (cs) => { - if (cs.stream.json_schema) { - await pool.query( - buildCreateTableDDL(config.schema, cs.stream.name, cs.stream.json_schema, { - system_columns: cs.system_columns, - }) - ) - } else { - await pool.query(sql` - CREATE TABLE IF NOT EXISTS "${config.schema}"."${cs.stream.name}" ( - "_raw_data" jsonb NOT NULL, - "_last_synced_at" timestamptz, - "_updated_at" timestamptz NOT NULL DEFAULT now(), - "id" text GENERATED ALWAYS AS (("_raw_data"->>'id')::text) STORED, - PRIMARY KEY ("id") - ) - `) - } + await pool.query( + buildCreateTableDDL(config.schema, cs.stream.name, cs.stream.json_schema ?? {}, { + system_columns: cs.system_columns, + primary_key: cs.stream.primary_key, + }) + ) }) ) } finally { @@ -199,16 +189,28 @@ const destination = { } }, - async *write({ config, catalog: _catalog }, $stdin) { + async *write({ config, catalog }, $stdin) { const pool = withQueryLogging(createPool(await buildPoolConfig(config))) const batchSize = config.batch_size // eslint-disable-next-line @typescript-eslint/no-explicit-any const streamBuffers = new Map[]>() + const streamKeyColumns = new Map( + catalog.streams.map((cs) => [ + cs.stream.name, + cs.stream.primary_key?.map((pk) => pk[0]) ?? ['id'], + ]) + ) const flushStream = async (streamName: string) => { const buffer = streamBuffers.get(streamName) if (!buffer || buffer.length === 0) return - await upsertMany(pool, config.schema, streamName, buffer) + await upsertMany( + pool, + config.schema, + streamName, + buffer, + streamKeyColumns.get(streamName) ?? ['id'] + ) streamBuffers.set(streamName, []) } diff --git a/packages/destination-postgres/src/schemaProjection.test.ts b/packages/destination-postgres/src/schemaProjection.test.ts index 7ff07dc47..4d51b310e 100644 --- a/packages/destination-postgres/src/schemaProjection.test.ts +++ b/packages/destination-postgres/src/schemaProjection.test.ts @@ -121,6 +121,25 @@ describe('buildCreateTableWithSchema', () => { expect(stmts[0]).toContain("WHEN jsonb_typeof(_raw_data->'customer') = 'object'") }) + it('generates composite primary key with _account_id when primary_key option is set', () => { + const stmts = buildCreateTableWithSchema('stripe', 'customers', SAMPLE_JSON_SCHEMA, { + primary_key: [['id'], ['_account_id']], + }) + + // Both PK columns present as generated columns + expect(stmts[0]).toContain(`"id" text GENERATED ALWAYS AS ((_raw_data->>'id')::text) STORED`) + expect(stmts[0]).toContain( + `"_account_id" text GENERATED ALWAYS AS ((_raw_data->>'_account_id')::text) STORED` + ) + + // Composite PRIMARY KEY + expect(stmts[0]).toContain('PRIMARY KEY ("id", "_account_id")') + + // _account_id should NOT appear as a regular generated column from json_schema + const alterStmts = stmts.filter((s) => s.includes('ADD COLUMN IF NOT EXISTS')) + expect(alterStmts.every((s) => !s.includes('"_account_id"'))).toBe(true) + }) + it('produces stable output across repeated calls', () => { const first = buildCreateTableWithSchema('mydata', 'customers', SAMPLE_JSON_SCHEMA) const second = buildCreateTableWithSchema('mydata', 'customers', SAMPLE_JSON_SCHEMA) diff --git a/packages/destination-postgres/src/schemaProjection.ts b/packages/destination-postgres/src/schemaProjection.ts index 691f3aa63..cb3da2787 100644 --- a/packages/destination-postgres/src/schemaProjection.ts +++ b/packages/destination-postgres/src/schemaProjection.ts @@ -98,6 +98,8 @@ export type SystemColumn = { export type BuildTableOptions = { /** Extra system columns to add to the table (e.g. _account_id). */ system_columns?: SystemColumn[] + /** Primary key paths from the stream (e.g. [['id'], ['_account_id']]). Defaults to [['id']]. */ + primary_key?: string[][] } /** @@ -114,7 +116,10 @@ export function buildCreateTableWithSchema( const quotedSchema = quoteIdent(schema) const quotedTable = quoteIdent(tableName) - const columns = jsonSchemaToColumns(jsonSchema) + const pkFields = (options.primary_key ?? [['id']]).map((pk) => pk[0]) + const pkSet = new Set(pkFields) + + const columns = jsonSchemaToColumns(jsonSchema).filter((c) => !pkSet.has(c.name)) const generatedColumnDefs = columns.map( (col) => `${quoteIdent(col.name)} ${col.pgType} GENERATED ALWAYS AS (${col.expression}) STORED` @@ -124,14 +129,19 @@ export function buildCreateTableWithSchema( (col) => `${quoteIdent(col.name)} ${col.type}` ) + const pkColumnDefs = pkFields.map((field) => { + const escapedField = field.replace(/'/g, "''") + return `${quoteIdent(field)} text GENERATED ALWAYS AS ((_raw_data->>'${escapedField}')::text) STORED` + }) + const columnDefs = [ '"_raw_data" jsonb NOT NULL', '"_last_synced_at" timestamptz', '"_updated_at" timestamptz NOT NULL DEFAULT now()', ...systemColumnDefs, - `"id" text GENERATED ALWAYS AS ((_raw_data->>'id')::text) STORED`, + ...pkColumnDefs, ...generatedColumnDefs, - 'PRIMARY KEY ("id")', + `PRIMARY KEY (${pkFields.map((f) => quoteIdent(f)).join(', ')})`, ] const stmts: string[] = [ @@ -305,6 +315,8 @@ export type ApplySchemaFromCatalogConfig = { syncSchema?: string /** Extra system columns to add to each table. */ system_columns?: SystemColumn[] + /** Primary key paths (e.g. [['id'], ['_account_id']]). Defaults to [['id']]. */ + primary_key?: string[][] apiVersion?: string /** Progress callback — emitting logs signals liveness to the orchestrator. */ onLog?: (message: string) => void @@ -371,6 +383,7 @@ export async function applySchemaFromCatalog( await client.query( buildCreateTableDDL(dataSchema, stream.name, stream.json_schema!, { system_columns: config.system_columns, + primary_key: config.primary_key, }) ) config.onLog?.( diff --git a/packages/source-stripe/src/catalog.ts b/packages/source-stripe/src/catalog.ts index 605a2b92a..27cd833c1 100644 --- a/packages/source-stripe/src/catalog.ts +++ b/packages/source-stripe/src/catalog.ts @@ -10,7 +10,7 @@ export function catalogFromRegistry(registry: Record): C .sort(([, a], [, b]) => a.order - b.order) .map(([name, cfg]) => ({ name: cfg.tableName, - primary_key: [['id']], + primary_key: [['id'], ['_account_id']], metadata: { resource_name: name }, })) @@ -19,7 +19,8 @@ export function catalogFromRegistry(registry: Record): C /** * Derive a CatalogPayload by merging OpenAPI-parsed tables with registry metadata. - * Each stream gets json_schema from the parsed OpenAPI spec. + * Each stream gets json_schema from the parsed OpenAPI spec, with `_account_id` + * injected so downstream consumers see the full data shape. */ export function catalogFromOpenApi( tables: ParsedResourceTable[], @@ -34,12 +35,25 @@ export function catalogFromOpenApi( const table = tableMap.get(cfg.tableName) const stream: Stream = { name: cfg.tableName, - primary_key: [['id']], + primary_key: [['id'], ['_account_id']], metadata: { resource_name: name }, } + if (table) { - stream.json_schema = parsedTableToJsonSchema(table) + const jsonSchema = parsedTableToJsonSchema(table) + const properties = (jsonSchema.properties ?? {}) as Record + properties._account_id = { type: 'string' } + jsonSchema.properties = properties + + const required = Array.isArray(jsonSchema.required) ? [...jsonSchema.required] : [] + if (!required.includes('_account_id')) { + required.push('_account_id') + } + jsonSchema.required = required + + stream.json_schema = jsonSchema } + return stream }) diff --git a/packages/source-stripe/src/index.test.ts b/packages/source-stripe/src/index.test.ts index deeca737e..8b02c88d3 100644 --- a/packages/source-stripe/src/index.test.ts +++ b/packages/source-stripe/src/index.test.ts @@ -41,6 +41,13 @@ vi.mock('./resourceRegistry', async (importOriginal) => ({ buildResourceRegistry: vi.fn(), })) +vi.mock('./client', async (importOriginal) => ({ + ...(await importOriginal()), + makeClient: vi.fn(() => ({ + getAccount: vi.fn(async () => ({ id: 'acct_test_fake123' })), + })), +})) + /** Wrap a single item as an AsyncIterable for source.read()'s $stdin param. */ async function* toIter(item: T): AsyncIterable { yield item @@ -644,6 +651,7 @@ describe('StripeSource', () => { state: undefined, registry, client: mockClient, + accountId: 'acct_test', rateLimiter: async () => 0, backfillConcurrency: 2, }) @@ -1915,6 +1923,7 @@ describe('StripeSource', () => { }, registry, client: mockClient, + accountId: 'acct_test', rateLimiter, backfillConcurrency: 3, }) @@ -1969,6 +1978,7 @@ describe('StripeSource', () => { state: undefined, registry, client: mockClient, + accountId: 'acct_test', rateLimiter, backfillConcurrency: 3, }) @@ -2015,6 +2025,7 @@ describe('StripeSource', () => { state: undefined, registry, client: mockClient, + accountId: 'acct_test', rateLimiter, }) ) @@ -2054,6 +2065,7 @@ describe('StripeSource', () => { state: undefined, registry, client: mockClient, + accountId: 'acct_test', rateLimiter, }) ) @@ -2108,6 +2120,7 @@ describe('StripeSource', () => { state: undefined, registry, client: mockClient, + accountId: 'acct_test', rateLimiter, backfillConcurrency: 3, }) @@ -2191,6 +2204,7 @@ describe('StripeSource', () => { state: undefined, registry, client: {} as unknown as StripeClient, + accountId: 'acct_test', rateLimiter: rateLimiterSpy, }) ) diff --git a/packages/source-stripe/src/index.ts b/packages/source-stripe/src/index.ts index 0efdcf53b..1704fb90e 100644 --- a/packages/source-stripe/src/index.ts +++ b/packages/source-stripe/src/index.ts @@ -1,7 +1,5 @@ import type { CatalogPayload, - ConfiguredCatalog, - Message, Source, SpecOutput, CheckOutput, @@ -24,12 +22,12 @@ import { } from '@stripe/sync-openapi' import { processStripeEvent } from './process-event.js' import { processWebhookInput, createInputQueue, startWebhookServer } from './src-webhook.js' -import { listApiBackfill } from './src-list-api.js' +import { listApiBackfill, errorToTrace } from './src-list-api.js' import { pollEvents } from './src-events-api.js' import type { StripeWebSocketClient, StripeWebhookEvent } from './src-websocket.js' import { createStripeWebSocketClient } from './src-websocket.js' import type { ResourceConfig } from './types.js' -import { makeClient } from './client.js' +import { makeClient, type StripeClient } from './client.js' import type { RateLimiter } from './rate-limiter.js' import { createInMemoryRateLimiter, DEFAULT_MAX_RPS } from './rate-limiter.js' import { fetchWithProxy } from './transport.js' @@ -78,6 +76,17 @@ export type StripeStreamState = { backfill?: BackfillState } +// MARK: - Account ID resolution + +export async function resolveAccountId(config: Config, client: StripeClient): Promise { + if (config.account_id) { + return config.account_id + } + + const account = await client.getAccount() + return account.id +} + // MARK: - Source export type StripeSourceDeps = { @@ -237,6 +246,13 @@ export function createStripeSource( config.base_url ) const streamNames = new Set(catalog.streams.map((s) => s.stream.name)) + let accountId: string + try { + accountId = await resolveAccountId(config, client) + } catch (err) { + yield errorToTrace(err, catalog.streams[0]?.stream.name ?? 'unknown') + return + } // Event-driven mode: iterate over incoming webhook inputs if ($stdin) { @@ -247,10 +263,18 @@ export function createStripeSource( config, catalog, registry, - streamNames + streamNames, + accountId ) } else { - yield* processStripeEvent(input as StripeEvent, config, catalog, registry, streamNames) + yield* processStripeEvent( + input as StripeEvent, + config, + catalog, + registry, + streamNames, + accountId + ) } } return @@ -281,10 +305,11 @@ export function createStripeSource( registry, rateLimiter, client, + accountId, backfillLimit: config.backfill_limit, backfillConcurrency: config.backfill_concurrency, drainQueue: wsClient - ? () => inputQueue.drain(config, catalog, registry, streamNames) + ? () => inputQueue.drain(config, catalog, registry, streamNames, accountId) : undefined, }) @@ -297,6 +322,7 @@ export function createStripeSource( streamNames, state: state?.streams as Record | undefined, startTimestamp, + accountId, }) // Start HTTP server for live mode if configured @@ -307,16 +333,30 @@ export function createStripeSource( // After backfill: stream live events from WebSocket and/or HTTP if (wsClient || httpServer) { // Drain anything that arrived during backfill - yield* inputQueue.drain(config, catalog, registry, streamNames) + yield* inputQueue.drain(config, catalog, registry, streamNames, accountId) // Block on new events (infinite loop until all live sources close) while (wsClient || httpServer) { const queued = await inputQueue.wait() try { if ('body' in queued.data) { - yield* processWebhookInput(queued.data, config, catalog, registry, streamNames) + yield* processWebhookInput( + queued.data, + config, + catalog, + registry, + streamNames, + accountId + ) } else { - yield* processStripeEvent(queued.data, config, catalog, registry, streamNames) + yield* processStripeEvent( + queued.data, + config, + catalog, + registry, + streamNames, + accountId + ) } queued.resolve?.() } catch (err) { diff --git a/packages/source-stripe/src/process-event.ts b/packages/source-stripe/src/process-event.ts index 762bfb0a9..cf0021577 100644 --- a/packages/source-stripe/src/process-event.ts +++ b/packages/source-stripe/src/process-event.ts @@ -45,7 +45,8 @@ function isDeleteEvent(event: StripeEvent): boolean { */ export function fromStripeEvent( event: StripeEvent, - registry: Record + registry: Record, + accountId?: string ): { record: RecordMessage; state: SourceStateMessage } | null { const dataObject = event.data?.object as unknown as | { id?: string; object?: string; deleted?: boolean; [key: string]: unknown } @@ -59,7 +60,10 @@ export function fromStripeEvent( // Skip objects without an id (preview/draft objects like invoice.upcoming) if (!dataObject.id) return null - const record = toRecordMessage(config.tableName, dataObject as Record) + const data = accountId + ? { ...(dataObject as Record), _account_id: accountId } + : (dataObject as Record) + const record = toRecordMessage(config.tableName, data) const state: SourceStateMessage = stateMsg({ stream: config.tableName, data: { @@ -86,7 +90,8 @@ export async function* processStripeEvent( config: Config, catalog: ConfiguredCatalog, registry: Record, - streamNames: Set + streamNames: Set, + accountId?: string ): AsyncGenerator { // 1. Extract object const dataObject = event.data?.object as unknown as @@ -118,6 +123,7 @@ export async function* processStripeEvent( customer: summary.customer, livemode: e.livemode, lookup_key: e.lookup_key, + ...(accountId ? { _account_id: accountId } : {}), }) } yield stateMsg({ @@ -136,7 +142,11 @@ export async function* processStripeEvent( // 4. Delete events — yield record with deleted: true if (isDeleteEvent(event)) { - yield toRecordMessage(resourceConfig.tableName, { ...dataObject, deleted: true }) + yield toRecordMessage(resourceConfig.tableName, { + ...dataObject, + deleted: true, + ...(accountId ? { _account_id: accountId } : {}), + }) yield stateMsg({ stream: resourceConfig.tableName, data: { eventId: event.id, eventCreated: event.created }, @@ -155,12 +165,16 @@ export async function* processStripeEvent( } // 6. Yield main record - yield toRecordMessage(resourceConfig.tableName, data) + const recordData = accountId ? { ...data, _account_id: accountId } : data + yield toRecordMessage(resourceConfig.tableName, recordData) // 7. Yield subscription items if applicable if (objectType === 'subscriptions' && (data as { items?: { data?: unknown[] } }).items?.data) { for (const item of (data as { items: { data: Record[] } }).items.data) { - yield toRecordMessage('subscription_items', item) + yield toRecordMessage( + 'subscription_items', + accountId ? { ...item, _account_id: accountId } : item + ) } } diff --git a/packages/source-stripe/src/src-events-api.ts b/packages/source-stripe/src/src-events-api.ts index 89f52030e..741140076 100644 --- a/packages/source-stripe/src/src-events-api.ts +++ b/packages/source-stripe/src/src-events-api.ts @@ -18,8 +18,9 @@ export async function* pollEvents(opts: { streamNames: Set state: Record | undefined startTimestamp: number + accountId: string }): AsyncGenerator { - const { config, client, catalog, registry, streamNames, state, startTimestamp } = opts + const { config, client, catalog, registry, streamNames, state, startTimestamp, accountId } = opts if (!config.poll_events) return @@ -86,7 +87,14 @@ export async function* pollEvents(opts: { events.reverse() for (const event of events) { - for await (const msg of processStripeEvent(event, config, catalog, registry, streamNames)) { + for await (const msg of processStripeEvent( + event, + config, + catalog, + registry, + streamNames, + accountId + )) { if (msg.type === 'source_state' && msg.source_state.state_type !== 'global') { // Intercept state messages to preserve complete status + update events_cursor const existing = state?.[msg.source_state.stream] diff --git a/packages/source-stripe/src/src-list-api.ts b/packages/source-stripe/src/src-list-api.ts index 7f0f41e58..965335821 100644 --- a/packages/source-stripe/src/src-list-api.ts +++ b/packages/source-stripe/src/src-list-api.ts @@ -6,6 +6,23 @@ import type { RateLimiter } from './rate-limiter.js' import { StripeApiRequestError } from '@stripe/sync-openapi' import type { StripeClient } from './client.js' +export function errorToTrace(err: unknown, stream: string): TraceMessage { + const isRateLimit = err instanceof Error && err.message.includes('Rate limit') + const isAuth = err instanceof StripeApiRequestError && (err.status === 401 || err.status === 403) + return { + type: 'trace', + trace: { + trace_type: 'error', + error: { + failure_type: isRateLimit ? 'transient_error' : isAuth ? 'auth_error' : 'system_error', + message: err instanceof Error ? err.message : String(err), + stream, + ...(err instanceof Error ? { stack_trace: err.stack } : {}), + }, + }, + } +} + // Errors matching these patterns are silently skipped during backfill. // The stream is marked complete without yielding records. // NOTE: these are band-aids — the underlying issue is that the OpenAPI spec @@ -240,6 +257,7 @@ async function* paginateSegment(opts: { range: { gte: number; lt: number } numSegments: number streamName: string + accountId: string supportsLimit: boolean supportsForwardPagination: boolean backfillLimit?: number @@ -253,6 +271,7 @@ async function* paginateSegment(opts: { range, numSegments, streamName, + accountId, supportsLimit, supportsForwardPagination, backfillLimit, @@ -279,7 +298,10 @@ async function* paginateSegment(opts: { const response = await listFn(params as Parameters[0]) for (const item of response.data) { - yield toRecordMessage(streamName, item as Record) + yield toRecordMessage(streamName, { + ...(item as Record), + _account_id: accountId, + }) totalEmitted.count++ } @@ -315,12 +337,13 @@ async function* paginateSegment(opts: { async function* sequentialBackfillStream(opts: { resourceConfig: ResourceConfig streamName: string + accountId: string pageCursor: string | null backfillLimit?: number rateLimiter: RateLimiter drainQueue?: () => AsyncGenerator }): AsyncGenerator { - const { resourceConfig, streamName, backfillLimit, rateLimiter, drainQueue } = opts + const { resourceConfig, streamName, accountId, backfillLimit, rateLimiter, drainQueue } = opts let pageCursor = opts.pageCursor let hasMore = true let totalEmitted = 0 @@ -347,7 +370,10 @@ async function* sequentialBackfillStream(opts: { ) for (const item of response.data) { - yield toRecordMessage(streamName, item as Record) + yield toRecordMessage(streamName, { + ...(item as Record), + _account_id: accountId, + }) totalEmitted++ } @@ -389,6 +415,7 @@ export async function* listApiBackfill(opts: { | undefined registry: Record client: StripeClient + accountId: string rateLimiter: RateLimiter backfillLimit?: number backfillConcurrency?: number @@ -399,6 +426,7 @@ export async function* listApiBackfill(opts: { state, registry, client, + accountId, rateLimiter, backfillLimit, backfillConcurrency = DEFAULT_BACKFILL_CONCURRENCY, @@ -479,6 +507,7 @@ export async function* listApiBackfill(opts: { range, numSegments, streamName: stream.name, + accountId, supportsLimit: resourceConfig.supportsLimit !== false, supportsForwardPagination: resourceConfig.supportsForwardPagination !== false, backfillLimit: streamBackfillLimit, @@ -495,6 +524,7 @@ export async function* listApiBackfill(opts: { yield* sequentialBackfillStream({ resourceConfig, streamName: stream.name, + accountId, pageCursor, backfillLimit: streamBackfillLimit, rateLimiter, @@ -525,25 +555,7 @@ export async function* listApiBackfill(opts: { stream: stream.name, error: err instanceof Error ? err.message : String(err), }) - const isRateLimit = err instanceof Error && err.message.includes('Rate limit') - const isAuthError = - err instanceof StripeApiRequestError && (err.status === 401 || err.status === 403) - yield { - type: 'trace', - trace: { - trace_type: 'error', - error: { - failure_type: isRateLimit - ? 'transient_error' - : isAuthError - ? 'auth_error' - : 'system_error', - message: String(err), - stream: stream.name, - ...(err instanceof Error ? { stack_trace: err.stack } : {}), - }, - }, - } satisfies TraceMessage + yield errorToTrace(err, stream.name) } } } diff --git a/packages/source-stripe/src/src-webhook.ts b/packages/source-stripe/src/src-webhook.ts index 932ea665a..16dd14cd0 100644 --- a/packages/source-stripe/src/src-webhook.ts +++ b/packages/source-stripe/src/src-webhook.ts @@ -18,14 +18,15 @@ export async function* processWebhookInput( config: Config, catalog: ConfiguredCatalog, registry: Record, - streamNames: Set + streamNames: Set, + accountId?: string ): AsyncGenerator { if (!config.webhook_secret) { throw new Error('webhook_secret is required for raw webhook signature verification') } const signature = (input.headers['stripe-signature'] as string) ?? '' const event = verifyWebhookSignature(input.body, signature, config.webhook_secret) - yield* processStripeEvent(event, config, catalog, registry, streamNames) + yield* processStripeEvent(event, config, catalog, registry, streamNames, accountId) } // MARK: - LiveInput queue @@ -62,11 +63,19 @@ export function createInputQueue() { config: Config, catalog: ConfiguredCatalog, registry: Record, - streamNames: Set + streamNames: Set, + accountId?: string ): AsyncGenerator { while (queue.length > 0) { const queued = queue.shift()! - yield* processStripeEvent(queued.data as StripeEvent, config, catalog, registry, streamNames) + yield* processStripeEvent( + queued.data as StripeEvent, + config, + catalog, + registry, + streamNames, + accountId + ) } }