Skip to content

feat: pipeline state machine, SourceState schema, protocol + service refactors#250

Merged
tonyxiao merged 13 commits intov2from
pipeline-state-machine
Apr 5, 2026
Merged

feat: pipeline state machine, SourceState schema, protocol + service refactors#250
tonyxiao merged 13 commits intov2from
pipeline-state-machine

Conversation

@tonyxiao
Copy link
Copy Markdown
Collaborator

@tonyxiao tonyxiao commented Apr 5, 2026

Summary

A broad refactor establishing clean ownership of sync state and protocol types across engine, service, and activities.

State ownership

  • SourceState replaces SyncState — renamed throughout + added .meta({ id: 'SourceState' }) so it appears as a named $ref component in the OAS spec
  • Activities own state merging — activities receive the current full SourceState as initial state, accumulate source_state messages on top of it, and return the updated full state; workflows simply do syncState = result.state (no spread-merge logic in workflow layer)
  • x-source-state header (renamed from x-state) carries the full SourceState blob into the engine on each call

OpenAPI fixes

  • Fix [object Object] content key bug — getParamContentType() now extracts the media type string from the content object instead of coercing the whole object to a string key
  • packages/hono-zod-openapi: header param.content accepts both object form { 'application/json': {} } (OAS-typed, preferred) and legacy string form
  • New OAS spec tests: no [object Object] keys, SourceState schema present, x-source-state uses $ref: '#/components/schemas/SourceState'

OpenAPI spec tooling

  • packages/openapi/scripts/generate-specs.mjs — canonical script for fetching Stripe API spec versions; --versions flag for targeted runs vs full CDN generation
  • BUNDLED_API_VERSION and SUPPORTED_API_VERSIONS derived from committed oas/*.json files (no runtime fetching)
  • docs/scripts/generate-stripe-specs.mjs → thin wrapper delegating to package script

Protocol refactors

  • Typed control messages, full config replacement, SourceInput envelope
  • eof terminal message + state_limit/time_limit query params
  • everything-is-a-stream — all connector output through unified stream
  • snake_case wire format throughout

Service

  • FS pipeline store + ID-only workflows
  • Backfill workflow with reconcileComplete flag (renamed from backfillComplete)
  • Remove generic queue activities; separate read/write Kafka activities for Google Sheets
  • x-source-state header in remote-engine.ts

🤖 Generated with Claude Code

tonyxiao and others added 13 commits April 4, 2026 23:53
Replace z.enum(SUPPORTED_API_VERSIONS) with z.string() so the engine
accepts non-bundled versions (CDN-fetched). Advertise known versions
in the JSON schema via anyOf so z.fromJSONSchema in the resolver
produces a union that accepts any string rather than a strict enum.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude
Replace hardcoded BUNDLED_API_VERSION / SUPPORTED_API_VERSIONS in
specFetchHelper.ts with a generated src/versions.ts derived from the
files present in packages/openapi/oas/*.json.

- Add scripts/generate-versions.mjs — scans oas/, writes src/versions.ts
- Hook into build: node scripts/generate-versions.mjs && tsc
- specFetchHelper.ts imports from ./src/versions.js (no more hardcodes)
- Revert api_version from z.string() back to z.enum(SUPPORTED_API_VERSIONS)
- Remove anyOf JSON Schema hack; enum is now the direct source of truth
- Update spec.test.ts to assert strict enum validation

Adding a new bundled API version is now: drop the .json into oas/,
run pnpm --filter @stripe/sync-openapi build. Same oas/ directory
powers the docs CDN (docs/scripts/generate-stripe-specs.mjs).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude
…d time

generate-versions.mjs now fetches stripe-sync.dev/stripe-api-specs/manifest.json
to populate SUPPORTED_API_VERSIONS (51 versions) while BUNDLED_API_VERSION
stays derived from the single oas/*.json file.

The CDN manifest is produced by docs/scripts/generate-stripe-specs.mjs —
same source, shared contract.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude
…st.json

Remove network fetch at build time. Instead:
- oas/manifest.json is committed alongside the bundled spec
- generate-versions.mjs reads it locally (reproducible, no network)
- docs/scripts/generate-stripe-specs.mjs now also writes manifest to
  packages/openapi/oas/manifest.json when updating the CDN

To pick up new Stripe API versions:
  1. Run docs/scripts/generate-stripe-specs.mjs
  2. Commit updated oas/manifest.json
  3. Run pnpm --filter @stripe/sync-openapi build

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude
…ipts/

- Add scripts/generate-all-specs.mjs (heavyweight): moved from
  docs/scripts/generate-stripe-specs.mjs; walks stripe/openapi git
  history, writes spec files + manifest.json to <outputDir> for CDN,
  and updates src/versions.ts as a side effect
- Keep scripts/generate-versions.mjs (lightweight): standalone utility
  to bootstrap src/versions.ts from just the bundled oas/ file
- Remove build-time version generation from package.json — src/versions.ts
  is a committed file updated by the heavyweight script; build = tsc only
- Remove oas/manifest.json — not needed in the package
- docs/scripts/generate-stripe-specs.mjs → thin wrapper that delegates
  to packages/openapi/scripts/generate-all-specs.mjs

packages/openapi is now the source of truth for Stripe API version
discovery. CDN invokes generate-all-specs.mjs with an output directory.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude
…replacement

- Delete read-into-queue.ts and write-from-queue.ts — unused generic
  queue activities; pipelineWorkflow and backfillPipelineWorkflow use
  syncImmediate directly, and googleSheetPipelineWorkflow has its own
  GS-specific queue activities
- Fix global source_state handling: replace wholesale (state.global = data)
  instead of merging (Object.assign) — consistent with stream state
  which already does per-stream replacement

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude
…versions filter

- Rename generate-all-specs.mjs → generate-specs.mjs
- Add --versions flag to generate specific versions only (e.g. for
  updating the bundled spec in oas/):
    node scripts/generate-specs.mjs oas --versions 2026-03-25.dahlia
  Without the flag, all versions are generated (CDN use case).
- Update docs/scripts/generate-stripe-specs.mjs wrapper reference
- Update generate-versions.mjs comment reference

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude
- Fix getParamContentType() to extract the media type string from the
  content object rather than returning the whole object, which coerced
  to '[object Object]' as a property key
- Rename x-state header to x-source-state for clarity
- Add .meta({ id: 'SourceState' }) to SyncState for a named OAS schema
- Regenerate openapi.json / openapi.d.ts

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude
Activities now receive the current full SyncState as initial state and
return the updated full SyncState. Workflows simply assign result.state
directly — no merge logic in the workflow layer.

- drainMessages accepts initialState and starts from it
- syncImmediate passes readOpts.state to drainMessages
- readGoogleSheetsIntoQueue starts state from readOpts.state
- writeGoogleSheetsFromQueue accepts opts.state and starts from it
- All workflows: syncState = result.state (no spread merge)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude
…→ reconcileComplete

- Rename SyncState Zod schema and type to SourceState across all packages
  (.meta({ id: 'SourceState' }) was already correct)
- Rename backfillComplete → reconcileComplete in backfill workflow
- Add reconcileComplete to BackfillPipelineWorkflowOpts so it survives continueAsNew

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Committed-By-Agent: claude
@tonyxiao tonyxiao changed the title feat(openapi): generate API versions from oas/ + CDN script in packages/openapi feat: pipeline state machine, SourceState schema, protocol + service refactors Apr 5, 2026
@tonyxiao tonyxiao merged commit c5d3aa6 into v2 Apr 5, 2026
13 checks passed
@tonyxiao tonyxiao deleted the pipeline-state-machine branch April 5, 2026 09:24
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant