From 2c90119b4f2ce11d0bc86930106fa970ca2d0ddb Mon Sep 17 00:00:00 2001 From: Yostra Date: Thu, 9 Apr 2026 20:45:08 +0200 Subject: [PATCH 1/6] squash --- apps/engine/src/cli/command.ts | 107 +++++- apps/engine/src/lib/index.ts | 2 +- apps/engine/src/lib/select-state-store.ts | 19 +- .../destination-postgres/src/index.test.ts | 88 +++++ packages/destination-postgres/src/index.ts | 41 ++- .../src/schemaProjection.test.ts | 19 ++ .../src/schemaProjection.ts | 19 +- packages/source-stripe/src/catalog.ts | 42 ++- packages/source-stripe/src/index.test.ts | 9 + packages/source-stripe/src/index.ts | 315 +++++++++++------- 10 files changed, 490 insertions(+), 171 deletions(-) diff --git a/apps/engine/src/cli/command.ts b/apps/engine/src/cli/command.ts index c229fef36..89c89b289 100644 --- a/apps/engine/src/cli/command.ts +++ b/apps/engine/src/cli/command.ts @@ -3,11 +3,20 @@ import { Readable } from 'node:stream' import { defineCommand } from 'citty' import { createCliFromSpec } from '@stripe/sync-ts-cli/openapi' import { parseJsonOrFile } from '@stripe/sync-ts-cli' -import { createConnectorResolver } from '../lib/index.js' +import { + createConnectorResolver, + createEngine, + selectStateStore, + pipe, + persistState, +} from '../lib/index.js' +import { collectMessages, PipelineConfig, writeLine } from '@stripe/sync-protocol' import { createApp } from '../api/app.js' import { serveAction } from '../serve-command.js' import { supabaseCmd } from './supabase.js' import { defaultConnectors } from '../lib/default-connectors.js' +import { logger } from '../logger.js' +import { resolveAccountId, type Config as StripeSourceConfig } from '@stripe/sync-source-stripe' /** Connector discovery flags shared by all commands (serve + one-shot). */ const connectorArgs = { @@ -95,8 +104,102 @@ export async function createProgram() { }, }) + const syncMultiCmd = defineCommand({ + meta: { + name: 'sync-multi', + description: + 'Sync multiple Stripe accounts into a shared schema. Accepts a JSON config with a "pipelines" array of PipelineConfig objects.', + }, + args: { + config: { + type: 'string', + description: 'JSON file path or inline JSON: { "pipelines": [PipelineConfig, ...] }', + required: true, + }, + }, + async run({ args }) { + const raw = parseJsonOrFile(args.config) + const pipelinesRaw = (raw as { pipelines?: unknown[] }).pipelines + if (!Array.isArray(pipelinesRaw) || pipelinesRaw.length === 0) { + logger.error('Config must contain a non-empty "pipelines" array') + process.exit(1) + } + + const pipelines = pipelinesRaw.map((p, i) => { + try { + return PipelineConfig.parse(p) + } catch (err) { + logger.error({ err, index: i }, `Invalid pipeline config at index ${i}`) + process.exit(1) + } + }) + + const engine = await createEngine(resolver) + const runs: Array<{ + pipeline: PipelineConfig + stateStore: Awaited> + index: number + }> = [] + + // Setup sequentially to avoid racing on CREATE SCHEMA + for (let i = 0; i < pipelines.length; i++) { + let pipeline = pipelines[i] + const { messages: controlMessages } = await collectMessages( + engine.pipeline_setup(pipeline), + 'control' + ) + for (const message of controlMessages) { + if (message.control.control_type === 'source_config') { + const type = pipeline.source.type + pipeline = { + ...pipeline, + source: { type, [type]: message.control.source_config } as PipelineConfig['source'], + } + } else if (message.control.control_type === 'destination_config') { + const type = pipeline.destination.type + pipeline = { + ...pipeline, + destination: { + type, + [type]: message.control.destination_config, + } as PipelineConfig['destination'], + } + } + } + const accountId = await resolveAccountId(pipeline.source as unknown as StripeSourceConfig) + const stateStore = await selectStateStore(pipeline, accountId) + logger.info({ pipeline: i }, 'sync-multi: setup completed') + runs.push({ pipeline, stateStore, index: i }) + } + + // Read/write concurrently — setup is already done + await Promise.all( + runs.map(async ({ pipeline, stateStore, index: i }) => { + logger.info({ pipeline: i }, 'sync-multi: starting sync') + try { + const state = await stateStore.get() + for await (const msg of pipe( + engine.pipeline_write(pipeline, engine.pipeline_read(pipeline, { state })), + persistState(stateStore) + )) { + writeLine(msg) + } + logger.info({ pipeline: i }, 'sync-multi: pipeline completed') + } finally { + await stateStore.close?.() + } + }) + ) + }, + }) + return defineCommand({ ...specCli, - subCommands: { serve: serveCmd, supabase: supabaseCmd, ...specCli.subCommands }, + subCommands: { + serve: serveCmd, + supabase: supabaseCmd, + 'sync-multi': syncMultiCmd, + ...specCli.subCommands, + }, }) } diff --git a/apps/engine/src/lib/index.ts b/apps/engine/src/lib/index.ts index 32cba14ee..d42cc3814 100644 --- a/apps/engine/src/lib/index.ts +++ b/apps/engine/src/lib/index.ts @@ -28,7 +28,7 @@ export { destinationTest, destinationTestSpec } from './destination-test.js' export type { DestinationTestConfig } from './destination-test.js' export { readonlyStateStore } from './state-store.js' export type { StateStore } from './state-store.js' -export { maybeDestinationStateStore } from './select-state-store.js' +export { maybeDestinationStateStore, selectStateStore } from './select-state-store.js' export { createConnectorSchemas, connectorSchemaName, diff --git a/apps/engine/src/lib/select-state-store.ts b/apps/engine/src/lib/select-state-store.ts index e82e8b96a..81d5c708b 100644 --- a/apps/engine/src/lib/select-state-store.ts +++ b/apps/engine/src/lib/select-state-store.ts @@ -6,9 +6,9 @@ import type { PipelineConfig } from '@stripe/sync-protocol' * Tries to resolve a destination-colocated state store. * * Imports `@stripe/sync-state-${destination.type}` and calls its - * `createStateStore(destConfig)`. Not all destinations support this — - * Postgres does (state table alongside synced data), Google Sheets doesn't. - * Falls back to a read-only no-op store when unavailable. + * `createStateStore(destConfig, pipelineId)`. Not all destinations support this + * convention; Postgres does (state table alongside synced data), Google Sheets + * doesn't. Falls back to a read-only store when unavailable. * * If the package exports a `setupStateStore(destConfig)` function, * it is called first to ensure the state table exists (runs migrations). @@ -20,19 +20,22 @@ import type { PipelineConfig } from '@stripe/sync-protocol' * e.g. the HTTP API (state flows in via X-State header, out via NDJSON stream) * or Temporal workflows (workflow memory is the source of truth). * Writing state to the destination DB in those cases creates unexpected tables. + * + * @param pipelineId Identifies the sync slot. Defaults to `'default'` downstream. + * Pass a unique value per pipeline (e.g. the Stripe account ID) to isolate cursor state. */ -export async function maybeDestinationStateStore( - params: PipelineConfig +export async function selectStateStore( + params: PipelineConfig, + pipelineId?: string ): Promise }> { try { const { type: destType, ...destConfig } = params.destination const pkg = await import(`@stripe/sync-state-${destType}`) if (typeof pkg.createStateStore === 'function') { - // Run migrations if the package provides a setup function if (typeof pkg.setupStateStore === 'function') { await pkg.setupStateStore(destConfig) } - return pkg.createStateStore(destConfig) as StateStore & { + return pkg.createStateStore(destConfig, pipelineId) as StateStore & { close?(): Promise } } @@ -41,3 +44,5 @@ export async function maybeDestinationStateStore( } return readonlyStateStore() } + +export const maybeDestinationStateStore = selectStateStore diff --git a/packages/destination-postgres/src/index.test.ts b/packages/destination-postgres/src/index.test.ts index 9c3289a07..65c87a65d 100644 --- a/packages/destination-postgres/src/index.test.ts +++ b/packages/destination-postgres/src/index.test.ts @@ -240,6 +240,94 @@ describe('destination default export', () => { }) }) +describe('multi-org sync (two account IDs)', () => { + const multiOrgCatalog: ConfiguredCatalog = { + streams: [ + { + stream: { + name: 'customers', + primary_key: [['id'], ['_account_id']], + metadata: {}, + }, + sync_mode: 'full_refresh', + destination_sync_mode: 'overwrite', + }, + ], + } + + beforeEach(async () => { + await destination.setup({ config: makeConfig(), catalog: multiOrgCatalog }) + }) + + it('creates table with composite primary key (id, _account_id)', async () => { + const { rows } = await pool.query( + `SELECT column_name FROM information_schema.columns + WHERE table_schema = $1 AND table_name = 'customers' + ORDER BY ordinal_position`, + [SCHEMA] + ) + const columnNames = rows.map((r) => r.column_name) + expect(columnNames).toContain('id') + expect(columnNames).toContain('_account_id') + + const { rows: pkRows } = await pool.query( + `SELECT a.attname + FROM pg_index i + JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey) + WHERE i.indrelid = $1::regclass AND i.indisprimary + ORDER BY array_position(i.indkey, a.attnum)`, + [`"${SCHEMA}"."customers"`] + ) + expect(pkRows.map((r) => r.attname)).toEqual(['id', '_account_id']) + }) + + it('stores rows from two accounts with the same object id as separate rows', async () => { + const messages = toAsyncIter([ + makeRecord('customers', { id: 'cus_1', name: 'Alice (Acct A)', _account_id: 'acct_AAA' }), + makeRecord('customers', { id: 'cus_1', name: 'Alice (Acct B)', _account_id: 'acct_BBB' }), + ]) + + await collectOutputs( + destination.write({ config: makeConfig(), catalog: multiOrgCatalog }, messages) + ) + + const { rows } = await pool.query( + `SELECT id, _account_id, _raw_data->>'name' AS name + FROM "${SCHEMA}".customers ORDER BY _account_id` + ) + expect(rows).toEqual([ + { id: 'cus_1', _account_id: 'acct_AAA', name: 'Alice (Acct A)' }, + { id: 'cus_1', _account_id: 'acct_BBB', name: 'Alice (Acct B)' }, + ]) + }) + + it('upserts per-account: same id + same account updates, different account inserts', async () => { + const batch1 = toAsyncIter([ + makeRecord('customers', { id: 'cus_1', name: 'Alice v1', _account_id: 'acct_AAA' }), + makeRecord('customers', { id: 'cus_1', name: 'Alice v1', _account_id: 'acct_BBB' }), + ]) + await collectOutputs( + destination.write({ config: makeConfig(), catalog: multiOrgCatalog }, batch1) + ) + + const batch2 = toAsyncIter([ + makeRecord('customers', { id: 'cus_1', name: 'Alice v2', _account_id: 'acct_AAA' }), + ]) + await collectOutputs( + destination.write({ config: makeConfig(), catalog: multiOrgCatalog }, batch2) + ) + + const { rows } = await pool.query( + `SELECT _account_id, _raw_data->>'name' AS name + FROM "${SCHEMA}".customers ORDER BY _account_id` + ) + expect(rows).toEqual([ + { _account_id: 'acct_AAA', name: 'Alice v2' }, + { _account_id: 'acct_BBB', name: 'Alice v1' }, + ]) + }) +}) + describe('upsertMany standalone', () => { beforeEach(async () => { await drain(destination.setup!({ config: makeConfig(), catalog })) diff --git a/packages/destination-postgres/src/index.ts b/packages/destination-postgres/src/index.ts index 8552e8c78..bd3be3885 100644 --- a/packages/destination-postgres/src/index.ts +++ b/packages/destination-postgres/src/index.ts @@ -68,7 +68,8 @@ export async function upsertMany( schema: string, table: string, // eslint-disable-next-line @typescript-eslint/no-explicit-any - entries: Record[] + entries: Record[], + keyColumns: string[] = ['id'] ): Promise { if (!entries.length) return await upsert( @@ -77,7 +78,7 @@ export async function upsertMany( { schema, table, - keyColumns: ['id'], + keyColumns, } ) } @@ -160,23 +161,12 @@ const destination = { `) await Promise.all( catalog.streams.map(async (cs) => { - if (cs.stream.json_schema) { - await pool.query( - buildCreateTableDDL(config.schema, cs.stream.name, cs.stream.json_schema, { - system_columns: cs.system_columns, - }) - ) - } else { - await pool.query(sql` - CREATE TABLE IF NOT EXISTS "${config.schema}"."${cs.stream.name}" ( - "_raw_data" jsonb NOT NULL, - "_last_synced_at" timestamptz, - "_updated_at" timestamptz NOT NULL DEFAULT now(), - "id" text GENERATED ALWAYS AS (("_raw_data"->>'id')::text) STORED, - PRIMARY KEY ("id") - ) - `) - } + await pool.query( + buildCreateTableDDL(config.schema, cs.stream.name, cs.stream.json_schema ?? {}, { + system_columns: cs.system_columns, + primary_key: cs.stream.primary_key, + }) + ) }) ) } finally { @@ -199,16 +189,25 @@ const destination = { } }, - async *write({ config, catalog: _catalog }, $stdin) { + async *write({ config, catalog }, $stdin) { const pool = withQueryLogging(createPool(await buildPoolConfig(config))) const batchSize = config.batch_size // eslint-disable-next-line @typescript-eslint/no-explicit-any const streamBuffers = new Map[]>() + const streamKeyColumns = new Map() + for (const cs of catalog.streams) { + streamKeyColumns.set( + cs.stream.name, + cs.stream.primary_key.map((pk) => pk[0]) + ) + } + const flushStream = async (streamName: string) => { const buffer = streamBuffers.get(streamName) if (!buffer || buffer.length === 0) return - await upsertMany(pool, config.schema, streamName, buffer) + const keyColumns = streamKeyColumns.get(streamName) ?? ['id'] + await upsertMany(pool, config.schema, streamName, buffer, keyColumns) streamBuffers.set(streamName, []) } diff --git a/packages/destination-postgres/src/schemaProjection.test.ts b/packages/destination-postgres/src/schemaProjection.test.ts index 7ff07dc47..4d51b310e 100644 --- a/packages/destination-postgres/src/schemaProjection.test.ts +++ b/packages/destination-postgres/src/schemaProjection.test.ts @@ -121,6 +121,25 @@ describe('buildCreateTableWithSchema', () => { expect(stmts[0]).toContain("WHEN jsonb_typeof(_raw_data->'customer') = 'object'") }) + it('generates composite primary key with _account_id when primary_key option is set', () => { + const stmts = buildCreateTableWithSchema('stripe', 'customers', SAMPLE_JSON_SCHEMA, { + primary_key: [['id'], ['_account_id']], + }) + + // Both PK columns present as generated columns + expect(stmts[0]).toContain(`"id" text GENERATED ALWAYS AS ((_raw_data->>'id')::text) STORED`) + expect(stmts[0]).toContain( + `"_account_id" text GENERATED ALWAYS AS ((_raw_data->>'_account_id')::text) STORED` + ) + + // Composite PRIMARY KEY + expect(stmts[0]).toContain('PRIMARY KEY ("id", "_account_id")') + + // _account_id should NOT appear as a regular generated column from json_schema + const alterStmts = stmts.filter((s) => s.includes('ADD COLUMN IF NOT EXISTS')) + expect(alterStmts.every((s) => !s.includes('"_account_id"'))).toBe(true) + }) + it('produces stable output across repeated calls', () => { const first = buildCreateTableWithSchema('mydata', 'customers', SAMPLE_JSON_SCHEMA) const second = buildCreateTableWithSchema('mydata', 'customers', SAMPLE_JSON_SCHEMA) diff --git a/packages/destination-postgres/src/schemaProjection.ts b/packages/destination-postgres/src/schemaProjection.ts index 691f3aa63..cb3da2787 100644 --- a/packages/destination-postgres/src/schemaProjection.ts +++ b/packages/destination-postgres/src/schemaProjection.ts @@ -98,6 +98,8 @@ export type SystemColumn = { export type BuildTableOptions = { /** Extra system columns to add to the table (e.g. _account_id). */ system_columns?: SystemColumn[] + /** Primary key paths from the stream (e.g. [['id'], ['_account_id']]). Defaults to [['id']]. */ + primary_key?: string[][] } /** @@ -114,7 +116,10 @@ export function buildCreateTableWithSchema( const quotedSchema = quoteIdent(schema) const quotedTable = quoteIdent(tableName) - const columns = jsonSchemaToColumns(jsonSchema) + const pkFields = (options.primary_key ?? [['id']]).map((pk) => pk[0]) + const pkSet = new Set(pkFields) + + const columns = jsonSchemaToColumns(jsonSchema).filter((c) => !pkSet.has(c.name)) const generatedColumnDefs = columns.map( (col) => `${quoteIdent(col.name)} ${col.pgType} GENERATED ALWAYS AS (${col.expression}) STORED` @@ -124,14 +129,19 @@ export function buildCreateTableWithSchema( (col) => `${quoteIdent(col.name)} ${col.type}` ) + const pkColumnDefs = pkFields.map((field) => { + const escapedField = field.replace(/'/g, "''") + return `${quoteIdent(field)} text GENERATED ALWAYS AS ((_raw_data->>'${escapedField}')::text) STORED` + }) + const columnDefs = [ '"_raw_data" jsonb NOT NULL', '"_last_synced_at" timestamptz', '"_updated_at" timestamptz NOT NULL DEFAULT now()', ...systemColumnDefs, - `"id" text GENERATED ALWAYS AS ((_raw_data->>'id')::text) STORED`, + ...pkColumnDefs, ...generatedColumnDefs, - 'PRIMARY KEY ("id")', + `PRIMARY KEY (${pkFields.map((f) => quoteIdent(f)).join(', ')})`, ] const stmts: string[] = [ @@ -305,6 +315,8 @@ export type ApplySchemaFromCatalogConfig = { syncSchema?: string /** Extra system columns to add to each table. */ system_columns?: SystemColumn[] + /** Primary key paths (e.g. [['id'], ['_account_id']]). Defaults to [['id']]. */ + primary_key?: string[][] apiVersion?: string /** Progress callback — emitting logs signals liveness to the orchestrator. */ onLog?: (message: string) => void @@ -371,6 +383,7 @@ export async function applySchemaFromCatalog( await client.query( buildCreateTableDDL(dataSchema, stream.name, stream.json_schema!, { system_columns: config.system_columns, + primary_key: config.primary_key, }) ) config.onLog?.( diff --git a/packages/source-stripe/src/catalog.ts b/packages/source-stripe/src/catalog.ts index 605a2b92a..e950c3aea 100644 --- a/packages/source-stripe/src/catalog.ts +++ b/packages/source-stripe/src/catalog.ts @@ -1,17 +1,20 @@ import type { CatalogPayload, Stream } from '@stripe/sync-protocol' -import type { ResourceConfig } from './types.js' import type { ParsedResourceTable } from '@stripe/sync-openapi' import { parsedTableToJsonSchema } from '@stripe/sync-openapi' +import type { ResourceConfig } from './types.js' /** Derive a CatalogPayload from the existing resource registry (no json_schema). */ -export function catalogFromRegistry(registry: Record): CatalogPayload { +export function catalogFromRegistry( + registry: Record, + accountId: string +): CatalogPayload { const streams: Stream[] = Object.entries(registry) .filter(([, cfg]) => cfg.sync !== false) .sort(([, a], [, b]) => a.order - b.order) - .map(([name, cfg]) => ({ + .map(([, cfg]) => ({ name: cfg.tableName, - primary_key: [['id']], - metadata: { resource_name: name }, + primary_key: [['id'], ['_account_id']], + metadata: { account_id: accountId }, })) return { streams } @@ -19,27 +22,44 @@ export function catalogFromRegistry(registry: Record): C /** * Derive a CatalogPayload by merging OpenAPI-parsed tables with registry metadata. - * Each stream gets json_schema from the parsed OpenAPI spec. + * Each stream gets json_schema from the parsed OpenAPI spec, with `_account_id` + * injected so downstream consumers see the full data shape. */ export function catalogFromOpenApi( tables: ParsedResourceTable[], - registry: Record + registry: Record, + accountId: string ): CatalogPayload { const tableMap = new Map(tables.map((t) => [t.tableName, t])) const streams: Stream[] = Object.entries(registry) .filter(([, cfg]) => cfg.sync !== false) .sort(([, a], [, b]) => a.order - b.order) - .map(([name, cfg]) => { + .map(([, cfg]) => { const table = tableMap.get(cfg.tableName) const stream: Stream = { name: cfg.tableName, - primary_key: [['id']], - metadata: { resource_name: name }, + primary_key: [['id'], ['_account_id']], + metadata: { account_id: accountId }, } + if (table) { - stream.json_schema = parsedTableToJsonSchema(table) + const jsonSchema = parsedTableToJsonSchema(table) + const properties = (jsonSchema.properties ?? {}) as Record + properties._account_id = { type: 'string' } + jsonSchema.properties = properties + + const required = Array.isArray(jsonSchema.required) + ? [...jsonSchema.required] + : [] + if (!required.includes('_account_id')) { + required.push('_account_id') + } + jsonSchema.required = required + + stream.json_schema = jsonSchema } + return stream }) diff --git a/packages/source-stripe/src/index.test.ts b/packages/source-stripe/src/index.test.ts index deeca737e..548332305 100644 --- a/packages/source-stripe/src/index.test.ts +++ b/packages/source-stripe/src/index.test.ts @@ -41,6 +41,15 @@ vi.mock('./resourceRegistry', async (importOriginal) => ({ buildResourceRegistry: vi.fn(), })) +vi.mock('./client', async (importOriginal) => ({ + ...(await importOriginal()), + makeClient: vi.fn(() => ({ + accounts: { + retrieve: vi.fn(async () => ({ id: 'acct_test_fake123' })), + }, + })), +})) + /** Wrap a single item as an AsyncIterable for source.read()'s $stdin param. */ async function* toIter(item: T): AsyncIterable { yield item diff --git a/packages/source-stripe/src/index.ts b/packages/source-stripe/src/index.ts index 0efdcf53b..d871efd07 100644 --- a/packages/source-stripe/src/index.ts +++ b/packages/source-stripe/src/index.ts @@ -10,10 +10,8 @@ import type { TeardownOutput, } from '@stripe/sync-protocol' import { sourceControlMsg } from '@stripe/sync-protocol' -import { z } from 'zod' -import defaultSpec, { configSchema } from './spec.js' -import type { Config } from './spec.js' -import type { StripeEvent } from './spec.js' +import defaultSpec, { configSchema, stripeEventSchema } from './spec.js' +import type { Config, StripeEvent } from './spec.js' import { buildResourceRegistry } from './resourceRegistry.js' import { catalogFromRegistry, catalogFromOpenApi } from './catalog.js' import { @@ -28,17 +26,15 @@ import { listApiBackfill } from './src-list-api.js' import { pollEvents } from './src-events-api.js' import type { StripeWebSocketClient, StripeWebhookEvent } from './src-websocket.js' import { createStripeWebSocketClient } from './src-websocket.js' -import type { ResourceConfig } from './types.js' import { makeClient } from './client.js' import type { RateLimiter } from './rate-limiter.js' import { createInMemoryRateLimiter, DEFAULT_MAX_RPS } from './rate-limiter.js' import { fetchWithProxy } from './transport.js' -import { stripeEventSchema } from './spec.js' const apiFetch: typeof globalThis.fetch = (input, init) => fetchWithProxy(input as URL | string, init ?? {}) -/** In-memory cache of discover results keyed by api_version. */ +/** In-memory cache of discover results keyed by api_version + account_id. */ export const discoverCache = new Map() // MARK: - Spec @@ -78,6 +74,158 @@ export type StripeStreamState = { backfill?: BackfillState } +type LegacyAccountClient = { + accounts?: { + retrieve?: () => Promise<{ id: string }> + } +} + +async function getAccountFromClient( + client: ReturnType +): Promise<{ id: string }> { + if (typeof client.getAccount === 'function') { + return client.getAccount() + } + + const legacyClient = client as ReturnType & LegacyAccountClient + if (typeof legacyClient.accounts?.retrieve === 'function') { + return legacyClient.accounts.retrieve() + } + + throw new Error('Stripe client does not support account lookup') +} + +// MARK: - Account ID resolution + +export async function resolveAccountId(config: Config): Promise { + if (config.account_id) { + return config.account_id + } + + const client = makeClient({ + ...config, + api_version: config.api_version ?? BUNDLED_API_VERSION, + }) + const account = await getAccountFromClient(client) + return account.id +} + +// MARK: - Read implementation + +async function* readImpl( + config: Config, + catalog: ConfiguredCatalog, + state: Record | undefined, + $stdin?: AsyncIterable, + rateLimiter?: RateLimiter, + backfillConcurrency?: number +): AsyncGenerator { + const apiVersion = config.api_version ?? BUNDLED_API_VERSION + const client = makeClient({ ...config, api_version: apiVersion }) + const resolved = await resolveOpenApiSpec({ apiVersion }, apiFetch) + const registry = buildResourceRegistry( + resolved.spec, + config.api_key, + resolved.apiVersion, + config.base_url + ) + const streamNames = new Set(catalog.streams.map((s) => s.stream.name)) + + // Event-driven mode: iterate over incoming webhook inputs + if ($stdin) { + for await (const input of $stdin) { + if ('body' in (input as object)) { + yield* processWebhookInput( + input as WebhookInput, + config, + catalog, + registry, + streamNames + ) + } else { + yield* processStripeEvent( + input as StripeEvent, + config, + catalog, + registry, + streamNames + ) + } + } + return + } + + const inputQueue = createInputQueue() + + let wsClient: StripeWebSocketClient | null = null + if (config.websocket) { + wsClient = await createStripeWebSocketClient({ + stripeApiKey: config.api_key, + onEvent: (wsEvent: StripeWebhookEvent) => { + const event = stripeEventSchema.parse(JSON.parse(wsEvent.event_payload)) + inputQueue.push({ data: event }) + }, + }) + } + + let httpServer: ReturnType | null = null + + try { + const startTimestamp = Math.floor(Date.now() / 1000) + + // Backfill: paginate through each configured stream + yield* listApiBackfill({ + catalog, + state, + registry, + client, + rateLimiter: rateLimiter ?? (() => Promise.resolve(0)), + backfillLimit: config.backfill_limit, + backfillConcurrency, + drainQueue: wsClient + ? () => inputQueue.drain(config, catalog, registry, streamNames) + : undefined, + }) + + // Events polling: incremental sync via /v1/events after backfill + yield* pollEvents({ config, client, catalog, registry, streamNames, state, startTimestamp }) + + // Start HTTP server for live mode if configured + if (config.webhook_port) { + httpServer = startWebhookServer(config.webhook_port, inputQueue.push) + } + + // After backfill: stream live events from WebSocket and/or HTTP + if (wsClient || httpServer) { + // Drain anything that arrived during backfill + yield* inputQueue.drain(config, catalog, registry, streamNames) + + // Block on new events (infinite loop until all live sources close) + while (wsClient || httpServer) { + const queued = await inputQueue.wait() + try { + if ('body' in queued.data) { + yield* processWebhookInput(queued.data, config, catalog, registry, streamNames) + } else { + yield* processStripeEvent(queued.data, config, catalog, registry, streamNames) + } + queued.resolve?.() + } catch (err) { + queued.reject?.(err instanceof Error ? err : new Error(String(err))) + } + } + } + } finally { + if (wsClient) { + wsClient.close() + wsClient = null + } + if (httpServer) { + httpServer.close() + httpServer = null + } + } +} // MARK: - Source export type StripeSourceDeps = { @@ -100,7 +248,7 @@ export function createStripeSource( ...config, api_version: config.api_version ?? BUNDLED_API_VERSION, }) - await client.getAccount() + await getAccountFromClient(client) yield { type: 'connection_status' as const, connection_status: { status: 'succeeded' as const }, @@ -113,17 +261,15 @@ export function createStripeSource( } }, - // For the default api_version (bundled), discover is CPU-only — no HTTP. - // resolveOpenApiSpec serves the bundled spec from the filesystem, so the - // cost is SpecParser.parse + catalogFromOpenApi (pure computation). We - // cache the result in-memory keyed by api_version so that pipeline_sync - // (which calls discover twice — once in pipeline_read, once in - // pipeline_write) doesn't repeat the work. - // TODO: Custom objects (not yet supported) would require a more specific cache - // since they aren't discoverable from the OpenAPI spec alone. + // For the default api_version (bundled), discover is CPU-only aside from + // resolving the Stripe account ID needed for multi-account metadata. + // Cache by api_version + account_id so repeated discover calls during the + // same sync don't re-parse the OpenAPI spec. async *discover({ config }): AsyncGenerator { const apiVersion = config.api_version ?? BUNDLED_API_VERSION - const cached = discoverCache.get(apiVersion) + const accountId = await resolveAccountId(config) + const cacheKey = `${apiVersion}:${accountId}` + const cached = discoverCache.get(cacheKey) if (cached) { yield { type: 'catalog' as const, catalog: cached } return @@ -142,15 +288,15 @@ export function createStripeSource( const parsed = parser.parse(resolved.spec, { resourceAliases: OPENAPI_RESOURCE_TABLE_ALIASES, }) - catalog = catalogFromOpenApi(parsed.tables, registry) + catalog = catalogFromOpenApi(parsed.tables, registry, accountId) } catch { - catalog = catalogFromRegistry(registry) + catalog = catalogFromRegistry(registry, accountId) } - discoverCache.set(apiVersion, catalog) + discoverCache.set(cacheKey, catalog) yield { type: 'catalog' as const, catalog } }, - async *setup({ config, catalog }): AsyncGenerator { + async *setup({ config }): AsyncGenerator { const updates: Partial = {} const client = makeClient({ ...config, @@ -159,7 +305,7 @@ export function createStripeSource( // Resolve account_id if not already set if (!config.account_id) { - const account = await client.getAccount() + const account = await getAccountFromClient(client) updates.account_id = account.id } @@ -225,119 +371,36 @@ export function createStripeSource( }, async *read({ config, catalog, state }, $stdin?) { - const apiVersion = config.api_version ?? BUNDLED_API_VERSION const rateLimiter = externalRateLimiter ?? createInMemoryRateLimiter(config.rate_limit ?? DEFAULT_MAX_RPS) - const client = makeClient({ ...config, api_version: apiVersion }) - const resolved = await resolveOpenApiSpec({ apiVersion }, apiFetch) - const registry = buildResourceRegistry( - resolved.spec, - config.api_key, - resolved.apiVersion, - config.base_url + const accountId = await resolveAccountId(config) + const inner = readImpl( + config, + catalog, + state?.streams as Record | undefined, + $stdin, + rateLimiter, + config.backfill_concurrency ) - const streamNames = new Set(catalog.streams.map((s) => s.stream.name)) - - // Event-driven mode: iterate over incoming webhook inputs - if ($stdin) { - for await (const input of $stdin) { - if ('body' in (input as object)) { - yield* processWebhookInput( - input as WebhookInput, - config, - catalog, - registry, - streamNames - ) - } else { - yield* processStripeEvent(input as StripeEvent, config, catalog, registry, streamNames) - } - } - return - } - - const inputQueue = createInputQueue() - - let wsClient: StripeWebSocketClient | null = null - if (config.websocket) { - wsClient = await createStripeWebSocketClient({ - stripeApiKey: config.api_key, - onEvent: (wsEvent: StripeWebhookEvent) => { - const event = stripeEventSchema.parse(JSON.parse(wsEvent.event_payload)) - inputQueue.push({ data: event }) - }, - }) - } - - let httpServer: ReturnType | null = null - - try { - const startTimestamp = Math.floor(Date.now() / 1000) - - // Backfill: paginate through each configured stream - yield* listApiBackfill({ - catalog, - state: state?.streams as Parameters[0]['state'], - registry, - rateLimiter, - client, - backfillLimit: config.backfill_limit, - backfillConcurrency: config.backfill_concurrency, - drainQueue: wsClient - ? () => inputQueue.drain(config, catalog, registry, streamNames) - : undefined, - }) - - // Events polling: incremental sync via /v1/events after backfill - yield* pollEvents({ - config, - client, - catalog, - registry, - streamNames, - state: state?.streams as Record | undefined, - startTimestamp, - }) - - // Start HTTP server for live mode if configured - if (config.webhook_port) { - httpServer = startWebhookServer(config.webhook_port, inputQueue.push) - } - - // After backfill: stream live events from WebSocket and/or HTTP - if (wsClient || httpServer) { - // Drain anything that arrived during backfill - yield* inputQueue.drain(config, catalog, registry, streamNames) - - // Block on new events (infinite loop until all live sources close) - while (wsClient || httpServer) { - const queued = await inputQueue.wait() - try { - if ('body' in queued.data) { - yield* processWebhookInput(queued.data, config, catalog, registry, streamNames) - } else { - yield* processStripeEvent(queued.data, config, catalog, registry, streamNames) - } - queued.resolve?.() - } catch (err) { - queued.reject?.(err instanceof Error ? err : new Error(String(err))) - } + for await (const msg of inner) { + if (msg.type === 'record') { + yield { + ...msg, + record: { + ...msg.record, + data: { + ...msg.record.data, + _account_id: accountId, + }, + }, } - } - } finally { - if (wsClient) { - wsClient.close() - wsClient = null - } - if (httpServer) { - httpServer.close() - httpServer = null + } else { + yield msg } } }, } } - export default createStripeSource() // MARK: - Re-exports From a6d3fa40160924d92e4db781efb524b32568e0c6 Mon Sep 17 00:00:00 2001 From: Yostra Date: Fri, 10 Apr 2026 00:28:16 +0200 Subject: [PATCH 2/6] rebase clean --- apps/engine/src/cli/command.ts | 107 +----- apps/engine/src/lib/select-state-store.ts | 4 +- .../destination-postgres/src/index.test.ts | 2 +- packages/source-stripe/src/catalog.ts | 18 +- packages/source-stripe/src/index.test.ts | 11 +- packages/source-stripe/src/index.ts | 329 ++++++++---------- packages/source-stripe/src/process-event.ts | 26 +- packages/source-stripe/src/src-events-api.ts | 12 +- packages/source-stripe/src/src-list-api.ts | 19 +- packages/source-stripe/src/src-webhook.ts | 17 +- 10 files changed, 229 insertions(+), 316 deletions(-) diff --git a/apps/engine/src/cli/command.ts b/apps/engine/src/cli/command.ts index 89c89b289..c229fef36 100644 --- a/apps/engine/src/cli/command.ts +++ b/apps/engine/src/cli/command.ts @@ -3,20 +3,11 @@ import { Readable } from 'node:stream' import { defineCommand } from 'citty' import { createCliFromSpec } from '@stripe/sync-ts-cli/openapi' import { parseJsonOrFile } from '@stripe/sync-ts-cli' -import { - createConnectorResolver, - createEngine, - selectStateStore, - pipe, - persistState, -} from '../lib/index.js' -import { collectMessages, PipelineConfig, writeLine } from '@stripe/sync-protocol' +import { createConnectorResolver } from '../lib/index.js' import { createApp } from '../api/app.js' import { serveAction } from '../serve-command.js' import { supabaseCmd } from './supabase.js' import { defaultConnectors } from '../lib/default-connectors.js' -import { logger } from '../logger.js' -import { resolveAccountId, type Config as StripeSourceConfig } from '@stripe/sync-source-stripe' /** Connector discovery flags shared by all commands (serve + one-shot). */ const connectorArgs = { @@ -104,102 +95,8 @@ export async function createProgram() { }, }) - const syncMultiCmd = defineCommand({ - meta: { - name: 'sync-multi', - description: - 'Sync multiple Stripe accounts into a shared schema. Accepts a JSON config with a "pipelines" array of PipelineConfig objects.', - }, - args: { - config: { - type: 'string', - description: 'JSON file path or inline JSON: { "pipelines": [PipelineConfig, ...] }', - required: true, - }, - }, - async run({ args }) { - const raw = parseJsonOrFile(args.config) - const pipelinesRaw = (raw as { pipelines?: unknown[] }).pipelines - if (!Array.isArray(pipelinesRaw) || pipelinesRaw.length === 0) { - logger.error('Config must contain a non-empty "pipelines" array') - process.exit(1) - } - - const pipelines = pipelinesRaw.map((p, i) => { - try { - return PipelineConfig.parse(p) - } catch (err) { - logger.error({ err, index: i }, `Invalid pipeline config at index ${i}`) - process.exit(1) - } - }) - - const engine = await createEngine(resolver) - const runs: Array<{ - pipeline: PipelineConfig - stateStore: Awaited> - index: number - }> = [] - - // Setup sequentially to avoid racing on CREATE SCHEMA - for (let i = 0; i < pipelines.length; i++) { - let pipeline = pipelines[i] - const { messages: controlMessages } = await collectMessages( - engine.pipeline_setup(pipeline), - 'control' - ) - for (const message of controlMessages) { - if (message.control.control_type === 'source_config') { - const type = pipeline.source.type - pipeline = { - ...pipeline, - source: { type, [type]: message.control.source_config } as PipelineConfig['source'], - } - } else if (message.control.control_type === 'destination_config') { - const type = pipeline.destination.type - pipeline = { - ...pipeline, - destination: { - type, - [type]: message.control.destination_config, - } as PipelineConfig['destination'], - } - } - } - const accountId = await resolveAccountId(pipeline.source as unknown as StripeSourceConfig) - const stateStore = await selectStateStore(pipeline, accountId) - logger.info({ pipeline: i }, 'sync-multi: setup completed') - runs.push({ pipeline, stateStore, index: i }) - } - - // Read/write concurrently — setup is already done - await Promise.all( - runs.map(async ({ pipeline, stateStore, index: i }) => { - logger.info({ pipeline: i }, 'sync-multi: starting sync') - try { - const state = await stateStore.get() - for await (const msg of pipe( - engine.pipeline_write(pipeline, engine.pipeline_read(pipeline, { state })), - persistState(stateStore) - )) { - writeLine(msg) - } - logger.info({ pipeline: i }, 'sync-multi: pipeline completed') - } finally { - await stateStore.close?.() - } - }) - ) - }, - }) - return defineCommand({ ...specCli, - subCommands: { - serve: serveCmd, - supabase: supabaseCmd, - 'sync-multi': syncMultiCmd, - ...specCli.subCommands, - }, + subCommands: { serve: serveCmd, supabase: supabaseCmd, ...specCli.subCommands }, }) } diff --git a/apps/engine/src/lib/select-state-store.ts b/apps/engine/src/lib/select-state-store.ts index 81d5c708b..3c8721647 100644 --- a/apps/engine/src/lib/select-state-store.ts +++ b/apps/engine/src/lib/select-state-store.ts @@ -35,9 +35,7 @@ export async function selectStateStore( if (typeof pkg.setupStateStore === 'function') { await pkg.setupStateStore(destConfig) } - return pkg.createStateStore(destConfig, pipelineId) as StateStore & { - close?(): Promise - } + return pkg.createStateStore(destConfig, pipelineId) } } catch { // Package not installed — fall through to readonly diff --git a/packages/destination-postgres/src/index.test.ts b/packages/destination-postgres/src/index.test.ts index 65c87a65d..a4552dcc6 100644 --- a/packages/destination-postgres/src/index.test.ts +++ b/packages/destination-postgres/src/index.test.ts @@ -256,7 +256,7 @@ describe('multi-org sync (two account IDs)', () => { } beforeEach(async () => { - await destination.setup({ config: makeConfig(), catalog: multiOrgCatalog }) + await drain(destination.setup!({ config: makeConfig(), catalog: multiOrgCatalog })) }) it('creates table with composite primary key (id, _account_id)', async () => { diff --git a/packages/source-stripe/src/catalog.ts b/packages/source-stripe/src/catalog.ts index e950c3aea..a5bd6e3cd 100644 --- a/packages/source-stripe/src/catalog.ts +++ b/packages/source-stripe/src/catalog.ts @@ -1,20 +1,17 @@ import type { CatalogPayload, Stream } from '@stripe/sync-protocol' +import type { ResourceConfig } from './types.js' import type { ParsedResourceTable } from '@stripe/sync-openapi' import { parsedTableToJsonSchema } from '@stripe/sync-openapi' -import type { ResourceConfig } from './types.js' /** Derive a CatalogPayload from the existing resource registry (no json_schema). */ -export function catalogFromRegistry( - registry: Record, - accountId: string -): CatalogPayload { +export function catalogFromRegistry(registry: Record): CatalogPayload { const streams: Stream[] = Object.entries(registry) .filter(([, cfg]) => cfg.sync !== false) .sort(([, a], [, b]) => a.order - b.order) .map(([, cfg]) => ({ name: cfg.tableName, primary_key: [['id'], ['_account_id']], - metadata: { account_id: accountId }, + metadata: {}, })) return { streams } @@ -27,8 +24,7 @@ export function catalogFromRegistry( */ export function catalogFromOpenApi( tables: ParsedResourceTable[], - registry: Record, - accountId: string + registry: Record ): CatalogPayload { const tableMap = new Map(tables.map((t) => [t.tableName, t])) @@ -40,7 +36,7 @@ export function catalogFromOpenApi( const stream: Stream = { name: cfg.tableName, primary_key: [['id'], ['_account_id']], - metadata: { account_id: accountId }, + metadata: {}, } if (table) { @@ -49,9 +45,7 @@ export function catalogFromOpenApi( properties._account_id = { type: 'string' } jsonSchema.properties = properties - const required = Array.isArray(jsonSchema.required) - ? [...jsonSchema.required] - : [] + const required = Array.isArray(jsonSchema.required) ? [...jsonSchema.required] : [] if (!required.includes('_account_id')) { required.push('_account_id') } diff --git a/packages/source-stripe/src/index.test.ts b/packages/source-stripe/src/index.test.ts index 548332305..8b02c88d3 100644 --- a/packages/source-stripe/src/index.test.ts +++ b/packages/source-stripe/src/index.test.ts @@ -44,9 +44,7 @@ vi.mock('./resourceRegistry', async (importOriginal) => ({ vi.mock('./client', async (importOriginal) => ({ ...(await importOriginal()), makeClient: vi.fn(() => ({ - accounts: { - retrieve: vi.fn(async () => ({ id: 'acct_test_fake123' })), - }, + getAccount: vi.fn(async () => ({ id: 'acct_test_fake123' })), })), })) @@ -653,6 +651,7 @@ describe('StripeSource', () => { state: undefined, registry, client: mockClient, + accountId: 'acct_test', rateLimiter: async () => 0, backfillConcurrency: 2, }) @@ -1924,6 +1923,7 @@ describe('StripeSource', () => { }, registry, client: mockClient, + accountId: 'acct_test', rateLimiter, backfillConcurrency: 3, }) @@ -1978,6 +1978,7 @@ describe('StripeSource', () => { state: undefined, registry, client: mockClient, + accountId: 'acct_test', rateLimiter, backfillConcurrency: 3, }) @@ -2024,6 +2025,7 @@ describe('StripeSource', () => { state: undefined, registry, client: mockClient, + accountId: 'acct_test', rateLimiter, }) ) @@ -2063,6 +2065,7 @@ describe('StripeSource', () => { state: undefined, registry, client: mockClient, + accountId: 'acct_test', rateLimiter, }) ) @@ -2117,6 +2120,7 @@ describe('StripeSource', () => { state: undefined, registry, client: mockClient, + accountId: 'acct_test', rateLimiter, backfillConcurrency: 3, }) @@ -2200,6 +2204,7 @@ describe('StripeSource', () => { state: undefined, registry, client: {} as unknown as StripeClient, + accountId: 'acct_test', rateLimiter: rateLimiterSpy, }) ) diff --git a/packages/source-stripe/src/index.ts b/packages/source-stripe/src/index.ts index d871efd07..4fe05cdb9 100644 --- a/packages/source-stripe/src/index.ts +++ b/packages/source-stripe/src/index.ts @@ -1,7 +1,5 @@ import type { CatalogPayload, - ConfiguredCatalog, - Message, Source, SpecOutput, CheckOutput, @@ -10,8 +8,10 @@ import type { TeardownOutput, } from '@stripe/sync-protocol' import { sourceControlMsg } from '@stripe/sync-protocol' -import defaultSpec, { configSchema, stripeEventSchema } from './spec.js' -import type { Config, StripeEvent } from './spec.js' +import { z } from 'zod' +import defaultSpec, { configSchema } from './spec.js' +import type { Config } from './spec.js' +import type { StripeEvent } from './spec.js' import { buildResourceRegistry } from './resourceRegistry.js' import { catalogFromRegistry, catalogFromOpenApi } from './catalog.js' import { @@ -26,15 +26,17 @@ import { listApiBackfill } from './src-list-api.js' import { pollEvents } from './src-events-api.js' import type { StripeWebSocketClient, StripeWebhookEvent } from './src-websocket.js' import { createStripeWebSocketClient } from './src-websocket.js' +import type { ResourceConfig } from './types.js' import { makeClient } from './client.js' import type { RateLimiter } from './rate-limiter.js' import { createInMemoryRateLimiter, DEFAULT_MAX_RPS } from './rate-limiter.js' import { fetchWithProxy } from './transport.js' +import { stripeEventSchema } from './spec.js' const apiFetch: typeof globalThis.fetch = (input, init) => fetchWithProxy(input as URL | string, init ?? {}) -/** In-memory cache of discover results keyed by api_version + account_id. */ +/** In-memory cache of discover results keyed by api_version. */ export const discoverCache = new Map() // MARK: - Spec @@ -74,27 +76,6 @@ export type StripeStreamState = { backfill?: BackfillState } -type LegacyAccountClient = { - accounts?: { - retrieve?: () => Promise<{ id: string }> - } -} - -async function getAccountFromClient( - client: ReturnType -): Promise<{ id: string }> { - if (typeof client.getAccount === 'function') { - return client.getAccount() - } - - const legacyClient = client as ReturnType & LegacyAccountClient - if (typeof legacyClient.accounts?.retrieve === 'function') { - return legacyClient.accounts.retrieve() - } - - throw new Error('Stripe client does not support account lookup') -} - // MARK: - Account ID resolution export async function resolveAccountId(config: Config): Promise { @@ -106,126 +87,10 @@ export async function resolveAccountId(config: Config): Promise { ...config, api_version: config.api_version ?? BUNDLED_API_VERSION, }) - const account = await getAccountFromClient(client) + const account = await client.getAccount() return account.id } -// MARK: - Read implementation - -async function* readImpl( - config: Config, - catalog: ConfiguredCatalog, - state: Record | undefined, - $stdin?: AsyncIterable, - rateLimiter?: RateLimiter, - backfillConcurrency?: number -): AsyncGenerator { - const apiVersion = config.api_version ?? BUNDLED_API_VERSION - const client = makeClient({ ...config, api_version: apiVersion }) - const resolved = await resolveOpenApiSpec({ apiVersion }, apiFetch) - const registry = buildResourceRegistry( - resolved.spec, - config.api_key, - resolved.apiVersion, - config.base_url - ) - const streamNames = new Set(catalog.streams.map((s) => s.stream.name)) - - // Event-driven mode: iterate over incoming webhook inputs - if ($stdin) { - for await (const input of $stdin) { - if ('body' in (input as object)) { - yield* processWebhookInput( - input as WebhookInput, - config, - catalog, - registry, - streamNames - ) - } else { - yield* processStripeEvent( - input as StripeEvent, - config, - catalog, - registry, - streamNames - ) - } - } - return - } - - const inputQueue = createInputQueue() - - let wsClient: StripeWebSocketClient | null = null - if (config.websocket) { - wsClient = await createStripeWebSocketClient({ - stripeApiKey: config.api_key, - onEvent: (wsEvent: StripeWebhookEvent) => { - const event = stripeEventSchema.parse(JSON.parse(wsEvent.event_payload)) - inputQueue.push({ data: event }) - }, - }) - } - - let httpServer: ReturnType | null = null - - try { - const startTimestamp = Math.floor(Date.now() / 1000) - - // Backfill: paginate through each configured stream - yield* listApiBackfill({ - catalog, - state, - registry, - client, - rateLimiter: rateLimiter ?? (() => Promise.resolve(0)), - backfillLimit: config.backfill_limit, - backfillConcurrency, - drainQueue: wsClient - ? () => inputQueue.drain(config, catalog, registry, streamNames) - : undefined, - }) - - // Events polling: incremental sync via /v1/events after backfill - yield* pollEvents({ config, client, catalog, registry, streamNames, state, startTimestamp }) - - // Start HTTP server for live mode if configured - if (config.webhook_port) { - httpServer = startWebhookServer(config.webhook_port, inputQueue.push) - } - - // After backfill: stream live events from WebSocket and/or HTTP - if (wsClient || httpServer) { - // Drain anything that arrived during backfill - yield* inputQueue.drain(config, catalog, registry, streamNames) - - // Block on new events (infinite loop until all live sources close) - while (wsClient || httpServer) { - const queued = await inputQueue.wait() - try { - if ('body' in queued.data) { - yield* processWebhookInput(queued.data, config, catalog, registry, streamNames) - } else { - yield* processStripeEvent(queued.data, config, catalog, registry, streamNames) - } - queued.resolve?.() - } catch (err) { - queued.reject?.(err instanceof Error ? err : new Error(String(err))) - } - } - } - } finally { - if (wsClient) { - wsClient.close() - wsClient = null - } - if (httpServer) { - httpServer.close() - httpServer = null - } - } -} // MARK: - Source export type StripeSourceDeps = { @@ -248,7 +113,7 @@ export function createStripeSource( ...config, api_version: config.api_version ?? BUNDLED_API_VERSION, }) - await getAccountFromClient(client) + await client.getAccount() yield { type: 'connection_status' as const, connection_status: { status: 'succeeded' as const }, @@ -261,15 +126,17 @@ export function createStripeSource( } }, - // For the default api_version (bundled), discover is CPU-only aside from - // resolving the Stripe account ID needed for multi-account metadata. - // Cache by api_version + account_id so repeated discover calls during the - // same sync don't re-parse the OpenAPI spec. + // For the default api_version (bundled), discover is CPU-only — no HTTP. + // resolveOpenApiSpec serves the bundled spec from the filesystem, so the + // cost is SpecParser.parse + catalogFromOpenApi (pure computation). We + // cache the result in-memory keyed by api_version so that pipeline_sync + // (which calls discover twice — once in pipeline_read, once in + // pipeline_write) doesn't repeat the work. + // TODO: Custom objects (not yet supported) would require a more specific cache + // since they aren't discoverable from the OpenAPI spec alone. async *discover({ config }): AsyncGenerator { const apiVersion = config.api_version ?? BUNDLED_API_VERSION - const accountId = await resolveAccountId(config) - const cacheKey = `${apiVersion}:${accountId}` - const cached = discoverCache.get(cacheKey) + const cached = discoverCache.get(apiVersion) if (cached) { yield { type: 'catalog' as const, catalog: cached } return @@ -288,15 +155,15 @@ export function createStripeSource( const parsed = parser.parse(resolved.spec, { resourceAliases: OPENAPI_RESOURCE_TABLE_ALIASES, }) - catalog = catalogFromOpenApi(parsed.tables, registry, accountId) + catalog = catalogFromOpenApi(parsed.tables, registry) } catch { - catalog = catalogFromRegistry(registry, accountId) + catalog = catalogFromRegistry(registry) } - discoverCache.set(cacheKey, catalog) + discoverCache.set(apiVersion, catalog) yield { type: 'catalog' as const, catalog } }, - async *setup({ config }): AsyncGenerator { + async *setup({ config, catalog }): AsyncGenerator { const updates: Partial = {} const client = makeClient({ ...config, @@ -305,7 +172,7 @@ export function createStripeSource( // Resolve account_id if not already set if (!config.account_id) { - const account = await getAccountFromClient(client) + const account = await client.getAccount() updates.account_id = account.id } @@ -371,36 +238,144 @@ export function createStripeSource( }, async *read({ config, catalog, state }, $stdin?) { + const apiVersion = config.api_version ?? BUNDLED_API_VERSION const rateLimiter = externalRateLimiter ?? createInMemoryRateLimiter(config.rate_limit ?? DEFAULT_MAX_RPS) - const accountId = await resolveAccountId(config) - const inner = readImpl( - config, - catalog, - state?.streams as Record | undefined, - $stdin, - rateLimiter, - config.backfill_concurrency + const client = makeClient({ ...config, api_version: apiVersion }) + const resolved = await resolveOpenApiSpec({ apiVersion }, apiFetch) + const registry = buildResourceRegistry( + resolved.spec, + config.api_key, + resolved.apiVersion, + config.base_url ) - for await (const msg of inner) { - if (msg.type === 'record') { - yield { - ...msg, - record: { - ...msg.record, - data: { - ...msg.record.data, - _account_id: accountId, - }, - }, + const streamNames = new Set(catalog.streams.map((s) => s.stream.name)) + const accountId = await resolveAccountId(config) + + // Event-driven mode: iterate over incoming webhook inputs + if ($stdin) { + for await (const input of $stdin) { + if ('body' in (input as object)) { + yield* processWebhookInput( + input as WebhookInput, + config, + catalog, + registry, + streamNames, + accountId + ) + } else { + yield* processStripeEvent( + input as StripeEvent, + config, + catalog, + registry, + streamNames, + accountId + ) } - } else { - yield msg + } + return + } + + const inputQueue = createInputQueue() + + let wsClient: StripeWebSocketClient | null = null + if (config.websocket) { + wsClient = await createStripeWebSocketClient({ + stripeApiKey: config.api_key, + onEvent: (wsEvent: StripeWebhookEvent) => { + const event = stripeEventSchema.parse(JSON.parse(wsEvent.event_payload)) + inputQueue.push({ data: event }) + }, + }) + } + + let httpServer: ReturnType | null = null + + try { + const startTimestamp = Math.floor(Date.now() / 1000) + + // Backfill: paginate through each configured stream + yield* listApiBackfill({ + catalog, + state: state?.streams as Parameters[0]['state'], + registry, + rateLimiter, + client, + accountId, + backfillLimit: config.backfill_limit, + backfillConcurrency: config.backfill_concurrency, + drainQueue: wsClient + ? () => inputQueue.drain(config, catalog, registry, streamNames, accountId) + : undefined, + }) + + // Events polling: incremental sync via /v1/events after backfill + yield* pollEvents({ + config, + client, + catalog, + registry, + streamNames, + state: state?.streams as Record | undefined, + startTimestamp, + accountId, + }) + + // Start HTTP server for live mode if configured + if (config.webhook_port) { + httpServer = startWebhookServer(config.webhook_port, inputQueue.push) + } + + // After backfill: stream live events from WebSocket and/or HTTP + if (wsClient || httpServer) { + // Drain anything that arrived during backfill + yield* inputQueue.drain(config, catalog, registry, streamNames, accountId) + + // Block on new events (infinite loop until all live sources close) + while (wsClient || httpServer) { + const queued = await inputQueue.wait() + try { + if ('body' in queued.data) { + yield* processWebhookInput( + queued.data, + config, + catalog, + registry, + streamNames, + accountId + ) + } else { + yield* processStripeEvent( + queued.data, + config, + catalog, + registry, + streamNames, + accountId + ) + } + queued.resolve?.() + } catch (err) { + queued.reject?.(err instanceof Error ? err : new Error(String(err))) + } + } + } + } finally { + if (wsClient) { + wsClient.close() + wsClient = null + } + if (httpServer) { + httpServer.close() + httpServer = null } } }, } } + export default createStripeSource() // MARK: - Re-exports diff --git a/packages/source-stripe/src/process-event.ts b/packages/source-stripe/src/process-event.ts index 762bfb0a9..cf0021577 100644 --- a/packages/source-stripe/src/process-event.ts +++ b/packages/source-stripe/src/process-event.ts @@ -45,7 +45,8 @@ function isDeleteEvent(event: StripeEvent): boolean { */ export function fromStripeEvent( event: StripeEvent, - registry: Record + registry: Record, + accountId?: string ): { record: RecordMessage; state: SourceStateMessage } | null { const dataObject = event.data?.object as unknown as | { id?: string; object?: string; deleted?: boolean; [key: string]: unknown } @@ -59,7 +60,10 @@ export function fromStripeEvent( // Skip objects without an id (preview/draft objects like invoice.upcoming) if (!dataObject.id) return null - const record = toRecordMessage(config.tableName, dataObject as Record) + const data = accountId + ? { ...(dataObject as Record), _account_id: accountId } + : (dataObject as Record) + const record = toRecordMessage(config.tableName, data) const state: SourceStateMessage = stateMsg({ stream: config.tableName, data: { @@ -86,7 +90,8 @@ export async function* processStripeEvent( config: Config, catalog: ConfiguredCatalog, registry: Record, - streamNames: Set + streamNames: Set, + accountId?: string ): AsyncGenerator { // 1. Extract object const dataObject = event.data?.object as unknown as @@ -118,6 +123,7 @@ export async function* processStripeEvent( customer: summary.customer, livemode: e.livemode, lookup_key: e.lookup_key, + ...(accountId ? { _account_id: accountId } : {}), }) } yield stateMsg({ @@ -136,7 +142,11 @@ export async function* processStripeEvent( // 4. Delete events — yield record with deleted: true if (isDeleteEvent(event)) { - yield toRecordMessage(resourceConfig.tableName, { ...dataObject, deleted: true }) + yield toRecordMessage(resourceConfig.tableName, { + ...dataObject, + deleted: true, + ...(accountId ? { _account_id: accountId } : {}), + }) yield stateMsg({ stream: resourceConfig.tableName, data: { eventId: event.id, eventCreated: event.created }, @@ -155,12 +165,16 @@ export async function* processStripeEvent( } // 6. Yield main record - yield toRecordMessage(resourceConfig.tableName, data) + const recordData = accountId ? { ...data, _account_id: accountId } : data + yield toRecordMessage(resourceConfig.tableName, recordData) // 7. Yield subscription items if applicable if (objectType === 'subscriptions' && (data as { items?: { data?: unknown[] } }).items?.data) { for (const item of (data as { items: { data: Record[] } }).items.data) { - yield toRecordMessage('subscription_items', item) + yield toRecordMessage( + 'subscription_items', + accountId ? { ...item, _account_id: accountId } : item + ) } } diff --git a/packages/source-stripe/src/src-events-api.ts b/packages/source-stripe/src/src-events-api.ts index 89f52030e..741140076 100644 --- a/packages/source-stripe/src/src-events-api.ts +++ b/packages/source-stripe/src/src-events-api.ts @@ -18,8 +18,9 @@ export async function* pollEvents(opts: { streamNames: Set state: Record | undefined startTimestamp: number + accountId: string }): AsyncGenerator { - const { config, client, catalog, registry, streamNames, state, startTimestamp } = opts + const { config, client, catalog, registry, streamNames, state, startTimestamp, accountId } = opts if (!config.poll_events) return @@ -86,7 +87,14 @@ export async function* pollEvents(opts: { events.reverse() for (const event of events) { - for await (const msg of processStripeEvent(event, config, catalog, registry, streamNames)) { + for await (const msg of processStripeEvent( + event, + config, + catalog, + registry, + streamNames, + accountId + )) { if (msg.type === 'source_state' && msg.source_state.state_type !== 'global') { // Intercept state messages to preserve complete status + update events_cursor const existing = state?.[msg.source_state.stream] diff --git a/packages/source-stripe/src/src-list-api.ts b/packages/source-stripe/src/src-list-api.ts index 7f0f41e58..d81bbad31 100644 --- a/packages/source-stripe/src/src-list-api.ts +++ b/packages/source-stripe/src/src-list-api.ts @@ -240,6 +240,7 @@ async function* paginateSegment(opts: { range: { gte: number; lt: number } numSegments: number streamName: string + accountId: string supportsLimit: boolean supportsForwardPagination: boolean backfillLimit?: number @@ -253,6 +254,7 @@ async function* paginateSegment(opts: { range, numSegments, streamName, + accountId, supportsLimit, supportsForwardPagination, backfillLimit, @@ -279,7 +281,10 @@ async function* paginateSegment(opts: { const response = await listFn(params as Parameters[0]) for (const item of response.data) { - yield toRecordMessage(streamName, item as Record) + yield toRecordMessage(streamName, { + ...(item as Record), + _account_id: accountId, + }) totalEmitted.count++ } @@ -315,12 +320,13 @@ async function* paginateSegment(opts: { async function* sequentialBackfillStream(opts: { resourceConfig: ResourceConfig streamName: string + accountId: string pageCursor: string | null backfillLimit?: number rateLimiter: RateLimiter drainQueue?: () => AsyncGenerator }): AsyncGenerator { - const { resourceConfig, streamName, backfillLimit, rateLimiter, drainQueue } = opts + const { resourceConfig, streamName, accountId, backfillLimit, rateLimiter, drainQueue } = opts let pageCursor = opts.pageCursor let hasMore = true let totalEmitted = 0 @@ -347,7 +353,10 @@ async function* sequentialBackfillStream(opts: { ) for (const item of response.data) { - yield toRecordMessage(streamName, item as Record) + yield toRecordMessage(streamName, { + ...(item as Record), + _account_id: accountId, + }) totalEmitted++ } @@ -389,6 +398,7 @@ export async function* listApiBackfill(opts: { | undefined registry: Record client: StripeClient + accountId: string rateLimiter: RateLimiter backfillLimit?: number backfillConcurrency?: number @@ -399,6 +409,7 @@ export async function* listApiBackfill(opts: { state, registry, client, + accountId, rateLimiter, backfillLimit, backfillConcurrency = DEFAULT_BACKFILL_CONCURRENCY, @@ -479,6 +490,7 @@ export async function* listApiBackfill(opts: { range, numSegments, streamName: stream.name, + accountId, supportsLimit: resourceConfig.supportsLimit !== false, supportsForwardPagination: resourceConfig.supportsForwardPagination !== false, backfillLimit: streamBackfillLimit, @@ -495,6 +507,7 @@ export async function* listApiBackfill(opts: { yield* sequentialBackfillStream({ resourceConfig, streamName: stream.name, + accountId, pageCursor, backfillLimit: streamBackfillLimit, rateLimiter, diff --git a/packages/source-stripe/src/src-webhook.ts b/packages/source-stripe/src/src-webhook.ts index 932ea665a..16dd14cd0 100644 --- a/packages/source-stripe/src/src-webhook.ts +++ b/packages/source-stripe/src/src-webhook.ts @@ -18,14 +18,15 @@ export async function* processWebhookInput( config: Config, catalog: ConfiguredCatalog, registry: Record, - streamNames: Set + streamNames: Set, + accountId?: string ): AsyncGenerator { if (!config.webhook_secret) { throw new Error('webhook_secret is required for raw webhook signature verification') } const signature = (input.headers['stripe-signature'] as string) ?? '' const event = verifyWebhookSignature(input.body, signature, config.webhook_secret) - yield* processStripeEvent(event, config, catalog, registry, streamNames) + yield* processStripeEvent(event, config, catalog, registry, streamNames, accountId) } // MARK: - LiveInput queue @@ -62,11 +63,19 @@ export function createInputQueue() { config: Config, catalog: ConfiguredCatalog, registry: Record, - streamNames: Set + streamNames: Set, + accountId?: string ): AsyncGenerator { while (queue.length > 0) { const queued = queue.shift()! - yield* processStripeEvent(queued.data as StripeEvent, config, catalog, registry, streamNames) + yield* processStripeEvent( + queued.data as StripeEvent, + config, + catalog, + registry, + streamNames, + accountId + ) } } From 5fc0f8cff2e0c24bf3295c2edf1eb3d7fbe53c23 Mon Sep 17 00:00:00 2001 From: Yostra Date: Fri, 10 Apr 2026 01:06:46 +0200 Subject: [PATCH 3/6] don't strip metadata --- packages/destination-postgres/src/index.ts | 13 ++----------- packages/source-stripe/src/catalog.ts | 8 ++++---- 2 files changed, 6 insertions(+), 15 deletions(-) diff --git a/packages/destination-postgres/src/index.ts b/packages/destination-postgres/src/index.ts index bd3be3885..076e8a7b6 100644 --- a/packages/destination-postgres/src/index.ts +++ b/packages/destination-postgres/src/index.ts @@ -189,25 +189,16 @@ const destination = { } }, - async *write({ config, catalog }, $stdin) { + async *write({ config, catalog: _catalog }, $stdin) { const pool = withQueryLogging(createPool(await buildPoolConfig(config))) const batchSize = config.batch_size // eslint-disable-next-line @typescript-eslint/no-explicit-any const streamBuffers = new Map[]>() - const streamKeyColumns = new Map() - for (const cs of catalog.streams) { - streamKeyColumns.set( - cs.stream.name, - cs.stream.primary_key.map((pk) => pk[0]) - ) - } - const flushStream = async (streamName: string) => { const buffer = streamBuffers.get(streamName) if (!buffer || buffer.length === 0) return - const keyColumns = streamKeyColumns.get(streamName) ?? ['id'] - await upsertMany(pool, config.schema, streamName, buffer, keyColumns) + await upsertMany(pool, config.schema, streamName, buffer) streamBuffers.set(streamName, []) } diff --git a/packages/source-stripe/src/catalog.ts b/packages/source-stripe/src/catalog.ts index a5bd6e3cd..27cd833c1 100644 --- a/packages/source-stripe/src/catalog.ts +++ b/packages/source-stripe/src/catalog.ts @@ -8,10 +8,10 @@ export function catalogFromRegistry(registry: Record): C const streams: Stream[] = Object.entries(registry) .filter(([, cfg]) => cfg.sync !== false) .sort(([, a], [, b]) => a.order - b.order) - .map(([, cfg]) => ({ + .map(([name, cfg]) => ({ name: cfg.tableName, primary_key: [['id'], ['_account_id']], - metadata: {}, + metadata: { resource_name: name }, })) return { streams } @@ -31,12 +31,12 @@ export function catalogFromOpenApi( const streams: Stream[] = Object.entries(registry) .filter(([, cfg]) => cfg.sync !== false) .sort(([, a], [, b]) => a.order - b.order) - .map(([, cfg]) => { + .map(([name, cfg]) => { const table = tableMap.get(cfg.tableName) const stream: Stream = { name: cfg.tableName, primary_key: [['id'], ['_account_id']], - metadata: {}, + metadata: { resource_name: name }, } if (table) { From 06ff31ac76c66ecf54aaeeb41509739976c6d89a Mon Sep 17 00:00:00 2001 From: Yostra Date: Fri, 10 Apr 2026 01:49:18 +0200 Subject: [PATCH 4/6] fix write --- packages/destination-postgres/src/index.ts | 16 ++++++++++++++-- packages/source-stripe/src/index.ts | 10 +++------- 2 files changed, 17 insertions(+), 9 deletions(-) diff --git a/packages/destination-postgres/src/index.ts b/packages/destination-postgres/src/index.ts index 076e8a7b6..84cb72c91 100644 --- a/packages/destination-postgres/src/index.ts +++ b/packages/destination-postgres/src/index.ts @@ -189,16 +189,28 @@ const destination = { } }, - async *write({ config, catalog: _catalog }, $stdin) { + async *write({ config, catalog }, $stdin) { const pool = withQueryLogging(createPool(await buildPoolConfig(config))) const batchSize = config.batch_size // eslint-disable-next-line @typescript-eslint/no-explicit-any const streamBuffers = new Map[]>() + const streamKeyColumns = new Map( + catalog.streams.map((cs) => [ + cs.stream.name, + cs.stream.primary_key?.map((pk) => pk[0]) ?? ['id'], + ]) + ) const flushStream = async (streamName: string) => { const buffer = streamBuffers.get(streamName) if (!buffer || buffer.length === 0) return - await upsertMany(pool, config.schema, streamName, buffer) + await upsertMany( + pool, + config.schema, + streamName, + buffer, + streamKeyColumns.get(streamName) ?? ['id'] + ) streamBuffers.set(streamName, []) } diff --git a/packages/source-stripe/src/index.ts b/packages/source-stripe/src/index.ts index 4fe05cdb9..584b9da03 100644 --- a/packages/source-stripe/src/index.ts +++ b/packages/source-stripe/src/index.ts @@ -27,7 +27,7 @@ import { pollEvents } from './src-events-api.js' import type { StripeWebSocketClient, StripeWebhookEvent } from './src-websocket.js' import { createStripeWebSocketClient } from './src-websocket.js' import type { ResourceConfig } from './types.js' -import { makeClient } from './client.js' +import { makeClient, type StripeClient } from './client.js' import type { RateLimiter } from './rate-limiter.js' import { createInMemoryRateLimiter, DEFAULT_MAX_RPS } from './rate-limiter.js' import { fetchWithProxy } from './transport.js' @@ -78,15 +78,11 @@ export type StripeStreamState = { // MARK: - Account ID resolution -export async function resolveAccountId(config: Config): Promise { +export async function resolveAccountId(config: Config, client: StripeClient): Promise { if (config.account_id) { return config.account_id } - const client = makeClient({ - ...config, - api_version: config.api_version ?? BUNDLED_API_VERSION, - }) const account = await client.getAccount() return account.id } @@ -250,7 +246,7 @@ export function createStripeSource( config.base_url ) const streamNames = new Set(catalog.streams.map((s) => s.stream.name)) - const accountId = await resolveAccountId(config) + const accountId = await resolveAccountId(config, client) // Event-driven mode: iterate over incoming webhook inputs if ($stdin) { From 4274a7ce5142c31b333a85bf9357c6879bd05feb Mon Sep 17 00:00:00 2001 From: Yostra Date: Fri, 10 Apr 2026 01:49:28 +0200 Subject: [PATCH 5/6] add e2e test --- e2e/test-server-sync.test.ts | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/e2e/test-server-sync.test.ts b/e2e/test-server-sync.test.ts index 2e31875a0..bc22686ec 100644 --- a/e2e/test-server-sync.test.ts +++ b/e2e/test-server-sync.test.ts @@ -668,6 +668,37 @@ describe('test-server sync via Docker engine', () => { expect((state.streams.customers as StripeStreamState).status).toBe('complete') }, 600_000) + it('multiple keys: concurrent syncs with different API keys do not interfere', async () => { + const COUNT = 5000 + const KEYS = ['sk_test_key_alpha', 'sk_test_key_bravo', 'sk_test_key_charlie'] + + await seedCustomers(generateCustomers(COUNT, 'cus_mk_')) + + const syncs = KEYS.map(async (apiKey) => { + const destSchema = uniqueSchema(`multikey_${apiKey.slice(-5)}`) + const { state } = await runSync({ + destSchema, + sourceOverrides: { api_key: apiKey, backfill_concurrency: 3 }, + }) + return { apiKey, destSchema, state } + }) + + const results = await Promise.all(syncs) + + for (const { apiKey, destSchema, state } of results) { + const ids = await listIds(destSchema, 'customers') + expect(ids.length, `key ${apiKey}: expected ${COUNT} rows`).toBe(COUNT) + + const destIds = new Set(ids) + for (let i = 0; i < COUNT; i++) { + const expected = `cus_mk_${String(i).padStart(5, '0')}` + expect(destIds.has(expected), `key ${apiKey}: missing ${expected}`).toBe(true) + } + + expect((state.streams.customers as StripeStreamState).status).toBe('complete') + } + }, 180_000) + it('v2 stream: syncs v2_core_event_destinations via cursor pagination', async () => { const destSchema = uniqueSchema('v2sync') const STREAM = 'v2_core_event_destinations' From 4bb5c289f5b652a8e626f5dccdaae104410d9518 Mon Sep 17 00:00:00 2001 From: Yostra Date: Fri, 10 Apr 2026 03:04:14 +0200 Subject: [PATCH 6/6] yield error --- packages/source-stripe/src/index.ts | 10 ++++-- packages/source-stripe/src/src-list-api.ts | 37 +++++++++++----------- 2 files changed, 26 insertions(+), 21 deletions(-) diff --git a/packages/source-stripe/src/index.ts b/packages/source-stripe/src/index.ts index 584b9da03..1704fb90e 100644 --- a/packages/source-stripe/src/index.ts +++ b/packages/source-stripe/src/index.ts @@ -22,7 +22,7 @@ import { } from '@stripe/sync-openapi' import { processStripeEvent } from './process-event.js' import { processWebhookInput, createInputQueue, startWebhookServer } from './src-webhook.js' -import { listApiBackfill } from './src-list-api.js' +import { listApiBackfill, errorToTrace } from './src-list-api.js' import { pollEvents } from './src-events-api.js' import type { StripeWebSocketClient, StripeWebhookEvent } from './src-websocket.js' import { createStripeWebSocketClient } from './src-websocket.js' @@ -246,7 +246,13 @@ export function createStripeSource( config.base_url ) const streamNames = new Set(catalog.streams.map((s) => s.stream.name)) - const accountId = await resolveAccountId(config, client) + let accountId: string + try { + accountId = await resolveAccountId(config, client) + } catch (err) { + yield errorToTrace(err, catalog.streams[0]?.stream.name ?? 'unknown') + return + } // Event-driven mode: iterate over incoming webhook inputs if ($stdin) { diff --git a/packages/source-stripe/src/src-list-api.ts b/packages/source-stripe/src/src-list-api.ts index d81bbad31..965335821 100644 --- a/packages/source-stripe/src/src-list-api.ts +++ b/packages/source-stripe/src/src-list-api.ts @@ -6,6 +6,23 @@ import type { RateLimiter } from './rate-limiter.js' import { StripeApiRequestError } from '@stripe/sync-openapi' import type { StripeClient } from './client.js' +export function errorToTrace(err: unknown, stream: string): TraceMessage { + const isRateLimit = err instanceof Error && err.message.includes('Rate limit') + const isAuth = err instanceof StripeApiRequestError && (err.status === 401 || err.status === 403) + return { + type: 'trace', + trace: { + trace_type: 'error', + error: { + failure_type: isRateLimit ? 'transient_error' : isAuth ? 'auth_error' : 'system_error', + message: err instanceof Error ? err.message : String(err), + stream, + ...(err instanceof Error ? { stack_trace: err.stack } : {}), + }, + }, + } +} + // Errors matching these patterns are silently skipped during backfill. // The stream is marked complete without yielding records. // NOTE: these are band-aids — the underlying issue is that the OpenAPI spec @@ -538,25 +555,7 @@ export async function* listApiBackfill(opts: { stream: stream.name, error: err instanceof Error ? err.message : String(err), }) - const isRateLimit = err instanceof Error && err.message.includes('Rate limit') - const isAuthError = - err instanceof StripeApiRequestError && (err.status === 401 || err.status === 403) - yield { - type: 'trace', - trace: { - trace_type: 'error', - error: { - failure_type: isRateLimit - ? 'transient_error' - : isAuthError - ? 'auth_error' - : 'system_error', - message: String(err), - stream: stream.name, - ...(err instanceof Error ? { stack_trace: err.stack } : {}), - }, - }, - } satisfies TraceMessage + yield errorToTrace(err, stream.name) } } }