diff --git a/apps/engine/src/__generated__/openapi.d.ts b/apps/engine/src/__generated__/openapi.d.ts index 2e20e5eb..09f7c8bd 100644 --- a/apps/engine/src/__generated__/openapi.d.ts +++ b/apps/engine/src/__generated__/openapi.d.ts @@ -424,7 +424,7 @@ export interface components { config: { [key: string]: unknown; }; - /** @description JSON Schema for per-stream state (cursor/checkpoint shape). See also SyncState.global for sync-wide cursors. */ + /** @description JSON Schema for per-stream state (cursor/checkpoint shape). See also SourceState.global for sync-wide cursors. */ source_state_stream?: { [key: string]: unknown; }; @@ -545,8 +545,8 @@ export interface components { account_id?: string; /** @description Whether this is a live mode sync */ livemode?: boolean; - /** @constant */ - api_version?: "2026-03-25.dahlia"; + /** @enum {string} */ + api_version?: "2026-03-25.dahlia" | "2026-02-25.clover" | "2026-01-28.clover" | "2025-12-15.clover" | "2025-11-17.clover" | "2025-10-29.clover" | "2025-09-30.clover" | "2025-08-27.basil" | "2025-07-30.basil" | "2025-06-30.basil" | "2025-05-28.basil" | "2025-04-30.basil" | "2025-03-31.basil" | "2025-02-24.acacia" | "2025-01-27.acacia" | "2024-12-18.acacia" | "2024-11-20.acacia" | "2024-10-28.acacia" | "2024-09-30.acacia" | "2024-06-20" | "2024-04-10" | "2024-04-03" | "2023-10-16" | "2023-08-16" | "2022-11-15" | "2022-08-01" | "2020-08-27" | "2020-03-02" | "2019-12-03" | "2019-11-05" | "2019-10-17" | "2019-10-08" | "2019-09-09" | "2019-08-14" | "2019-05-16" | "2019-03-14" | "2019-02-19" | "2019-02-11" | "2018-11-08" | "2018-10-31" | "2018-09-24" | "2018-09-06" | "2018-08-23" | "2018-07-27" | "2018-05-21" | "2018-02-28" | "2018-02-06" | "2018-02-05" | "2018-01-23" | "2017-12-14" | "2017-08-15"; /** * Format: uri * @description Override the Stripe API base URL (e.g. http://localhost:12111 for stripe-mock) @@ -669,6 +669,16 @@ export interface components { backfill_limit?: number; }[]; }; + SourceState: { + /** @description Per-stream checkpoint data, keyed by stream name. */ + streams: { + [key: string]: unknown; + }; + /** @description Sync-wide state shared across all streams (e.g. a global events cursor). */ + global: { + [key: string]: unknown; + }; + }; }; responses: never; parameters: never; @@ -851,8 +861,8 @@ export interface operations { header: { /** @description JSON-encoded PipelineConfig */ "x-pipeline": string; - /** @description JSON-encoded SyncState ({ streams, global }) or legacy flat per-stream state */ - "x-state"?: string; + /** @description JSON-encoded SourceState ({ streams, global }) or legacy flat per-stream state */ + "x-source-state"?: string; }; path?: never; cookie?: never; @@ -934,8 +944,8 @@ export interface operations { header: { /** @description JSON-encoded PipelineConfig */ "x-pipeline": string; - /** @description JSON-encoded SyncState ({ streams, global }) or legacy flat per-stream state */ - "x-state"?: string; + /** @description JSON-encoded SourceState ({ streams, global }) or legacy flat per-stream state */ + "x-source-state"?: string; }; path?: never; cookie?: never; diff --git a/apps/engine/src/__generated__/openapi.json b/apps/engine/src/__generated__/openapi.json index 3324bd90..83d27232 100644 --- a/apps/engine/src/__generated__/openapi.json +++ b/apps/engine/src/__generated__/openapi.json @@ -61,7 +61,7 @@ "required": true, "description": "JSON-encoded PipelineConfig", "content": { - "[object Object]": { + "application/json": { "schema": { "$ref": "#/components/schemas/PipelineConfig" } @@ -115,7 +115,7 @@ "required": true, "description": "JSON-encoded PipelineConfig", "content": { - "[object Object]": { + "application/json": { "schema": { "$ref": "#/components/schemas/PipelineConfig" } @@ -169,7 +169,7 @@ "required": true, "description": "JSON-encoded PipelineConfig", "content": { - "[object Object]": { + "application/json": { "schema": { "$ref": "#/components/schemas/PipelineConfig" } @@ -223,7 +223,7 @@ "required": true, "description": "JSON-encoded source config ({ type, ...config })", "content": { - "[object Object]": { + "application/json": { "schema": { "type": "object", "properties": { @@ -309,7 +309,7 @@ "required": true, "description": "JSON-encoded PipelineConfig", "content": { - "[object Object]": { + "application/json": { "schema": { "$ref": "#/components/schemas/PipelineConfig" } @@ -318,36 +318,13 @@ }, { "in": "header", - "name": "x-state", + "name": "x-source-state", "required": false, - "description": "JSON-encoded SyncState ({ streams, global }) or legacy flat per-stream state", + "description": "JSON-encoded SourceState ({ streams, global }) or legacy flat per-stream state", "content": { - "[object Object]": { + "application/json": { "schema": { - "type": "object", - "properties": { - "streams": { - "type": "object", - "propertyNames": { - "type": "string" - }, - "additionalProperties": {}, - "description": "Per-stream checkpoint data, keyed by stream name." - }, - "global": { - "type": "object", - "propertyNames": { - "type": "string" - }, - "additionalProperties": {}, - "description": "Sync-wide state shared across all streams (e.g. a global events cursor)." - } - }, - "required": [ - "streams", - "global" - ], - "additionalProperties": false + "$ref": "#/components/schemas/SourceState" } } } @@ -409,7 +386,7 @@ "required": true, "description": "JSON-encoded PipelineConfig", "content": { - "[object Object]": { + "application/json": { "schema": { "$ref": "#/components/schemas/PipelineConfig" } @@ -496,7 +473,7 @@ "required": true, "description": "JSON-encoded PipelineConfig", "content": { - "[object Object]": { + "application/json": { "schema": { "$ref": "#/components/schemas/PipelineConfig" } @@ -505,36 +482,13 @@ }, { "in": "header", - "name": "x-state", + "name": "x-source-state", "required": false, - "description": "JSON-encoded SyncState ({ streams, global }) or legacy flat per-stream state", + "description": "JSON-encoded SourceState ({ streams, global }) or legacy flat per-stream state", "content": { - "[object Object]": { + "application/json": { "schema": { - "type": "object", - "properties": { - "streams": { - "type": "object", - "propertyNames": { - "type": "string" - }, - "additionalProperties": {}, - "description": "Per-stream checkpoint data, keyed by stream name." - }, - "global": { - "type": "object", - "propertyNames": { - "type": "string" - }, - "additionalProperties": {}, - "description": "Sync-wide state shared across all streams (e.g. a global events cursor)." - } - }, - "required": [ - "streams", - "global" - ], - "additionalProperties": false + "$ref": "#/components/schemas/SourceState" } } } @@ -1200,7 +1154,7 @@ "description": "JSON Schema for the connector's configuration object." }, "source_state_stream": { - "description": "JSON Schema for per-stream state (cursor/checkpoint shape). See also SyncState.global for sync-wide cursors.", + "description": "JSON Schema for per-stream state (cursor/checkpoint shape). See also SourceState.global for sync-wide cursors.", "type": "object", "propertyNames": { "type": "string" @@ -1540,7 +1494,59 @@ }, "api_version": { "type": "string", - "const": "2026-03-25.dahlia" + "enum": [ + "2026-03-25.dahlia", + "2026-02-25.clover", + "2026-01-28.clover", + "2025-12-15.clover", + "2025-11-17.clover", + "2025-10-29.clover", + "2025-09-30.clover", + "2025-08-27.basil", + "2025-07-30.basil", + "2025-06-30.basil", + "2025-05-28.basil", + "2025-04-30.basil", + "2025-03-31.basil", + "2025-02-24.acacia", + "2025-01-27.acacia", + "2024-12-18.acacia", + "2024-11-20.acacia", + "2024-10-28.acacia", + "2024-09-30.acacia", + "2024-06-20", + "2024-04-10", + "2024-04-03", + "2023-10-16", + "2023-08-16", + "2022-11-15", + "2022-08-01", + "2020-08-27", + "2020-03-02", + "2019-12-03", + "2019-11-05", + "2019-10-17", + "2019-10-08", + "2019-09-09", + "2019-08-14", + "2019-05-16", + "2019-03-14", + "2019-02-19", + "2019-02-11", + "2018-11-08", + "2018-10-31", + "2018-09-24", + "2018-09-06", + "2018-08-23", + "2018-07-27", + "2018-05-21", + "2018-02-28", + "2018-02-06", + "2018-02-05", + "2018-01-23", + "2017-12-14", + "2017-08-15" + ] }, "base_url": { "type": "string", @@ -2005,6 +2011,32 @@ "destination" ], "additionalProperties": false + }, + "SourceState": { + "type": "object", + "properties": { + "streams": { + "type": "object", + "propertyNames": { + "type": "string" + }, + "additionalProperties": {}, + "description": "Per-stream checkpoint data, keyed by stream name." + }, + "global": { + "type": "object", + "propertyNames": { + "type": "string" + }, + "additionalProperties": {}, + "description": "Sync-wide state shared across all streams (e.g. a global events cursor)." + } + }, + "required": [ + "streams", + "global" + ], + "additionalProperties": false } } } diff --git a/apps/engine/src/__tests__/openapi.test.ts b/apps/engine/src/__tests__/openapi.test.ts index 6681f1e8..b662c7fd 100644 --- a/apps/engine/src/__tests__/openapi.test.ts +++ b/apps/engine/src/__tests__/openapi.test.ts @@ -51,4 +51,54 @@ describe('Engine OpenAPI spec', () => { expect(schema, `${name} should not have $schema`).not.toHaveProperty('$schema') } }) + + it('has SourceState as a named component schema', async () => { + const spec = await getSpec() + expect(spec.components.schemas).toHaveProperty('SourceState') + const sourceState = spec.components.schemas['SourceState'] as Record + expect(sourceState.type).toBe('object') + expect(sourceState).toHaveProperty('properties') + const props = sourceState.properties as Record + expect(props).toHaveProperty('streams') + expect(props).toHaveProperty('global') + }) + + it('header params use application/json content key, never [object Object]', async () => { + const spec = await getSpec() + for (const [path, pathItem] of Object.entries(spec.paths ?? {})) { + for (const [method, op] of Object.entries(pathItem as Record)) { + const operation = op as { parameters?: Array> } | undefined + for (const param of operation?.parameters ?? []) { + const content = param.content as Record | undefined + if (content) { + expect( + Object.keys(content), + `${method.toUpperCase()} ${path} param "${param.name}" has [object Object] content key` + ).not.toContain('[object Object]') + } + } + } + } + }) + + it('x-source-state header uses application/json content with $ref to SourceState', async () => { + const spec = await getSpec() + const allParams: Array> = [] + for (const pathItem of Object.values(spec.paths ?? {})) { + for (const op of Object.values(pathItem as Record)) { + const operation = op as { parameters?: Array> } | undefined + allParams.push(...(operation?.parameters ?? [])) + } + } + const stateParams = allParams.filter((p) => p.name === 'x-source-state') + expect(stateParams.length).toBeGreaterThan(0) + for (const param of stateParams) { + expect(param.schema).toBeUndefined() + const content = param.content as Record> | undefined + expect(content?.['application/json']).toBeDefined() + expect(content?.['application/json']?.schema).toMatchObject({ + $ref: '#/components/schemas/SourceState', + }) + } + }) }) diff --git a/apps/engine/src/api/app.ts b/apps/engine/src/api/app.ts index 182afcb6..0701185c 100644 --- a/apps/engine/src/api/app.ts +++ b/apps/engine/src/api/app.ts @@ -20,7 +20,7 @@ import { CheckOutput as CheckOutputSchema, SetupOutput as SetupOutputSchema, TeardownOutput as TeardownOutputSchema, - SyncState, + SourceState, } from '@stripe/sync-protocol' // Raw $refs for NDJSON content schemas — avoids zod-openapi generating *Output @@ -161,10 +161,10 @@ export async function createApp(resolver: ConnectorResolver) { // Accept both new format { streams, global } and old flat format { stream_name: data }. 'streams' in obj && 'global' in obj ? obj : { streams: obj, global: {} } ) - .pipe(SyncState) + .pipe(SourceState) .optional() .meta({ - description: 'JSON-encoded SyncState ({ streams, global }) or legacy flat per-stream state', + description: 'JSON-encoded SourceState ({ streams, global }) or legacy flat per-stream state', param: { content: { 'application/json': {} } }, }) @@ -181,7 +181,7 @@ export async function createApp(resolver: ConnectorResolver) { const sourceHeaders = z.object({ 'x-source': xSourceHeader }) const allSyncHeaders = z.object({ 'x-pipeline': xPipelineHeader, - 'x-state': xStateHeader, + 'x-source-state': xStateHeader, }) const syncQueryParams = z.object({ @@ -333,7 +333,9 @@ export async function createApp(resolver: ConnectorResolver) { app.openapi(sourceDiscoverRoute, (c) => { const source = c.req.valid('header')['x-source'] const context = { path: '/source_discover', sourceName: source.type } - return ndjsonResponse(logApiStream('Engine API /source_discover', engine.source_discover(source), context)) + return ndjsonResponse( + logApiStream('Engine API /source_discover', engine.source_discover(source), context) + ) }) const pipelineReadRoute = createRoute({ @@ -363,7 +365,7 @@ export async function createApp(resolver: ConnectorResolver) { }) app.openapi(pipelineReadRoute, async (c) => { const pipeline = c.req.valid('header')['x-pipeline'] - const state = c.req.valid('header')['x-state'] + const state = c.req.valid('header')['x-source-state'] const { state_limit, time_limit } = c.req.valid('query') const inputPresent = hasBody(c) const context = { path: '/pipeline_read', inputPresent, ...syncRequestContext(pipeline) } @@ -458,7 +460,7 @@ export async function createApp(resolver: ConnectorResolver) { }) app.openapi(pipelineSyncRoute, async (c) => { const pipeline = c.req.valid('header')['x-pipeline'] - const state = c.req.valid('header')['x-state'] + const state = c.req.valid('header')['x-source-state'] const { state_limit, time_limit } = c.req.valid('query') const input = hasBody(c) ? parseNdjsonStream(c.req.raw.body!) : undefined const output = engine.pipeline_sync(pipeline, { state, state_limit, time_limit }, input) diff --git a/apps/engine/src/lib/engine.ts b/apps/engine/src/lib/engine.ts index 8bf931cd..5c9c63df 100644 --- a/apps/engine/src/lib/engine.ts +++ b/apps/engine/src/lib/engine.ts @@ -11,7 +11,7 @@ import { ConfiguredStream, ConfiguredCatalog, SyncOutput, - SyncState, + SourceState, RecordMessage, SourceStateMessage, collectFirst, @@ -29,7 +29,7 @@ import { logger } from '../logger.js' export const SourceReadOptions = z.object({ /** Aggregate state (per-stream + global) carried in from the previous sync run. */ - state: SyncState.optional(), + state: SourceState.optional(), /** Stop after emitting this many state messages (useful for paging). */ state_limit: z.number().int().positive().optional(), /** Wall-clock time limit in seconds; the stream stops after this duration. */ diff --git a/apps/engine/src/lib/remote-engine.ts b/apps/engine/src/lib/remote-engine.ts index f80dbdc3..5b18a581 100644 --- a/apps/engine/src/lib/remote-engine.ts +++ b/apps/engine/src/lib/remote-engine.ts @@ -58,7 +58,7 @@ export function createRemoteEngine(engineUrl: string): Engine { function stateHeaders(opts?: SourceReadOptions): Record { const h: Record = {} if (opts?.state && Object.keys(opts.state).length > 0) { - h['x-state'] = JSON.stringify(opts.state) + h['x-source-state'] = JSON.stringify(opts.state) } return h } diff --git a/apps/engine/src/lib/state-store.ts b/apps/engine/src/lib/state-store.ts index a11fd1df..98a99169 100644 --- a/apps/engine/src/lib/state-store.ts +++ b/apps/engine/src/lib/state-store.ts @@ -1,10 +1,10 @@ -import type { SyncState } from '@stripe/sync-protocol' +import type { SourceState } from '@stripe/sync-protocol' // MARK: - Interface /** Pipeline-scoped state store — load prior state and persist checkpoints. */ export interface StateStore { - get(): Promise + get(): Promise set(stream: string, data: unknown): Promise setGlobal(data: unknown): Promise } @@ -15,7 +15,7 @@ export interface StateStore { * A StateStore that returns the provided initial state (if any) and discards all writes. * Use when the caller manages state externally (e.g., via HTTP headers or workflow state). */ -export function readonlyStateStore(state?: SyncState): StateStore { +export function readonlyStateStore(state?: SourceState): StateStore { return { async get() { return state diff --git a/apps/service/src/__generated__/openapi.d.ts b/apps/service/src/__generated__/openapi.d.ts index ed78fd72..3a3c9b1c 100644 --- a/apps/service/src/__generated__/openapi.d.ts +++ b/apps/service/src/__generated__/openapi.d.ts @@ -128,8 +128,8 @@ export interface components { account_id?: string; /** @description Whether this is a live mode sync */ livemode?: boolean; - /** @constant */ - api_version?: "2026-03-25.dahlia"; + /** @enum {string} */ + api_version?: "2026-03-25.dahlia" | "2026-02-25.clover" | "2026-01-28.clover" | "2025-12-15.clover" | "2025-11-17.clover" | "2025-10-29.clover" | "2025-09-30.clover" | "2025-08-27.basil" | "2025-07-30.basil" | "2025-06-30.basil" | "2025-05-28.basil" | "2025-04-30.basil" | "2025-03-31.basil" | "2025-02-24.acacia" | "2025-01-27.acacia" | "2024-12-18.acacia" | "2024-11-20.acacia" | "2024-10-28.acacia" | "2024-09-30.acacia" | "2024-06-20" | "2024-04-10" | "2024-04-03" | "2023-10-16" | "2023-08-16" | "2022-11-15" | "2022-08-01" | "2020-08-27" | "2020-03-02" | "2019-12-03" | "2019-11-05" | "2019-10-17" | "2019-10-08" | "2019-09-09" | "2019-08-14" | "2019-05-16" | "2019-03-14" | "2019-02-19" | "2019-02-11" | "2018-11-08" | "2018-10-31" | "2018-09-24" | "2018-09-06" | "2018-08-23" | "2018-07-27" | "2018-05-21" | "2018-02-28" | "2018-02-06" | "2018-02-05" | "2018-01-23" | "2017-12-14" | "2017-08-15"; /** * Format: uri * @description Override the Stripe API base URL (e.g. http://localhost:12111 for stripe-mock) diff --git a/apps/service/src/__generated__/openapi.json b/apps/service/src/__generated__/openapi.json index 631fc426..5de34bc2 100644 --- a/apps/service/src/__generated__/openapi.json +++ b/apps/service/src/__generated__/openapi.json @@ -989,7 +989,59 @@ }, "api_version": { "type": "string", - "const": "2026-03-25.dahlia" + "enum": [ + "2026-03-25.dahlia", + "2026-02-25.clover", + "2026-01-28.clover", + "2025-12-15.clover", + "2025-11-17.clover", + "2025-10-29.clover", + "2025-09-30.clover", + "2025-08-27.basil", + "2025-07-30.basil", + "2025-06-30.basil", + "2025-05-28.basil", + "2025-04-30.basil", + "2025-03-31.basil", + "2025-02-24.acacia", + "2025-01-27.acacia", + "2024-12-18.acacia", + "2024-11-20.acacia", + "2024-10-28.acacia", + "2024-09-30.acacia", + "2024-06-20", + "2024-04-10", + "2024-04-03", + "2023-10-16", + "2023-08-16", + "2022-11-15", + "2022-08-01", + "2020-08-27", + "2020-03-02", + "2019-12-03", + "2019-11-05", + "2019-10-17", + "2019-10-08", + "2019-09-09", + "2019-08-14", + "2019-05-16", + "2019-03-14", + "2019-02-19", + "2019-02-11", + "2018-11-08", + "2018-10-31", + "2018-09-24", + "2018-09-06", + "2018-08-23", + "2018-07-27", + "2018-05-21", + "2018-02-28", + "2018-02-06", + "2018-02-05", + "2018-01-23", + "2017-12-14", + "2017-08-15" + ] }, "base_url": { "type": "string", diff --git a/apps/service/src/__tests__/workflow.test.ts b/apps/service/src/__tests__/workflow.test.ts index c37f01ba..a49e9293 100644 --- a/apps/service/src/__tests__/workflow.test.ts +++ b/apps/service/src/__tests__/workflow.test.ts @@ -22,14 +22,12 @@ function stubActivities(overrides: Partial = {}): SyncActivities setup: async () => ({}), syncImmediate: async () => noErrors, readGoogleSheetsIntoQueue: async () => ({ count: 0, state: emptyState }), - readIntoQueue: async () => ({ count: 0, state: emptyState }), writeGoogleSheetsFromQueue: async () => ({ errors: [], state: emptyState, written: 0, rowAssignments: {}, }), - writeFromQueue: async () => ({ errors: [], state: emptyState, written: 0 }), teardown: async () => {}, ...overrides, } diff --git a/apps/service/src/api/app.test.ts b/apps/service/src/api/app.test.ts index ddd86971..3f223d3e 100644 --- a/apps/service/src/api/app.test.ts +++ b/apps/service/src/api/app.test.ts @@ -132,14 +132,12 @@ function stubActivities(): SyncActivities { setup: async () => ({}), syncImmediate: async () => noErrors, readGoogleSheetsIntoQueue: async () => ({ count: 0, state: {} }), - readIntoQueue: async () => ({ count: 0, state: {} }), writeGoogleSheetsFromQueue: async () => ({ errors: [], state: {}, written: 0, rowAssignments: {}, }), - writeFromQueue: async () => ({ errors: [], state: {}, written: 0 }), teardown: async () => {}, } } diff --git a/apps/service/src/temporal/activities/_shared.ts b/apps/service/src/temporal/activities/_shared.ts index da35d248..b0ba710e 100644 --- a/apps/service/src/temporal/activities/_shared.ts +++ b/apps/service/src/temporal/activities/_shared.ts @@ -1,5 +1,5 @@ import { heartbeat } from '@temporalio/activity' -import type { Message, Engine, SyncState } from '@stripe/sync-engine' +import type { Message, Engine, SourceState } from '@stripe/sync-engine' import { createRemoteEngine } from '@stripe/sync-engine' import { Kafka } from 'kafkajs' import type { Producer } from 'kafkajs' @@ -105,7 +105,7 @@ export function createActivitiesContext(opts: { export interface RunResult { errors: Array<{ message: string; failure_type?: string; stream?: string }> - state: SyncState + state: SourceState } export async function* asIterable(items: T[]): AsyncIterable { @@ -127,16 +127,22 @@ export function collectError(message: Message): RunResult['errors'][number] | nu return null } -export async function drainMessages(stream: AsyncIterable): Promise<{ +export async function drainMessages( + stream: AsyncIterable, + initialState?: SourceState +): Promise<{ errors: RunResult['errors'] - state: SyncState + state: SourceState records: Message[] sourceConfig?: Record destConfig?: Record eof?: { reason: string } }> { const errors: RunResult['errors'] = [] - const state: SyncState = { streams: {}, global: {} } + const state: SourceState = { + streams: { ...initialState?.streams }, + global: { ...initialState?.global }, + } const records: Message[] = [] let sourceConfig: Record | undefined let destConfig: Record | undefined @@ -159,7 +165,7 @@ export async function drainMessages(stream: AsyncIterable): Promise<{ errors.push(error) } else if (message.type === 'source_state') { if (message.source_state.state_type === 'global') { - Object.assign(state.global, message.source_state.data as Record) + state.global = message.source_state.data as Record } else { state.streams[message.source_state.stream] = message.source_state.data } diff --git a/apps/service/src/temporal/activities/index.ts b/apps/service/src/temporal/activities/index.ts index e423c2e3..cd96b661 100644 --- a/apps/service/src/temporal/activities/index.ts +++ b/apps/service/src/temporal/activities/index.ts @@ -1,11 +1,9 @@ import { createActivitiesContext } from './_shared.js' import { createDiscoverCatalogActivity } from './discover-catalog.js' -import { createReadIntoQueueActivity } from './read-into-queue.js' import { createReadGoogleSheetsIntoQueueActivity } from './read-google-sheets-into-queue.js' import { createSetupActivity } from './setup.js' import { createSyncImmediateActivity } from './sync-immediate.js' import { createTeardownActivity } from './teardown.js' -import { createWriteFromQueueActivity } from './write-from-queue.js' import { createWriteGoogleSheetsFromQueueActivity } from './write-google-sheets-from-queue.js' import type { PipelineStore } from '../../lib/stores.js' @@ -23,9 +21,7 @@ export function createActivities(opts: { setup: createSetupActivity(context), syncImmediate: createSyncImmediateActivity(context), readGoogleSheetsIntoQueue: createReadGoogleSheetsIntoQueueActivity(context), - readIntoQueue: createReadIntoQueueActivity(context), writeGoogleSheetsFromQueue: createWriteGoogleSheetsFromQueueActivity(context), - writeFromQueue: createWriteFromQueueActivity(context), teardown: createTeardownActivity(context), } } diff --git a/apps/service/src/temporal/activities/read-google-sheets-into-queue.ts b/apps/service/src/temporal/activities/read-google-sheets-into-queue.ts index dd2772aa..6e89fa2a 100644 --- a/apps/service/src/temporal/activities/read-google-sheets-into-queue.ts +++ b/apps/service/src/temporal/activities/read-google-sheets-into-queue.ts @@ -35,7 +35,7 @@ export function createReadGoogleSheetsIntoQueueActivity(context: ActivitiesConte input?: SourceInput[] catalog?: ConfiguredCatalog } - ): Promise<{ count: number; state: import('@stripe/sync-engine').SyncState }> { + ): Promise<{ count: number; state: import('@stripe/sync-engine').SourceState }> { if (!context.kafkaBroker) throw new Error('kafkaBroker is required for Google Sheets workflow') const pipeline = await context.pipelineStore.get(pipelineId) @@ -44,7 +44,10 @@ export function createReadGoogleSheetsIntoQueueActivity(context: ActivitiesConte const input = inputArr?.length ? asIterable(inputArr) : undefined const queued: Message[] = [] - const state: import('@stripe/sync-engine').SyncState = { streams: {}, global: {} } + const state: import('@stripe/sync-engine').SourceState = { + streams: { ...readOpts.state?.streams }, + global: { ...readOpts.state?.global }, + } const errors: RunResult['errors'] = [] let seen = 0 @@ -57,7 +60,7 @@ export function createReadGoogleSheetsIntoQueueActivity(context: ActivitiesConte queued.push(withRowKey(raw, catalog)) } else if (raw.type === 'source_state') { if (raw.source_state.state_type === 'global') { - Object.assign(state.global, raw.source_state.data as Record) + state.global = raw.source_state.data as Record } else { state.streams[raw.source_state.stream] = raw.source_state.data } diff --git a/apps/service/src/temporal/activities/read-into-queue.ts b/apps/service/src/temporal/activities/read-into-queue.ts deleted file mode 100644 index deec31bf..00000000 --- a/apps/service/src/temporal/activities/read-into-queue.ts +++ /dev/null @@ -1,34 +0,0 @@ -import type { SourceReadOptions } from '@stripe/sync-engine' -import type { ActivitiesContext } from './_shared.js' - -type SourceInput = unknown -import { asIterable, drainMessages } from './_shared.js' - -export function createReadIntoQueueActivity(context: ActivitiesContext) { - return async function readIntoQueue( - pipelineId: string, - opts?: SourceReadOptions & { input?: SourceInput[] } - ): Promise<{ - count: number - state: import('@stripe/sync-engine').SyncState - eof?: { reason: string } - }> { - const pipeline = await context.pipelineStore.get(pipelineId) - const { id: _, ...config } = pipeline - const { input: inputArr, ...readOpts } = opts ?? {} - const input = inputArr?.length ? asIterable(inputArr) : undefined - const { records, state, eof } = await drainMessages( - context.engine.pipeline_read(config, readOpts, input) - ) - - if (context.kafkaBroker && records.length > 0) { - const producer = await context.getProducer() - await producer.send({ - topic: `pipeline.${pipelineId}`, - messages: records.map((record) => ({ value: JSON.stringify(record) })), - }) - } - - return { count: records.length, state, eof } - } -} diff --git a/apps/service/src/temporal/activities/sync-immediate.ts b/apps/service/src/temporal/activities/sync-immediate.ts index d9959058..bbdf1531 100644 --- a/apps/service/src/temporal/activities/sync-immediate.ts +++ b/apps/service/src/temporal/activities/sync-immediate.ts @@ -14,7 +14,8 @@ export function createSyncImmediateActivity(context: ActivitiesContext) { const { input: inputArr, ...readOpts } = opts ?? {} const input = inputArr?.length ? asIterable(inputArr) : undefined const { errors, state, sourceConfig, destConfig, eof } = await drainMessages( - context.engine.pipeline_sync(config, readOpts, input) + context.engine.pipeline_sync(config, readOpts, input), + readOpts.state ) // Full replacement — connector emits the complete updated config if (sourceConfig) { diff --git a/apps/service/src/temporal/activities/write-from-queue.ts b/apps/service/src/temporal/activities/write-from-queue.ts deleted file mode 100644 index 9647eea4..00000000 --- a/apps/service/src/temporal/activities/write-from-queue.ts +++ /dev/null @@ -1,32 +0,0 @@ -import type { Message } from '@stripe/sync-engine' - -import type { ActivitiesContext } from './_shared.js' -import { asIterable, drainMessages, type RunResult } from './_shared.js' - -export function createWriteFromQueueActivity(context: ActivitiesContext) { - return async function writeFromQueue( - pipelineId: string, - opts?: { records?: unknown[]; maxBatch?: number } - ): Promise { - let records: unknown[] - - if (context.kafkaBroker) { - const maxBatch = opts?.maxBatch ?? 50 - records = await context.consumeQueueBatch(pipelineId, maxBatch) - } else { - records = opts?.records ?? [] - } - - if (records.length === 0) { - return { errors: [], state: { streams: {}, global: {} }, written: 0 } - } - - const pipeline = await context.pipelineStore.get(pipelineId) - const { id: _, ...config } = pipeline - const { errors, state } = await drainMessages( - context.engine.pipeline_write(config, asIterable(records) as AsyncIterable) - ) - - return { errors, state, written: records.length } - } -} diff --git a/apps/service/src/temporal/activities/write-google-sheets-from-queue.ts b/apps/service/src/temporal/activities/write-google-sheets-from-queue.ts index e77bc838..22bb0fec 100644 --- a/apps/service/src/temporal/activities/write-google-sheets-from-queue.ts +++ b/apps/service/src/temporal/activities/write-google-sheets-from-queue.ts @@ -116,6 +116,7 @@ export function createWriteGoogleSheetsFromQueueActivity(context: ActivitiesCont maxBatch?: number rowIndex?: Record> catalog?: ConfiguredCatalog + state?: import('@stripe/sync-engine').SourceState } ): Promise< RunResult & { @@ -128,8 +129,13 @@ export function createWriteGoogleSheetsFromQueueActivity(context: ActivitiesCont const maxBatch = opts?.maxBatch ?? 50 const queued = await context.consumeQueueBatch(pipelineId, maxBatch) + const initialState: import('@stripe/sync-engine').SourceState = { + streams: { ...opts?.state?.streams }, + global: { ...opts?.state?.global }, + } + if (queued.length === 0) { - return { errors: [], state: { streams: {}, global: {} }, written: 0, rowAssignments: {} } + return { errors: [], state: initialState, written: 0, rowAssignments: {} } } const pipeline = await context.pipelineStore.get(pipelineId) @@ -146,7 +152,10 @@ export function createWriteGoogleSheetsFromQueueActivity(context: ActivitiesCont const filteredCatalog = augmentGoogleSheetsCatalog(opts.catalog) const destination = createGoogleSheetsDestination() const errors: RunResult['errors'] = [] - const state: import('@stripe/sync-engine').SyncState = { streams: {}, global: {} } + const state: import('@stripe/sync-engine').SourceState = { + streams: { ...initialState.streams }, + global: { ...initialState.global }, + } const rowAssignments: Record> = {} const input = enforceCatalog(filteredCatalog)( asIterable(writeBatch) @@ -164,7 +173,7 @@ export function createWriteGoogleSheetsFromQueueActivity(context: ActivitiesCont errors.push(error) } else if (raw.type === 'source_state') { if (raw.source_state.state_type === 'global') { - Object.assign(state.global, raw.source_state.data as Record) + state.global = raw.source_state.data as Record } else { state.streams[raw.source_state.stream] = raw.source_state.data } diff --git a/apps/service/src/temporal/workflows/_shared.ts b/apps/service/src/temporal/workflows/_shared.ts index 2f3199e8..395c66c6 100644 --- a/apps/service/src/temporal/workflows/_shared.ts +++ b/apps/service/src/temporal/workflows/_shared.ts @@ -1,7 +1,7 @@ import { defineQuery, defineSignal, proxyActivities } from '@temporalio/workflow' import type { SyncActivities } from '../activities/index.js' -import type { SyncState } from '@stripe/sync-protocol' +import type { SourceState } from '@stripe/sync-protocol' import { retryPolicy } from '../../lib/utils.js' export interface WorkflowStatus { @@ -18,7 +18,7 @@ export const updateSignal = defineSignal<[{ paused?: boolean }]>('update') export const deleteSignal = defineSignal('delete') export const statusQuery = defineQuery('status') -export const stateQuery = defineQuery('state') +export const stateQuery = defineQuery('state') export const { setup, teardown } = proxyActivities({ startToCloseTimeout: '2m', diff --git a/apps/service/src/temporal/workflows/backfill-pipeline-workflow.ts b/apps/service/src/temporal/workflows/backfill-pipeline-workflow.ts index 91511789..170e9083 100644 --- a/apps/service/src/temporal/workflows/backfill-pipeline-workflow.ts +++ b/apps/service/src/temporal/workflows/backfill-pipeline-workflow.ts @@ -8,13 +8,14 @@ import { updateSignal, WorkflowStatus, } from './_shared.js' -import type { SyncState } from '@stripe/sync-protocol' +import type { SourceState } from '@stripe/sync-protocol' import { CONTINUE_AS_NEW_THRESHOLD } from '../../lib/utils.js' const ONE_WEEK_MS = 7 * 24 * 60 * 60 * 1000 export interface BackfillPipelineWorkflowOpts { - state?: SyncState + state?: SourceState + reconcileComplete?: boolean } export async function backfillPipelineWorkflow( @@ -24,8 +25,8 @@ export async function backfillPipelineWorkflow( let paused = false let deleted = false let iteration = 0 - let syncState: SyncState = opts?.state ?? { streams: {}, global: {} } - let backfillComplete = false + let syncState: SourceState = opts?.state ?? { streams: {}, global: {} } + let reconcileComplete: boolean = opts?.reconcileComplete ?? false setHandler(updateSignal, (patch) => { if (patch.paused !== undefined) paused = patch.paused @@ -35,11 +36,14 @@ export async function backfillPipelineWorkflow( }) setHandler(statusQuery, (): WorkflowStatus => ({ phase: 'running', paused, iteration })) - setHandler(stateQuery, (): SyncState => syncState) + setHandler(stateQuery, (): SourceState => syncState) async function maybeContinueAsNew() { if (++iteration >= CONTINUE_AS_NEW_THRESHOLD) { - await continueAsNew(pipelineId, { state: syncState }) + await continueAsNew(pipelineId, { + state: syncState, + reconcileComplete, + }) } } @@ -49,10 +53,10 @@ export async function backfillPipelineWorkflow( continue } - if (backfillComplete) { + if (reconcileComplete) { // Idle — wait up to one week; timeout means recon is due. const timedOut = !(await condition(() => paused || deleted, ONE_WEEK_MS)) - if (timedOut) backfillComplete = false + if (timedOut) reconcileComplete = false continue } @@ -61,11 +65,8 @@ export async function backfillPipelineWorkflow( state_limit: 100, time_limit: 10, }) - syncState = { - streams: { ...syncState.streams, ...result.state.streams }, - global: { ...syncState.global, ...result.state.global }, - } - backfillComplete = result.eof?.reason === 'complete' + syncState = result.state + reconcileComplete = result.eof?.reason === 'complete' await maybeContinueAsNew() } } diff --git a/apps/service/src/temporal/workflows/google-sheet-pipeline-workflow.ts b/apps/service/src/temporal/workflows/google-sheet-pipeline-workflow.ts index e9b4f028..d7f117e6 100644 --- a/apps/service/src/temporal/workflows/google-sheet-pipeline-workflow.ts +++ b/apps/service/src/temporal/workflows/google-sheet-pipeline-workflow.ts @@ -1,5 +1,5 @@ import { condition, continueAsNew, setHandler, sleep } from '@temporalio/workflow' -import type { ConfiguredCatalog, SyncState } from '@stripe/sync-engine' +import type { ConfiguredCatalog, SourceState } from '@stripe/sync-engine' import { deleteSignal, @@ -19,8 +19,8 @@ import { CONTINUE_AS_NEW_THRESHOLD, deepEqual, EVENT_BATCH_SIZE } from '../../li export interface GoogleSheetPipelineWorkflowOpts { phase?: string - sourceState?: SyncState - readState?: SyncState + sourceState?: SourceState + readState?: SourceState rowIndex?: RowIndex catalog?: ConfiguredCatalog pendingWrites?: boolean @@ -37,8 +37,8 @@ export async function googleSheetPipelineWorkflow( let deleted = false const inputQueue: unknown[] = [...(opts?.inputQueue ?? [])] let iteration = 0 - let sourceState: SyncState = opts?.sourceState ?? { streams: {}, global: {} } - let readState: SyncState = opts?.readState ?? { + let sourceState: SourceState = opts?.sourceState ?? { streams: {}, global: {} } + let readState: SourceState = opts?.readState ?? { streams: { ...sourceState.streams }, global: { ...sourceState.global }, } @@ -70,7 +70,7 @@ export async function googleSheetPipelineWorkflow( iteration, }) ) - setHandler(stateQuery, (): SyncState => sourceState) + setHandler(stateQuery, (): SourceState => sourceState) async function waitWhilePaused() { await condition(() => !paused || deleted) @@ -128,10 +128,7 @@ export async function googleSheetPipelineWorkflow( catalog, }) if (count > 0) pendingWrites = true - readState = { - streams: { ...readState.streams, ...nextReadState.streams }, - global: { ...readState.global, ...nextReadState.global }, - } + readState = nextReadState readComplete = deepEqual(readState, before) await tickIteration() continue @@ -152,12 +149,10 @@ export async function googleSheetPipelineWorkflow( maxBatch: 50, rowIndex, catalog, + state: sourceState, }) pendingWrites = result.written > 0 - sourceState = { - streams: { ...sourceState.streams, ...result.state.streams }, - global: { ...sourceState.global, ...result.state.global }, - } + sourceState = result.state for (const [stream, assignments] of Object.entries(result.rowAssignments)) { rowIndex[stream] ??= {} Object.assign(rowIndex[stream], assignments) diff --git a/apps/service/src/temporal/workflows/pipeline-workflow.ts b/apps/service/src/temporal/workflows/pipeline-workflow.ts index 4d541540..27da4aad 100644 --- a/apps/service/src/temporal/workflows/pipeline-workflow.ts +++ b/apps/service/src/temporal/workflows/pipeline-workflow.ts @@ -15,10 +15,10 @@ import { CONTINUE_AS_NEW_THRESHOLD, EVENT_BATCH_SIZE } from '../../lib/utils.js' const ONE_WEEK_MS = 7 * 24 * 60 * 60 * 1000 -import type { SyncState } from '@stripe/sync-protocol' +import type { SourceState } from '@stripe/sync-protocol' export interface PipelineWorkflowOpts { - state?: SyncState + state?: SourceState inputQueue?: unknown[] } @@ -30,7 +30,7 @@ export async function pipelineWorkflow( let deleted = false const inputQueue: unknown[] = [...(opts?.inputQueue ?? [])] let iteration = 0 - let syncState: SyncState = opts?.state ?? { streams: {}, global: {} } + let syncState: SourceState = opts?.state ?? { streams: {}, global: {} } let readComplete = false setHandler(stripeEventSignal, (event: unknown) => { @@ -44,7 +44,7 @@ export async function pipelineWorkflow( }) setHandler(statusQuery, (): WorkflowStatus => ({ phase: 'running', paused, iteration })) - setHandler(stateQuery, (): SyncState => syncState) + setHandler(stateQuery, (): SourceState => syncState) async function maybeContinueAsNew() { if (++iteration >= CONTINUE_AS_NEW_THRESHOLD) { @@ -86,10 +86,7 @@ export async function pipelineWorkflow( state_limit: 100, time_limit: 10, }) - syncState = { - streams: { ...syncState.streams, ...result.state.streams }, - global: { ...syncState.global, ...result.state.global }, - } + syncState = result.state readComplete = result.eof?.reason === 'complete' } diff --git a/docs/scripts/generate-stripe-specs.mjs b/docs/scripts/generate-stripe-specs.mjs index 569506d8..2d466328 100644 --- a/docs/scripts/generate-stripe-specs.mjs +++ b/docs/scripts/generate-stripe-specs.mjs @@ -1,142 +1,15 @@ #!/usr/bin/env node /** - * Fetches all published Stripe REST API spec versions from - * github.com/stripe/openapi and writes .json + manifest.json to . + * CDN spec generation — thin wrapper around packages/openapi/scripts/generate-specs.mjs. * * Usage: - * node generate-stripe-specs.mjs - * - * Clones stripe/openapi (single-branch) then walks the full history, collecting - * every unique API version (deduplicated by blob SHA, then by version string). - * Set STRIPE_OPENAPI_REPO to a pre-cloned path to skip the clone (e.g. CI cache). - * - * These are the official Stripe REST API specs (github.com/stripe/openapi), NOT - * the Sync Engine's own OpenAPI spec (which lives at /openapi/engine.json etc.). - * - * No npm dependencies. + * node docs/scripts/generate-stripe-specs.mjs */ -import { writeFileSync, mkdirSync, existsSync } from 'node:fs' -import { join } from 'node:path' -import { tmpdir } from 'node:os' import { execFileSync } from 'node:child_process' +import { dirname, join } from 'node:path' +import { fileURLToPath } from 'node:url' -const [outputDir] = process.argv.slice(2) -if (!outputDir) { - console.error('Usage: node generate-stripe-specs.mjs ') - process.exit(1) -} - -const REPO_URL = 'https://github.com/stripe/openapi' -// stripe/openapi uses 'latest/openapi.spec3.sdk.json' for recent specs and -// 'openapi/spec3.json' for historic ones. -const SPEC_PATHS = ['latest/openapi.spec3.sdk.json', 'openapi/spec3.json'] - -function git(...args) { - // maxBuffer: Stripe specs are ~10 MB each; default 1 MB would silently truncate/throw. - return execFileSync('git', ['-C', repoDir, ...args], { - encoding: 'utf8', - maxBuffer: 50 * 1024 * 1024, - }) -} - -// Clone or use pre-cloned repo -const repoDir = process.env.STRIPE_OPENAPI_REPO ?? join(tmpdir(), 'stripe-openapi') -if (!existsSync(join(repoDir, '.git'))) { - console.error(`Cloning ${REPO_URL}...`) - execFileSync('git', ['clone', '--single-branch', REPO_URL, repoDir], { stdio: 'inherit' }) -} else { - console.error(`Using pre-cloned repo at ${repoDir}`) -} - -// Walk commits newest→oldest, collect all unique API versions. -console.error('Collecting all spec versions...') -const commits = git('log', '--format=%H', '--', ...SPEC_PATHS) - .trim() - .split('\n') - .filter(Boolean) - -mkdirSync(outputDir, { recursive: true }) - -const seen = new Map() // version -> filename -const seenBlobs = new Set() - -for (const commit of commits) { - let blobSha - for (const specPath of SPEC_PATHS) { - let ls - try { - ls = git('ls-tree', commit, specPath).trim() - } catch { - continue - } - if (!ls) continue - blobSha = ls.split(/\s+/)[2] - break - } - if (!blobSha || seenBlobs.has(blobSha)) continue - seenBlobs.add(blobSha) - - let raw - try { - raw = git('cat-file', 'blob', blobSha) - } catch { - continue - } - - let version - try { - version = JSON.parse(raw).info?.version - } catch { - continue - } - if (!version || seen.has(version)) continue - - writeFileSync(join(outputDir, `${version}.json`), raw) - seen.set(version, `${version}.json`) - console.error(` ${version}`) -} - -writeFileSync( - join(outputDir, 'manifest.json'), - JSON.stringify(Object.fromEntries(seen), null, 2) + '\n' -) - -// Generate an index page so https://stripe-sync.dev/stripe-api-specs/ is browsable -const versions = [...seen.keys()].sort().reverse() -const rows = versions.map((v) => `
  • ${v}
  • `).join('\n') -writeFileSync( - join(outputDir, 'index.html'), - ` - - - - Stripe REST API Specs — stripe-sync.dev CDN - - - -

    Stripe REST API OpenAPI Specs

    -

    - These are the official Stripe REST API specs from - github.com/stripe/openapi, - mirrored here to avoid GitHub API rate limits. - This is not the Sync Engine's own OpenAPI spec - (see engine.json for that). -

    -

    Machine-readable index: manifest.json — ${versions.length} versions available.

    -
      -${rows} -
    - - -` -) +const __dirname = dirname(fileURLToPath(import.meta.url)) +const script = join(__dirname, '..', '..', 'packages', 'openapi', 'scripts', 'generate-specs.mjs') -console.error(`\nDone: ${seen.size} spec versions`) +execFileSync(process.execPath, [script, ...process.argv.slice(2)], { stdio: 'inherit' }) diff --git a/docs/todos.md b/docs/todos.md index db09180b..5d38f654 100644 --- a/docs/todos.md +++ b/docs/todos.md @@ -1,35 +1,17 @@ # Todos -- /setup - - finish getting setup working - - use the control message for persisting catalog updates in setiup -- /pipeline_sync - - is activity plain functions -- call pipeline_sync from webhook - ---- - -- [x] ~~_/discover endpoint passing stripe api version_~~ [2026-04-04] -- [x] ~~\*~~how to return a list of all stripe api versions~~ done (DDR-008: enum on config JSON Schema)\*~~ [2026-04-04] -- Consider renaming generated OpenAPI specs to `.oas.json` for consistency (e.g. `engine.oas.json`, `service.oas.json`) - -- /check (implmeent or remove for now) - -- The pipeline state machine.... maintain status -- more resilient engine, not crash and do something else to recover from source / destination issues? - ---- - Short-term actionable items. Move to a dated plan in `docs/plans/` when scoped. ## Now - Land grouped Stripe tables in stream selector (`docs/plans/active/2026-04-03-grouped-tables.md`) - Structured stream state + per-stream reset signal (`docs/plans/active/2026-04-03-structured-stream-state.md`) +- Pipeline status state machine — implement `PipelineStatus`, `sub_status`, `backfill_complete` in workflow + API (`docs/plans/2026-04-03-pipeline-status-state-machine.md`) +- Google Sheets destination — fix global state flush, catalog reset on update, batch updateRows ## Soon -- Pipeline status state machine — implementation (`docs/plans/2026-04-03-pipeline-status-state-machine.md`) +- `/check` endpoint — implement or remove - Control messages → config store updates (unscoped) - CLI backfill progress display (`docs/plans/active/2026-03-20-plan-005-cli-progress-display.md`) — progress bars with row counts - Scope rename: `@stripe/` → `@stripe-sync/` (`docs/plans/active/2026-03-20-plan-008-scope-rename-stripe-sync.md`) — blocked on npm org approval @@ -59,6 +41,7 @@ Short-term actionable items. Move to a dated plan in `docs/plans/` when scoped. - OAuth credential refresh workflow (auto-renew expiring tokens) - Global state (per-pipeline state in addition to per-stream cursors) - Fan-in support (multiple sources → one destination) +- Resilient engine — recover from source/destination errors without crashing the workflow (auto-pause on permanent, retry on transient) ## Developer Experience diff --git a/packages/hono-zod-openapi/src/__tests__/json-content-header.test.ts b/packages/hono-zod-openapi/src/__tests__/json-content-header.test.ts index b13370ad..c57729ec 100644 --- a/packages/hono-zod-openapi/src/__tests__/json-content-header.test.ts +++ b/packages/hono-zod-openapi/src/__tests__/json-content-header.test.ts @@ -41,7 +41,7 @@ function createTestApp() { .string() .transform(jsonParse) .pipe(ItemSchema) - .meta({ param: { content: 'application/json' } }), + .meta({ param: { content: { 'application/json': {} } } }), }), }, responses: { @@ -120,7 +120,7 @@ describe('JSON content header — runtime', () => { .transform(jsonParse) .pipe(ItemSchema) .optional() - .meta({ param: { content: 'application/json' } }), + .meta({ param: { content: { 'application/json': {} } } }), }), }, responses: { 200: { description: 'ok' } }, @@ -157,7 +157,7 @@ describe('JSON content header — runtime', () => { .string() .transform(jsonParse) .pipe(ItemSchema) - .meta({ param: { content: 'application/json' } }), + .meta({ param: { content: { 'application/json': {} } } }), }), }, responses: { 200: { description: 'ok' } }, diff --git a/packages/hono-zod-openapi/src/index.ts b/packages/hono-zod-openapi/src/index.ts index 4b314a99..4790ae70 100644 --- a/packages/hono-zod-openapi/src/index.ts +++ b/packages/hono-zod-openapi/src/index.ts @@ -150,13 +150,19 @@ function getPipeOutput(schema: AnyZod): AnyZod | undefined { /** * Check if a header field schema has the JSON content meta annotation. - * Returns the content media type (e.g. 'application/json') or undefined. + * Returns the content media type string (e.g. 'application/json') or undefined. + * + * Accepts either object form `.meta({ param: { content: { 'application/json': {} } } })` + * (the OAS-typed form, preferred) or legacy string form `.meta({ param: { content: 'application/json' } })`. */ function getParamContentType(schema: AnyZod): string | undefined { // eslint-disable-next-line @typescript-eslint/no-explicit-any const meta = z.globalRegistry.get(schema as any) as Record | undefined // eslint-disable-next-line @typescript-eslint/no-explicit-any - return (meta?.param as any)?.content as string | undefined + const content = (meta?.param as any)?.content + if (typeof content === 'string') return content + if (content && typeof content === 'object') return Object.keys(content)[0] + return undefined } /** diff --git a/packages/openapi/scripts/generate-specs.mjs b/packages/openapi/scripts/generate-specs.mjs new file mode 100644 index 00000000..d1afc78f --- /dev/null +++ b/packages/openapi/scripts/generate-specs.mjs @@ -0,0 +1,213 @@ +#!/usr/bin/env node +/** + * Fetches Stripe REST API spec versions from github.com/stripe/openapi and + * writes .json + manifest.json to . + * + * Usage: + * # All versions (CDN): + * node packages/openapi/scripts/generate-specs.mjs + * + * # Specific versions only (e.g. to update the bundled spec in oas/): + * node packages/openapi/scripts/generate-specs.mjs --versions 2026-03-25.dahlia + * node packages/openapi/scripts/generate-specs.mjs --versions 2026-03-25.dahlia,2026-02-25.clover + * + * Clones stripe/openapi (single-branch) then walks the full history, collecting + * versions (deduplicated by blob SHA). When --versions is given, only those + * versions are written to . + * Set STRIPE_OPENAPI_REPO to a pre-cloned path to skip the clone (e.g. CI cache). + * + * Also updates src/versions.ts with the discovered version list. + * + * These are the official Stripe REST API specs (github.com/stripe/openapi), NOT + * the Sync Engine's own OpenAPI spec. + * + * No npm dependencies. + */ +import { writeFileSync, mkdirSync, existsSync, readdirSync } from 'node:fs' +import { join, dirname } from 'node:path' +import { tmpdir } from 'node:os' +import { execFileSync } from 'node:child_process' +import { fileURLToPath } from 'node:url' + +const __dirname = dirname(fileURLToPath(import.meta.url)) + +const args = process.argv.slice(2) +const outputDir = args.find((a) => !a.startsWith('--')) +if (!outputDir) { + console.error('Usage: node generate-specs.mjs [--versions v1,v2,...]') + process.exit(1) +} + +const versionsArg = args.find((a) => a.startsWith('--versions=') || a === '--versions') +const versionsIdx = args.indexOf('--versions') +const versionFilter = versionsArg?.startsWith('--versions=') + ? new Set(versionsArg.slice('--versions='.length).split(',').filter(Boolean)) + : versionsIdx !== -1 + ? new Set(args[versionsIdx + 1]?.split(',').filter(Boolean) ?? []) + : null // null = no filter, collect all + +const REPO_URL = 'https://github.com/stripe/openapi' +// stripe/openapi uses 'latest/openapi.spec3.sdk.json' for recent specs and +// 'openapi/spec3.json' for historic ones. +const SPEC_PATHS = ['latest/openapi.spec3.sdk.json', 'openapi/spec3.json'] + +function git(...gitArgs) { + // maxBuffer: Stripe specs are ~10 MB each; default 1 MB would silently truncate/throw. + return execFileSync('git', ['-C', repoDir, ...gitArgs], { + encoding: 'utf8', + maxBuffer: 50 * 1024 * 1024, + }) +} + +// Clone or use pre-cloned repo +const repoDir = process.env.STRIPE_OPENAPI_REPO ?? join(tmpdir(), 'stripe-openapi') +if (!existsSync(join(repoDir, '.git'))) { + console.error(`Cloning ${REPO_URL}...`) + execFileSync('git', ['clone', '--single-branch', REPO_URL, repoDir], { stdio: 'inherit' }) +} else { + console.error(`Using pre-cloned repo at ${repoDir}`) +} + +console.error( + versionFilter + ? `Collecting versions: ${[...versionFilter].join(', ')}` + : 'Collecting all spec versions...' +) +const commits = git('log', '--format=%H', '--', ...SPEC_PATHS) + .trim() + .split('\n') + .filter(Boolean) + +mkdirSync(outputDir, { recursive: true }) + +const seen = new Map() // version -> filename +const seenBlobs = new Set() + +for (const commit of commits) { + // Stop early if we've found all requested versions + if (versionFilter && versionFilter.size === seen.size) break + + let blobSha + for (const specPath of SPEC_PATHS) { + let ls + try { + ls = git('ls-tree', commit, specPath).trim() + } catch { + continue + } + if (!ls) continue + blobSha = ls.split(/\s+/)[2] + break + } + if (!blobSha || seenBlobs.has(blobSha)) continue + seenBlobs.add(blobSha) + + let raw + try { + raw = git('cat-file', 'blob', blobSha) + } catch { + continue + } + + let version + try { + version = JSON.parse(raw).info?.version + } catch { + continue + } + if (!version || seen.has(version)) continue + if (versionFilter && !versionFilter.has(version)) continue + + writeFileSync(join(outputDir, `${version}.json`), raw) + seen.set(version, `${version}.json`) + console.error(` ${version}`) +} + +if (versionFilter) { + const missing = [...versionFilter].filter((v) => !seen.has(v)) + if (missing.length > 0) { + console.error(`Warning: versions not found in stripe/openapi history: ${missing.join(', ')}`) + } +} + +writeFileSync( + join(outputDir, 'manifest.json'), + JSON.stringify(Object.fromEntries(seen), null, 2) + '\n' +) + +// Update src/versions.ts — for a version-filtered run, only update BUNDLED_API_VERSION +// (the oas/ spec files); for a full run, update the complete SUPPORTED_API_VERSIONS list. +const oasDir = join(__dirname, '..', 'oas') +const bundledFiles = readdirSync(oasDir).filter( + (f) => f.endsWith('.json') && f !== 'manifest.json' && f !== 'index.html' +) +const bundled = bundledFiles[0]?.replace(/\.json$/, '') ?? [...seen.keys()].sort().reverse()[0] + +const supportedVersions = versionFilter + ? // Filtered run: read existing versions from src/versions.ts to preserve the full list + await import('../src/versions.ts', { with: { type: 'module' } }) + .then((m) => [...m.SUPPORTED_API_VERSIONS]) + .catch(() => [...seen.keys()]) + : [...seen.keys()].sort().reverse() + +const lines = supportedVersions.map((v) => ` '${v}',`).join('\n') +const versionsFile = join(__dirname, '..', 'src', 'versions.ts') +writeFileSync( + versionsFile, + `// Generated by scripts/generate-specs.mjs — do not edit manually. +// BUNDLED_API_VERSION: the single spec file in packages/openapi/oas/. +// SUPPORTED_API_VERSIONS: all versions discovered from github.com/stripe/openapi. +// Re-run scripts/generate-specs.mjs to pick up newly published API versions. + +/** The single Stripe API spec bundled in this package (served without network). */ +export const BUNDLED_API_VERSION = '${bundled}' as const + +/** All Stripe API versions published by Stripe, newest first. */ +export const SUPPORTED_API_VERSIONS = [ +${lines} +] as const satisfies readonly string[] +` +) +console.error(`Updated src/versions.ts (${supportedVersions.length} versions, bundled: ${bundled})`) + +// Generate an index page (CDN use) — only when writing all versions +if (!versionFilter) { + const versions = [...seen.keys()].sort().reverse() + const rows = versions.map((v) => `
  • ${v}
  • `).join('\n') + writeFileSync( + join(outputDir, 'index.html'), + ` + + + + Stripe REST API Specs — stripe-sync.dev CDN + + + +

    Stripe REST API OpenAPI Specs

    +

    + These are the official Stripe REST API specs from + github.com/stripe/openapi, + mirrored here to avoid GitHub API rate limits. + This is not the Sync Engine's own OpenAPI spec + (see engine.json for that). +

    +

    Machine-readable index: manifest.json — ${versions.length} versions available.

    +
      +${rows} +
    + + +` + ) +} + +console.error(`\nDone: ${seen.size} spec version(s)`) diff --git a/packages/openapi/scripts/generate-versions.mjs b/packages/openapi/scripts/generate-versions.mjs new file mode 100644 index 00000000..a435811b --- /dev/null +++ b/packages/openapi/scripts/generate-versions.mjs @@ -0,0 +1,50 @@ +#!/usr/bin/env node +/** + * Lightweight: generates src/versions.ts from the single bundled spec in oas/. + * + * BUNDLED_API_VERSION — the single *.json file in oas/ + * SUPPORTED_API_VERSIONS — same as bundled (lightweight fallback) + * + * For the full list of all published Stripe API versions, run the heavyweight + * script instead: node scripts/generate-all-specs.mjs + * + * Run directly: node scripts/generate-versions.mjs + * Run via build: pnpm --filter @stripe/sync-openapi build + */ +import { readdirSync, writeFileSync, mkdirSync } from 'node:fs' +import { join, dirname } from 'node:path' +import { fileURLToPath } from 'node:url' + +const __dirname = dirname(fileURLToPath(import.meta.url)) +const oasDir = join(__dirname, '..', 'oas') +const outFile = join(__dirname, '..', 'src', 'versions.ts') + +const bundledFiles = readdirSync(oasDir).filter( + (f) => f.endsWith('.json') && f !== 'manifest.json' && f !== 'index.html' +) +if (bundledFiles.length !== 1) { + console.error( + `Expected exactly 1 bundled spec in oas/, found: ${bundledFiles.join(', ') || 'none'}` + ) + process.exit(1) +} +const bundled = bundledFiles[0].replace(/\.json$/, '') + +mkdirSync(dirname(outFile), { recursive: true }) +writeFileSync( + outFile, + `// Generated by scripts/generate-versions.mjs — do not edit manually. +// For the full list of all published Stripe API versions, run: +// node scripts/generate-specs.mjs + +/** The single Stripe API spec bundled in this package (served without network). */ +export const BUNDLED_API_VERSION = '${bundled}' as const + +/** All Stripe API versions published by Stripe, newest first. */ +export const SUPPORTED_API_VERSIONS = [ + '${bundled}', +] as const satisfies readonly string[] +` +) + +console.log(`Bundled: ${bundled}`) diff --git a/packages/openapi/specFetchHelper.ts b/packages/openapi/specFetchHelper.ts index 6eabf1d7..9347a6aa 100644 --- a/packages/openapi/specFetchHelper.ts +++ b/packages/openapi/specFetchHelper.ts @@ -13,15 +13,8 @@ const DEFAULT_CACHE_DIR = path.join(os.tmpdir(), 'stripe-sync-openapi-cache') const STRIPE_SPEC_CDN_BASE_URL = process.env.STRIPE_SPEC_CDN_BASE_URL ?? 'https://stripe-sync.dev/stripe-api-specs' -// The spec bundled into this package at build time. -// Update this constant and the corresponding .oas.json file together when bumping. -export const BUNDLED_API_VERSION = '2026-03-25.dahlia' - -// Stripe API versions that this connector has been tested against and supports. -// Each entry must have a corresponding `{version}.json` file in this package. -// The bundled version is always first. Add older versions as they are confirmed. -// Clients discover this list via the config JSON Schema's `api_version.enum`. -export const SUPPORTED_API_VERSIONS = [BUNDLED_API_VERSION] as const +import { BUNDLED_API_VERSION, SUPPORTED_API_VERSIONS } from './src/versions.js' +export { BUNDLED_API_VERSION, SUPPORTED_API_VERSIONS } export async function resolveOpenApiSpec( config: ResolveSpecConfig, diff --git a/packages/openapi/src/versions.ts b/packages/openapi/src/versions.ts new file mode 100644 index 00000000..e2e690a0 --- /dev/null +++ b/packages/openapi/src/versions.ts @@ -0,0 +1,62 @@ +// Generated by scripts/generate-versions.mjs — do not edit manually. +// BUNDLED_API_VERSION: derived from packages/openapi/oas/*.json (one file). +// SUPPORTED_API_VERSIONS: fetched from https://stripe-sync.dev/stripe-api-specs/manifest.json at build time. +// Re-run `pnpm --filter @stripe/sync-openapi build` to pick up new CDN versions. + +/** The single Stripe API spec bundled in this package (served without network). */ +export const BUNDLED_API_VERSION = '2026-03-25.dahlia' as const + +/** All Stripe API versions available via the CDN, newest first. */ +export const SUPPORTED_API_VERSIONS = [ + '2026-03-25.dahlia', + '2026-02-25.clover', + '2026-01-28.clover', + '2025-12-15.clover', + '2025-11-17.clover', + '2025-10-29.clover', + '2025-09-30.clover', + '2025-08-27.basil', + '2025-07-30.basil', + '2025-06-30.basil', + '2025-05-28.basil', + '2025-04-30.basil', + '2025-03-31.basil', + '2025-02-24.acacia', + '2025-01-27.acacia', + '2024-12-18.acacia', + '2024-11-20.acacia', + '2024-10-28.acacia', + '2024-09-30.acacia', + '2024-06-20', + '2024-04-10', + '2024-04-03', + '2023-10-16', + '2023-08-16', + '2022-11-15', + '2022-08-01', + '2020-08-27', + '2020-03-02', + '2019-12-03', + '2019-11-05', + '2019-10-17', + '2019-10-08', + '2019-09-09', + '2019-08-14', + '2019-05-16', + '2019-03-14', + '2019-02-19', + '2019-02-11', + '2018-11-08', + '2018-10-31', + '2018-09-24', + '2018-09-06', + '2018-08-23', + '2018-07-27', + '2018-05-21', + '2018-02-28', + '2018-02-06', + '2018-02-05', + '2018-01-23', + '2017-12-14', + '2017-08-15', +] as const satisfies readonly string[] diff --git a/packages/protocol/src/__tests__/state.test.ts b/packages/protocol/src/__tests__/state.test.ts index 243b1058..9cff23c5 100644 --- a/packages/protocol/src/__tests__/state.test.ts +++ b/packages/protocol/src/__tests__/state.test.ts @@ -1,17 +1,17 @@ import { describe, it, expect } from 'vitest' -import { SyncState, StatePayload, StreamStatePayload, GlobalStatePayload } from '../protocol.js' +import { SourceState, StatePayload, StreamStatePayload, GlobalStatePayload } from '../protocol.js' import { stateMsg, stateStream, stateData } from '../helpers.js' -describe('SyncState', () => { - it('parses a full SyncState', () => { - expect(SyncState.parse({ streams: { orders: { cursor: 1 } }, global: {} })).toEqual({ +describe('SourceState', () => { + it('parses a full SourceState', () => { + expect(SourceState.parse({ streams: { orders: { cursor: 1 } }, global: {} })).toEqual({ streams: { orders: { cursor: 1 } }, global: {}, }) }) it('requires both fields', () => { - expect(() => SyncState.parse({ streams: {} })).toThrow() + expect(() => SourceState.parse({ streams: {} })).toThrow() }) }) diff --git a/packages/protocol/src/cli.ts b/packages/protocol/src/cli.ts index 5196cded..9ca7e3a8 100644 --- a/packages/protocol/src/cli.ts +++ b/packages/protocol/src/cli.ts @@ -176,7 +176,7 @@ export function createConnectorCli( const config = parseConfig(args.config, opts?.configSchema) const catalog = parseJsonOrFile(args.catalog) const rawState = args.state ? parseJsonOrFile(args.state) : undefined - // Accept both SyncState { streams, global } and legacy flat state + // Accept both SourceState { streams, global } and legacy flat state const state = rawState ? 'streams' in rawState ? (rawState as { streams: Record; global: Record }) diff --git a/packages/protocol/src/protocol.ts b/packages/protocol/src/protocol.ts index 9a08e2d8..65d1354d 100644 --- a/packages/protocol/src/protocol.ts +++ b/packages/protocol/src/protocol.ts @@ -17,15 +17,17 @@ import { z } from 'zod' * `streams` holds per-stream checkpoints (existing behavior). * `global` holds sync-wide state shared across all streams (new). */ -export const SyncState = z.object({ - streams: z - .record(z.string(), z.unknown()) - .describe('Per-stream checkpoint data, keyed by stream name.'), - global: z - .record(z.string(), z.unknown()) - .describe('Sync-wide state shared across all streams (e.g. a global events cursor).'), -}) -export type SyncState = z.infer +export const SourceState = z + .object({ + streams: z + .record(z.string(), z.unknown()) + .describe('Per-stream checkpoint data, keyed by stream name.'), + global: z + .record(z.string(), z.unknown()) + .describe('Sync-wide state shared across all streams (e.g. a global events cursor).'), + }) + .meta({ id: 'SourceState' }) +export type SourceState = z.infer // MARK: - Data model @@ -122,7 +124,7 @@ export const ConnectorSpecification = z .record(z.string(), z.unknown()) .optional() .describe( - 'JSON Schema for per-stream state (cursor/checkpoint shape). See also SyncState.global for sync-wide cursors.' + 'JSON Schema for per-stream state (cursor/checkpoint shape). See also SourceState.global for sync-wide cursors.' ), source_input: z .record(z.string(), z.unknown()) @@ -390,7 +392,7 @@ export type PipelineConfig = z.infer /** The full set of parsed sync request params: pipeline config + cursor state + stream limits. */ export interface SyncParams { pipeline: PipelineConfig - state?: SyncState + state?: SourceState state_limit?: number time_limit?: number } @@ -508,7 +510,7 @@ export interface Source< params: { config: TConfig catalog: ConfiguredCatalog - state?: SyncState + state?: SourceState }, $stdin?: AsyncIterable ): AsyncIterable diff --git a/packages/source-stripe/src/spec.test.ts b/packages/source-stripe/src/spec.test.ts index f9ce0220..978953ab 100644 --- a/packages/source-stripe/src/spec.test.ts +++ b/packages/source-stripe/src/spec.test.ts @@ -1,11 +1,16 @@ import { describe, it, expect } from 'vitest' -import { z } from 'zod' -import { configSchema } from './spec.js' +import spec, { configSchema } from './spec.js' import { BUNDLED_API_VERSION, SUPPORTED_API_VERSIONS } from '@stripe/sync-openapi' describe('configSchema api_version field', () => { + it('only accepts known enum values', () => { + expect(configSchema.shape.api_version.safeParse(BUNDLED_API_VERSION).success).toBe(true) + expect(configSchema.shape.api_version.safeParse('2099-01-01.unknown').success).toBe(false) + expect(configSchema.shape.api_version.safeParse(undefined).success).toBe(true) + }) + it('exposes supported versions via JSON Schema enum', () => { - const jsonSchema = z.toJSONSchema(configSchema) as { + const jsonSchema = spec.config as { properties?: Record } const field = jsonSchema.properties?.api_version @@ -18,7 +23,7 @@ describe('configSchema api_version field', () => { it('clients can extract supported API versions from config_schema', () => { // This is the pattern clients use: read config_schema from // GET /meta/sources/stripe, then inspect the api_version field. - const schema = z.toJSONSchema(configSchema) as { + const schema = spec.config as { properties?: Record } const versions: string[] = schema.properties?.api_version?.enum ?? [] diff --git a/packages/source-stripe/src/transport.test.ts b/packages/source-stripe/src/transport.test.ts index 0e7aa738..06f87a1c 100644 --- a/packages/source-stripe/src/transport.test.ts +++ b/packages/source-stripe/src/transport.test.ts @@ -120,9 +120,13 @@ describe('fetchWithProxy', () => { const mockFetch = vi.fn().mockResolvedValue(new Response('ok', { status: 200 })) vi.stubGlobal('fetch', mockFetch) - await fetchWithProxy('https://api.stripe.com/v1/customers', {}, { - HTTPS_PROXY: 'http://proxy.example.test:8080', - }) + await fetchWithProxy( + 'https://api.stripe.com/v1/customers', + {}, + { + HTTPS_PROXY: 'http://proxy.example.test:8080', + } + ) expect(mockFetch).toHaveBeenCalledOnce() const [, init] = mockFetch.mock.calls[0] @@ -133,9 +137,13 @@ describe('fetchWithProxy', () => { const mockFetch = vi.fn().mockResolvedValue(new Response('ok', { status: 200 })) vi.stubGlobal('fetch', mockFetch) - await fetchWithProxy('http://localhost:12111/v1/customers', {}, { - HTTPS_PROXY: 'http://proxy.example.test:8080', - }) + await fetchWithProxy( + 'http://localhost:12111/v1/customers', + {}, + { + HTTPS_PROXY: 'http://proxy.example.test:8080', + } + ) expect(mockFetch).toHaveBeenCalledOnce() const [, init] = mockFetch.mock.calls[0] @@ -146,10 +154,14 @@ describe('fetchWithProxy', () => { const mockFetch = vi.fn().mockResolvedValue(new Response('ok', { status: 200 })) vi.stubGlobal('fetch', mockFetch) - await fetchWithProxy('https://stripe-sync.dev/stripe-api-specs/manifest.json', {}, { - HTTPS_PROXY: 'http://proxy.example.test:8080', - NO_PROXY: 'stripe-sync.dev', - }) + await fetchWithProxy( + 'https://stripe-sync.dev/stripe-api-specs/manifest.json', + {}, + { + HTTPS_PROXY: 'http://proxy.example.test:8080', + NO_PROXY: 'stripe-sync.dev', + } + ) expect(mockFetch).toHaveBeenCalledOnce() const [, init] = mockFetch.mock.calls[0] diff --git a/packages/state-postgres/src/state-store.ts b/packages/state-postgres/src/state-store.ts index 5b47a345..84e48917 100644 --- a/packages/state-postgres/src/state-store.ts +++ b/packages/state-postgres/src/state-store.ts @@ -5,13 +5,13 @@ import { stripSslParams, withPgConnectProxy, } from '@stripe/sync-util-postgres' -import type { SyncState } from '@stripe/sync-protocol' +import type { SourceState } from '@stripe/sync-protocol' /** Reserved stream name for global state in the _sync_state table. */ const GLOBAL_KEY = '_global' export interface StateStore { - get(syncId: string): Promise + get(syncId: string): Promise set(syncId: string, stream: string, data: unknown): Promise setGlobal(syncId: string, data: unknown): Promise clear(syncId: string): Promise @@ -78,7 +78,7 @@ export function createPgStateStore( /** Engine-compatible state store scoped to a single sync_id. */ export interface ScopedStateStore { - get(): Promise + get(): Promise set(stream: string, data: unknown): Promise setGlobal(data: unknown): Promise }