Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/engine/src/lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ export { destinationTest, destinationTestSpec } from './destination-test.js'
export type { DestinationTestConfig } from './destination-test.js'
export { readonlyStateStore } from './state-store.js'
export type { StateStore } from './state-store.js'
export { maybeDestinationStateStore } from './select-state-store.js'
export { maybeDestinationStateStore, selectStateStore } from './select-state-store.js'
export {
createConnectorSchemas,
connectorSchemaName,
Expand Down
19 changes: 12 additions & 7 deletions apps/engine/src/lib/select-state-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import type { PipelineConfig } from '@stripe/sync-protocol'
* Tries to resolve a destination-colocated state store.
*
* Imports `@stripe/sync-state-${destination.type}` and calls its
* `createStateStore(destConfig)`. Not all destinations support this
* Postgres does (state table alongside synced data), Google Sheets doesn't.
* Falls back to a read-only no-op store when unavailable.
* `createStateStore(destConfig, pipelineId)`. Not all destinations support this
* convention; Postgres does (state table alongside synced data), Google Sheets
* doesn't. Falls back to a read-only store when unavailable.
*
* If the package exports a `setupStateStore(destConfig)` function,
* it is called first to ensure the state table exists (runs migrations).
Expand All @@ -20,19 +20,22 @@ import type { PipelineConfig } from '@stripe/sync-protocol'
* e.g. the HTTP API (state flows in via X-State header, out via NDJSON stream)
* or Temporal workflows (workflow memory is the source of truth).
* Writing state to the destination DB in those cases creates unexpected tables.
*
* @param pipelineId Identifies the sync slot. Defaults to `'default'` downstream.
* Pass a unique value per pipeline (e.g. the Stripe account ID) to isolate cursor state.
*/
export async function maybeDestinationStateStore(
params: PipelineConfig
export async function selectStateStore(
params: PipelineConfig,
pipelineId?: string
): Promise<StateStore & { close?(): Promise<void> }> {
try {
const { type: destType, ...destConfig } = params.destination
const pkg = await import(`@stripe/sync-state-${destType}`)
if (typeof pkg.createStateStore === 'function') {
// Run migrations if the package provides a setup function
if (typeof pkg.setupStateStore === 'function') {
await pkg.setupStateStore(destConfig)
}
return pkg.createStateStore(destConfig) as StateStore & {
return pkg.createStateStore(destConfig, pipelineId) as StateStore & {
close?(): Promise<void>
}
}
Expand All @@ -41,3 +44,5 @@ export async function maybeDestinationStateStore(
}
return readonlyStateStore()
}

export const maybeDestinationStateStore = selectStateStore
88 changes: 88 additions & 0 deletions packages/destination-postgres/src/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,94 @@ describe('destination default export', () => {
})
})

describe('multi-org sync (two account IDs)', () => {
const multiOrgCatalog: ConfiguredCatalog = {
streams: [
{
stream: {
name: 'customers',
primary_key: [['id'], ['_account_id']],
metadata: {},
},
sync_mode: 'full_refresh',
destination_sync_mode: 'overwrite',
},
],
}

beforeEach(async () => {
await drain(destination.setup!({ config: makeConfig(), catalog: multiOrgCatalog }))
})

it('creates table with composite primary key (id, _account_id)', async () => {
const { rows } = await pool.query(
`SELECT column_name FROM information_schema.columns
WHERE table_schema = $1 AND table_name = 'customers'
ORDER BY ordinal_position`,
[SCHEMA]
)
const columnNames = rows.map((r) => r.column_name)
expect(columnNames).toContain('id')
expect(columnNames).toContain('_account_id')

const { rows: pkRows } = await pool.query(
`SELECT a.attname
FROM pg_index i
JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey)
WHERE i.indrelid = $1::regclass AND i.indisprimary
ORDER BY array_position(i.indkey, a.attnum)`,
[`"${SCHEMA}"."customers"`]
)
expect(pkRows.map((r) => r.attname)).toEqual(['id', '_account_id'])
})

it('stores rows from two accounts with the same object id as separate rows', async () => {
const messages = toAsyncIter([
makeRecord('customers', { id: 'cus_1', name: 'Alice (Acct A)', _account_id: 'acct_AAA' }),
makeRecord('customers', { id: 'cus_1', name: 'Alice (Acct B)', _account_id: 'acct_BBB' }),
])

await collectOutputs(
destination.write({ config: makeConfig(), catalog: multiOrgCatalog }, messages)
)

const { rows } = await pool.query(
`SELECT id, _account_id, _raw_data->>'name' AS name
FROM "${SCHEMA}".customers ORDER BY _account_id`
)
expect(rows).toEqual([
{ id: 'cus_1', _account_id: 'acct_AAA', name: 'Alice (Acct A)' },
{ id: 'cus_1', _account_id: 'acct_BBB', name: 'Alice (Acct B)' },
])
})

it('upserts per-account: same id + same account updates, different account inserts', async () => {
const batch1 = toAsyncIter([
makeRecord('customers', { id: 'cus_1', name: 'Alice v1', _account_id: 'acct_AAA' }),
makeRecord('customers', { id: 'cus_1', name: 'Alice v1', _account_id: 'acct_BBB' }),
])
await collectOutputs(
destination.write({ config: makeConfig(), catalog: multiOrgCatalog }, batch1)
)

const batch2 = toAsyncIter([
makeRecord('customers', { id: 'cus_1', name: 'Alice v2', _account_id: 'acct_AAA' }),
])
await collectOutputs(
destination.write({ config: makeConfig(), catalog: multiOrgCatalog }, batch2)
)

const { rows } = await pool.query(
`SELECT _account_id, _raw_data->>'name' AS name
FROM "${SCHEMA}".customers ORDER BY _account_id`
)
expect(rows).toEqual([
{ _account_id: 'acct_AAA', name: 'Alice v2' },
{ _account_id: 'acct_BBB', name: 'Alice v1' },
])
})
})

describe('upsertMany standalone', () => {
beforeEach(async () => {
await drain(destination.setup!({ config: makeConfig(), catalog }))
Expand Down
41 changes: 20 additions & 21 deletions packages/destination-postgres/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ export async function upsertMany(
schema: string,
table: string,
// eslint-disable-next-line @typescript-eslint/no-explicit-any
entries: Record<string, any>[]
entries: Record<string, any>[],
keyColumns: string[] = ['id']
): Promise<void> {
if (!entries.length) return
await upsert(
Expand All @@ -77,7 +78,7 @@ export async function upsertMany(
{
schema,
table,
keyColumns: ['id'],
keyColumns,
}
)
}
Expand Down Expand Up @@ -160,23 +161,12 @@ const destination = {
`)
await Promise.all(
catalog.streams.map(async (cs) => {
if (cs.stream.json_schema) {
await pool.query(
buildCreateTableDDL(config.schema, cs.stream.name, cs.stream.json_schema, {
system_columns: cs.system_columns,
})
)
} else {
await pool.query(sql`
CREATE TABLE IF NOT EXISTS "${config.schema}"."${cs.stream.name}" (
"_raw_data" jsonb NOT NULL,
"_last_synced_at" timestamptz,
"_updated_at" timestamptz NOT NULL DEFAULT now(),
"id" text GENERATED ALWAYS AS (("_raw_data"->>'id')::text) STORED,
PRIMARY KEY ("id")
)
`)
}
await pool.query(
buildCreateTableDDL(config.schema, cs.stream.name, cs.stream.json_schema ?? {}, {
system_columns: cs.system_columns,
primary_key: cs.stream.primary_key,
})
)
})
)
} finally {
Expand All @@ -199,16 +189,25 @@ const destination = {
}
},

async *write({ config, catalog: _catalog }, $stdin) {
async *write({ config, catalog }, $stdin) {
const pool = withQueryLogging(createPool(await buildPoolConfig(config)))
const batchSize = config.batch_size
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const streamBuffers = new Map<string, Record<string, any>[]>()

const streamKeyColumns = new Map<string, string[]>()
for (const cs of catalog.streams) {
streamKeyColumns.set(
cs.stream.name,
cs.stream.primary_key.map((pk) => pk[0])
)
}

const flushStream = async (streamName: string) => {
const buffer = streamBuffers.get(streamName)
if (!buffer || buffer.length === 0) return
await upsertMany(pool, config.schema, streamName, buffer)
const keyColumns = streamKeyColumns.get(streamName) ?? ['id']
await upsertMany(pool, config.schema, streamName, buffer, keyColumns)
streamBuffers.set(streamName, [])
}

Expand Down
19 changes: 19 additions & 0 deletions packages/destination-postgres/src/schemaProjection.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,25 @@ describe('buildCreateTableWithSchema', () => {
expect(stmts[0]).toContain("WHEN jsonb_typeof(_raw_data->'customer') = 'object'")
})

it('generates composite primary key with _account_id when primary_key option is set', () => {
const stmts = buildCreateTableWithSchema('stripe', 'customers', SAMPLE_JSON_SCHEMA, {
primary_key: [['id'], ['_account_id']],
})

// Both PK columns present as generated columns
expect(stmts[0]).toContain(`"id" text GENERATED ALWAYS AS ((_raw_data->>'id')::text) STORED`)
expect(stmts[0]).toContain(
`"_account_id" text GENERATED ALWAYS AS ((_raw_data->>'_account_id')::text) STORED`
)

// Composite PRIMARY KEY
expect(stmts[0]).toContain('PRIMARY KEY ("id", "_account_id")')

// _account_id should NOT appear as a regular generated column from json_schema
const alterStmts = stmts.filter((s) => s.includes('ADD COLUMN IF NOT EXISTS'))
expect(alterStmts.every((s) => !s.includes('"_account_id"'))).toBe(true)
})

it('produces stable output across repeated calls', () => {
const first = buildCreateTableWithSchema('mydata', 'customers', SAMPLE_JSON_SCHEMA)
const second = buildCreateTableWithSchema('mydata', 'customers', SAMPLE_JSON_SCHEMA)
Expand Down
19 changes: 16 additions & 3 deletions packages/destination-postgres/src/schemaProjection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ export type SystemColumn = {
export type BuildTableOptions = {
/** Extra system columns to add to the table (e.g. _account_id). */
system_columns?: SystemColumn[]
/** Primary key paths from the stream (e.g. [['id'], ['_account_id']]). Defaults to [['id']]. */
primary_key?: string[][]
}

/**
Expand All @@ -114,7 +116,10 @@ export function buildCreateTableWithSchema(
const quotedSchema = quoteIdent(schema)
const quotedTable = quoteIdent(tableName)

const columns = jsonSchemaToColumns(jsonSchema)
const pkFields = (options.primary_key ?? [['id']]).map((pk) => pk[0])
const pkSet = new Set(pkFields)

const columns = jsonSchemaToColumns(jsonSchema).filter((c) => !pkSet.has(c.name))

const generatedColumnDefs = columns.map(
(col) => `${quoteIdent(col.name)} ${col.pgType} GENERATED ALWAYS AS (${col.expression}) STORED`
Expand All @@ -124,14 +129,19 @@ export function buildCreateTableWithSchema(
(col) => `${quoteIdent(col.name)} ${col.type}`
)

const pkColumnDefs = pkFields.map((field) => {
const escapedField = field.replace(/'/g, "''")
return `${quoteIdent(field)} text GENERATED ALWAYS AS ((_raw_data->>'${escapedField}')::text) STORED`
})

const columnDefs = [
'"_raw_data" jsonb NOT NULL',
'"_last_synced_at" timestamptz',
'"_updated_at" timestamptz NOT NULL DEFAULT now()',
...systemColumnDefs,
`"id" text GENERATED ALWAYS AS ((_raw_data->>'id')::text) STORED`,
...pkColumnDefs,
...generatedColumnDefs,
'PRIMARY KEY ("id")',
`PRIMARY KEY (${pkFields.map((f) => quoteIdent(f)).join(', ')})`,
]

const stmts: string[] = [
Expand Down Expand Up @@ -305,6 +315,8 @@ export type ApplySchemaFromCatalogConfig = {
syncSchema?: string
/** Extra system columns to add to each table. */
system_columns?: SystemColumn[]
/** Primary key paths (e.g. [['id'], ['_account_id']]). Defaults to [['id']]. */
primary_key?: string[][]
apiVersion?: string
/** Progress callback — emitting logs signals liveness to the orchestrator. */
onLog?: (message: string) => void
Expand Down Expand Up @@ -371,6 +383,7 @@ export async function applySchemaFromCatalog(
await client.query(
buildCreateTableDDL(dataSchema, stream.name, stream.json_schema!, {
system_columns: config.system_columns,
primary_key: config.primary_key,
})
)
config.onLog?.(
Expand Down
42 changes: 31 additions & 11 deletions packages/source-stripe/src/catalog.ts
Original file line number Diff line number Diff line change
@@ -1,45 +1,65 @@
import type { CatalogPayload, Stream } from '@stripe/sync-protocol'
import type { ResourceConfig } from './types.js'
import type { ParsedResourceTable } from '@stripe/sync-openapi'
import { parsedTableToJsonSchema } from '@stripe/sync-openapi'
import type { ResourceConfig } from './types.js'

/** Derive a CatalogPayload from the existing resource registry (no json_schema). */
export function catalogFromRegistry(registry: Record<string, ResourceConfig>): CatalogPayload {
export function catalogFromRegistry(
registry: Record<string, ResourceConfig>,
_accountId: string
): CatalogPayload {
const streams: Stream[] = Object.entries(registry)
.filter(([, cfg]) => cfg.sync !== false)
.sort(([, a], [, b]) => a.order - b.order)
.map(([name, cfg]) => ({
.map(([, cfg]) => ({
name: cfg.tableName,
primary_key: [['id']],
metadata: { resource_name: name },
primary_key: [['id'], ['_account_id']],
metadata: {},
}))

return { streams }
}

/**
* Derive a CatalogPayload by merging OpenAPI-parsed tables with registry metadata.
* Each stream gets json_schema from the parsed OpenAPI spec.
* Each stream gets json_schema from the parsed OpenAPI spec, with `_account_id`
* injected so downstream consumers see the full data shape.
*/
export function catalogFromOpenApi(
tables: ParsedResourceTable[],
registry: Record<string, ResourceConfig>
registry: Record<string, ResourceConfig>,
_accountId: string
): CatalogPayload {
const tableMap = new Map(tables.map((t) => [t.tableName, t]))

const streams: Stream[] = Object.entries(registry)
.filter(([, cfg]) => cfg.sync !== false)
.sort(([, a], [, b]) => a.order - b.order)
.map(([name, cfg]) => {
.map(([, cfg]) => {
const table = tableMap.get(cfg.tableName)
const stream: Stream = {
name: cfg.tableName,
primary_key: [['id']],
metadata: { resource_name: name },
primary_key: [['id'], ['_account_id']],
metadata: {},
}

if (table) {
stream.json_schema = parsedTableToJsonSchema(table)
const jsonSchema = parsedTableToJsonSchema(table)
const properties = (jsonSchema.properties ?? {}) as Record<string, unknown>
properties._account_id = { type: 'string' }
jsonSchema.properties = properties

const required = Array.isArray(jsonSchema.required)
? [...jsonSchema.required]
: []
if (!required.includes('_account_id')) {
required.push('_account_id')
}
jsonSchema.required = required

stream.json_schema = jsonSchema
}

return stream
})

Expand Down
Loading
Loading