From b640f9faff183e97dfb195e5f4c8571885468591 Mon Sep 17 00:00:00 2001 From: tx Date: Sun, 5 Apr 2026 06:45:26 +0000 Subject: [PATCH 01/13] Accept any api_version string, advertise known versions via anyOf enum Replace z.enum(SUPPORTED_API_VERSIONS) with z.string() so the engine accepts non-bundled versions (CDN-fetched). Advertise known versions in the JSON schema via anyOf so z.fromJSONSchema in the resolver produces a union that accepts any string rather than a strict enum. Co-Authored-By: Claude Sonnet 4.6 Committed-By-Agent: claude --- packages/source-stripe/src/spec.test.ts | 23 ++++++++++++++--------- packages/source-stripe/src/spec.ts | 17 +++++++++++++++-- 2 files changed, 29 insertions(+), 11 deletions(-) diff --git a/packages/source-stripe/src/spec.test.ts b/packages/source-stripe/src/spec.test.ts index f9ce0220..9204331e 100644 --- a/packages/source-stripe/src/spec.test.ts +++ b/packages/source-stripe/src/spec.test.ts @@ -1,27 +1,32 @@ import { describe, it, expect } from 'vitest' -import { z } from 'zod' -import { configSchema } from './spec.js' +import spec, { configSchema } from './spec.js' import { BUNDLED_API_VERSION, SUPPORTED_API_VERSIONS } from '@stripe/sync-openapi' describe('configSchema api_version field', () => { - it('exposes supported versions via JSON Schema enum', () => { - const jsonSchema = z.toJSONSchema(configSchema) as { - properties?: Record + it('accepts any version string at runtime (not just the enum)', () => { + expect(configSchema.shape.api_version.safeParse('2024-12-18.acacia').success).toBe(true) + expect(configSchema.shape.api_version.safeParse('2023-08-16').success).toBe(true) + }) + + it('exposes supported versions via JSON Schema anyOf enum', () => { + const jsonSchema = spec.config as { + properties?: Record; description?: string }> } const field = jsonSchema.properties?.api_version expect(field).toBeDefined() - expect(field!.enum).toEqual([...SUPPORTED_API_VERSIONS]) + // Enum is nested in anyOf[0] so z.fromJSONSchema produces a union that accepts any string + expect(field!.anyOf?.[0]?.enum).toEqual([...SUPPORTED_API_VERSIONS]) expect(field!.description).toContain(BUNDLED_API_VERSION) }) it('clients can extract supported API versions from config_schema', () => { // This is the pattern clients use: read config_schema from // GET /meta/sources/stripe, then inspect the api_version field. - const schema = z.toJSONSchema(configSchema) as { - properties?: Record + const schema = spec.config as { + properties?: Record }> } - const versions: string[] = schema.properties?.api_version?.enum ?? [] + const versions: string[] = schema.properties?.api_version?.anyOf?.[0]?.enum ?? [] expect(versions).toContain(BUNDLED_API_VERSION) expect(versions.length).toBeGreaterThan(0) diff --git a/packages/source-stripe/src/spec.ts b/packages/source-stripe/src/spec.ts index 4c799973..63f840b2 100644 --- a/packages/source-stripe/src/spec.ts +++ b/packages/source-stripe/src/spec.ts @@ -7,7 +7,7 @@ export const configSchema = z.object({ account_id: z.string().optional().describe('Stripe account ID (resolved from API if omitted)'), livemode: z.boolean().optional().describe('Whether this is a live mode sync'), api_version: z - .enum(SUPPORTED_API_VERSIONS) + .string() .optional() .describe(`Stripe API version (default: ${BUNDLED_API_VERSION})`), base_url: z @@ -128,8 +128,21 @@ export const stripeEventSchema = z.object({ export type StripeEvent = z.infer +const configJsonSchema = z.toJSONSchema(configSchema) as { + properties: Record> +} +// Advertise known versions via anyOf so clients can discover them, while still +// accepting any valid version string (for CDN-fetched non-bundled specs). +// z.fromJSONSchema turns { anyOf: [{ enum: [...] }, { type: "string" }] } into +// z.union([z.literal(...), z.string()]) which accepts any string. +configJsonSchema.properties.api_version.anyOf = [ + { enum: [...SUPPORTED_API_VERSIONS] }, + { type: 'string' }, +] +delete configJsonSchema.properties.api_version.type + export default { - config: z.toJSONSchema(configSchema), + config: configJsonSchema, source_state_stream: z.toJSONSchema(streamStateSpec), source_input: z.toJSONSchema(stripeEventSchema), } satisfies ConnectorSpecification From ac0607c885619af2e72639259c55718ae6426514 Mon Sep 17 00:00:00 2001 From: Tony Xiao Date: Sun, 5 Apr 2026 00:01:04 -0700 Subject: [PATCH 02/13] feat(openapi): generate SUPPORTED_API_VERSIONS from oas/ directory MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace hardcoded BUNDLED_API_VERSION / SUPPORTED_API_VERSIONS in specFetchHelper.ts with a generated src/versions.ts derived from the files present in packages/openapi/oas/*.json. - Add scripts/generate-versions.mjs — scans oas/, writes src/versions.ts - Hook into build: node scripts/generate-versions.mjs && tsc - specFetchHelper.ts imports from ./src/versions.js (no more hardcodes) - Revert api_version from z.string() back to z.enum(SUPPORTED_API_VERSIONS) - Remove anyOf JSON Schema hack; enum is now the direct source of truth - Update spec.test.ts to assert strict enum validation Adding a new bundled API version is now: drop the .json into oas/, run pnpm --filter @stripe/sync-openapi build. Same oas/ directory powers the docs CDN (docs/scripts/generate-stripe-specs.mjs). Co-Authored-By: Claude Sonnet 4.6 Committed-By-Agent: claude --- apps/engine/src/api/app.ts | 4 +- docs/todos.md | 1 + packages/openapi/package.json | 2 +- .../openapi/scripts/generate-versions.mjs | 53 +++++++++++++++++++ packages/openapi/specFetchHelper.ts | 11 +--- packages/openapi/src/versions.ts | 9 ++++ packages/source-stripe/src/spec.test.ts | 18 +++---- packages/source-stripe/src/spec.ts | 17 +----- packages/source-stripe/src/transport.test.ts | 32 +++++++---- 9 files changed, 102 insertions(+), 45 deletions(-) create mode 100644 packages/openapi/scripts/generate-versions.mjs create mode 100644 packages/openapi/src/versions.ts diff --git a/apps/engine/src/api/app.ts b/apps/engine/src/api/app.ts index 182afcb6..390e3921 100644 --- a/apps/engine/src/api/app.ts +++ b/apps/engine/src/api/app.ts @@ -333,7 +333,9 @@ export async function createApp(resolver: ConnectorResolver) { app.openapi(sourceDiscoverRoute, (c) => { const source = c.req.valid('header')['x-source'] const context = { path: '/source_discover', sourceName: source.type } - return ndjsonResponse(logApiStream('Engine API /source_discover', engine.source_discover(source), context)) + return ndjsonResponse( + logApiStream('Engine API /source_discover', engine.source_discover(source), context) + ) }) const pipelineReadRoute = createRoute({ diff --git a/docs/todos.md b/docs/todos.md index db09180b..09f26c34 100644 --- a/docs/todos.md +++ b/docs/todos.md @@ -17,6 +17,7 @@ - The pipeline state machine.... maintain status - more resilient engine, not crash and do something else to recover from source / destination issues? +- making sure the google sheet thing works --- diff --git a/packages/openapi/package.json b/packages/openapi/package.json index 4d48f398..50b30f12 100644 --- a/packages/openapi/package.json +++ b/packages/openapi/package.json @@ -11,7 +11,7 @@ } }, "scripts": { - "build": "tsc && cp -r oas dist/", + "build": "node scripts/generate-versions.mjs && tsc && cp -r oas dist/", "test": "vitest --passWithNoTests" }, "files": [ diff --git a/packages/openapi/scripts/generate-versions.mjs b/packages/openapi/scripts/generate-versions.mjs new file mode 100644 index 00000000..aedd40fb --- /dev/null +++ b/packages/openapi/scripts/generate-versions.mjs @@ -0,0 +1,53 @@ +#!/usr/bin/env node +/** + * Generates src/versions.ts from the oas/ directory. + * + * SUPPORTED_API_VERSIONS = all *.json filenames in oas/ (sorted newest-first). + * BUNDLED_API_VERSION = the first (most recent) entry. + * + * Run directly: node scripts/generate-versions.mjs + * Run via build: pnpm --filter @stripe/sync-openapi build + * + * When adding a new bundled spec, drop the .json file into oas/ and re-run. + * The same oas/ directory is served as the CDN source for the docs site. + */ +import { readdirSync, writeFileSync, mkdirSync } from 'node:fs' +import { join, dirname } from 'node:path' +import { fileURLToPath } from 'node:url' + +const __dirname = dirname(fileURLToPath(import.meta.url)) +const oasDir = join(__dirname, '..', 'oas') +const outFile = join(__dirname, '..', 'src', 'versions.ts') + +const versions = readdirSync(oasDir) + .filter((f) => f.endsWith('.json') && f !== 'manifest.json' && f !== 'index.html') + .map((f) => f.replace(/\.json$/, '')) + .sort() + .reverse() // newest first + +if (versions.length === 0) { + console.error('No .json spec files found in oas/') + process.exit(1) +} + +const bundled = versions[0] +const lines = versions.map((v) => ` '${v}',`).join('\n') + +mkdirSync(dirname(outFile), { recursive: true }) +writeFileSync( + outFile, + `// Generated by scripts/generate-versions.mjs — do not edit manually. +// Source of truth: packages/openapi/oas/*.json +// Re-run \`pnpm --filter @stripe/sync-openapi build\` after adding a new bundled spec. + +/** The most recent Stripe API version bundled in this package. */ +export const BUNDLED_API_VERSION = '${bundled}' as const + +/** All Stripe API versions bundled in this package, newest first. */ +export const SUPPORTED_API_VERSIONS = [ +${lines} +] as const satisfies readonly string[] +` +) + +console.log(`Generated ${versions.length} version(s), bundled: ${bundled}`) diff --git a/packages/openapi/specFetchHelper.ts b/packages/openapi/specFetchHelper.ts index 6eabf1d7..9347a6aa 100644 --- a/packages/openapi/specFetchHelper.ts +++ b/packages/openapi/specFetchHelper.ts @@ -13,15 +13,8 @@ const DEFAULT_CACHE_DIR = path.join(os.tmpdir(), 'stripe-sync-openapi-cache') const STRIPE_SPEC_CDN_BASE_URL = process.env.STRIPE_SPEC_CDN_BASE_URL ?? 'https://stripe-sync.dev/stripe-api-specs' -// The spec bundled into this package at build time. -// Update this constant and the corresponding .oas.json file together when bumping. -export const BUNDLED_API_VERSION = '2026-03-25.dahlia' - -// Stripe API versions that this connector has been tested against and supports. -// Each entry must have a corresponding `{version}.json` file in this package. -// The bundled version is always first. Add older versions as they are confirmed. -// Clients discover this list via the config JSON Schema's `api_version.enum`. -export const SUPPORTED_API_VERSIONS = [BUNDLED_API_VERSION] as const +import { BUNDLED_API_VERSION, SUPPORTED_API_VERSIONS } from './src/versions.js' +export { BUNDLED_API_VERSION, SUPPORTED_API_VERSIONS } export async function resolveOpenApiSpec( config: ResolveSpecConfig, diff --git a/packages/openapi/src/versions.ts b/packages/openapi/src/versions.ts new file mode 100644 index 00000000..c5e2ad9f --- /dev/null +++ b/packages/openapi/src/versions.ts @@ -0,0 +1,9 @@ +// Generated by scripts/generate-versions.mjs — do not edit manually. +// Source of truth: packages/openapi/oas/*.json +// Re-run `pnpm --filter @stripe/sync-openapi build` after adding a new bundled spec. + +/** The most recent Stripe API version bundled in this package. */ +export const BUNDLED_API_VERSION = '2026-03-25.dahlia' as const + +/** All Stripe API versions bundled in this package, newest first. */ +export const SUPPORTED_API_VERSIONS = ['2026-03-25.dahlia'] as const satisfies readonly string[] diff --git a/packages/source-stripe/src/spec.test.ts b/packages/source-stripe/src/spec.test.ts index 9204331e..978953ab 100644 --- a/packages/source-stripe/src/spec.test.ts +++ b/packages/source-stripe/src/spec.test.ts @@ -3,20 +3,20 @@ import spec, { configSchema } from './spec.js' import { BUNDLED_API_VERSION, SUPPORTED_API_VERSIONS } from '@stripe/sync-openapi' describe('configSchema api_version field', () => { - it('accepts any version string at runtime (not just the enum)', () => { - expect(configSchema.shape.api_version.safeParse('2024-12-18.acacia').success).toBe(true) - expect(configSchema.shape.api_version.safeParse('2023-08-16').success).toBe(true) + it('only accepts known enum values', () => { + expect(configSchema.shape.api_version.safeParse(BUNDLED_API_VERSION).success).toBe(true) + expect(configSchema.shape.api_version.safeParse('2099-01-01.unknown').success).toBe(false) + expect(configSchema.shape.api_version.safeParse(undefined).success).toBe(true) }) - it('exposes supported versions via JSON Schema anyOf enum', () => { + it('exposes supported versions via JSON Schema enum', () => { const jsonSchema = spec.config as { - properties?: Record; description?: string }> + properties?: Record } const field = jsonSchema.properties?.api_version expect(field).toBeDefined() - // Enum is nested in anyOf[0] so z.fromJSONSchema produces a union that accepts any string - expect(field!.anyOf?.[0]?.enum).toEqual([...SUPPORTED_API_VERSIONS]) + expect(field!.enum).toEqual([...SUPPORTED_API_VERSIONS]) expect(field!.description).toContain(BUNDLED_API_VERSION) }) @@ -24,9 +24,9 @@ describe('configSchema api_version field', () => { // This is the pattern clients use: read config_schema from // GET /meta/sources/stripe, then inspect the api_version field. const schema = spec.config as { - properties?: Record }> + properties?: Record } - const versions: string[] = schema.properties?.api_version?.anyOf?.[0]?.enum ?? [] + const versions: string[] = schema.properties?.api_version?.enum ?? [] expect(versions).toContain(BUNDLED_API_VERSION) expect(versions.length).toBeGreaterThan(0) diff --git a/packages/source-stripe/src/spec.ts b/packages/source-stripe/src/spec.ts index 63f840b2..4c799973 100644 --- a/packages/source-stripe/src/spec.ts +++ b/packages/source-stripe/src/spec.ts @@ -7,7 +7,7 @@ export const configSchema = z.object({ account_id: z.string().optional().describe('Stripe account ID (resolved from API if omitted)'), livemode: z.boolean().optional().describe('Whether this is a live mode sync'), api_version: z - .string() + .enum(SUPPORTED_API_VERSIONS) .optional() .describe(`Stripe API version (default: ${BUNDLED_API_VERSION})`), base_url: z @@ -128,21 +128,8 @@ export const stripeEventSchema = z.object({ export type StripeEvent = z.infer -const configJsonSchema = z.toJSONSchema(configSchema) as { - properties: Record> -} -// Advertise known versions via anyOf so clients can discover them, while still -// accepting any valid version string (for CDN-fetched non-bundled specs). -// z.fromJSONSchema turns { anyOf: [{ enum: [...] }, { type: "string" }] } into -// z.union([z.literal(...), z.string()]) which accepts any string. -configJsonSchema.properties.api_version.anyOf = [ - { enum: [...SUPPORTED_API_VERSIONS] }, - { type: 'string' }, -] -delete configJsonSchema.properties.api_version.type - export default { - config: configJsonSchema, + config: z.toJSONSchema(configSchema), source_state_stream: z.toJSONSchema(streamStateSpec), source_input: z.toJSONSchema(stripeEventSchema), } satisfies ConnectorSpecification diff --git a/packages/source-stripe/src/transport.test.ts b/packages/source-stripe/src/transport.test.ts index 0e7aa738..06f87a1c 100644 --- a/packages/source-stripe/src/transport.test.ts +++ b/packages/source-stripe/src/transport.test.ts @@ -120,9 +120,13 @@ describe('fetchWithProxy', () => { const mockFetch = vi.fn().mockResolvedValue(new Response('ok', { status: 200 })) vi.stubGlobal('fetch', mockFetch) - await fetchWithProxy('https://api.stripe.com/v1/customers', {}, { - HTTPS_PROXY: 'http://proxy.example.test:8080', - }) + await fetchWithProxy( + 'https://api.stripe.com/v1/customers', + {}, + { + HTTPS_PROXY: 'http://proxy.example.test:8080', + } + ) expect(mockFetch).toHaveBeenCalledOnce() const [, init] = mockFetch.mock.calls[0] @@ -133,9 +137,13 @@ describe('fetchWithProxy', () => { const mockFetch = vi.fn().mockResolvedValue(new Response('ok', { status: 200 })) vi.stubGlobal('fetch', mockFetch) - await fetchWithProxy('http://localhost:12111/v1/customers', {}, { - HTTPS_PROXY: 'http://proxy.example.test:8080', - }) + await fetchWithProxy( + 'http://localhost:12111/v1/customers', + {}, + { + HTTPS_PROXY: 'http://proxy.example.test:8080', + } + ) expect(mockFetch).toHaveBeenCalledOnce() const [, init] = mockFetch.mock.calls[0] @@ -146,10 +154,14 @@ describe('fetchWithProxy', () => { const mockFetch = vi.fn().mockResolvedValue(new Response('ok', { status: 200 })) vi.stubGlobal('fetch', mockFetch) - await fetchWithProxy('https://stripe-sync.dev/stripe-api-specs/manifest.json', {}, { - HTTPS_PROXY: 'http://proxy.example.test:8080', - NO_PROXY: 'stripe-sync.dev', - }) + await fetchWithProxy( + 'https://stripe-sync.dev/stripe-api-specs/manifest.json', + {}, + { + HTTPS_PROXY: 'http://proxy.example.test:8080', + NO_PROXY: 'stripe-sync.dev', + } + ) expect(mockFetch).toHaveBeenCalledOnce() const [, init] = mockFetch.mock.calls[0] From 5978cbd932982939829b945b80d7e4ad0607d3a8 Mon Sep 17 00:00:00 2001 From: Tony Xiao Date: Sun, 5 Apr 2026 00:08:23 -0700 Subject: [PATCH 03/13] feat(openapi): fetch SUPPORTED_API_VERSIONS from CDN manifest at build time MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit generate-versions.mjs now fetches stripe-sync.dev/stripe-api-specs/manifest.json to populate SUPPORTED_API_VERSIONS (51 versions) while BUNDLED_API_VERSION stays derived from the single oas/*.json file. The CDN manifest is produced by docs/scripts/generate-stripe-specs.mjs — same source, shared contract. Co-Authored-By: Claude Sonnet 4.6 Committed-By-Agent: claude --- .../openapi/scripts/generate-versions.mjs | 66 +++++++++++++------ packages/openapi/src/versions.ts | 63 ++++++++++++++++-- 2 files changed, 105 insertions(+), 24 deletions(-) diff --git a/packages/openapi/scripts/generate-versions.mjs b/packages/openapi/scripts/generate-versions.mjs index aedd40fb..e53eac18 100644 --- a/packages/openapi/scripts/generate-versions.mjs +++ b/packages/openapi/scripts/generate-versions.mjs @@ -1,15 +1,17 @@ #!/usr/bin/env node /** - * Generates src/versions.ts from the oas/ directory. + * Generates src/versions.ts by combining: + * - BUNDLED_API_VERSION — the single spec file in oas/ (local, no network) + * - SUPPORTED_API_VERSIONS — all versions in the CDN manifest (network fetch) * - * SUPPORTED_API_VERSIONS = all *.json filenames in oas/ (sorted newest-first). - * BUNDLED_API_VERSION = the first (most recent) entry. + * The CDN manifest is produced by docs/scripts/generate-stripe-specs.mjs and + * served at STRIPE_SPEC_CDN_BASE_URL/manifest.json. * * Run directly: node scripts/generate-versions.mjs * Run via build: pnpm --filter @stripe/sync-openapi build * - * When adding a new bundled spec, drop the .json file into oas/ and re-run. - * The same oas/ directory is served as the CDN source for the docs site. + * Override the CDN URL in tests/CI: + * STRIPE_SPEC_CDN_BASE_URL=http://localhost:3000 node scripts/generate-versions.mjs */ import { readdirSync, writeFileSync, mkdirSync } from 'node:fs' import { join, dirname } from 'node:path' @@ -19,35 +21,61 @@ const __dirname = dirname(fileURLToPath(import.meta.url)) const oasDir = join(__dirname, '..', 'oas') const outFile = join(__dirname, '..', 'src', 'versions.ts') -const versions = readdirSync(oasDir) - .filter((f) => f.endsWith('.json') && f !== 'manifest.json' && f !== 'index.html') - .map((f) => f.replace(/\.json$/, '')) - .sort() - .reverse() // newest first +const CDN_BASE_URL = + process.env.STRIPE_SPEC_CDN_BASE_URL ?? 'https://stripe-sync.dev/stripe-api-specs' -if (versions.length === 0) { - console.error('No .json spec files found in oas/') +// BUNDLED_API_VERSION — derived from the single .json file in oas/ +const bundledFiles = readdirSync(oasDir).filter( + (f) => f.endsWith('.json') && f !== 'manifest.json' && f !== 'index.html' +) +if (bundledFiles.length !== 1) { + console.error( + `Expected exactly 1 bundled spec in oas/, found: ${bundledFiles.join(', ') || 'none'}` + ) + process.exit(1) +} +const bundled = bundledFiles[0].replace(/\.json$/, '') + +// SUPPORTED_API_VERSIONS — all versions from the CDN manifest +console.error(`Fetching manifest from ${CDN_BASE_URL}/manifest.json ...`) +let manifest +try { + const res = await fetch(`${CDN_BASE_URL}/manifest.json`) + if (!res.ok) throw new Error(`HTTP ${res.status} ${res.statusText}`) + manifest = await res.json() +} catch (err) { + console.error(`Failed to fetch CDN manifest: ${err.message}`) + console.error( + 'Run with STRIPE_SPEC_CDN_BASE_URL pointing to a local mirror, or check connectivity.' + ) + process.exit(1) +} + +const allVersions = Object.keys(manifest).sort().reverse() // newest first + +if (allVersions.length === 0) { + console.error('CDN manifest has no versions') process.exit(1) } -const bundled = versions[0] -const lines = versions.map((v) => ` '${v}',`).join('\n') +const lines = allVersions.map((v) => ` '${v}',`).join('\n') mkdirSync(dirname(outFile), { recursive: true }) writeFileSync( outFile, `// Generated by scripts/generate-versions.mjs — do not edit manually. -// Source of truth: packages/openapi/oas/*.json -// Re-run \`pnpm --filter @stripe/sync-openapi build\` after adding a new bundled spec. +// BUNDLED_API_VERSION: derived from packages/openapi/oas/*.json (one file). +// SUPPORTED_API_VERSIONS: fetched from ${CDN_BASE_URL}/manifest.json at build time. +// Re-run \`pnpm --filter @stripe/sync-openapi build\` to pick up new CDN versions. -/** The most recent Stripe API version bundled in this package. */ +/** The single Stripe API spec bundled in this package (served without network). */ export const BUNDLED_API_VERSION = '${bundled}' as const -/** All Stripe API versions bundled in this package, newest first. */ +/** All Stripe API versions available via the CDN, newest first. */ export const SUPPORTED_API_VERSIONS = [ ${lines} ] as const satisfies readonly string[] ` ) -console.log(`Generated ${versions.length} version(s), bundled: ${bundled}`) +console.log(`Bundled: ${bundled} | Supported: ${allVersions.length} version(s) from CDN`) diff --git a/packages/openapi/src/versions.ts b/packages/openapi/src/versions.ts index c5e2ad9f..e2e690a0 100644 --- a/packages/openapi/src/versions.ts +++ b/packages/openapi/src/versions.ts @@ -1,9 +1,62 @@ // Generated by scripts/generate-versions.mjs — do not edit manually. -// Source of truth: packages/openapi/oas/*.json -// Re-run `pnpm --filter @stripe/sync-openapi build` after adding a new bundled spec. +// BUNDLED_API_VERSION: derived from packages/openapi/oas/*.json (one file). +// SUPPORTED_API_VERSIONS: fetched from https://stripe-sync.dev/stripe-api-specs/manifest.json at build time. +// Re-run `pnpm --filter @stripe/sync-openapi build` to pick up new CDN versions. -/** The most recent Stripe API version bundled in this package. */ +/** The single Stripe API spec bundled in this package (served without network). */ export const BUNDLED_API_VERSION = '2026-03-25.dahlia' as const -/** All Stripe API versions bundled in this package, newest first. */ -export const SUPPORTED_API_VERSIONS = ['2026-03-25.dahlia'] as const satisfies readonly string[] +/** All Stripe API versions available via the CDN, newest first. */ +export const SUPPORTED_API_VERSIONS = [ + '2026-03-25.dahlia', + '2026-02-25.clover', + '2026-01-28.clover', + '2025-12-15.clover', + '2025-11-17.clover', + '2025-10-29.clover', + '2025-09-30.clover', + '2025-08-27.basil', + '2025-07-30.basil', + '2025-06-30.basil', + '2025-05-28.basil', + '2025-04-30.basil', + '2025-03-31.basil', + '2025-02-24.acacia', + '2025-01-27.acacia', + '2024-12-18.acacia', + '2024-11-20.acacia', + '2024-10-28.acacia', + '2024-09-30.acacia', + '2024-06-20', + '2024-04-10', + '2024-04-03', + '2023-10-16', + '2023-08-16', + '2022-11-15', + '2022-08-01', + '2020-08-27', + '2020-03-02', + '2019-12-03', + '2019-11-05', + '2019-10-17', + '2019-10-08', + '2019-09-09', + '2019-08-14', + '2019-05-16', + '2019-03-14', + '2019-02-19', + '2019-02-11', + '2018-11-08', + '2018-10-31', + '2018-09-24', + '2018-09-06', + '2018-08-23', + '2018-07-27', + '2018-05-21', + '2018-02-28', + '2018-02-06', + '2018-02-05', + '2018-01-23', + '2017-12-14', + '2017-08-15', +] as const satisfies readonly string[] From 7a0928228f3bd2cab8b0fedf059e82623588710f Mon Sep 17 00:00:00 2001 From: Tony Xiao Date: Sun, 5 Apr 2026 00:08:43 -0700 Subject: [PATCH 04/13] chore: regenerate OpenAPI specs with 51-version api_version enum Co-Authored-By: Claude Sonnet 4.6 Committed-By-Agent: claude --- apps/engine/src/__generated__/openapi.d.ts | 4 +- apps/engine/src/__generated__/openapi.json | 54 ++++++++++++++++++++- apps/service/src/__generated__/openapi.d.ts | 4 +- apps/service/src/__generated__/openapi.json | 54 ++++++++++++++++++++- 4 files changed, 110 insertions(+), 6 deletions(-) diff --git a/apps/engine/src/__generated__/openapi.d.ts b/apps/engine/src/__generated__/openapi.d.ts index 2e20e5eb..5b607e78 100644 --- a/apps/engine/src/__generated__/openapi.d.ts +++ b/apps/engine/src/__generated__/openapi.d.ts @@ -545,8 +545,8 @@ export interface components { account_id?: string; /** @description Whether this is a live mode sync */ livemode?: boolean; - /** @constant */ - api_version?: "2026-03-25.dahlia"; + /** @enum {string} */ + api_version?: "2026-03-25.dahlia" | "2026-02-25.clover" | "2026-01-28.clover" | "2025-12-15.clover" | "2025-11-17.clover" | "2025-10-29.clover" | "2025-09-30.clover" | "2025-08-27.basil" | "2025-07-30.basil" | "2025-06-30.basil" | "2025-05-28.basil" | "2025-04-30.basil" | "2025-03-31.basil" | "2025-02-24.acacia" | "2025-01-27.acacia" | "2024-12-18.acacia" | "2024-11-20.acacia" | "2024-10-28.acacia" | "2024-09-30.acacia" | "2024-06-20" | "2024-04-10" | "2024-04-03" | "2023-10-16" | "2023-08-16" | "2022-11-15" | "2022-08-01" | "2020-08-27" | "2020-03-02" | "2019-12-03" | "2019-11-05" | "2019-10-17" | "2019-10-08" | "2019-09-09" | "2019-08-14" | "2019-05-16" | "2019-03-14" | "2019-02-19" | "2019-02-11" | "2018-11-08" | "2018-10-31" | "2018-09-24" | "2018-09-06" | "2018-08-23" | "2018-07-27" | "2018-05-21" | "2018-02-28" | "2018-02-06" | "2018-02-05" | "2018-01-23" | "2017-12-14" | "2017-08-15"; /** * Format: uri * @description Override the Stripe API base URL (e.g. http://localhost:12111 for stripe-mock) diff --git a/apps/engine/src/__generated__/openapi.json b/apps/engine/src/__generated__/openapi.json index 3324bd90..29476cd9 100644 --- a/apps/engine/src/__generated__/openapi.json +++ b/apps/engine/src/__generated__/openapi.json @@ -1540,7 +1540,59 @@ }, "api_version": { "type": "string", - "const": "2026-03-25.dahlia" + "enum": [ + "2026-03-25.dahlia", + "2026-02-25.clover", + "2026-01-28.clover", + "2025-12-15.clover", + "2025-11-17.clover", + "2025-10-29.clover", + "2025-09-30.clover", + "2025-08-27.basil", + "2025-07-30.basil", + "2025-06-30.basil", + "2025-05-28.basil", + "2025-04-30.basil", + "2025-03-31.basil", + "2025-02-24.acacia", + "2025-01-27.acacia", + "2024-12-18.acacia", + "2024-11-20.acacia", + "2024-10-28.acacia", + "2024-09-30.acacia", + "2024-06-20", + "2024-04-10", + "2024-04-03", + "2023-10-16", + "2023-08-16", + "2022-11-15", + "2022-08-01", + "2020-08-27", + "2020-03-02", + "2019-12-03", + "2019-11-05", + "2019-10-17", + "2019-10-08", + "2019-09-09", + "2019-08-14", + "2019-05-16", + "2019-03-14", + "2019-02-19", + "2019-02-11", + "2018-11-08", + "2018-10-31", + "2018-09-24", + "2018-09-06", + "2018-08-23", + "2018-07-27", + "2018-05-21", + "2018-02-28", + "2018-02-06", + "2018-02-05", + "2018-01-23", + "2017-12-14", + "2017-08-15" + ] }, "base_url": { "type": "string", diff --git a/apps/service/src/__generated__/openapi.d.ts b/apps/service/src/__generated__/openapi.d.ts index ed78fd72..3a3c9b1c 100644 --- a/apps/service/src/__generated__/openapi.d.ts +++ b/apps/service/src/__generated__/openapi.d.ts @@ -128,8 +128,8 @@ export interface components { account_id?: string; /** @description Whether this is a live mode sync */ livemode?: boolean; - /** @constant */ - api_version?: "2026-03-25.dahlia"; + /** @enum {string} */ + api_version?: "2026-03-25.dahlia" | "2026-02-25.clover" | "2026-01-28.clover" | "2025-12-15.clover" | "2025-11-17.clover" | "2025-10-29.clover" | "2025-09-30.clover" | "2025-08-27.basil" | "2025-07-30.basil" | "2025-06-30.basil" | "2025-05-28.basil" | "2025-04-30.basil" | "2025-03-31.basil" | "2025-02-24.acacia" | "2025-01-27.acacia" | "2024-12-18.acacia" | "2024-11-20.acacia" | "2024-10-28.acacia" | "2024-09-30.acacia" | "2024-06-20" | "2024-04-10" | "2024-04-03" | "2023-10-16" | "2023-08-16" | "2022-11-15" | "2022-08-01" | "2020-08-27" | "2020-03-02" | "2019-12-03" | "2019-11-05" | "2019-10-17" | "2019-10-08" | "2019-09-09" | "2019-08-14" | "2019-05-16" | "2019-03-14" | "2019-02-19" | "2019-02-11" | "2018-11-08" | "2018-10-31" | "2018-09-24" | "2018-09-06" | "2018-08-23" | "2018-07-27" | "2018-05-21" | "2018-02-28" | "2018-02-06" | "2018-02-05" | "2018-01-23" | "2017-12-14" | "2017-08-15"; /** * Format: uri * @description Override the Stripe API base URL (e.g. http://localhost:12111 for stripe-mock) diff --git a/apps/service/src/__generated__/openapi.json b/apps/service/src/__generated__/openapi.json index 631fc426..5de34bc2 100644 --- a/apps/service/src/__generated__/openapi.json +++ b/apps/service/src/__generated__/openapi.json @@ -989,7 +989,59 @@ }, "api_version": { "type": "string", - "const": "2026-03-25.dahlia" + "enum": [ + "2026-03-25.dahlia", + "2026-02-25.clover", + "2026-01-28.clover", + "2025-12-15.clover", + "2025-11-17.clover", + "2025-10-29.clover", + "2025-09-30.clover", + "2025-08-27.basil", + "2025-07-30.basil", + "2025-06-30.basil", + "2025-05-28.basil", + "2025-04-30.basil", + "2025-03-31.basil", + "2025-02-24.acacia", + "2025-01-27.acacia", + "2024-12-18.acacia", + "2024-11-20.acacia", + "2024-10-28.acacia", + "2024-09-30.acacia", + "2024-06-20", + "2024-04-10", + "2024-04-03", + "2023-10-16", + "2023-08-16", + "2022-11-15", + "2022-08-01", + "2020-08-27", + "2020-03-02", + "2019-12-03", + "2019-11-05", + "2019-10-17", + "2019-10-08", + "2019-09-09", + "2019-08-14", + "2019-05-16", + "2019-03-14", + "2019-02-19", + "2019-02-11", + "2018-11-08", + "2018-10-31", + "2018-09-24", + "2018-09-06", + "2018-08-23", + "2018-07-27", + "2018-05-21", + "2018-02-28", + "2018-02-06", + "2018-02-05", + "2018-01-23", + "2017-12-14", + "2017-08-15" + ] }, "base_url": { "type": "string", From 89b89140c28824e21a58970920cfbea34dfcd523 Mon Sep 17 00:00:00 2001 From: Tony Xiao Date: Sun, 5 Apr 2026 00:11:56 -0700 Subject: [PATCH 05/13] feat(openapi): build SUPPORTED_API_VERSIONS from committed oas/manifest.json Remove network fetch at build time. Instead: - oas/manifest.json is committed alongside the bundled spec - generate-versions.mjs reads it locally (reproducible, no network) - docs/scripts/generate-stripe-specs.mjs now also writes manifest to packages/openapi/oas/manifest.json when updating the CDN To pick up new Stripe API versions: 1. Run docs/scripts/generate-stripe-specs.mjs 2. Commit updated oas/manifest.json 3. Run pnpm --filter @stripe/sync-openapi build Co-Authored-By: Claude Sonnet 4.6 Committed-By-Agent: claude --- docs/scripts/generate-stripe-specs.mjs | 6 +++ packages/openapi/oas/manifest.json | 53 +++++++++++++++++++ .../openapi/scripts/generate-versions.mjs | 49 ++++++++--------- 3 files changed, 80 insertions(+), 28 deletions(-) create mode 100644 packages/openapi/oas/manifest.json diff --git a/docs/scripts/generate-stripe-specs.mjs b/docs/scripts/generate-stripe-specs.mjs index 569506d8..3b76b457 100644 --- a/docs/scripts/generate-stripe-specs.mjs +++ b/docs/scripts/generate-stripe-specs.mjs @@ -139,4 +139,10 @@ ${rows} ` ) +// Also write manifest to packages/openapi/oas/ so generate-versions.mjs +// can build SUPPORTED_API_VERSIONS without a network fetch. +const pkgManifestPath = new URL('../../packages/openapi/oas/manifest.json', import.meta.url) +writeFileSync(pkgManifestPath, JSON.stringify(Object.fromEntries(seen), null, 2) + '\n') +console.error(`Updated packages/openapi/oas/manifest.json`) + console.error(`\nDone: ${seen.size} spec versions`) diff --git a/packages/openapi/oas/manifest.json b/packages/openapi/oas/manifest.json new file mode 100644 index 00000000..9613d434 --- /dev/null +++ b/packages/openapi/oas/manifest.json @@ -0,0 +1,53 @@ +{ + "2026-03-25.dahlia": "2026-03-25.dahlia.json", + "2026-02-25.clover": "2026-02-25.clover.json", + "2026-01-28.clover": "2026-01-28.clover.json", + "2025-12-15.clover": "2025-12-15.clover.json", + "2025-11-17.clover": "2025-11-17.clover.json", + "2025-10-29.clover": "2025-10-29.clover.json", + "2025-09-30.clover": "2025-09-30.clover.json", + "2025-08-27.basil": "2025-08-27.basil.json", + "2025-07-30.basil": "2025-07-30.basil.json", + "2025-06-30.basil": "2025-06-30.basil.json", + "2025-05-28.basil": "2025-05-28.basil.json", + "2025-04-30.basil": "2025-04-30.basil.json", + "2025-03-31.basil": "2025-03-31.basil.json", + "2025-02-24.acacia": "2025-02-24.acacia.json", + "2025-01-27.acacia": "2025-01-27.acacia.json", + "2024-12-18.acacia": "2024-12-18.acacia.json", + "2024-11-20.acacia": "2024-11-20.acacia.json", + "2024-10-28.acacia": "2024-10-28.acacia.json", + "2024-09-30.acacia": "2024-09-30.acacia.json", + "2024-06-20": "2024-06-20.json", + "2024-04-10": "2024-04-10.json", + "2024-04-03": "2024-04-03.json", + "2023-10-16": "2023-10-16.json", + "2023-08-16": "2023-08-16.json", + "2022-11-15": "2022-11-15.json", + "2022-08-01": "2022-08-01.json", + "2020-08-27": "2020-08-27.json", + "2020-03-02": "2020-03-02.json", + "2019-12-03": "2019-12-03.json", + "2019-11-05": "2019-11-05.json", + "2019-10-17": "2019-10-17.json", + "2019-10-08": "2019-10-08.json", + "2019-09-09": "2019-09-09.json", + "2019-08-14": "2019-08-14.json", + "2019-05-16": "2019-05-16.json", + "2019-03-14": "2019-03-14.json", + "2019-02-19": "2019-02-19.json", + "2019-02-11": "2019-02-11.json", + "2018-11-08": "2018-11-08.json", + "2018-10-31": "2018-10-31.json", + "2018-09-24": "2018-09-24.json", + "2018-09-06": "2018-09-06.json", + "2018-08-23": "2018-08-23.json", + "2018-07-27": "2018-07-27.json", + "2018-05-21": "2018-05-21.json", + "2018-02-28": "2018-02-28.json", + "2018-02-06": "2018-02-06.json", + "2018-02-05": "2018-02-05.json", + "2018-01-23": "2018-01-23.json", + "2017-12-14": "2017-12-14.json", + "2017-08-15": "2017-08-15.json" +} diff --git a/packages/openapi/scripts/generate-versions.mjs b/packages/openapi/scripts/generate-versions.mjs index e53eac18..08df841a 100644 --- a/packages/openapi/scripts/generate-versions.mjs +++ b/packages/openapi/scripts/generate-versions.mjs @@ -1,19 +1,18 @@ #!/usr/bin/env node /** - * Generates src/versions.ts by combining: - * - BUNDLED_API_VERSION — the single spec file in oas/ (local, no network) - * - SUPPORTED_API_VERSIONS — all versions in the CDN manifest (network fetch) + * Generates src/versions.ts from committed files in oas/. * - * The CDN manifest is produced by docs/scripts/generate-stripe-specs.mjs and - * served at STRIPE_SPEC_CDN_BASE_URL/manifest.json. + * BUNDLED_API_VERSION — the single *.json spec file in oas/ + * SUPPORTED_API_VERSIONS — all entries in oas/manifest.json + * + * Both are committed. oas/manifest.json is updated by + * docs/scripts/generate-stripe-specs.mjs (the CDN update process), which + * writes to the same manifest file. No network access at build time. * * Run directly: node scripts/generate-versions.mjs * Run via build: pnpm --filter @stripe/sync-openapi build - * - * Override the CDN URL in tests/CI: - * STRIPE_SPEC_CDN_BASE_URL=http://localhost:3000 node scripts/generate-versions.mjs */ -import { readdirSync, writeFileSync, mkdirSync } from 'node:fs' +import { readdirSync, readFileSync, writeFileSync, mkdirSync } from 'node:fs' import { join, dirname } from 'node:path' import { fileURLToPath } from 'node:url' @@ -21,10 +20,7 @@ const __dirname = dirname(fileURLToPath(import.meta.url)) const oasDir = join(__dirname, '..', 'oas') const outFile = join(__dirname, '..', 'src', 'versions.ts') -const CDN_BASE_URL = - process.env.STRIPE_SPEC_CDN_BASE_URL ?? 'https://stripe-sync.dev/stripe-api-specs' - -// BUNDLED_API_VERSION — derived from the single .json file in oas/ +// BUNDLED_API_VERSION — the single committed spec file in oas/ const bundledFiles = readdirSync(oasDir).filter( (f) => f.endsWith('.json') && f !== 'manifest.json' && f !== 'index.html' ) @@ -36,25 +32,22 @@ if (bundledFiles.length !== 1) { } const bundled = bundledFiles[0].replace(/\.json$/, '') -// SUPPORTED_API_VERSIONS — all versions from the CDN manifest -console.error(`Fetching manifest from ${CDN_BASE_URL}/manifest.json ...`) +// SUPPORTED_API_VERSIONS — from committed oas/manifest.json +// Updated by: docs/scripts/generate-stripe-specs.mjs +const manifestPath = join(oasDir, 'manifest.json') let manifest try { - const res = await fetch(`${CDN_BASE_URL}/manifest.json`) - if (!res.ok) throw new Error(`HTTP ${res.status} ${res.statusText}`) - manifest = await res.json() + manifest = JSON.parse(readFileSync(manifestPath, 'utf8')) } catch (err) { - console.error(`Failed to fetch CDN manifest: ${err.message}`) - console.error( - 'Run with STRIPE_SPEC_CDN_BASE_URL pointing to a local mirror, or check connectivity.' - ) + console.error(`Failed to read ${manifestPath}: ${err.message}`) + console.error('Run docs/scripts/generate-stripe-specs.mjs to regenerate it.') process.exit(1) } const allVersions = Object.keys(manifest).sort().reverse() // newest first if (allVersions.length === 0) { - console.error('CDN manifest has no versions') + console.error('oas/manifest.json has no versions') process.exit(1) } @@ -64,18 +57,18 @@ mkdirSync(dirname(outFile), { recursive: true }) writeFileSync( outFile, `// Generated by scripts/generate-versions.mjs — do not edit manually. -// BUNDLED_API_VERSION: derived from packages/openapi/oas/*.json (one file). -// SUPPORTED_API_VERSIONS: fetched from ${CDN_BASE_URL}/manifest.json at build time. -// Re-run \`pnpm --filter @stripe/sync-openapi build\` to pick up new CDN versions. +// BUNDLED_API_VERSION: the single spec file in packages/openapi/oas/. +// SUPPORTED_API_VERSIONS: all entries in packages/openapi/oas/manifest.json. +// To update: run docs/scripts/generate-stripe-specs.mjs, then rebuild. /** The single Stripe API spec bundled in this package (served without network). */ export const BUNDLED_API_VERSION = '${bundled}' as const -/** All Stripe API versions available via the CDN, newest first. */ +/** All Stripe API versions known to the CDN, newest first. */ export const SUPPORTED_API_VERSIONS = [ ${lines} ] as const satisfies readonly string[] ` ) -console.log(`Bundled: ${bundled} | Supported: ${allVersions.length} version(s) from CDN`) +console.log(`Bundled: ${bundled} | Supported: ${allVersions.length} version(s)`) From 6336a0c92e58214f6e8a3bb1ffca08cee032346b Mon Sep 17 00:00:00 2001 From: Tony Xiao Date: Sun, 5 Apr 2026 00:23:23 -0700 Subject: [PATCH 06/13] refactor(openapi): move CDN spec generation into packages/openapi/scripts/ MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add scripts/generate-all-specs.mjs (heavyweight): moved from docs/scripts/generate-stripe-specs.mjs; walks stripe/openapi git history, writes spec files + manifest.json to for CDN, and updates src/versions.ts as a side effect - Keep scripts/generate-versions.mjs (lightweight): standalone utility to bootstrap src/versions.ts from just the bundled oas/ file - Remove build-time version generation from package.json — src/versions.ts is a committed file updated by the heavyweight script; build = tsc only - Remove oas/manifest.json — not needed in the package - docs/scripts/generate-stripe-specs.mjs → thin wrapper that delegates to packages/openapi/scripts/generate-all-specs.mjs packages/openapi is now the source of truth for Stripe API version discovery. CDN invokes generate-all-specs.mjs with an output directory. Co-Authored-By: Claude Sonnet 4.6 Committed-By-Agent: claude --- docs/scripts/generate-stripe-specs.mjs | 155 ++-------------- packages/openapi/oas/manifest.json | 53 ------ packages/openapi/package.json | 2 +- .../openapi/scripts/generate-all-specs.mjs | 173 ++++++++++++++++++ .../openapi/scripts/generate-versions.mjs | 46 ++--- 5 files changed, 200 insertions(+), 229 deletions(-) delete mode 100644 packages/openapi/oas/manifest.json create mode 100644 packages/openapi/scripts/generate-all-specs.mjs diff --git a/docs/scripts/generate-stripe-specs.mjs b/docs/scripts/generate-stripe-specs.mjs index 3b76b457..f970c70c 100644 --- a/docs/scripts/generate-stripe-specs.mjs +++ b/docs/scripts/generate-stripe-specs.mjs @@ -1,148 +1,23 @@ #!/usr/bin/env node /** - * Fetches all published Stripe REST API spec versions from - * github.com/stripe/openapi and writes .json + manifest.json to . + * CDN spec generation — thin wrapper around packages/openapi/scripts/generate-all-specs.mjs. * * Usage: - * node generate-stripe-specs.mjs - * - * Clones stripe/openapi (single-branch) then walks the full history, collecting - * every unique API version (deduplicated by blob SHA, then by version string). - * Set STRIPE_OPENAPI_REPO to a pre-cloned path to skip the clone (e.g. CI cache). - * - * These are the official Stripe REST API specs (github.com/stripe/openapi), NOT - * the Sync Engine's own OpenAPI spec (which lives at /openapi/engine.json etc.). - * - * No npm dependencies. + * node docs/scripts/generate-stripe-specs.mjs */ -import { writeFileSync, mkdirSync, existsSync } from 'node:fs' -import { join } from 'node:path' -import { tmpdir } from 'node:os' import { execFileSync } from 'node:child_process' - -const [outputDir] = process.argv.slice(2) -if (!outputDir) { - console.error('Usage: node generate-stripe-specs.mjs ') - process.exit(1) -} - -const REPO_URL = 'https://github.com/stripe/openapi' -// stripe/openapi uses 'latest/openapi.spec3.sdk.json' for recent specs and -// 'openapi/spec3.json' for historic ones. -const SPEC_PATHS = ['latest/openapi.spec3.sdk.json', 'openapi/spec3.json'] - -function git(...args) { - // maxBuffer: Stripe specs are ~10 MB each; default 1 MB would silently truncate/throw. - return execFileSync('git', ['-C', repoDir, ...args], { - encoding: 'utf8', - maxBuffer: 50 * 1024 * 1024, - }) -} - -// Clone or use pre-cloned repo -const repoDir = process.env.STRIPE_OPENAPI_REPO ?? join(tmpdir(), 'stripe-openapi') -if (!existsSync(join(repoDir, '.git'))) { - console.error(`Cloning ${REPO_URL}...`) - execFileSync('git', ['clone', '--single-branch', REPO_URL, repoDir], { stdio: 'inherit' }) -} else { - console.error(`Using pre-cloned repo at ${repoDir}`) -} - -// Walk commits newest→oldest, collect all unique API versions. -console.error('Collecting all spec versions...') -const commits = git('log', '--format=%H', '--', ...SPEC_PATHS) - .trim() - .split('\n') - .filter(Boolean) - -mkdirSync(outputDir, { recursive: true }) - -const seen = new Map() // version -> filename -const seenBlobs = new Set() - -for (const commit of commits) { - let blobSha - for (const specPath of SPEC_PATHS) { - let ls - try { - ls = git('ls-tree', commit, specPath).trim() - } catch { - continue - } - if (!ls) continue - blobSha = ls.split(/\s+/)[2] - break - } - if (!blobSha || seenBlobs.has(blobSha)) continue - seenBlobs.add(blobSha) - - let raw - try { - raw = git('cat-file', 'blob', blobSha) - } catch { - continue - } - - let version - try { - version = JSON.parse(raw).info?.version - } catch { - continue - } - if (!version || seen.has(version)) continue - - writeFileSync(join(outputDir, `${version}.json`), raw) - seen.set(version, `${version}.json`) - console.error(` ${version}`) -} - -writeFileSync( - join(outputDir, 'manifest.json'), - JSON.stringify(Object.fromEntries(seen), null, 2) + '\n' -) - -// Generate an index page so https://stripe-sync.dev/stripe-api-specs/ is browsable -const versions = [...seen.keys()].sort().reverse() -const rows = versions.map((v) => `
  • ${v}
  • `).join('\n') -writeFileSync( - join(outputDir, 'index.html'), - ` - - - - Stripe REST API Specs — stripe-sync.dev CDN - - - -

    Stripe REST API OpenAPI Specs

    -

    - These are the official Stripe REST API specs from - github.com/stripe/openapi, - mirrored here to avoid GitHub API rate limits. - This is not the Sync Engine's own OpenAPI spec - (see engine.json for that). -

    -

    Machine-readable index: manifest.json — ${versions.length} versions available.

    -
      -${rows} -
    - - -` +import { dirname, join } from 'node:path' +import { fileURLToPath } from 'node:url' + +const __dirname = dirname(fileURLToPath(import.meta.url)) +const script = join( + __dirname, + '..', + '..', + 'packages', + 'openapi', + 'scripts', + 'generate-all-specs.mjs' ) -// Also write manifest to packages/openapi/oas/ so generate-versions.mjs -// can build SUPPORTED_API_VERSIONS without a network fetch. -const pkgManifestPath = new URL('../../packages/openapi/oas/manifest.json', import.meta.url) -writeFileSync(pkgManifestPath, JSON.stringify(Object.fromEntries(seen), null, 2) + '\n') -console.error(`Updated packages/openapi/oas/manifest.json`) - -console.error(`\nDone: ${seen.size} spec versions`) +execFileSync(process.execPath, [script, ...process.argv.slice(2)], { stdio: 'inherit' }) diff --git a/packages/openapi/oas/manifest.json b/packages/openapi/oas/manifest.json deleted file mode 100644 index 9613d434..00000000 --- a/packages/openapi/oas/manifest.json +++ /dev/null @@ -1,53 +0,0 @@ -{ - "2026-03-25.dahlia": "2026-03-25.dahlia.json", - "2026-02-25.clover": "2026-02-25.clover.json", - "2026-01-28.clover": "2026-01-28.clover.json", - "2025-12-15.clover": "2025-12-15.clover.json", - "2025-11-17.clover": "2025-11-17.clover.json", - "2025-10-29.clover": "2025-10-29.clover.json", - "2025-09-30.clover": "2025-09-30.clover.json", - "2025-08-27.basil": "2025-08-27.basil.json", - "2025-07-30.basil": "2025-07-30.basil.json", - "2025-06-30.basil": "2025-06-30.basil.json", - "2025-05-28.basil": "2025-05-28.basil.json", - "2025-04-30.basil": "2025-04-30.basil.json", - "2025-03-31.basil": "2025-03-31.basil.json", - "2025-02-24.acacia": "2025-02-24.acacia.json", - "2025-01-27.acacia": "2025-01-27.acacia.json", - "2024-12-18.acacia": "2024-12-18.acacia.json", - "2024-11-20.acacia": "2024-11-20.acacia.json", - "2024-10-28.acacia": "2024-10-28.acacia.json", - "2024-09-30.acacia": "2024-09-30.acacia.json", - "2024-06-20": "2024-06-20.json", - "2024-04-10": "2024-04-10.json", - "2024-04-03": "2024-04-03.json", - "2023-10-16": "2023-10-16.json", - "2023-08-16": "2023-08-16.json", - "2022-11-15": "2022-11-15.json", - "2022-08-01": "2022-08-01.json", - "2020-08-27": "2020-08-27.json", - "2020-03-02": "2020-03-02.json", - "2019-12-03": "2019-12-03.json", - "2019-11-05": "2019-11-05.json", - "2019-10-17": "2019-10-17.json", - "2019-10-08": "2019-10-08.json", - "2019-09-09": "2019-09-09.json", - "2019-08-14": "2019-08-14.json", - "2019-05-16": "2019-05-16.json", - "2019-03-14": "2019-03-14.json", - "2019-02-19": "2019-02-19.json", - "2019-02-11": "2019-02-11.json", - "2018-11-08": "2018-11-08.json", - "2018-10-31": "2018-10-31.json", - "2018-09-24": "2018-09-24.json", - "2018-09-06": "2018-09-06.json", - "2018-08-23": "2018-08-23.json", - "2018-07-27": "2018-07-27.json", - "2018-05-21": "2018-05-21.json", - "2018-02-28": "2018-02-28.json", - "2018-02-06": "2018-02-06.json", - "2018-02-05": "2018-02-05.json", - "2018-01-23": "2018-01-23.json", - "2017-12-14": "2017-12-14.json", - "2017-08-15": "2017-08-15.json" -} diff --git a/packages/openapi/package.json b/packages/openapi/package.json index 50b30f12..4d48f398 100644 --- a/packages/openapi/package.json +++ b/packages/openapi/package.json @@ -11,7 +11,7 @@ } }, "scripts": { - "build": "node scripts/generate-versions.mjs && tsc && cp -r oas dist/", + "build": "tsc && cp -r oas dist/", "test": "vitest --passWithNoTests" }, "files": [ diff --git a/packages/openapi/scripts/generate-all-specs.mjs b/packages/openapi/scripts/generate-all-specs.mjs new file mode 100644 index 00000000..e0b68fd8 --- /dev/null +++ b/packages/openapi/scripts/generate-all-specs.mjs @@ -0,0 +1,173 @@ +#!/usr/bin/env node +/** + * Heavyweight: fetches ALL published Stripe REST API spec versions from + * github.com/stripe/openapi, writes .json + manifest.json to + * (for CDN upload), and updates src/versions.ts with the full version list. + * + * Usage: + * node packages/openapi/scripts/generate-all-specs.mjs + * + * Clones stripe/openapi (single-branch) then walks the full history, collecting + * every unique API version (deduplicated by blob SHA, then by version string). + * Set STRIPE_OPENAPI_REPO to a pre-cloned path to skip the clone (e.g. CI cache). + * + * These are the official Stripe REST API specs (github.com/stripe/openapi), NOT + * the Sync Engine's own OpenAPI spec. + * + * No npm dependencies. + */ +import { writeFileSync, mkdirSync, existsSync } from 'node:fs' +import { join, dirname } from 'node:path' +import { tmpdir } from 'node:os' +import { execFileSync } from 'node:child_process' +import { fileURLToPath } from 'node:url' + +const __dirname = dirname(fileURLToPath(import.meta.url)) + +const [outputDir] = process.argv.slice(2) +if (!outputDir) { + console.error('Usage: node generate-all-specs.mjs ') + process.exit(1) +} + +const REPO_URL = 'https://github.com/stripe/openapi' +// stripe/openapi uses 'latest/openapi.spec3.sdk.json' for recent specs and +// 'openapi/spec3.json' for historic ones. +const SPEC_PATHS = ['latest/openapi.spec3.sdk.json', 'openapi/spec3.json'] + +function git(...args) { + // maxBuffer: Stripe specs are ~10 MB each; default 1 MB would silently truncate/throw. + return execFileSync('git', ['-C', repoDir, ...args], { + encoding: 'utf8', + maxBuffer: 50 * 1024 * 1024, + }) +} + +// Clone or use pre-cloned repo +const repoDir = process.env.STRIPE_OPENAPI_REPO ?? join(tmpdir(), 'stripe-openapi') +if (!existsSync(join(repoDir, '.git'))) { + console.error(`Cloning ${REPO_URL}...`) + execFileSync('git', ['clone', '--single-branch', REPO_URL, repoDir], { stdio: 'inherit' }) +} else { + console.error(`Using pre-cloned repo at ${repoDir}`) +} + +// Walk commits newest→oldest, collect all unique API versions. +console.error('Collecting all spec versions...') +const commits = git('log', '--format=%H', '--', ...SPEC_PATHS) + .trim() + .split('\n') + .filter(Boolean) + +mkdirSync(outputDir, { recursive: true }) + +const seen = new Map() // version -> filename +const seenBlobs = new Set() + +for (const commit of commits) { + let blobSha + for (const specPath of SPEC_PATHS) { + let ls + try { + ls = git('ls-tree', commit, specPath).trim() + } catch { + continue + } + if (!ls) continue + blobSha = ls.split(/\s+/)[2] + break + } + if (!blobSha || seenBlobs.has(blobSha)) continue + seenBlobs.add(blobSha) + + let raw + try { + raw = git('cat-file', 'blob', blobSha) + } catch { + continue + } + + let version + try { + version = JSON.parse(raw).info?.version + } catch { + continue + } + if (!version || seen.has(version)) continue + + writeFileSync(join(outputDir, `${version}.json`), raw) + seen.set(version, `${version}.json`) + console.error(` ${version}`) +} + +writeFileSync( + join(outputDir, 'manifest.json'), + JSON.stringify(Object.fromEntries(seen), null, 2) + '\n' +) + +// Update src/versions.ts with the full version list so build-time generation +// picks up all known versions without a network fetch. +const allVersions = [...seen.keys()].sort().reverse() // newest first +const bundledFiles = (await import('node:fs')) + .readdirSync(join(__dirname, '..', 'oas')) + .filter((f) => f.endsWith('.json') && f !== 'manifest.json' && f !== 'index.html') +const bundled = bundledFiles[0]?.replace(/\.json$/, '') ?? allVersions[0] +const lines = allVersions.map((v) => ` '${v}',`).join('\n') +const versionsFile = join(__dirname, '..', 'src', 'versions.ts') +writeFileSync( + versionsFile, + `// Generated by scripts/generate-all-specs.mjs — do not edit manually. +// BUNDLED_API_VERSION: the single spec file in packages/openapi/oas/. +// SUPPORTED_API_VERSIONS: all versions discovered from github.com/stripe/openapi. +// Re-run scripts/generate-all-specs.mjs to pick up newly published API versions. + +/** The single Stripe API spec bundled in this package (served without network). */ +export const BUNDLED_API_VERSION = '${bundled}' as const + +/** All Stripe API versions published by Stripe, newest first. */ +export const SUPPORTED_API_VERSIONS = [ +${lines} +] as const satisfies readonly string[] +` +) +console.error(`Updated src/versions.ts (${allVersions.length} versions, bundled: ${bundled})`) + +// Generate an index page so https://stripe-sync.dev/stripe-api-specs/ is browsable +const versions = [...seen.keys()].sort().reverse() +const rows = versions.map((v) => `
  • ${v}
  • `).join('\n') +writeFileSync( + join(outputDir, 'index.html'), + ` + + + + Stripe REST API Specs — stripe-sync.dev CDN + + + +

    Stripe REST API OpenAPI Specs

    +

    + These are the official Stripe REST API specs from + github.com/stripe/openapi, + mirrored here to avoid GitHub API rate limits. + This is not the Sync Engine's own OpenAPI spec + (see engine.json for that). +

    +

    Machine-readable index: manifest.json — ${versions.length} versions available.

    +
      +${rows} +
    + + +` +) + +console.error(`\nDone: ${seen.size} spec versions`) diff --git a/packages/openapi/scripts/generate-versions.mjs b/packages/openapi/scripts/generate-versions.mjs index 08df841a..6ef6c23a 100644 --- a/packages/openapi/scripts/generate-versions.mjs +++ b/packages/openapi/scripts/generate-versions.mjs @@ -1,18 +1,17 @@ #!/usr/bin/env node /** - * Generates src/versions.ts from committed files in oas/. + * Lightweight: generates src/versions.ts from the single bundled spec in oas/. * - * BUNDLED_API_VERSION — the single *.json spec file in oas/ - * SUPPORTED_API_VERSIONS — all entries in oas/manifest.json + * BUNDLED_API_VERSION — the single *.json file in oas/ + * SUPPORTED_API_VERSIONS — same as bundled (lightweight fallback) * - * Both are committed. oas/manifest.json is updated by - * docs/scripts/generate-stripe-specs.mjs (the CDN update process), which - * writes to the same manifest file. No network access at build time. + * For the full list of all published Stripe API versions, run the heavyweight + * script instead: node scripts/generate-all-specs.mjs * * Run directly: node scripts/generate-versions.mjs * Run via build: pnpm --filter @stripe/sync-openapi build */ -import { readdirSync, readFileSync, writeFileSync, mkdirSync } from 'node:fs' +import { readdirSync, writeFileSync, mkdirSync } from 'node:fs' import { join, dirname } from 'node:path' import { fileURLToPath } from 'node:url' @@ -20,7 +19,6 @@ const __dirname = dirname(fileURLToPath(import.meta.url)) const oasDir = join(__dirname, '..', 'oas') const outFile = join(__dirname, '..', 'src', 'versions.ts') -// BUNDLED_API_VERSION — the single committed spec file in oas/ const bundledFiles = readdirSync(oasDir).filter( (f) => f.endsWith('.json') && f !== 'manifest.json' && f !== 'index.html' ) @@ -32,43 +30,21 @@ if (bundledFiles.length !== 1) { } const bundled = bundledFiles[0].replace(/\.json$/, '') -// SUPPORTED_API_VERSIONS — from committed oas/manifest.json -// Updated by: docs/scripts/generate-stripe-specs.mjs -const manifestPath = join(oasDir, 'manifest.json') -let manifest -try { - manifest = JSON.parse(readFileSync(manifestPath, 'utf8')) -} catch (err) { - console.error(`Failed to read ${manifestPath}: ${err.message}`) - console.error('Run docs/scripts/generate-stripe-specs.mjs to regenerate it.') - process.exit(1) -} - -const allVersions = Object.keys(manifest).sort().reverse() // newest first - -if (allVersions.length === 0) { - console.error('oas/manifest.json has no versions') - process.exit(1) -} - -const lines = allVersions.map((v) => ` '${v}',`).join('\n') - mkdirSync(dirname(outFile), { recursive: true }) writeFileSync( outFile, `// Generated by scripts/generate-versions.mjs — do not edit manually. -// BUNDLED_API_VERSION: the single spec file in packages/openapi/oas/. -// SUPPORTED_API_VERSIONS: all entries in packages/openapi/oas/manifest.json. -// To update: run docs/scripts/generate-stripe-specs.mjs, then rebuild. +// For the full list of all published Stripe API versions, run: +// node scripts/generate-all-specs.mjs /** The single Stripe API spec bundled in this package (served without network). */ export const BUNDLED_API_VERSION = '${bundled}' as const -/** All Stripe API versions known to the CDN, newest first. */ +/** All Stripe API versions published by Stripe, newest first. */ export const SUPPORTED_API_VERSIONS = [ -${lines} + '${bundled}', ] as const satisfies readonly string[] ` ) -console.log(`Bundled: ${bundled} | Supported: ${allVersions.length} version(s)`) +console.log(`Bundled: ${bundled}`) From 8b1c7df862759f6219d1e063f12b6015fc3eab2c Mon Sep 17 00:00:00 2001 From: Tony Xiao Date: Sun, 5 Apr 2026 00:27:27 -0700 Subject: [PATCH 07/13] refactor(service): remove generic queue activities, fix global state replacement MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Delete read-into-queue.ts and write-from-queue.ts — unused generic queue activities; pipelineWorkflow and backfillPipelineWorkflow use syncImmediate directly, and googleSheetPipelineWorkflow has its own GS-specific queue activities - Fix global source_state handling: replace wholesale (state.global = data) instead of merging (Object.assign) — consistent with stream state which already does per-stream replacement Co-Authored-By: Claude Sonnet 4.6 Committed-By-Agent: claude --- apps/service/src/__tests__/workflow.test.ts | 2 -- apps/service/src/api/app.test.ts | 2 -- .../src/temporal/activities/_shared.ts | 2 +- apps/service/src/temporal/activities/index.ts | 4 --- .../read-google-sheets-into-queue.ts | 2 +- .../temporal/activities/read-into-queue.ts | 34 ------------------- .../temporal/activities/write-from-queue.ts | 32 ----------------- .../write-google-sheets-from-queue.ts | 2 +- ...erate-all-specs.mjs => generate-specs.mjs} | 0 9 files changed, 3 insertions(+), 77 deletions(-) delete mode 100644 apps/service/src/temporal/activities/read-into-queue.ts delete mode 100644 apps/service/src/temporal/activities/write-from-queue.ts rename packages/openapi/scripts/{generate-all-specs.mjs => generate-specs.mjs} (100%) diff --git a/apps/service/src/__tests__/workflow.test.ts b/apps/service/src/__tests__/workflow.test.ts index c37f01ba..a49e9293 100644 --- a/apps/service/src/__tests__/workflow.test.ts +++ b/apps/service/src/__tests__/workflow.test.ts @@ -22,14 +22,12 @@ function stubActivities(overrides: Partial = {}): SyncActivities setup: async () => ({}), syncImmediate: async () => noErrors, readGoogleSheetsIntoQueue: async () => ({ count: 0, state: emptyState }), - readIntoQueue: async () => ({ count: 0, state: emptyState }), writeGoogleSheetsFromQueue: async () => ({ errors: [], state: emptyState, written: 0, rowAssignments: {}, }), - writeFromQueue: async () => ({ errors: [], state: emptyState, written: 0 }), teardown: async () => {}, ...overrides, } diff --git a/apps/service/src/api/app.test.ts b/apps/service/src/api/app.test.ts index ddd86971..3f223d3e 100644 --- a/apps/service/src/api/app.test.ts +++ b/apps/service/src/api/app.test.ts @@ -132,14 +132,12 @@ function stubActivities(): SyncActivities { setup: async () => ({}), syncImmediate: async () => noErrors, readGoogleSheetsIntoQueue: async () => ({ count: 0, state: {} }), - readIntoQueue: async () => ({ count: 0, state: {} }), writeGoogleSheetsFromQueue: async () => ({ errors: [], state: {}, written: 0, rowAssignments: {}, }), - writeFromQueue: async () => ({ errors: [], state: {}, written: 0 }), teardown: async () => {}, } } diff --git a/apps/service/src/temporal/activities/_shared.ts b/apps/service/src/temporal/activities/_shared.ts index da35d248..215f7bbd 100644 --- a/apps/service/src/temporal/activities/_shared.ts +++ b/apps/service/src/temporal/activities/_shared.ts @@ -159,7 +159,7 @@ export async function drainMessages(stream: AsyncIterable): Promise<{ errors.push(error) } else if (message.type === 'source_state') { if (message.source_state.state_type === 'global') { - Object.assign(state.global, message.source_state.data as Record) + state.global = message.source_state.data as Record } else { state.streams[message.source_state.stream] = message.source_state.data } diff --git a/apps/service/src/temporal/activities/index.ts b/apps/service/src/temporal/activities/index.ts index e423c2e3..cd96b661 100644 --- a/apps/service/src/temporal/activities/index.ts +++ b/apps/service/src/temporal/activities/index.ts @@ -1,11 +1,9 @@ import { createActivitiesContext } from './_shared.js' import { createDiscoverCatalogActivity } from './discover-catalog.js' -import { createReadIntoQueueActivity } from './read-into-queue.js' import { createReadGoogleSheetsIntoQueueActivity } from './read-google-sheets-into-queue.js' import { createSetupActivity } from './setup.js' import { createSyncImmediateActivity } from './sync-immediate.js' import { createTeardownActivity } from './teardown.js' -import { createWriteFromQueueActivity } from './write-from-queue.js' import { createWriteGoogleSheetsFromQueueActivity } from './write-google-sheets-from-queue.js' import type { PipelineStore } from '../../lib/stores.js' @@ -23,9 +21,7 @@ export function createActivities(opts: { setup: createSetupActivity(context), syncImmediate: createSyncImmediateActivity(context), readGoogleSheetsIntoQueue: createReadGoogleSheetsIntoQueueActivity(context), - readIntoQueue: createReadIntoQueueActivity(context), writeGoogleSheetsFromQueue: createWriteGoogleSheetsFromQueueActivity(context), - writeFromQueue: createWriteFromQueueActivity(context), teardown: createTeardownActivity(context), } } diff --git a/apps/service/src/temporal/activities/read-google-sheets-into-queue.ts b/apps/service/src/temporal/activities/read-google-sheets-into-queue.ts index dd2772aa..7b68fd90 100644 --- a/apps/service/src/temporal/activities/read-google-sheets-into-queue.ts +++ b/apps/service/src/temporal/activities/read-google-sheets-into-queue.ts @@ -57,7 +57,7 @@ export function createReadGoogleSheetsIntoQueueActivity(context: ActivitiesConte queued.push(withRowKey(raw, catalog)) } else if (raw.type === 'source_state') { if (raw.source_state.state_type === 'global') { - Object.assign(state.global, raw.source_state.data as Record) + state.global = raw.source_state.data as Record } else { state.streams[raw.source_state.stream] = raw.source_state.data } diff --git a/apps/service/src/temporal/activities/read-into-queue.ts b/apps/service/src/temporal/activities/read-into-queue.ts deleted file mode 100644 index deec31bf..00000000 --- a/apps/service/src/temporal/activities/read-into-queue.ts +++ /dev/null @@ -1,34 +0,0 @@ -import type { SourceReadOptions } from '@stripe/sync-engine' -import type { ActivitiesContext } from './_shared.js' - -type SourceInput = unknown -import { asIterable, drainMessages } from './_shared.js' - -export function createReadIntoQueueActivity(context: ActivitiesContext) { - return async function readIntoQueue( - pipelineId: string, - opts?: SourceReadOptions & { input?: SourceInput[] } - ): Promise<{ - count: number - state: import('@stripe/sync-engine').SyncState - eof?: { reason: string } - }> { - const pipeline = await context.pipelineStore.get(pipelineId) - const { id: _, ...config } = pipeline - const { input: inputArr, ...readOpts } = opts ?? {} - const input = inputArr?.length ? asIterable(inputArr) : undefined - const { records, state, eof } = await drainMessages( - context.engine.pipeline_read(config, readOpts, input) - ) - - if (context.kafkaBroker && records.length > 0) { - const producer = await context.getProducer() - await producer.send({ - topic: `pipeline.${pipelineId}`, - messages: records.map((record) => ({ value: JSON.stringify(record) })), - }) - } - - return { count: records.length, state, eof } - } -} diff --git a/apps/service/src/temporal/activities/write-from-queue.ts b/apps/service/src/temporal/activities/write-from-queue.ts deleted file mode 100644 index 9647eea4..00000000 --- a/apps/service/src/temporal/activities/write-from-queue.ts +++ /dev/null @@ -1,32 +0,0 @@ -import type { Message } from '@stripe/sync-engine' - -import type { ActivitiesContext } from './_shared.js' -import { asIterable, drainMessages, type RunResult } from './_shared.js' - -export function createWriteFromQueueActivity(context: ActivitiesContext) { - return async function writeFromQueue( - pipelineId: string, - opts?: { records?: unknown[]; maxBatch?: number } - ): Promise { - let records: unknown[] - - if (context.kafkaBroker) { - const maxBatch = opts?.maxBatch ?? 50 - records = await context.consumeQueueBatch(pipelineId, maxBatch) - } else { - records = opts?.records ?? [] - } - - if (records.length === 0) { - return { errors: [], state: { streams: {}, global: {} }, written: 0 } - } - - const pipeline = await context.pipelineStore.get(pipelineId) - const { id: _, ...config } = pipeline - const { errors, state } = await drainMessages( - context.engine.pipeline_write(config, asIterable(records) as AsyncIterable) - ) - - return { errors, state, written: records.length } - } -} diff --git a/apps/service/src/temporal/activities/write-google-sheets-from-queue.ts b/apps/service/src/temporal/activities/write-google-sheets-from-queue.ts index e77bc838..73bbae67 100644 --- a/apps/service/src/temporal/activities/write-google-sheets-from-queue.ts +++ b/apps/service/src/temporal/activities/write-google-sheets-from-queue.ts @@ -164,7 +164,7 @@ export function createWriteGoogleSheetsFromQueueActivity(context: ActivitiesCont errors.push(error) } else if (raw.type === 'source_state') { if (raw.source_state.state_type === 'global') { - Object.assign(state.global, raw.source_state.data as Record) + state.global = raw.source_state.data as Record } else { state.streams[raw.source_state.stream] = raw.source_state.data } diff --git a/packages/openapi/scripts/generate-all-specs.mjs b/packages/openapi/scripts/generate-specs.mjs similarity index 100% rename from packages/openapi/scripts/generate-all-specs.mjs rename to packages/openapi/scripts/generate-specs.mjs From 7224b6c9a632f6375dce70518b884ec3863fd81f Mon Sep 17 00:00:00 2001 From: Tony Xiao Date: Sun, 5 Apr 2026 00:27:48 -0700 Subject: [PATCH 08/13] =?UTF-8?q?refactor(openapi):=20rename=20generate-al?= =?UTF-8?q?l-specs=20=E2=86=92=20generate-specs,=20add=20--versions=20filt?= =?UTF-8?q?er?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Rename generate-all-specs.mjs → generate-specs.mjs - Add --versions flag to generate specific versions only (e.g. for updating the bundled spec in oas/): node scripts/generate-specs.mjs oas --versions 2026-03-25.dahlia Without the flag, all versions are generated (CDN use case). - Update docs/scripts/generate-stripe-specs.mjs wrapper reference - Update generate-versions.mjs comment reference Co-Authored-By: Claude Sonnet 4.6 Committed-By-Agent: claude --- docs/scripts/generate-stripe-specs.mjs | 12 +- packages/openapi/scripts/generate-specs.mjs | 104 ++++++++++++------ .../openapi/scripts/generate-versions.mjs | 2 +- 3 files changed, 75 insertions(+), 43 deletions(-) diff --git a/docs/scripts/generate-stripe-specs.mjs b/docs/scripts/generate-stripe-specs.mjs index f970c70c..2d466328 100644 --- a/docs/scripts/generate-stripe-specs.mjs +++ b/docs/scripts/generate-stripe-specs.mjs @@ -1,6 +1,6 @@ #!/usr/bin/env node /** - * CDN spec generation — thin wrapper around packages/openapi/scripts/generate-all-specs.mjs. + * CDN spec generation — thin wrapper around packages/openapi/scripts/generate-specs.mjs. * * Usage: * node docs/scripts/generate-stripe-specs.mjs @@ -10,14 +10,6 @@ import { dirname, join } from 'node:path' import { fileURLToPath } from 'node:url' const __dirname = dirname(fileURLToPath(import.meta.url)) -const script = join( - __dirname, - '..', - '..', - 'packages', - 'openapi', - 'scripts', - 'generate-all-specs.mjs' -) +const script = join(__dirname, '..', '..', 'packages', 'openapi', 'scripts', 'generate-specs.mjs') execFileSync(process.execPath, [script, ...process.argv.slice(2)], { stdio: 'inherit' }) diff --git a/packages/openapi/scripts/generate-specs.mjs b/packages/openapi/scripts/generate-specs.mjs index e0b68fd8..d1afc78f 100644 --- a/packages/openapi/scripts/generate-specs.mjs +++ b/packages/openapi/scripts/generate-specs.mjs @@ -1,22 +1,29 @@ #!/usr/bin/env node /** - * Heavyweight: fetches ALL published Stripe REST API spec versions from - * github.com/stripe/openapi, writes .json + manifest.json to - * (for CDN upload), and updates src/versions.ts with the full version list. + * Fetches Stripe REST API spec versions from github.com/stripe/openapi and + * writes .json + manifest.json to . * * Usage: - * node packages/openapi/scripts/generate-all-specs.mjs + * # All versions (CDN): + * node packages/openapi/scripts/generate-specs.mjs + * + * # Specific versions only (e.g. to update the bundled spec in oas/): + * node packages/openapi/scripts/generate-specs.mjs --versions 2026-03-25.dahlia + * node packages/openapi/scripts/generate-specs.mjs --versions 2026-03-25.dahlia,2026-02-25.clover * * Clones stripe/openapi (single-branch) then walks the full history, collecting - * every unique API version (deduplicated by blob SHA, then by version string). + * versions (deduplicated by blob SHA). When --versions is given, only those + * versions are written to . * Set STRIPE_OPENAPI_REPO to a pre-cloned path to skip the clone (e.g. CI cache). * + * Also updates src/versions.ts with the discovered version list. + * * These are the official Stripe REST API specs (github.com/stripe/openapi), NOT * the Sync Engine's own OpenAPI spec. * * No npm dependencies. */ -import { writeFileSync, mkdirSync, existsSync } from 'node:fs' +import { writeFileSync, mkdirSync, existsSync, readdirSync } from 'node:fs' import { join, dirname } from 'node:path' import { tmpdir } from 'node:os' import { execFileSync } from 'node:child_process' @@ -24,20 +31,29 @@ import { fileURLToPath } from 'node:url' const __dirname = dirname(fileURLToPath(import.meta.url)) -const [outputDir] = process.argv.slice(2) +const args = process.argv.slice(2) +const outputDir = args.find((a) => !a.startsWith('--')) if (!outputDir) { - console.error('Usage: node generate-all-specs.mjs ') + console.error('Usage: node generate-specs.mjs [--versions v1,v2,...]') process.exit(1) } +const versionsArg = args.find((a) => a.startsWith('--versions=') || a === '--versions') +const versionsIdx = args.indexOf('--versions') +const versionFilter = versionsArg?.startsWith('--versions=') + ? new Set(versionsArg.slice('--versions='.length).split(',').filter(Boolean)) + : versionsIdx !== -1 + ? new Set(args[versionsIdx + 1]?.split(',').filter(Boolean) ?? []) + : null // null = no filter, collect all + const REPO_URL = 'https://github.com/stripe/openapi' // stripe/openapi uses 'latest/openapi.spec3.sdk.json' for recent specs and // 'openapi/spec3.json' for historic ones. const SPEC_PATHS = ['latest/openapi.spec3.sdk.json', 'openapi/spec3.json'] -function git(...args) { +function git(...gitArgs) { // maxBuffer: Stripe specs are ~10 MB each; default 1 MB would silently truncate/throw. - return execFileSync('git', ['-C', repoDir, ...args], { + return execFileSync('git', ['-C', repoDir, ...gitArgs], { encoding: 'utf8', maxBuffer: 50 * 1024 * 1024, }) @@ -52,8 +68,11 @@ if (!existsSync(join(repoDir, '.git'))) { console.error(`Using pre-cloned repo at ${repoDir}`) } -// Walk commits newest→oldest, collect all unique API versions. -console.error('Collecting all spec versions...') +console.error( + versionFilter + ? `Collecting versions: ${[...versionFilter].join(', ')}` + : 'Collecting all spec versions...' +) const commits = git('log', '--format=%H', '--', ...SPEC_PATHS) .trim() .split('\n') @@ -65,6 +84,9 @@ const seen = new Map() // version -> filename const seenBlobs = new Set() for (const commit of commits) { + // Stop early if we've found all requested versions + if (versionFilter && versionFilter.size === seen.size) break + let blobSha for (const specPath of SPEC_PATHS) { let ls @@ -94,32 +116,48 @@ for (const commit of commits) { continue } if (!version || seen.has(version)) continue + if (versionFilter && !versionFilter.has(version)) continue writeFileSync(join(outputDir, `${version}.json`), raw) seen.set(version, `${version}.json`) console.error(` ${version}`) } +if (versionFilter) { + const missing = [...versionFilter].filter((v) => !seen.has(v)) + if (missing.length > 0) { + console.error(`Warning: versions not found in stripe/openapi history: ${missing.join(', ')}`) + } +} + writeFileSync( join(outputDir, 'manifest.json'), JSON.stringify(Object.fromEntries(seen), null, 2) + '\n' ) -// Update src/versions.ts with the full version list so build-time generation -// picks up all known versions without a network fetch. -const allVersions = [...seen.keys()].sort().reverse() // newest first -const bundledFiles = (await import('node:fs')) - .readdirSync(join(__dirname, '..', 'oas')) - .filter((f) => f.endsWith('.json') && f !== 'manifest.json' && f !== 'index.html') -const bundled = bundledFiles[0]?.replace(/\.json$/, '') ?? allVersions[0] -const lines = allVersions.map((v) => ` '${v}',`).join('\n') +// Update src/versions.ts — for a version-filtered run, only update BUNDLED_API_VERSION +// (the oas/ spec files); for a full run, update the complete SUPPORTED_API_VERSIONS list. +const oasDir = join(__dirname, '..', 'oas') +const bundledFiles = readdirSync(oasDir).filter( + (f) => f.endsWith('.json') && f !== 'manifest.json' && f !== 'index.html' +) +const bundled = bundledFiles[0]?.replace(/\.json$/, '') ?? [...seen.keys()].sort().reverse()[0] + +const supportedVersions = versionFilter + ? // Filtered run: read existing versions from src/versions.ts to preserve the full list + await import('../src/versions.ts', { with: { type: 'module' } }) + .then((m) => [...m.SUPPORTED_API_VERSIONS]) + .catch(() => [...seen.keys()]) + : [...seen.keys()].sort().reverse() + +const lines = supportedVersions.map((v) => ` '${v}',`).join('\n') const versionsFile = join(__dirname, '..', 'src', 'versions.ts') writeFileSync( versionsFile, - `// Generated by scripts/generate-all-specs.mjs — do not edit manually. + `// Generated by scripts/generate-specs.mjs — do not edit manually. // BUNDLED_API_VERSION: the single spec file in packages/openapi/oas/. // SUPPORTED_API_VERSIONS: all versions discovered from github.com/stripe/openapi. -// Re-run scripts/generate-all-specs.mjs to pick up newly published API versions. +// Re-run scripts/generate-specs.mjs to pick up newly published API versions. /** The single Stripe API spec bundled in this package (served without network). */ export const BUNDLED_API_VERSION = '${bundled}' as const @@ -130,14 +168,15 @@ ${lines} ] as const satisfies readonly string[] ` ) -console.error(`Updated src/versions.ts (${allVersions.length} versions, bundled: ${bundled})`) - -// Generate an index page so https://stripe-sync.dev/stripe-api-specs/ is browsable -const versions = [...seen.keys()].sort().reverse() -const rows = versions.map((v) => `
  • ${v}
  • `).join('\n') -writeFileSync( - join(outputDir, 'index.html'), - ` +console.error(`Updated src/versions.ts (${supportedVersions.length} versions, bundled: ${bundled})`) + +// Generate an index page (CDN use) — only when writing all versions +if (!versionFilter) { + const versions = [...seen.keys()].sort().reverse() + const rows = versions.map((v) => `
  • ${v}
  • `).join('\n') + writeFileSync( + join(outputDir, 'index.html'), + ` @@ -168,6 +207,7 @@ ${rows} ` -) + ) +} -console.error(`\nDone: ${seen.size} spec versions`) +console.error(`\nDone: ${seen.size} spec version(s)`) diff --git a/packages/openapi/scripts/generate-versions.mjs b/packages/openapi/scripts/generate-versions.mjs index 6ef6c23a..a435811b 100644 --- a/packages/openapi/scripts/generate-versions.mjs +++ b/packages/openapi/scripts/generate-versions.mjs @@ -35,7 +35,7 @@ writeFileSync( outFile, `// Generated by scripts/generate-versions.mjs — do not edit manually. // For the full list of all published Stripe API versions, run: -// node scripts/generate-all-specs.mjs +// node scripts/generate-specs.mjs /** The single Stripe API spec bundled in this package (served without network). */ export const BUNDLED_API_VERSION = '${bundled}' as const From 02d09665d743373159cb1d279c7f3144c36239c0 Mon Sep 17 00:00:00 2001 From: Tony Xiao Date: Sun, 5 Apr 2026 01:01:47 -0700 Subject: [PATCH 09/13] fix(openapi): resolve [object Object] content key in header params - Fix getParamContentType() to extract the media type string from the content object rather than returning the whole object, which coerced to '[object Object]' as a property key - Rename x-state header to x-source-state for clarity - Add .meta({ id: 'SourceState' }) to SyncState for a named OAS schema - Regenerate openapi.json / openapi.d.ts Co-Authored-By: Claude Sonnet 4.6 Committed-By-Agent: claude --- apps/engine/src/__generated__/openapi.d.ts | 18 +++- apps/engine/src/__generated__/openapi.json | 102 +++++++----------- apps/engine/src/__tests__/openapi.test.ts | 50 +++++++++ apps/engine/src/api/app.ts | 8 +- apps/engine/src/lib/remote-engine.ts | 2 +- .../src/__tests__/json-content-header.test.ts | 6 +- packages/hono-zod-openapi/src/index.ts | 10 +- packages/protocol/src/protocol.ts | 18 ++-- 8 files changed, 131 insertions(+), 83 deletions(-) diff --git a/apps/engine/src/__generated__/openapi.d.ts b/apps/engine/src/__generated__/openapi.d.ts index 5b607e78..398a995a 100644 --- a/apps/engine/src/__generated__/openapi.d.ts +++ b/apps/engine/src/__generated__/openapi.d.ts @@ -669,6 +669,16 @@ export interface components { backfill_limit?: number; }[]; }; + SourceState: { + /** @description Per-stream checkpoint data, keyed by stream name. */ + streams: { + [key: string]: unknown; + }; + /** @description Sync-wide state shared across all streams (e.g. a global events cursor). */ + global: { + [key: string]: unknown; + }; + }; }; responses: never; parameters: never; @@ -851,8 +861,8 @@ export interface operations { header: { /** @description JSON-encoded PipelineConfig */ "x-pipeline": string; - /** @description JSON-encoded SyncState ({ streams, global }) or legacy flat per-stream state */ - "x-state"?: string; + /** @description JSON-encoded SourceState ({ streams, global }) or legacy flat per-stream state */ + "x-source-state"?: string; }; path?: never; cookie?: never; @@ -934,8 +944,8 @@ export interface operations { header: { /** @description JSON-encoded PipelineConfig */ "x-pipeline": string; - /** @description JSON-encoded SyncState ({ streams, global }) or legacy flat per-stream state */ - "x-state"?: string; + /** @description JSON-encoded SourceState ({ streams, global }) or legacy flat per-stream state */ + "x-source-state"?: string; }; path?: never; cookie?: never; diff --git a/apps/engine/src/__generated__/openapi.json b/apps/engine/src/__generated__/openapi.json index 29476cd9..cef344fd 100644 --- a/apps/engine/src/__generated__/openapi.json +++ b/apps/engine/src/__generated__/openapi.json @@ -61,7 +61,7 @@ "required": true, "description": "JSON-encoded PipelineConfig", "content": { - "[object Object]": { + "application/json": { "schema": { "$ref": "#/components/schemas/PipelineConfig" } @@ -115,7 +115,7 @@ "required": true, "description": "JSON-encoded PipelineConfig", "content": { - "[object Object]": { + "application/json": { "schema": { "$ref": "#/components/schemas/PipelineConfig" } @@ -169,7 +169,7 @@ "required": true, "description": "JSON-encoded PipelineConfig", "content": { - "[object Object]": { + "application/json": { "schema": { "$ref": "#/components/schemas/PipelineConfig" } @@ -223,7 +223,7 @@ "required": true, "description": "JSON-encoded source config ({ type, ...config })", "content": { - "[object Object]": { + "application/json": { "schema": { "type": "object", "properties": { @@ -309,7 +309,7 @@ "required": true, "description": "JSON-encoded PipelineConfig", "content": { - "[object Object]": { + "application/json": { "schema": { "$ref": "#/components/schemas/PipelineConfig" } @@ -318,36 +318,13 @@ }, { "in": "header", - "name": "x-state", + "name": "x-source-state", "required": false, - "description": "JSON-encoded SyncState ({ streams, global }) or legacy flat per-stream state", + "description": "JSON-encoded SourceState ({ streams, global }) or legacy flat per-stream state", "content": { - "[object Object]": { + "application/json": { "schema": { - "type": "object", - "properties": { - "streams": { - "type": "object", - "propertyNames": { - "type": "string" - }, - "additionalProperties": {}, - "description": "Per-stream checkpoint data, keyed by stream name." - }, - "global": { - "type": "object", - "propertyNames": { - "type": "string" - }, - "additionalProperties": {}, - "description": "Sync-wide state shared across all streams (e.g. a global events cursor)." - } - }, - "required": [ - "streams", - "global" - ], - "additionalProperties": false + "$ref": "#/components/schemas/SourceState" } } } @@ -409,7 +386,7 @@ "required": true, "description": "JSON-encoded PipelineConfig", "content": { - "[object Object]": { + "application/json": { "schema": { "$ref": "#/components/schemas/PipelineConfig" } @@ -496,7 +473,7 @@ "required": true, "description": "JSON-encoded PipelineConfig", "content": { - "[object Object]": { + "application/json": { "schema": { "$ref": "#/components/schemas/PipelineConfig" } @@ -505,36 +482,13 @@ }, { "in": "header", - "name": "x-state", + "name": "x-source-state", "required": false, - "description": "JSON-encoded SyncState ({ streams, global }) or legacy flat per-stream state", + "description": "JSON-encoded SourceState ({ streams, global }) or legacy flat per-stream state", "content": { - "[object Object]": { + "application/json": { "schema": { - "type": "object", - "properties": { - "streams": { - "type": "object", - "propertyNames": { - "type": "string" - }, - "additionalProperties": {}, - "description": "Per-stream checkpoint data, keyed by stream name." - }, - "global": { - "type": "object", - "propertyNames": { - "type": "string" - }, - "additionalProperties": {}, - "description": "Sync-wide state shared across all streams (e.g. a global events cursor)." - } - }, - "required": [ - "streams", - "global" - ], - "additionalProperties": false + "$ref": "#/components/schemas/SourceState" } } } @@ -2057,6 +2011,32 @@ "destination" ], "additionalProperties": false + }, + "SourceState": { + "type": "object", + "properties": { + "streams": { + "type": "object", + "propertyNames": { + "type": "string" + }, + "additionalProperties": {}, + "description": "Per-stream checkpoint data, keyed by stream name." + }, + "global": { + "type": "object", + "propertyNames": { + "type": "string" + }, + "additionalProperties": {}, + "description": "Sync-wide state shared across all streams (e.g. a global events cursor)." + } + }, + "required": [ + "streams", + "global" + ], + "additionalProperties": false } } } diff --git a/apps/engine/src/__tests__/openapi.test.ts b/apps/engine/src/__tests__/openapi.test.ts index 6681f1e8..b662c7fd 100644 --- a/apps/engine/src/__tests__/openapi.test.ts +++ b/apps/engine/src/__tests__/openapi.test.ts @@ -51,4 +51,54 @@ describe('Engine OpenAPI spec', () => { expect(schema, `${name} should not have $schema`).not.toHaveProperty('$schema') } }) + + it('has SourceState as a named component schema', async () => { + const spec = await getSpec() + expect(spec.components.schemas).toHaveProperty('SourceState') + const sourceState = spec.components.schemas['SourceState'] as Record + expect(sourceState.type).toBe('object') + expect(sourceState).toHaveProperty('properties') + const props = sourceState.properties as Record + expect(props).toHaveProperty('streams') + expect(props).toHaveProperty('global') + }) + + it('header params use application/json content key, never [object Object]', async () => { + const spec = await getSpec() + for (const [path, pathItem] of Object.entries(spec.paths ?? {})) { + for (const [method, op] of Object.entries(pathItem as Record)) { + const operation = op as { parameters?: Array> } | undefined + for (const param of operation?.parameters ?? []) { + const content = param.content as Record | undefined + if (content) { + expect( + Object.keys(content), + `${method.toUpperCase()} ${path} param "${param.name}" has [object Object] content key` + ).not.toContain('[object Object]') + } + } + } + } + }) + + it('x-source-state header uses application/json content with $ref to SourceState', async () => { + const spec = await getSpec() + const allParams: Array> = [] + for (const pathItem of Object.values(spec.paths ?? {})) { + for (const op of Object.values(pathItem as Record)) { + const operation = op as { parameters?: Array> } | undefined + allParams.push(...(operation?.parameters ?? [])) + } + } + const stateParams = allParams.filter((p) => p.name === 'x-source-state') + expect(stateParams.length).toBeGreaterThan(0) + for (const param of stateParams) { + expect(param.schema).toBeUndefined() + const content = param.content as Record> | undefined + expect(content?.['application/json']).toBeDefined() + expect(content?.['application/json']?.schema).toMatchObject({ + $ref: '#/components/schemas/SourceState', + }) + } + }) }) diff --git a/apps/engine/src/api/app.ts b/apps/engine/src/api/app.ts index 390e3921..e3849134 100644 --- a/apps/engine/src/api/app.ts +++ b/apps/engine/src/api/app.ts @@ -164,7 +164,7 @@ export async function createApp(resolver: ConnectorResolver) { .pipe(SyncState) .optional() .meta({ - description: 'JSON-encoded SyncState ({ streams, global }) or legacy flat per-stream state', + description: 'JSON-encoded SourceState ({ streams, global }) or legacy flat per-stream state', param: { content: { 'application/json': {} } }, }) @@ -181,7 +181,7 @@ export async function createApp(resolver: ConnectorResolver) { const sourceHeaders = z.object({ 'x-source': xSourceHeader }) const allSyncHeaders = z.object({ 'x-pipeline': xPipelineHeader, - 'x-state': xStateHeader, + 'x-source-state': xStateHeader, }) const syncQueryParams = z.object({ @@ -365,7 +365,7 @@ export async function createApp(resolver: ConnectorResolver) { }) app.openapi(pipelineReadRoute, async (c) => { const pipeline = c.req.valid('header')['x-pipeline'] - const state = c.req.valid('header')['x-state'] + const state = c.req.valid('header')['x-source-state'] const { state_limit, time_limit } = c.req.valid('query') const inputPresent = hasBody(c) const context = { path: '/pipeline_read', inputPresent, ...syncRequestContext(pipeline) } @@ -460,7 +460,7 @@ export async function createApp(resolver: ConnectorResolver) { }) app.openapi(pipelineSyncRoute, async (c) => { const pipeline = c.req.valid('header')['x-pipeline'] - const state = c.req.valid('header')['x-state'] + const state = c.req.valid('header')['x-source-state'] const { state_limit, time_limit } = c.req.valid('query') const input = hasBody(c) ? parseNdjsonStream(c.req.raw.body!) : undefined const output = engine.pipeline_sync(pipeline, { state, state_limit, time_limit }, input) diff --git a/apps/engine/src/lib/remote-engine.ts b/apps/engine/src/lib/remote-engine.ts index f80dbdc3..5b18a581 100644 --- a/apps/engine/src/lib/remote-engine.ts +++ b/apps/engine/src/lib/remote-engine.ts @@ -58,7 +58,7 @@ export function createRemoteEngine(engineUrl: string): Engine { function stateHeaders(opts?: SourceReadOptions): Record { const h: Record = {} if (opts?.state && Object.keys(opts.state).length > 0) { - h['x-state'] = JSON.stringify(opts.state) + h['x-source-state'] = JSON.stringify(opts.state) } return h } diff --git a/packages/hono-zod-openapi/src/__tests__/json-content-header.test.ts b/packages/hono-zod-openapi/src/__tests__/json-content-header.test.ts index b13370ad..c57729ec 100644 --- a/packages/hono-zod-openapi/src/__tests__/json-content-header.test.ts +++ b/packages/hono-zod-openapi/src/__tests__/json-content-header.test.ts @@ -41,7 +41,7 @@ function createTestApp() { .string() .transform(jsonParse) .pipe(ItemSchema) - .meta({ param: { content: 'application/json' } }), + .meta({ param: { content: { 'application/json': {} } } }), }), }, responses: { @@ -120,7 +120,7 @@ describe('JSON content header — runtime', () => { .transform(jsonParse) .pipe(ItemSchema) .optional() - .meta({ param: { content: 'application/json' } }), + .meta({ param: { content: { 'application/json': {} } } }), }), }, responses: { 200: { description: 'ok' } }, @@ -157,7 +157,7 @@ describe('JSON content header — runtime', () => { .string() .transform(jsonParse) .pipe(ItemSchema) - .meta({ param: { content: 'application/json' } }), + .meta({ param: { content: { 'application/json': {} } } }), }), }, responses: { 200: { description: 'ok' } }, diff --git a/packages/hono-zod-openapi/src/index.ts b/packages/hono-zod-openapi/src/index.ts index 4b314a99..4790ae70 100644 --- a/packages/hono-zod-openapi/src/index.ts +++ b/packages/hono-zod-openapi/src/index.ts @@ -150,13 +150,19 @@ function getPipeOutput(schema: AnyZod): AnyZod | undefined { /** * Check if a header field schema has the JSON content meta annotation. - * Returns the content media type (e.g. 'application/json') or undefined. + * Returns the content media type string (e.g. 'application/json') or undefined. + * + * Accepts either object form `.meta({ param: { content: { 'application/json': {} } } })` + * (the OAS-typed form, preferred) or legacy string form `.meta({ param: { content: 'application/json' } })`. */ function getParamContentType(schema: AnyZod): string | undefined { // eslint-disable-next-line @typescript-eslint/no-explicit-any const meta = z.globalRegistry.get(schema as any) as Record | undefined // eslint-disable-next-line @typescript-eslint/no-explicit-any - return (meta?.param as any)?.content as string | undefined + const content = (meta?.param as any)?.content + if (typeof content === 'string') return content + if (content && typeof content === 'object') return Object.keys(content)[0] + return undefined } /** diff --git a/packages/protocol/src/protocol.ts b/packages/protocol/src/protocol.ts index 9a08e2d8..153e9bd5 100644 --- a/packages/protocol/src/protocol.ts +++ b/packages/protocol/src/protocol.ts @@ -17,14 +17,16 @@ import { z } from 'zod' * `streams` holds per-stream checkpoints (existing behavior). * `global` holds sync-wide state shared across all streams (new). */ -export const SyncState = z.object({ - streams: z - .record(z.string(), z.unknown()) - .describe('Per-stream checkpoint data, keyed by stream name.'), - global: z - .record(z.string(), z.unknown()) - .describe('Sync-wide state shared across all streams (e.g. a global events cursor).'), -}) +export const SyncState = z + .object({ + streams: z + .record(z.string(), z.unknown()) + .describe('Per-stream checkpoint data, keyed by stream name.'), + global: z + .record(z.string(), z.unknown()) + .describe('Sync-wide state shared across all streams (e.g. a global events cursor).'), + }) + .meta({ id: 'SourceState' }) export type SyncState = z.infer // MARK: - Data model From 21114f0636bd7e2c18b791cfce4fe43dc4458562 Mon Sep 17 00:00:00 2001 From: Tony Xiao Date: Sun, 5 Apr 2026 01:14:45 -0700 Subject: [PATCH 10/13] refactor(state): activity owns state merge, workflow holds full copy MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Activities now receive the current full SyncState as initial state and return the updated full SyncState. Workflows simply assign result.state directly — no merge logic in the workflow layer. - drainMessages accepts initialState and starts from it - syncImmediate passes readOpts.state to drainMessages - readGoogleSheetsIntoQueue starts state from readOpts.state - writeGoogleSheetsFromQueue accepts opts.state and starts from it - All workflows: syncState = result.state (no spread merge) Co-Authored-By: Claude Sonnet 4.6 Committed-By-Agent: claude --- apps/service/src/temporal/activities/_shared.ts | 10 ++++++++-- .../activities/read-google-sheets-into-queue.ts | 5 ++++- .../src/temporal/activities/sync-immediate.ts | 3 ++- .../activities/write-google-sheets-from-queue.ts | 13 +++++++++++-- .../workflows/backfill-pipeline-workflow.ts | 5 +---- .../workflows/google-sheet-pipeline-workflow.ts | 11 +++-------- .../src/temporal/workflows/pipeline-workflow.ts | 5 +---- 7 files changed, 30 insertions(+), 22 deletions(-) diff --git a/apps/service/src/temporal/activities/_shared.ts b/apps/service/src/temporal/activities/_shared.ts index 215f7bbd..0e86254e 100644 --- a/apps/service/src/temporal/activities/_shared.ts +++ b/apps/service/src/temporal/activities/_shared.ts @@ -127,7 +127,10 @@ export function collectError(message: Message): RunResult['errors'][number] | nu return null } -export async function drainMessages(stream: AsyncIterable): Promise<{ +export async function drainMessages( + stream: AsyncIterable, + initialState?: SyncState +): Promise<{ errors: RunResult['errors'] state: SyncState records: Message[] @@ -136,7 +139,10 @@ export async function drainMessages(stream: AsyncIterable): Promise<{ eof?: { reason: string } }> { const errors: RunResult['errors'] = [] - const state: SyncState = { streams: {}, global: {} } + const state: SyncState = { + streams: { ...initialState?.streams }, + global: { ...initialState?.global }, + } const records: Message[] = [] let sourceConfig: Record | undefined let destConfig: Record | undefined diff --git a/apps/service/src/temporal/activities/read-google-sheets-into-queue.ts b/apps/service/src/temporal/activities/read-google-sheets-into-queue.ts index 7b68fd90..3d918a6a 100644 --- a/apps/service/src/temporal/activities/read-google-sheets-into-queue.ts +++ b/apps/service/src/temporal/activities/read-google-sheets-into-queue.ts @@ -44,7 +44,10 @@ export function createReadGoogleSheetsIntoQueueActivity(context: ActivitiesConte const input = inputArr?.length ? asIterable(inputArr) : undefined const queued: Message[] = [] - const state: import('@stripe/sync-engine').SyncState = { streams: {}, global: {} } + const state: import('@stripe/sync-engine').SyncState = { + streams: { ...readOpts.state?.streams }, + global: { ...readOpts.state?.global }, + } const errors: RunResult['errors'] = [] let seen = 0 diff --git a/apps/service/src/temporal/activities/sync-immediate.ts b/apps/service/src/temporal/activities/sync-immediate.ts index d9959058..bbdf1531 100644 --- a/apps/service/src/temporal/activities/sync-immediate.ts +++ b/apps/service/src/temporal/activities/sync-immediate.ts @@ -14,7 +14,8 @@ export function createSyncImmediateActivity(context: ActivitiesContext) { const { input: inputArr, ...readOpts } = opts ?? {} const input = inputArr?.length ? asIterable(inputArr) : undefined const { errors, state, sourceConfig, destConfig, eof } = await drainMessages( - context.engine.pipeline_sync(config, readOpts, input) + context.engine.pipeline_sync(config, readOpts, input), + readOpts.state ) // Full replacement — connector emits the complete updated config if (sourceConfig) { diff --git a/apps/service/src/temporal/activities/write-google-sheets-from-queue.ts b/apps/service/src/temporal/activities/write-google-sheets-from-queue.ts index 73bbae67..2b8d735b 100644 --- a/apps/service/src/temporal/activities/write-google-sheets-from-queue.ts +++ b/apps/service/src/temporal/activities/write-google-sheets-from-queue.ts @@ -116,6 +116,7 @@ export function createWriteGoogleSheetsFromQueueActivity(context: ActivitiesCont maxBatch?: number rowIndex?: Record> catalog?: ConfiguredCatalog + state?: import('@stripe/sync-engine').SyncState } ): Promise< RunResult & { @@ -128,8 +129,13 @@ export function createWriteGoogleSheetsFromQueueActivity(context: ActivitiesCont const maxBatch = opts?.maxBatch ?? 50 const queued = await context.consumeQueueBatch(pipelineId, maxBatch) + const initialState: import('@stripe/sync-engine').SyncState = { + streams: { ...opts?.state?.streams }, + global: { ...opts?.state?.global }, + } + if (queued.length === 0) { - return { errors: [], state: { streams: {}, global: {} }, written: 0, rowAssignments: {} } + return { errors: [], state: initialState, written: 0, rowAssignments: {} } } const pipeline = await context.pipelineStore.get(pipelineId) @@ -146,7 +152,10 @@ export function createWriteGoogleSheetsFromQueueActivity(context: ActivitiesCont const filteredCatalog = augmentGoogleSheetsCatalog(opts.catalog) const destination = createGoogleSheetsDestination() const errors: RunResult['errors'] = [] - const state: import('@stripe/sync-engine').SyncState = { streams: {}, global: {} } + const state: import('@stripe/sync-engine').SyncState = { + streams: { ...initialState.streams }, + global: { ...initialState.global }, + } const rowAssignments: Record> = {} const input = enforceCatalog(filteredCatalog)( asIterable(writeBatch) diff --git a/apps/service/src/temporal/workflows/backfill-pipeline-workflow.ts b/apps/service/src/temporal/workflows/backfill-pipeline-workflow.ts index 91511789..416c47e6 100644 --- a/apps/service/src/temporal/workflows/backfill-pipeline-workflow.ts +++ b/apps/service/src/temporal/workflows/backfill-pipeline-workflow.ts @@ -61,10 +61,7 @@ export async function backfillPipelineWorkflow( state_limit: 100, time_limit: 10, }) - syncState = { - streams: { ...syncState.streams, ...result.state.streams }, - global: { ...syncState.global, ...result.state.global }, - } + syncState = result.state backfillComplete = result.eof?.reason === 'complete' await maybeContinueAsNew() } diff --git a/apps/service/src/temporal/workflows/google-sheet-pipeline-workflow.ts b/apps/service/src/temporal/workflows/google-sheet-pipeline-workflow.ts index e9b4f028..af96aacd 100644 --- a/apps/service/src/temporal/workflows/google-sheet-pipeline-workflow.ts +++ b/apps/service/src/temporal/workflows/google-sheet-pipeline-workflow.ts @@ -128,10 +128,7 @@ export async function googleSheetPipelineWorkflow( catalog, }) if (count > 0) pendingWrites = true - readState = { - streams: { ...readState.streams, ...nextReadState.streams }, - global: { ...readState.global, ...nextReadState.global }, - } + readState = nextReadState readComplete = deepEqual(readState, before) await tickIteration() continue @@ -152,12 +149,10 @@ export async function googleSheetPipelineWorkflow( maxBatch: 50, rowIndex, catalog, + state: sourceState, }) pendingWrites = result.written > 0 - sourceState = { - streams: { ...sourceState.streams, ...result.state.streams }, - global: { ...sourceState.global, ...result.state.global }, - } + sourceState = result.state for (const [stream, assignments] of Object.entries(result.rowAssignments)) { rowIndex[stream] ??= {} Object.assign(rowIndex[stream], assignments) diff --git a/apps/service/src/temporal/workflows/pipeline-workflow.ts b/apps/service/src/temporal/workflows/pipeline-workflow.ts index 4d541540..7725e2fb 100644 --- a/apps/service/src/temporal/workflows/pipeline-workflow.ts +++ b/apps/service/src/temporal/workflows/pipeline-workflow.ts @@ -86,10 +86,7 @@ export async function pipelineWorkflow( state_limit: 100, time_limit: 10, }) - syncState = { - streams: { ...syncState.streams, ...result.state.streams }, - global: { ...syncState.global, ...result.state.global }, - } + syncState = result.state readComplete = result.eof?.reason === 'complete' } From 3c8552fce818b74389a7b54fc6dcab07995fee05 Mon Sep 17 00:00:00 2001 From: Tony Xiao Date: Sun, 5 Apr 2026 02:00:39 -0700 Subject: [PATCH 11/13] =?UTF-8?q?refactor(protocol):=20rename=20SyncState?= =?UTF-8?q?=20=E2=86=92=20SourceState,=20backfillComplete=20=E2=86=92=20re?= =?UTF-8?q?concileComplete?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Rename SyncState Zod schema and type to SourceState across all packages (.meta({ id: 'SourceState' }) was already correct) - Rename backfillComplete → reconcileComplete in backfill workflow - Add reconcileComplete to BackfillPipelineWorkflowOpts so it survives continueAsNew Co-Authored-By: Claude Sonnet 4.6 Committed-By-Agent: claude --- apps/engine/src/api/app.ts | 4 ++-- apps/engine/src/lib/engine.ts | 4 ++-- apps/engine/src/lib/state-store.ts | 6 ++--- .../src/temporal/activities/_shared.ts | 10 ++++----- .../read-google-sheets-into-queue.ts | 4 ++-- .../write-google-sheets-from-queue.ts | 6 ++--- .../service/src/temporal/workflows/_shared.ts | 4 ++-- .../workflows/backfill-pipeline-workflow.ts | 22 +++++++++++-------- .../google-sheet-pipeline-workflow.ts | 12 +++++----- .../temporal/workflows/pipeline-workflow.ts | 8 +++---- packages/protocol/src/__tests__/state.test.ts | 10 ++++----- packages/protocol/src/cli.ts | 2 +- packages/protocol/src/protocol.ts | 10 ++++----- packages/state-postgres/src/state-store.ts | 6 ++--- 14 files changed, 56 insertions(+), 52 deletions(-) diff --git a/apps/engine/src/api/app.ts b/apps/engine/src/api/app.ts index e3849134..0701185c 100644 --- a/apps/engine/src/api/app.ts +++ b/apps/engine/src/api/app.ts @@ -20,7 +20,7 @@ import { CheckOutput as CheckOutputSchema, SetupOutput as SetupOutputSchema, TeardownOutput as TeardownOutputSchema, - SyncState, + SourceState, } from '@stripe/sync-protocol' // Raw $refs for NDJSON content schemas — avoids zod-openapi generating *Output @@ -161,7 +161,7 @@ export async function createApp(resolver: ConnectorResolver) { // Accept both new format { streams, global } and old flat format { stream_name: data }. 'streams' in obj && 'global' in obj ? obj : { streams: obj, global: {} } ) - .pipe(SyncState) + .pipe(SourceState) .optional() .meta({ description: 'JSON-encoded SourceState ({ streams, global }) or legacy flat per-stream state', diff --git a/apps/engine/src/lib/engine.ts b/apps/engine/src/lib/engine.ts index 8bf931cd..5c9c63df 100644 --- a/apps/engine/src/lib/engine.ts +++ b/apps/engine/src/lib/engine.ts @@ -11,7 +11,7 @@ import { ConfiguredStream, ConfiguredCatalog, SyncOutput, - SyncState, + SourceState, RecordMessage, SourceStateMessage, collectFirst, @@ -29,7 +29,7 @@ import { logger } from '../logger.js' export const SourceReadOptions = z.object({ /** Aggregate state (per-stream + global) carried in from the previous sync run. */ - state: SyncState.optional(), + state: SourceState.optional(), /** Stop after emitting this many state messages (useful for paging). */ state_limit: z.number().int().positive().optional(), /** Wall-clock time limit in seconds; the stream stops after this duration. */ diff --git a/apps/engine/src/lib/state-store.ts b/apps/engine/src/lib/state-store.ts index a11fd1df..98a99169 100644 --- a/apps/engine/src/lib/state-store.ts +++ b/apps/engine/src/lib/state-store.ts @@ -1,10 +1,10 @@ -import type { SyncState } from '@stripe/sync-protocol' +import type { SourceState } from '@stripe/sync-protocol' // MARK: - Interface /** Pipeline-scoped state store — load prior state and persist checkpoints. */ export interface StateStore { - get(): Promise + get(): Promise set(stream: string, data: unknown): Promise setGlobal(data: unknown): Promise } @@ -15,7 +15,7 @@ export interface StateStore { * A StateStore that returns the provided initial state (if any) and discards all writes. * Use when the caller manages state externally (e.g., via HTTP headers or workflow state). */ -export function readonlyStateStore(state?: SyncState): StateStore { +export function readonlyStateStore(state?: SourceState): StateStore { return { async get() { return state diff --git a/apps/service/src/temporal/activities/_shared.ts b/apps/service/src/temporal/activities/_shared.ts index 0e86254e..b0ba710e 100644 --- a/apps/service/src/temporal/activities/_shared.ts +++ b/apps/service/src/temporal/activities/_shared.ts @@ -1,5 +1,5 @@ import { heartbeat } from '@temporalio/activity' -import type { Message, Engine, SyncState } from '@stripe/sync-engine' +import type { Message, Engine, SourceState } from '@stripe/sync-engine' import { createRemoteEngine } from '@stripe/sync-engine' import { Kafka } from 'kafkajs' import type { Producer } from 'kafkajs' @@ -105,7 +105,7 @@ export function createActivitiesContext(opts: { export interface RunResult { errors: Array<{ message: string; failure_type?: string; stream?: string }> - state: SyncState + state: SourceState } export async function* asIterable(items: T[]): AsyncIterable { @@ -129,17 +129,17 @@ export function collectError(message: Message): RunResult['errors'][number] | nu export async function drainMessages( stream: AsyncIterable, - initialState?: SyncState + initialState?: SourceState ): Promise<{ errors: RunResult['errors'] - state: SyncState + state: SourceState records: Message[] sourceConfig?: Record destConfig?: Record eof?: { reason: string } }> { const errors: RunResult['errors'] = [] - const state: SyncState = { + const state: SourceState = { streams: { ...initialState?.streams }, global: { ...initialState?.global }, } diff --git a/apps/service/src/temporal/activities/read-google-sheets-into-queue.ts b/apps/service/src/temporal/activities/read-google-sheets-into-queue.ts index 3d918a6a..6e89fa2a 100644 --- a/apps/service/src/temporal/activities/read-google-sheets-into-queue.ts +++ b/apps/service/src/temporal/activities/read-google-sheets-into-queue.ts @@ -35,7 +35,7 @@ export function createReadGoogleSheetsIntoQueueActivity(context: ActivitiesConte input?: SourceInput[] catalog?: ConfiguredCatalog } - ): Promise<{ count: number; state: import('@stripe/sync-engine').SyncState }> { + ): Promise<{ count: number; state: import('@stripe/sync-engine').SourceState }> { if (!context.kafkaBroker) throw new Error('kafkaBroker is required for Google Sheets workflow') const pipeline = await context.pipelineStore.get(pipelineId) @@ -44,7 +44,7 @@ export function createReadGoogleSheetsIntoQueueActivity(context: ActivitiesConte const input = inputArr?.length ? asIterable(inputArr) : undefined const queued: Message[] = [] - const state: import('@stripe/sync-engine').SyncState = { + const state: import('@stripe/sync-engine').SourceState = { streams: { ...readOpts.state?.streams }, global: { ...readOpts.state?.global }, } diff --git a/apps/service/src/temporal/activities/write-google-sheets-from-queue.ts b/apps/service/src/temporal/activities/write-google-sheets-from-queue.ts index 2b8d735b..22bb0fec 100644 --- a/apps/service/src/temporal/activities/write-google-sheets-from-queue.ts +++ b/apps/service/src/temporal/activities/write-google-sheets-from-queue.ts @@ -116,7 +116,7 @@ export function createWriteGoogleSheetsFromQueueActivity(context: ActivitiesCont maxBatch?: number rowIndex?: Record> catalog?: ConfiguredCatalog - state?: import('@stripe/sync-engine').SyncState + state?: import('@stripe/sync-engine').SourceState } ): Promise< RunResult & { @@ -129,7 +129,7 @@ export function createWriteGoogleSheetsFromQueueActivity(context: ActivitiesCont const maxBatch = opts?.maxBatch ?? 50 const queued = await context.consumeQueueBatch(pipelineId, maxBatch) - const initialState: import('@stripe/sync-engine').SyncState = { + const initialState: import('@stripe/sync-engine').SourceState = { streams: { ...opts?.state?.streams }, global: { ...opts?.state?.global }, } @@ -152,7 +152,7 @@ export function createWriteGoogleSheetsFromQueueActivity(context: ActivitiesCont const filteredCatalog = augmentGoogleSheetsCatalog(opts.catalog) const destination = createGoogleSheetsDestination() const errors: RunResult['errors'] = [] - const state: import('@stripe/sync-engine').SyncState = { + const state: import('@stripe/sync-engine').SourceState = { streams: { ...initialState.streams }, global: { ...initialState.global }, } diff --git a/apps/service/src/temporal/workflows/_shared.ts b/apps/service/src/temporal/workflows/_shared.ts index 2f3199e8..395c66c6 100644 --- a/apps/service/src/temporal/workflows/_shared.ts +++ b/apps/service/src/temporal/workflows/_shared.ts @@ -1,7 +1,7 @@ import { defineQuery, defineSignal, proxyActivities } from '@temporalio/workflow' import type { SyncActivities } from '../activities/index.js' -import type { SyncState } from '@stripe/sync-protocol' +import type { SourceState } from '@stripe/sync-protocol' import { retryPolicy } from '../../lib/utils.js' export interface WorkflowStatus { @@ -18,7 +18,7 @@ export const updateSignal = defineSignal<[{ paused?: boolean }]>('update') export const deleteSignal = defineSignal('delete') export const statusQuery = defineQuery('status') -export const stateQuery = defineQuery('state') +export const stateQuery = defineQuery('state') export const { setup, teardown } = proxyActivities({ startToCloseTimeout: '2m', diff --git a/apps/service/src/temporal/workflows/backfill-pipeline-workflow.ts b/apps/service/src/temporal/workflows/backfill-pipeline-workflow.ts index 416c47e6..170e9083 100644 --- a/apps/service/src/temporal/workflows/backfill-pipeline-workflow.ts +++ b/apps/service/src/temporal/workflows/backfill-pipeline-workflow.ts @@ -8,13 +8,14 @@ import { updateSignal, WorkflowStatus, } from './_shared.js' -import type { SyncState } from '@stripe/sync-protocol' +import type { SourceState } from '@stripe/sync-protocol' import { CONTINUE_AS_NEW_THRESHOLD } from '../../lib/utils.js' const ONE_WEEK_MS = 7 * 24 * 60 * 60 * 1000 export interface BackfillPipelineWorkflowOpts { - state?: SyncState + state?: SourceState + reconcileComplete?: boolean } export async function backfillPipelineWorkflow( @@ -24,8 +25,8 @@ export async function backfillPipelineWorkflow( let paused = false let deleted = false let iteration = 0 - let syncState: SyncState = opts?.state ?? { streams: {}, global: {} } - let backfillComplete = false + let syncState: SourceState = opts?.state ?? { streams: {}, global: {} } + let reconcileComplete: boolean = opts?.reconcileComplete ?? false setHandler(updateSignal, (patch) => { if (patch.paused !== undefined) paused = patch.paused @@ -35,11 +36,14 @@ export async function backfillPipelineWorkflow( }) setHandler(statusQuery, (): WorkflowStatus => ({ phase: 'running', paused, iteration })) - setHandler(stateQuery, (): SyncState => syncState) + setHandler(stateQuery, (): SourceState => syncState) async function maybeContinueAsNew() { if (++iteration >= CONTINUE_AS_NEW_THRESHOLD) { - await continueAsNew(pipelineId, { state: syncState }) + await continueAsNew(pipelineId, { + state: syncState, + reconcileComplete, + }) } } @@ -49,10 +53,10 @@ export async function backfillPipelineWorkflow( continue } - if (backfillComplete) { + if (reconcileComplete) { // Idle — wait up to one week; timeout means recon is due. const timedOut = !(await condition(() => paused || deleted, ONE_WEEK_MS)) - if (timedOut) backfillComplete = false + if (timedOut) reconcileComplete = false continue } @@ -62,7 +66,7 @@ export async function backfillPipelineWorkflow( time_limit: 10, }) syncState = result.state - backfillComplete = result.eof?.reason === 'complete' + reconcileComplete = result.eof?.reason === 'complete' await maybeContinueAsNew() } } diff --git a/apps/service/src/temporal/workflows/google-sheet-pipeline-workflow.ts b/apps/service/src/temporal/workflows/google-sheet-pipeline-workflow.ts index af96aacd..d7f117e6 100644 --- a/apps/service/src/temporal/workflows/google-sheet-pipeline-workflow.ts +++ b/apps/service/src/temporal/workflows/google-sheet-pipeline-workflow.ts @@ -1,5 +1,5 @@ import { condition, continueAsNew, setHandler, sleep } from '@temporalio/workflow' -import type { ConfiguredCatalog, SyncState } from '@stripe/sync-engine' +import type { ConfiguredCatalog, SourceState } from '@stripe/sync-engine' import { deleteSignal, @@ -19,8 +19,8 @@ import { CONTINUE_AS_NEW_THRESHOLD, deepEqual, EVENT_BATCH_SIZE } from '../../li export interface GoogleSheetPipelineWorkflowOpts { phase?: string - sourceState?: SyncState - readState?: SyncState + sourceState?: SourceState + readState?: SourceState rowIndex?: RowIndex catalog?: ConfiguredCatalog pendingWrites?: boolean @@ -37,8 +37,8 @@ export async function googleSheetPipelineWorkflow( let deleted = false const inputQueue: unknown[] = [...(opts?.inputQueue ?? [])] let iteration = 0 - let sourceState: SyncState = opts?.sourceState ?? { streams: {}, global: {} } - let readState: SyncState = opts?.readState ?? { + let sourceState: SourceState = opts?.sourceState ?? { streams: {}, global: {} } + let readState: SourceState = opts?.readState ?? { streams: { ...sourceState.streams }, global: { ...sourceState.global }, } @@ -70,7 +70,7 @@ export async function googleSheetPipelineWorkflow( iteration, }) ) - setHandler(stateQuery, (): SyncState => sourceState) + setHandler(stateQuery, (): SourceState => sourceState) async function waitWhilePaused() { await condition(() => !paused || deleted) diff --git a/apps/service/src/temporal/workflows/pipeline-workflow.ts b/apps/service/src/temporal/workflows/pipeline-workflow.ts index 7725e2fb..27da4aad 100644 --- a/apps/service/src/temporal/workflows/pipeline-workflow.ts +++ b/apps/service/src/temporal/workflows/pipeline-workflow.ts @@ -15,10 +15,10 @@ import { CONTINUE_AS_NEW_THRESHOLD, EVENT_BATCH_SIZE } from '../../lib/utils.js' const ONE_WEEK_MS = 7 * 24 * 60 * 60 * 1000 -import type { SyncState } from '@stripe/sync-protocol' +import type { SourceState } from '@stripe/sync-protocol' export interface PipelineWorkflowOpts { - state?: SyncState + state?: SourceState inputQueue?: unknown[] } @@ -30,7 +30,7 @@ export async function pipelineWorkflow( let deleted = false const inputQueue: unknown[] = [...(opts?.inputQueue ?? [])] let iteration = 0 - let syncState: SyncState = opts?.state ?? { streams: {}, global: {} } + let syncState: SourceState = opts?.state ?? { streams: {}, global: {} } let readComplete = false setHandler(stripeEventSignal, (event: unknown) => { @@ -44,7 +44,7 @@ export async function pipelineWorkflow( }) setHandler(statusQuery, (): WorkflowStatus => ({ phase: 'running', paused, iteration })) - setHandler(stateQuery, (): SyncState => syncState) + setHandler(stateQuery, (): SourceState => syncState) async function maybeContinueAsNew() { if (++iteration >= CONTINUE_AS_NEW_THRESHOLD) { diff --git a/packages/protocol/src/__tests__/state.test.ts b/packages/protocol/src/__tests__/state.test.ts index 243b1058..9cff23c5 100644 --- a/packages/protocol/src/__tests__/state.test.ts +++ b/packages/protocol/src/__tests__/state.test.ts @@ -1,17 +1,17 @@ import { describe, it, expect } from 'vitest' -import { SyncState, StatePayload, StreamStatePayload, GlobalStatePayload } from '../protocol.js' +import { SourceState, StatePayload, StreamStatePayload, GlobalStatePayload } from '../protocol.js' import { stateMsg, stateStream, stateData } from '../helpers.js' -describe('SyncState', () => { - it('parses a full SyncState', () => { - expect(SyncState.parse({ streams: { orders: { cursor: 1 } }, global: {} })).toEqual({ +describe('SourceState', () => { + it('parses a full SourceState', () => { + expect(SourceState.parse({ streams: { orders: { cursor: 1 } }, global: {} })).toEqual({ streams: { orders: { cursor: 1 } }, global: {}, }) }) it('requires both fields', () => { - expect(() => SyncState.parse({ streams: {} })).toThrow() + expect(() => SourceState.parse({ streams: {} })).toThrow() }) }) diff --git a/packages/protocol/src/cli.ts b/packages/protocol/src/cli.ts index 5196cded..9ca7e3a8 100644 --- a/packages/protocol/src/cli.ts +++ b/packages/protocol/src/cli.ts @@ -176,7 +176,7 @@ export function createConnectorCli( const config = parseConfig(args.config, opts?.configSchema) const catalog = parseJsonOrFile(args.catalog) const rawState = args.state ? parseJsonOrFile(args.state) : undefined - // Accept both SyncState { streams, global } and legacy flat state + // Accept both SourceState { streams, global } and legacy flat state const state = rawState ? 'streams' in rawState ? (rawState as { streams: Record; global: Record }) diff --git a/packages/protocol/src/protocol.ts b/packages/protocol/src/protocol.ts index 153e9bd5..65d1354d 100644 --- a/packages/protocol/src/protocol.ts +++ b/packages/protocol/src/protocol.ts @@ -17,7 +17,7 @@ import { z } from 'zod' * `streams` holds per-stream checkpoints (existing behavior). * `global` holds sync-wide state shared across all streams (new). */ -export const SyncState = z +export const SourceState = z .object({ streams: z .record(z.string(), z.unknown()) @@ -27,7 +27,7 @@ export const SyncState = z .describe('Sync-wide state shared across all streams (e.g. a global events cursor).'), }) .meta({ id: 'SourceState' }) -export type SyncState = z.infer +export type SourceState = z.infer // MARK: - Data model @@ -124,7 +124,7 @@ export const ConnectorSpecification = z .record(z.string(), z.unknown()) .optional() .describe( - 'JSON Schema for per-stream state (cursor/checkpoint shape). See also SyncState.global for sync-wide cursors.' + 'JSON Schema for per-stream state (cursor/checkpoint shape). See also SourceState.global for sync-wide cursors.' ), source_input: z .record(z.string(), z.unknown()) @@ -392,7 +392,7 @@ export type PipelineConfig = z.infer /** The full set of parsed sync request params: pipeline config + cursor state + stream limits. */ export interface SyncParams { pipeline: PipelineConfig - state?: SyncState + state?: SourceState state_limit?: number time_limit?: number } @@ -510,7 +510,7 @@ export interface Source< params: { config: TConfig catalog: ConfiguredCatalog - state?: SyncState + state?: SourceState }, $stdin?: AsyncIterable ): AsyncIterable diff --git a/packages/state-postgres/src/state-store.ts b/packages/state-postgres/src/state-store.ts index 5b47a345..84e48917 100644 --- a/packages/state-postgres/src/state-store.ts +++ b/packages/state-postgres/src/state-store.ts @@ -5,13 +5,13 @@ import { stripSslParams, withPgConnectProxy, } from '@stripe/sync-util-postgres' -import type { SyncState } from '@stripe/sync-protocol' +import type { SourceState } from '@stripe/sync-protocol' /** Reserved stream name for global state in the _sync_state table. */ const GLOBAL_KEY = '_global' export interface StateStore { - get(syncId: string): Promise + get(syncId: string): Promise set(syncId: string, stream: string, data: unknown): Promise setGlobal(syncId: string, data: unknown): Promise clear(syncId: string): Promise @@ -78,7 +78,7 @@ export function createPgStateStore( /** Engine-compatible state store scoped to a single sync_id. */ export interface ScopedStateStore { - get(): Promise + get(): Promise set(stream: string, data: unknown): Promise setGlobal(data: unknown): Promise } From 75e3cc4170d054425b336ef31ace717fa8a8dd02 Mon Sep 17 00:00:00 2001 From: Tony Xiao Date: Sun, 5 Apr 2026 02:00:59 -0700 Subject: [PATCH 12/13] chore: regenerate OpenAPI specs (SourceState description update) Co-Authored-By: Claude Sonnet 4.6 Committed-By-Agent: claude --- apps/engine/src/__generated__/openapi.d.ts | 2 +- apps/engine/src/__generated__/openapi.json | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/engine/src/__generated__/openapi.d.ts b/apps/engine/src/__generated__/openapi.d.ts index 398a995a..09f7c8bd 100644 --- a/apps/engine/src/__generated__/openapi.d.ts +++ b/apps/engine/src/__generated__/openapi.d.ts @@ -424,7 +424,7 @@ export interface components { config: { [key: string]: unknown; }; - /** @description JSON Schema for per-stream state (cursor/checkpoint shape). See also SyncState.global for sync-wide cursors. */ + /** @description JSON Schema for per-stream state (cursor/checkpoint shape). See also SourceState.global for sync-wide cursors. */ source_state_stream?: { [key: string]: unknown; }; diff --git a/apps/engine/src/__generated__/openapi.json b/apps/engine/src/__generated__/openapi.json index cef344fd..83d27232 100644 --- a/apps/engine/src/__generated__/openapi.json +++ b/apps/engine/src/__generated__/openapi.json @@ -1154,7 +1154,7 @@ "description": "JSON Schema for the connector's configuration object." }, "source_state_stream": { - "description": "JSON Schema for per-stream state (cursor/checkpoint shape). See also SyncState.global for sync-wide cursors.", + "description": "JSON Schema for per-stream state (cursor/checkpoint shape). See also SourceState.global for sync-wide cursors.", "type": "object", "propertyNames": { "type": "string" From 0e34ee6e6f1ec526a8eac3ea3f10eafa38a7c354 Mon Sep 17 00:00:00 2001 From: Tony Xiao Date: Sun, 5 Apr 2026 02:06:49 -0700 Subject: [PATCH 13/13] docs(todos): clean up orphan items, promote active work to Now Co-Authored-By: Claude Sonnet 4.6 Committed-By-Agent: claude --- docs/todos.md | 26 ++++---------------------- 1 file changed, 4 insertions(+), 22 deletions(-) diff --git a/docs/todos.md b/docs/todos.md index 09f26c34..5d38f654 100644 --- a/docs/todos.md +++ b/docs/todos.md @@ -1,36 +1,17 @@ # Todos -- /setup - - finish getting setup working - - use the control message for persisting catalog updates in setiup -- /pipeline_sync - - is activity plain functions -- call pipeline_sync from webhook - ---- - -- [x] ~~_/discover endpoint passing stripe api version_~~ [2026-04-04] -- [x] ~~\*~~how to return a list of all stripe api versions~~ done (DDR-008: enum on config JSON Schema)\*~~ [2026-04-04] -- Consider renaming generated OpenAPI specs to `.oas.json` for consistency (e.g. `engine.oas.json`, `service.oas.json`) - -- /check (implmeent or remove for now) - -- The pipeline state machine.... maintain status -- more resilient engine, not crash and do something else to recover from source / destination issues? -- making sure the google sheet thing works - ---- - Short-term actionable items. Move to a dated plan in `docs/plans/` when scoped. ## Now - Land grouped Stripe tables in stream selector (`docs/plans/active/2026-04-03-grouped-tables.md`) - Structured stream state + per-stream reset signal (`docs/plans/active/2026-04-03-structured-stream-state.md`) +- Pipeline status state machine — implement `PipelineStatus`, `sub_status`, `backfill_complete` in workflow + API (`docs/plans/2026-04-03-pipeline-status-state-machine.md`) +- Google Sheets destination — fix global state flush, catalog reset on update, batch updateRows ## Soon -- Pipeline status state machine — implementation (`docs/plans/2026-04-03-pipeline-status-state-machine.md`) +- `/check` endpoint — implement or remove - Control messages → config store updates (unscoped) - CLI backfill progress display (`docs/plans/active/2026-03-20-plan-005-cli-progress-display.md`) — progress bars with row counts - Scope rename: `@stripe/` → `@stripe-sync/` (`docs/plans/active/2026-03-20-plan-008-scope-rename-stripe-sync.md`) — blocked on npm org approval @@ -60,6 +41,7 @@ Short-term actionable items. Move to a dated plan in `docs/plans/` when scoped. - OAuth credential refresh workflow (auto-renew expiring tokens) - Global state (per-pipeline state in addition to per-stream cursors) - Fan-in support (multiple sources → one destination) +- Resilient engine — recover from source/destination errors without crashing the workflow (auto-pause on permanent, retry on transient) ## Developer Experience