diff --git a/apps/engine/package.json b/apps/engine/package.json index 2d54c8ef..b4e97c35 100644 --- a/apps/engine/package.json +++ b/apps/engine/package.json @@ -50,7 +50,6 @@ "pg": "^8.16.3", "pino": "^10", "pino-pretty": "^13", - "stripe": "^17.7.0", "ws": "^8.18.0", "zod": "^4.3.6", "zod-openapi": "^5" diff --git a/apps/supabase/package.json b/apps/supabase/package.json index 02d9c8ba..0077fbdf 100644 --- a/apps/supabase/package.json +++ b/apps/supabase/package.json @@ -33,7 +33,6 @@ "devDependencies": { "@types/node": "^24.10.1", "esbuild": "^0.27.2", - "stripe": "^18.1.0", "vitest": "^3.2.4" } } diff --git a/apps/supabase/src/__tests__/supabase.e2e.test.ts b/apps/supabase/src/__tests__/supabase.e2e.test.ts index facd0092..a9c7f172 100644 --- a/apps/supabase/src/__tests__/supabase.e2e.test.ts +++ b/apps/supabase/src/__tests__/supabase.e2e.test.ts @@ -9,9 +9,9 @@ * STRIPE_API_KEY */ import { describe, it, expect, beforeAll, afterAll } from 'vitest' -import Stripe from 'stripe' import { SupabaseSetupClient } from '../supabase.js' import { describeWithEnv } from '../../../../e2e/test-helpers.js' +import { stripeDelete, stripePost } from '../../../../e2e/stripe-helpers.js' function sleep(ms: number): Promise { return new Promise((resolve) => setTimeout(resolve, ms)) @@ -22,14 +22,12 @@ describeWithEnv( ['SUPABASE_PROJECT_ID', 'SUPABASE_PERSONAL_ACCESS_TOKEN', 'STRIPE_API_KEY'], ({ SUPABASE_PROJECT_ID, SUPABASE_PERSONAL_ACCESS_TOKEN, STRIPE_API_KEY }) => { let client: SupabaseSetupClient - let stripe: Stripe beforeAll(async () => { client = new SupabaseSetupClient({ accessToken: SUPABASE_PERSONAL_ACCESS_TOKEN, projectRef: SUPABASE_PROJECT_ID, }) - stripe = new Stripe(STRIPE_API_KEY) // Ensure clean slate try { @@ -60,7 +58,7 @@ describeWithEnv( // Clean up test customer if (customerId) { try { - await stripe.customers.del(customerId) + await stripeDelete(STRIPE_API_KEY, `/v1/customers/${customerId}`) } catch {} } }) @@ -89,7 +87,7 @@ describeWithEnv( it('should receive customer.created webhook', async () => { const testName = `Supabase E2E ${Date.now()}` - const customer = await stripe.customers.create({ + const customer = await stripePost<{ id: string }>(STRIPE_API_KEY, '/v1/customers', { name: testName, email: 'supabase-e2e@test.local', }) @@ -116,7 +114,7 @@ describeWithEnv( expect(customerId).toBeDefined() const updatedName = `Updated Supabase E2E ${Date.now()}` - await stripe.customers.update(customerId!, { name: updatedName }) + await stripePost(STRIPE_API_KEY, `/v1/customers/${customerId!}`, { name: updatedName }) // Poll until the update arrives (up to 60s) let found = false diff --git a/docs/slides/architecture.md b/docs/slides/architecture.md index f37c11e8..5f2f4307 100644 --- a/docs/slides/architecture.md +++ b/docs/slides/architecture.md @@ -64,7 +64,7 @@ interface Source { discover(): Promise setup(): Promise read(params: SyncParams, $stdin?: AsyncIterable): AsyncIterable - teardown(): Promise + teardown(opts?: { remove_shared_resources?: boolean }): Promise } interface Destination { diff --git a/e2e/package.json b/e2e/package.json index 188ad3e5..8d49f3da 100644 --- a/e2e/package.json +++ b/e2e/package.json @@ -17,7 +17,6 @@ "@temporalio/worker": "^1", "googleapis": "^144", "pg": "^8.16.0", - "stripe": "^17.7.0", "vitest": "^3.2.1" } } diff --git a/e2e/stripe-helpers.ts b/e2e/stripe-helpers.ts new file mode 100644 index 00000000..db3c9db8 --- /dev/null +++ b/e2e/stripe-helpers.ts @@ -0,0 +1,52 @@ +export const DEFAULT_BASE = 'https://api.stripe.com' + +export async function stripeGet(apiKey: string, path: string, baseUrl?: string): Promise { + const base = baseUrl ?? DEFAULT_BASE + const url = path.startsWith('http') ? path : `${base}${path}` + const res = await fetch(url, { headers: { Authorization: `Bearer ${apiKey}` } }) + if (!res.ok) { + const body = await res.text() + throw new Error(`Stripe API ${res.status}: ${body}`) + } + return (await res.json()) as T +} + +export async function stripePost( + apiKey: string, + path: string, + params?: Record, + baseUrl?: string +): Promise { + const base = baseUrl ?? DEFAULT_BASE + const body = params ? new URLSearchParams(params).toString() : undefined + const res = await fetch(`${base}${path}`, { + method: 'POST', + headers: { + Authorization: `Bearer ${apiKey}`, + 'Content-Type': 'application/x-www-form-urlencoded', + }, + body, + }) + if (!res.ok) { + const text = await res.text() + throw new Error(`Stripe API ${res.status}: ${text}`) + } + return (await res.json()) as T +} + +export async function stripeDelete( + apiKey: string, + path: string, + baseUrl?: string +): Promise { + const base = baseUrl ?? DEFAULT_BASE + const res = await fetch(`${base}${path}`, { + method: 'DELETE', + headers: { Authorization: `Bearer ${apiKey}` }, + }) + if (!res.ok) { + const text = await res.text() + throw new Error(`Stripe API ${res.status}: ${text}`) + } + return (await res.json()) as T +} diff --git a/e2e/stripe-to-postgres.test.ts b/e2e/stripe-to-postgres.test.ts index da6476fd..51edbf5f 100644 --- a/e2e/stripe-to-postgres.test.ts +++ b/e2e/stripe-to-postgres.test.ts @@ -1,11 +1,11 @@ import pg from 'pg' -import Stripe from 'stripe' import { afterAll, beforeAll, expect, it } from 'vitest' import source from '@stripe/sync-source-stripe' import destination from '@stripe/sync-destination-postgres' import { createEngine, noopStateStore } from '@stripe/sync-engine' import type { StateMessage, DestinationOutput } from '@stripe/sync-protocol' import { describeWithEnv } from './test-helpers.js' +import { stripeGet, stripePost } from './stripe-helpers.js' // --------------------------------------------------------------------------- // Config @@ -39,7 +39,6 @@ async function collectStates(iter: AsyncIterable): Promise { let pool: pg.Pool - let stripe: Stripe function makeEngine(opts: { websocket?: boolean } = {}) { return createEngine( @@ -61,8 +60,7 @@ describeWithEnv('stripe → postgres e2e', ['STRIPE_API_KEY'], ({ STRIPE_API_KEY beforeAll(async () => { pool = new pg.Pool({ connectionString: POSTGRES_URL }) await pool.query('SELECT 1') - stripe = new Stripe(STRIPE_API_KEY) - const account = await stripe.accounts.retrieve() + const account = await stripeGet<{ id: string }>(STRIPE_API_KEY, '/v1/account') const isTest = STRIPE_API_KEY.startsWith('sk_test_') const dashPrefix = isTest ? 'dashboard.stripe.com/test' : 'dashboard.stripe.com' console.log(`\n Stripe: ${account.id} → https://${dashPrefix}/developers`) @@ -112,11 +110,14 @@ describeWithEnv('stripe → postgres e2e', ['STRIPE_API_KEY'], ({ STRIPE_API_KEY console.log(' Backfill complete, sending product update…') // Phase 2: update a product via Stripe API - const products = await stripe.products.list({ limit: 1 }) + const products = await stripeGet<{ data: { id: string }[] }>( + STRIPE_API_KEY, + '/v1/products?limit=1' + ) expect(products.data.length).toBeGreaterThan(0) const product = products.data[0] const newName = `e2e-test-${Date.now()}` - await stripe.products.update(product.id, { name: newName }) + await stripePost(STRIPE_API_KEY, `/v1/products/${product.id}`, { name: newName }) // Phase 3: consume until we see a live event state for product const deadline = Date.now() + 30_000 diff --git a/e2e/temporal.test.ts b/e2e/temporal.test.ts index 1bc1b565..214f76ec 100644 --- a/e2e/temporal.test.ts +++ b/e2e/temporal.test.ts @@ -4,8 +4,8 @@ import { NativeConnection, Worker } from '@temporalio/worker' import { serve } from '@hono/node-server' import type { ServerType } from '@hono/node-server' import pg from 'pg' -import Stripe from 'stripe' import { google } from 'googleapis' +import { stripeGet, stripePost } from './stripe-helpers.js' import fs from 'node:fs' import path from 'node:path' import os from 'node:os' @@ -162,11 +162,9 @@ function createTestInfra() { describeWithEnv('temporal e2e: stripe → postgres', ['STRIPE_API_KEY'], ({ STRIPE_API_KEY }) => { const infra = createTestInfra() const schema = schemaName() - let stripe: Stripe beforeAll(async () => { await infra.setup() - stripe = new Stripe(STRIPE_API_KEY) console.log(` Schema: ${schema}`) }, 60_000) @@ -235,14 +233,20 @@ describeWithEnv('temporal e2e: stripe → postgres', ['STRIPE_API_KEY'], ({ STRI console.log(` Sample: ${sampleRows[0].id} → ${sampleRows[0].name}`) // --- Live event via stripe_event signal --- - const products = await stripe.products.list({ limit: 1 }) + const products = await stripeGet<{ data: { id: string }[] }>( + STRIPE_API_KEY, + '/v1/products?limit=1' + ) const product = products.data[0] const newName = `temporal-e2e-${Date.now()}` - await stripe.products.update(product.id, { name: newName }) + await stripePost(STRIPE_API_KEY, `/v1/products/${product.id}`, { name: newName }) console.log(` Updated product ${product.id} → "${newName}"`) await new Promise((r) => setTimeout(r, 2000)) - const events = await stripe.events.list({ limit: 5, type: 'product.updated' }) + const events = await stripeGet<{ data: { id: string; type: string }[] }>( + STRIPE_API_KEY, + '/v1/events?limit=5&type=product.updated' + ) const event = events.data[0] console.log(` Fetched event ${event.id} (${event.type})`) await handle.signal('stripe_event', event) diff --git a/packages/protocol/src/protocol.ts b/packages/protocol/src/protocol.ts index 7c47e12d..9cef2e81 100644 --- a/packages/protocol/src/protocol.ts +++ b/packages/protocol/src/protocol.ts @@ -283,7 +283,7 @@ export interface Source< setup?(params: { config: TConfig; catalog: ConfiguredCatalog }): Promise /** Clean up external resources. Called when a sync is deleted. */ - teardown?(params: { config: TConfig }): Promise + teardown?(params: { config: TConfig; remove_shared_resources?: boolean }): Promise } // MARK: - Destination diff --git a/packages/source-stripe/package.json b/packages/source-stripe/package.json index 849477ea..e45c93dd 100644 --- a/packages/source-stripe/package.json +++ b/packages/source-stripe/package.json @@ -30,7 +30,6 @@ "@stripe/sync-openapi": "workspace:*", "@stripe/sync-protocol": "workspace:*", "https-proxy-agent": "^7.0.6", - "stripe": "^17.7.0", "undici": "^7.16.0", "ws": "^8.18.0", "zod": "^4.3.6" diff --git a/packages/source-stripe/src/client.test.ts b/packages/source-stripe/src/client.test.ts index 505bffe1..75115cb0 100644 --- a/packages/source-stripe/src/client.test.ts +++ b/packages/source-stripe/src/client.test.ts @@ -1,6 +1,5 @@ -import { Agent } from 'node:http' import { describe, expect, it } from 'vitest' -import { buildStripeClientOptions, type StripeClientConfigInput } from './client.js' +import { buildTransportOptions, makeClientConfig, type StripeClientConfigInput } from './client.js' import { getProxyUrl } from './transport.js' const config: StripeClientConfigInput = { @@ -22,61 +21,61 @@ describe('getProxyUrl', () => { }) }) -describe('buildStripeClientOptions', () => { - it('adds a proxy agent and default timeout when HTTPS_PROXY is set', () => { - const options = buildStripeClientOptions(config, { - HTTPS_PROXY: 'http://proxy.example.test:8080', - }) +describe('buildTransportOptions', () => { + it('returns default timeout and api.stripe.com base when no overrides', () => { + const options = buildTransportOptions(config, {}) - expect(options.timeout).toBe(10_000) - expect(options.httpAgent).toBeInstanceOf(Agent) + expect(options.timeout_ms).toBe(10_000) + expect(options.base_url).toBe('https://api.stripe.com') + expect(options.host).toBe('api.stripe.com') + expect(options.port).toBe(443) + expect(options.protocol).toBe('https') }) it('uses the configured timeout override', () => { - const options = buildStripeClientOptions(config, { - HTTPS_PROXY: 'http://proxy.example.test:8080', + const options = buildTransportOptions(config, { STRIPE_REQUEST_TIMEOUT_MS: '2500', }) - expect(options.timeout).toBe(2500) + expect(options.timeout_ms).toBe(2500) }) - it('bypasses the proxy for localhost base_url overrides', () => { - const options = buildStripeClientOptions( - { - ...config, - base_url: 'http://localhost:12111', - }, - { - HTTPS_PROXY: 'http://proxy.example.test:8080', - } - ) + it('decomposes a localhost base_url', () => { + const options = buildTransportOptions({ ...config, base_url: 'http://localhost:12111' }, {}) expect(options.host).toBe('localhost') expect(options.port).toBe(12111) expect(options.protocol).toBe('http') - expect(options.httpAgent).toBeUndefined() - }) - - it('keeps the proxy for external base_url overrides', () => { - const options = buildStripeClientOptions( - { - ...config, - base_url: 'https://api.stripe.com', - }, - { - HTTPS_PROXY: 'http://proxy.example.test:8080', - } - ) - - expect(options.httpAgent).toBeInstanceOf(Agent) }) it('throws on an invalid timeout override', () => { expect(() => - buildStripeClientOptions(config, { + buildTransportOptions(config, { STRIPE_REQUEST_TIMEOUT_MS: '0', }) ).toThrow('STRIPE_REQUEST_TIMEOUT_MS must be a positive integer') }) }) + +describe('makeClientConfig', () => { + it('maps snake_case input to camelCase StripeClientConfig', () => { + const result = makeClientConfig({ + api_key: 'sk_test_123', + base_url: 'http://localhost:12111', + }) + + expect(result).toEqual({ + apiKey: 'sk_test_123', + baseUrl: 'http://localhost:12111', + }) + }) + + it('omits baseUrl when base_url is not provided', () => { + const result = makeClientConfig({ api_key: 'sk_test_123' }) + + expect(result).toEqual({ + apiKey: 'sk_test_123', + baseUrl: undefined, + }) + }) +}) diff --git a/packages/source-stripe/src/client.ts b/packages/source-stripe/src/client.ts index 50051601..7bd074a7 100644 --- a/packages/source-stripe/src/client.ts +++ b/packages/source-stripe/src/client.ts @@ -1,9 +1,5 @@ -import Stripe from 'stripe' -import { - getHttpsProxyAgentForTarget, - parsePositiveInteger, - type TransportEnv, -} from './transport.js' +import type { StripeClientConfig } from './stripe-api.js' +import { parsePositiveInteger, type TransportEnv } from './transport.js' export type StripeClientConfigInput = { api_key: string @@ -13,79 +9,36 @@ export { getProxyUrl as getStripeProxyUrl } from './transport.js' const DEFAULT_STRIPE_API_BASE = 'https://api.stripe.com' -function buildBaseUrlOptions( - baseUrl: string -): Pick { - const url = new URL(baseUrl) - return { - host: url.hostname, - port: url.port ? parseInt(url.port, 10) : url.protocol === 'https:' ? 443 : 80, - protocol: url.protocol.replace(':', '') as Stripe.HttpProtocol, - } +export interface TransportOptions { + timeout_ms: number + base_url: string + host: string + port: number + protocol: string } -export function buildStripeClientOptions( +export function buildTransportOptions( config: StripeClientConfigInput, env: TransportEnv = process.env -): Stripe.StripeConfig { - const options: Stripe.StripeConfig = { - timeout: parsePositiveInteger( +): TransportOptions { + const base = config.base_url ?? DEFAULT_STRIPE_API_BASE + const url = new URL(base) + return { + timeout_ms: parsePositiveInteger( 'STRIPE_REQUEST_TIMEOUT_MS', env.STRIPE_REQUEST_TIMEOUT_MS, 10_000 ), + base_url: base, + host: url.hostname, + port: url.port ? parseInt(url.port, 10) : url.protocol === 'https:' ? 443 : 80, + protocol: url.protocol.replace(':', ''), } - - if (config.base_url) { - const httpAgent = getHttpsProxyAgentForTarget(config.base_url, env) - return { - ...options, - ...buildBaseUrlOptions(config.base_url), - ...(httpAgent ? { httpAgent } : {}), - } - } - - const httpAgent = getHttpsProxyAgentForTarget(DEFAULT_STRIPE_API_BASE, env) - if (httpAgent) { - options.httpAgent = httpAgent - } - - return options } -function attachStripeRequestLogging(stripe: Stripe, env: TransportEnv = process.env): void { - if (env.STRIPE_LOG_REQUESTS !== '1') { - return +export function makeClientConfig(config: StripeClientConfigInput): StripeClientConfig { + return { + apiKey: config.api_key, + baseUrl: config.base_url, } - - stripe.on('request', (event) => { - console.info({ - msg: 'Stripe API request started', - method: event.method, - path: event.path, - apiVersion: event.api_version, - requestStartTime: event.request_start_time, - }) - }) - - stripe.on('response', (event) => { - console.info({ - msg: 'Stripe API request completed', - method: event.method, - path: event.path, - status: event.status, - elapsed: event.elapsed, - requestId: event.request_id, - apiVersion: event.api_version, - }) - }) -} - -export function makeClient( - config: StripeClientConfigInput, - env: TransportEnv = process.env -): Stripe { - const stripe = new Stripe(config.api_key, buildStripeClientOptions(config, env)) - attachStripeRequestLogging(stripe, env) - return stripe } diff --git a/packages/source-stripe/src/index.test.ts b/packages/source-stripe/src/index.test.ts index 075d01a0..182f1764 100644 --- a/packages/source-stripe/src/index.test.ts +++ b/packages/source-stripe/src/index.test.ts @@ -1,7 +1,7 @@ import fs from 'node:fs' import path from 'node:path' import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' -import type Stripe from 'stripe' +import type { StripeEvent } from './stripe-types.js' import type { ConfiguredCatalog, ErrorMessage, @@ -90,13 +90,13 @@ function getAllTsFiles(dir: string): string[] { return results } -/** Create a minimal Stripe.Event for testing fromWebhookEvent(). */ +/** Create a minimal StripeEvent for testing fromWebhookEvent(). */ function makeEvent(overrides: { id?: string type?: string created?: number dataObject: Record -}): Stripe.Event { +}): StripeEvent { return { id: overrides.id ?? 'evt_test_123', object: 'event', @@ -107,9 +107,9 @@ function makeEvent(overrides: { pending_webhooks: 0, request: null, data: { - object: overrides.dataObject as Stripe.Event.Data['object'], + object: overrides.dataObject as StripeEvent['data']['object'], }, - } as Stripe.Event + } } const config = { api_key: 'sk_test_fake' } @@ -472,8 +472,8 @@ describe('StripeSource', () => { it('WebSocket mode uses same fromWebhookEvent conversion as webhook mode', () => { // WebSocket is a transport concern — the conversion is identical. - // The same Stripe.Event structure is received regardless of transport. - // This test verifies fromWebhookEvent works for any Stripe.Event input. + // The same StripeEvent structure is received regardless of transport. + // This test verifies fromWebhookEvent works for any StripeEvent input. const registry: Record = { invoices: makeConfig({ order: 1, tableName: 'invoices' }), } @@ -703,7 +703,7 @@ describe('StripeSource', () => { }) it('stream via websocket (input): same code path as webhook', async () => { - // WebSocket is a transport concern — the Stripe.Event is identical. + // WebSocket is a transport concern — the StripeEvent is identical. // read() with input= behaves the same regardless of transport. vi.mocked(buildResourceRegistry).mockReturnValue(registry as any) const event = makeEvent({ @@ -1145,7 +1145,7 @@ describe('StripeSource', () => { } /** Push a synthetic event through the captured onEvent callback. */ - function pushWsEvent(event: Stripe.Event) { + function pushWsEvent(event: StripeEvent) { capturedOnEvent!({ type: 'webhook_event', webhook_id: 'wh_' + event.id, diff --git a/packages/source-stripe/src/index.ts b/packages/source-stripe/src/index.ts index 006ee8fb..27c14da1 100644 --- a/packages/source-stripe/src/index.ts +++ b/packages/source-stripe/src/index.ts @@ -4,7 +4,6 @@ import type { Message, Source, } from '@stripe/sync-protocol' -import Stripe from 'stripe' import { z } from 'zod' import { buildResourceRegistry } from './resourceRegistry.js' import { catalogFromRegistry, catalogFromOpenApi } from './catalog.js' @@ -20,9 +19,16 @@ import { pollEvents } from './src-events-api.js' import type { StripeWebSocketClient, StripeWebhookEvent } from './src-websocket.js' import { createStripeWebSocketClient } from './src-websocket.js' import type { ResourceConfig } from './types.js' -import { makeClient } from './client.js' import type { RateLimiter } from './rate-limiter.js' import { createInMemoryRateLimiter, DEFAULT_MAX_RPS } from './rate-limiter.js' +import type { StripeEvent } from './stripe-types.js' +import type { StripeClientConfig } from './stripe-api.js' +import { + retrieveAccount, + listWebhookEndpoints, + createWebhookEndpoint, + deleteWebhookEndpoint, +} from './stripe-api.js' // MARK: - Spec @@ -80,6 +86,10 @@ export const spec = z.object({ export type Config = z.infer +function clientConfig(config: Config): StripeClientConfig { + return { apiKey: config.api_key, baseUrl: config.base_url } +} + /** Raw webhook payload requiring signature verification. */ export type WebhookInput = { body: string | Buffer @@ -126,7 +136,7 @@ export type StripeSourceDeps = { export function createStripeSource( deps?: StripeSourceDeps -): Source { +): Source { const externalRateLimiter = deps?.rateLimiter return { @@ -139,8 +149,7 @@ export function createStripeSource( async check({ config }) { try { - const s = makeClient(config) - await s.accounts.retrieve() + await retrieveAccount(clientConfig(config)) return { status: 'succeeded' } } catch (err: any) { return { status: 'failed', message: err.message } @@ -170,8 +179,8 @@ export function createStripeSource( async setup({ config, catalog }) { if (config.webhook_url) { - const stripe = makeClient(config) - const existing = await stripe.webhookEndpoints.list({ limit: 100 }) + const sc = clientConfig(config) + const existing = await listWebhookEndpoints(sc, { limit: 100 }) const managed = existing.data.find( (wh) => wh.url === config.webhook_url && wh.metadata?.managed_by === 'stripe-sync' ) @@ -185,7 +194,7 @@ export function createStripeSource( // for the same account, each sync filters events by its own catalog // inside processStripeEvent(), keeping endpoint usage constant // regardless of how many syncs are configured. - await stripe.webhookEndpoints.create({ + await createWebhookEndpoint(sc, { url: config.webhook_url, enabled_events: ['*'], metadata: { managed_by: 'stripe-sync' }, @@ -196,11 +205,11 @@ export function createStripeSource( async teardown({ config }) { if (config.webhook_url) { - const stripe = makeClient(config) - const existing = await stripe.webhookEndpoints.list({ limit: 100 }) + const sc = clientConfig(config) + const existing = await listWebhookEndpoints(sc, { limit: 100 }) for (const wh of existing.data) { if (wh.metadata?.managed_by === 'stripe-sync') { - await stripe.webhookEndpoints.del(wh.id) + await deleteWebhookEndpoint(sc, wh.id) } } } @@ -209,7 +218,6 @@ export function createStripeSource( async *read({ config, catalog, state }, $stdin?) { const rateLimiter = externalRateLimiter ?? createInMemoryRateLimiter(config.rate_limit ?? DEFAULT_MAX_RPS) - const stripe = makeClient(config) const resolved = await resolveOpenApiSpec({ apiVersion: config.api_version ?? '2020-08-27', }) @@ -225,23 +233,9 @@ export function createStripeSource( if ($stdin) { for await (const input of $stdin) { if ('body' in (input as object)) { - yield* processWebhookInput( - input as WebhookInput, - config, - stripe, - catalog, - registry, - streamNames - ) + yield* processWebhookInput(input as WebhookInput, config, catalog, registry, streamNames) } else { - yield* processStripeEvent( - input as Stripe.Event, - config, - stripe, - catalog, - registry, - streamNames - ) + yield* processStripeEvent(input as StripeEvent, config, catalog, registry, streamNames) } } return @@ -254,7 +248,7 @@ export function createStripeSource( wsClient = await createStripeWebSocketClient({ stripeApiKey: config.api_key, onEvent: (wsEvent: StripeWebhookEvent) => { - const event = JSON.parse(wsEvent.event_payload) as Stripe.Event + const event = JSON.parse(wsEvent.event_payload) as StripeEvent inputQueue.push({ data: event }) }, }) @@ -270,17 +264,14 @@ export function createStripeSource( catalog, state, registry, - stripe, - rateLimiter, backfillLimit: config.backfill_limit, - backfillConcurrency: config.backfill_concurrency, drainQueue: wsClient - ? () => inputQueue.drain(config, stripe, catalog, registry, streamNames) + ? () => inputQueue.drain(config, catalog, registry, streamNames) : undefined, }) // Events polling: incremental sync via /v1/events after backfill - yield* pollEvents({ config, stripe, catalog, registry, streamNames, state, startTimestamp }) + yield* pollEvents({ config, catalog, registry, streamNames, state, startTimestamp }) // Start HTTP server for live mode if configured if (config.webhook_port) { @@ -290,30 +281,16 @@ export function createStripeSource( // After backfill: stream live events from WebSocket and/or HTTP if (wsClient || httpServer) { // Drain anything that arrived during backfill - yield* inputQueue.drain(config, stripe, catalog, registry, streamNames) + yield* inputQueue.drain(config, catalog, registry, streamNames) // Block on new events (infinite loop until all live sources close) while (wsClient || httpServer) { const queued = await inputQueue.wait() try { if ('body' in queued.data) { - yield* processWebhookInput( - queued.data, - config, - stripe, - catalog, - registry, - streamNames - ) + yield* processWebhookInput(queued.data, config, catalog, registry, streamNames) } else { - yield* processStripeEvent( - queued.data, - config, - stripe, - catalog, - registry, - streamNames - ) + yield* processStripeEvent(queued.data, config, catalog, registry, streamNames) } queued.resolve?.() } catch (err) { @@ -345,3 +322,11 @@ export { SpecParser, OPENAPI_RESOURCE_TABLE_ALIASES } from './openapi/specParser export type { ParsedResourceTable, ParsedOpenApiSpec } from './openapi/types.js' export type { RateLimiter } from './rate-limiter.js' export { createInMemoryRateLimiter, DEFAULT_MAX_RPS } from './rate-limiter.js' +export type { + StripeEvent, + StripeList, + StripeAccount, + StripeWebhookEndpoint, +} from './stripe-types.js' +export type { StripeClientConfig } from './stripe-api.js' +export { verifyWebhookSignature, retrieveAccount, listAllEvents } from './stripe-api.js' diff --git a/packages/source-stripe/src/process-event.ts b/packages/source-stripe/src/process-event.ts index 9ce197bc..fcb92ebe 100644 --- a/packages/source-stripe/src/process-event.ts +++ b/packages/source-stripe/src/process-event.ts @@ -1,6 +1,6 @@ import type { ConfiguredCatalog, Message, RecordMessage, StateMessage } from '@stripe/sync-protocol' import { toRecordMessage } from '@stripe/sync-protocol' -import type Stripe from 'stripe' +import type { StripeEvent } from './stripe-types.js' import type { Config } from './index.js' import type { ResourceConfig } from './types.js' import { normalizeStripeObjectName } from './resourceRegistry.js' @@ -17,7 +17,7 @@ const RESOURCE_DELETE_EVENTS: ReadonlySet = new Set([ 'customer.tax_id.deleted', ]) -function isDeleteEvent(event: Stripe.Event): boolean { +function isDeleteEvent(event: StripeEvent): boolean { if ( 'deleted' in event.data.object && (event.data.object as { deleted?: boolean }).deleted === true @@ -39,7 +39,7 @@ function isDeleteEvent(event: Stripe.Event): boolean { * pushes events in; this method converts them to protocol messages. */ export function fromWebhookEvent( - event: Stripe.Event, + event: StripeEvent, registry: Record ): { record: RecordMessage; state: StateMessage } | null { const dataObject = event.data?.object as unknown as @@ -70,17 +70,16 @@ export function fromWebhookEvent( // MARK: - processStripeEvent /** - * Process a single verified Stripe.Event through the full pipeline: + * Process a single verified StripeEvent through the full pipeline: * entitlements, registry filter, delete detection, revalidation, * subscription items. * * This is the canonical function — all event paths (webhook, events API, - * WebSocket) converge here once a Stripe.Event is in hand. + * WebSocket) converge here once a StripeEvent is in hand. */ export async function* processStripeEvent( - event: Stripe.Event, + event: StripeEvent, config: Config, - stripe: Stripe, catalog: ConfiguredCatalog, registry: Record, streamNames: Set diff --git a/packages/source-stripe/src/src-events-api.ts b/packages/source-stripe/src/src-events-api.ts index 03a408f8..65b701db 100644 --- a/packages/source-stripe/src/src-events-api.ts +++ b/packages/source-stripe/src/src-events-api.ts @@ -1,5 +1,6 @@ import type { ConfiguredCatalog, LogMessage, Message, StateMessage } from '@stripe/sync-protocol' -import type Stripe from 'stripe' +import type { StripeEvent } from './stripe-types.js' +import { listAllEvents } from './stripe-api.js' import type { Config, StripeStreamState } from './index.js' import type { ResourceConfig } from './types.js' import { processStripeEvent } from './process-event.js' @@ -10,14 +11,13 @@ const EVENTS_MAX_AGE_DAYS = 25 export async function* pollEvents(opts: { config: Config - stripe: Stripe catalog: ConfiguredCatalog registry: Record streamNames: Set state: Record | undefined startTimestamp: number }): AsyncGenerator { - const { config, stripe, catalog, registry, streamNames, state, startTimestamp } = opts + const { config, catalog, registry, streamNames, state, startTimestamp } = opts if (!config.poll_events) return @@ -62,8 +62,9 @@ export async function* pollEvents(opts: { } // Fetch events since cursor (API returns newest-first) - const events: Stripe.Event[] = [] - for await (const event of stripe.events.list({ created: { gt: cursor } })) { + const events: StripeEvent[] = [] + const sc = { apiKey: config.api_key, baseUrl: config.base_url } + for await (const event of listAllEvents(sc, { created: { gt: cursor } })) { events.push(event) } @@ -71,14 +72,7 @@ export async function* pollEvents(opts: { events.reverse() for (const event of events) { - for await (const msg of processStripeEvent( - event, - config, - stripe, - catalog, - registry, - streamNames - )) { + for await (const msg of processStripeEvent(event, config, catalog, registry, streamNames)) { if (msg.type === 'state') { // Intercept state messages to preserve complete status + update events_cursor const existing = state?.[msg.stream] diff --git a/packages/source-stripe/src/src-list-api.ts b/packages/source-stripe/src/src-list-api.ts index cde66f3a..274ee28c 100644 --- a/packages/source-stripe/src/src-list-api.ts +++ b/packages/source-stripe/src/src-list-api.ts @@ -6,9 +6,6 @@ import type { } from '@stripe/sync-protocol' import { toRecordMessage } from '@stripe/sync-protocol' import type { ResourceConfig } from './types.js' -import type { SegmentState } from './index.js' -import type { RateLimiter } from './rate-limiter.js' -import type Stripe from 'stripe' const SKIPPABLE_ERROR_PATTERNS = [ 'only available in testmode', @@ -18,8 +15,6 @@ const SKIPPABLE_ERROR_PATTERNS = [ 'Missing required param', ] -const DEFAULT_BACKFILL_CONCURRENCY = 200 - function isSkippableError(err: unknown): boolean { const msg = err instanceof Error ? err.message : String(err) return SKIPPABLE_ERROR_PATTERNS.some((p) => msg.includes(p)) @@ -32,260 +27,14 @@ function findConfigByTableName( return Object.values(registry).find((cfg) => cfg.tableName === tableName) } -// MARK: - mergeAsync - -type IndexedResult = { index: number; result: IteratorResult } - -async function* mergeAsync( - generators: AsyncGenerator[], - concurrency: number -): AsyncGenerator { - const active = new Map>>() - let nextIndex = 0 - - function pull(gen: AsyncGenerator, index: number) { - active.set( - index, - gen.next().then((result) => ({ index, result: result as IteratorResult })) - ) - } - - const limit = Math.min(concurrency, generators.length) - for (let i = 0; i < limit; i++) { - pull(generators[i], i) - nextIndex = i + 1 - } - - while (active.size > 0) { - const { index, result } = await Promise.race(active.values()) - active.delete(index) - - if (result.done) { - if (nextIndex < generators.length) { - pull(generators[nextIndex], nextIndex) - nextIndex++ - } - } else { - yield result.value - pull(generators[index], index) - } - } -} - -// MARK: - Account created timestamp - -async function getAccountCreatedTimestamp(stripe: Stripe): Promise { - const account = await stripe.accounts.retrieve() - return account.created ?? 1293840000 -} - -// MARK: - Segment creation - -function buildSegments( - startTimestamp: number, - endTimestamp: number, - numSegments = DEFAULT_BACKFILL_CONCURRENCY -): SegmentState[] { - const range = endTimestamp - startTimestamp - const segmentSize = Math.max(1, Math.ceil(range / numSegments)) - const segments: SegmentState[] = [] - - for (let i = 0; i < numSegments; i++) { - const gte = startTimestamp + i * segmentSize - const lt = i === numSegments - 1 ? endTimestamp + 1 : startTimestamp + (i + 1) * segmentSize - if (gte >= endTimestamp + 1) break - segments.push({ index: i, gte, lt, pageCursor: null, status: 'pending' }) - } - - return segments -} - -// MARK: - Segment pagination - -async function* paginateSegment(opts: { - listFn: NonNullable - segment: SegmentState - segments: SegmentState[] - streamName: string - supportsLimit: boolean - backfillLimit?: number - totalEmitted: { count: number } - rateLimiter: RateLimiter -}): AsyncGenerator { - const { - listFn, - segment, - segments, - streamName, - supportsLimit, - backfillLimit, - totalEmitted, - rateLimiter, - } = opts - - let pageCursor: string | null = segment.pageCursor - let hasMore = true - - while (hasMore) { - const params: Record = { - created: { gte: segment.gte, lt: segment.lt }, - } - if (supportsLimit !== false) { - params.limit = 100 - } - if (pageCursor) { - params.starting_after = pageCursor - } - - const wait = await rateLimiter() - if (wait > 0) await new Promise((r) => setTimeout(r, wait * 1000)) - console.error({ - msg: 'Starting Stripe list page', - stream: streamName, - segment: segment.index, - pageCursor, - created: params.created, - }) - const response = await listFn(params as Parameters[0]) - console.error({ - msg: 'Completed Stripe list page', - stream: streamName, - segment: segment.index, - pageCursor, - recordCount: response.data.length, - hasMore: response.has_more, - }) - - for (const item of response.data) { - yield toRecordMessage(streamName, item as Record) - totalEmitted.count++ - } - - hasMore = response.has_more - if (response.pageCursor) { - pageCursor = response.pageCursor - } else if (response.data.length > 0) { - pageCursor = (response.data[response.data.length - 1] as { id: string }).id - } - - if (backfillLimit && totalEmitted.count >= backfillLimit) { - hasMore = false - } - - // Update shared segment state and emit checkpoint - segment.pageCursor = hasMore ? pageCursor : null - segment.status = hasMore ? 'pending' : 'complete' - - const allComplete = segments.every((s) => s.status === 'complete') - yield { - type: 'state', - stream: streamName, - data: { - pageCursor: null, - status: allComplete ? 'complete' : 'pending', - segments: segments.map((s) => ({ ...s })), - }, - } satisfies StateMessage - } -} - -// MARK: - Sequential fallback (original logic) - -async function* sequentialBackfillStream(opts: { - resourceConfig: ResourceConfig - streamName: string - pageCursor: string | null - backfillLimit?: number - rateLimiter: RateLimiter - drainQueue?: () => AsyncGenerator -}): AsyncGenerator { - const { resourceConfig, streamName, backfillLimit, rateLimiter, drainQueue } = opts - let pageCursor = opts.pageCursor - let hasMore = true - let totalEmitted = 0 - - while (hasMore) { - if (drainQueue) yield* drainQueue() - - const params: Record = {} - if (resourceConfig.supportsLimit !== false) { - params.limit = 100 - } - if (pageCursor) { - params.starting_after = pageCursor - } - - const wait = await rateLimiter() - if (wait > 0) await new Promise((r) => setTimeout(r, wait * 1000)) - console.error({ - msg: 'Starting Stripe list page', - stream: streamName, - pageCursor, - }) - const response = await resourceConfig.listFn!( - params as Parameters>[0] - ) - console.error({ - msg: 'Completed Stripe list page', - stream: streamName, - pageCursor, - recordCount: response.data.length, - hasMore: response.has_more, - }) - - for (const item of response.data) { - yield toRecordMessage(streamName, item as Record) - totalEmitted++ - } - - hasMore = response.has_more - if (response.pageCursor) { - pageCursor = response.pageCursor - } else if (response.data.length > 0) { - pageCursor = (response.data[response.data.length - 1] as { id: string }).id - } - - if (backfillLimit && totalEmitted >= backfillLimit) { - hasMore = false - } - - yield { - type: 'state', - stream: streamName, - data: { - pageCursor: hasMore ? pageCursor : null, - status: hasMore ? 'pending' : 'complete', - }, - } satisfies StateMessage - } -} - -// MARK: - Main entry point - export async function* listApiBackfill(opts: { catalog: { streams: Array<{ stream: { name: string } }> } - state: - | Record - | undefined + state: Record | undefined registry: Record - stripe: Stripe - rateLimiter: RateLimiter backfillLimit?: number - backfillConcurrency?: number drainQueue?: () => AsyncGenerator }): AsyncGenerator { - const { - catalog, - state, - registry, - stripe, - rateLimiter, - backfillLimit, - backfillConcurrency = DEFAULT_BACKFILL_CONCURRENCY, - drainQueue, - } = opts - - let accountCreated: number | null = null + const { catalog, state, registry, backfillLimit, drainQueue } = opts for (const configuredStream of catalog.streams) { const stream = configuredStream.stream @@ -302,6 +51,7 @@ export async function* listApiBackfill(opts: { if (!resourceConfig.listFn) continue + // Skip already-complete streams (e.g. resuming after full backfill for events polling) const streamState = state?.[stream.name] if (streamState?.status === 'complete') continue @@ -311,52 +61,69 @@ export async function* listApiBackfill(opts: { status: 'started', } satisfies StreamStatusMessage + // Restore cursor from combined state if available + let pageCursor: string | null = streamState?.pageCursor ?? null + try { - // Parallel path: streams that support created filter - if (resourceConfig.supportsCreatedFilter) { - let segments: SegmentState[] + let hasMore = true + let totalEmitted = 0 + while (hasMore) { + // Drain any queued events before each page + if (drainQueue) yield* drainQueue() + + const params: Record = {} + if (resourceConfig.supportsLimit !== false) { + params.limit = 100 + } + if (pageCursor) { + params.starting_after = pageCursor + } + + // TODO: replace with structured logger once one is wired into the source connector; + // console.error (stderr) is used here intentionally — console.log/info would write + // to stdout and corrupt the NDJSON output stream. + console.error({ + msg: 'Starting Stripe list page', + stream: stream.name, + pageCursor, + }) + const response = await resourceConfig.listFn( + params as Parameters[0] + ) + console.error({ + msg: 'Completed Stripe list page', + stream: stream.name, + pageCursor, + recordCount: response.data.length, + hasMore: response.has_more, + }) - if (streamState?.segments) { - // Resume from prior segment state — only run incomplete segments - segments = streamState.segments.map((s) => ({ ...s })) - } else { - // First run: fetch account creation date and build segments - if (accountCreated === null) { - accountCreated = await getAccountCreatedTimestamp(stripe) - } - const now = Math.floor(Date.now() / 1000) - segments = buildSegments(accountCreated, now, backfillConcurrency) + for (const item of response.data) { + yield toRecordMessage(stream.name, item as Record) + totalEmitted++ } - const incompleteSegments = segments.filter((s) => s.status !== 'complete') - if (incompleteSegments.length > 0) { - const totalEmitted = { count: 0 } - const generators = incompleteSegments.map((segment) => - paginateSegment({ - listFn: resourceConfig.listFn!, - segment, - segments, - streamName: stream.name, - supportsLimit: resourceConfig.supportsLimit !== false, - backfillLimit, - totalEmitted, - rateLimiter, - }) - ) + hasMore = response.has_more + if (response.pageCursor) { + pageCursor = response.pageCursor + } else if (response.data.length > 0) { + pageCursor = (response.data[response.data.length - 1] as { id: string }).id + } - yield* mergeAsync(generators, backfillConcurrency) + // Stop early if backfill limit reached + if (backfillLimit && totalEmitted >= backfillLimit) { + hasMore = false } - } else { - // Sequential path: no created filter support - const pageCursor: string | null = streamState?.pageCursor ?? null - yield* sequentialBackfillStream({ - resourceConfig, - streamName: stream.name, - pageCursor, - backfillLimit, - rateLimiter, - drainQueue, - }) + + // Emit state checkpoint after each page + yield { + type: 'state', + stream: stream.name, + data: { + pageCursor: hasMore ? pageCursor : null, + status: hasMore ? 'pending' : 'complete', + }, + } satisfies StateMessage } yield { @@ -376,6 +143,7 @@ export async function* listApiBackfill(opts: { console.error({ msg: 'Stripe list page failed', stream: stream.name, + pageCursor, error: err instanceof Error ? err.message : String(err), }) const isRateLimit = err instanceof Error && err.message.includes('Rate limit') diff --git a/packages/source-stripe/src/src-webhook.ts b/packages/source-stripe/src/src-webhook.ts index 407ab13b..1d97f190 100644 --- a/packages/source-stripe/src/src-webhook.ts +++ b/packages/source-stripe/src/src-webhook.ts @@ -1,6 +1,7 @@ import type { ConfiguredCatalog, Message } from '@stripe/sync-protocol' import http from 'node:http' -import type Stripe from 'stripe' +import type { StripeEvent } from './stripe-types.js' +import { verifyWebhookSignature } from './stripe-api.js' import type { Config, WebhookInput } from './index.js' import type { ResourceConfig } from './types.js' import { processStripeEvent } from './process-event.js' @@ -9,13 +10,12 @@ import { processStripeEvent } from './process-event.js' /** * Verify a raw webhook body+signature and delegate to processStripeEvent. - * Use this at the HTTP transport boundary. For already-verified Stripe.Event + * Use this at the HTTP transport boundary. For already-verified StripeEvent * objects (WebSocket, events API), call processStripeEvent directly. */ export async function* processWebhookInput( input: WebhookInput, config: Config, - stripe: Stripe, catalog: ConfiguredCatalog, registry: Record, streamNames: Set @@ -24,15 +24,15 @@ export async function* processWebhookInput( throw new Error('webhook_secret is required for raw webhook signature verification') } const signature = (input.headers['stripe-signature'] as string) ?? '' - const event = await stripe.webhooks.constructEvent(input.body, signature, config.webhook_secret) - yield* processStripeEvent(event, config, stripe, catalog, registry, streamNames) + const event = verifyWebhookSignature(input.body, signature, config.webhook_secret) + yield* processStripeEvent(event, config, catalog, registry, streamNames) } // MARK: - LiveInput queue /** An item in the live input queue. HTTP webhooks include resolve/reject for backpressure. */ export type LiveInput = { - data: WebhookInput | Stripe.Event + data: WebhookInput | StripeEvent resolve?: () => void reject?: (err: Error) => void } @@ -60,21 +60,13 @@ export function createInputQueue() { async function* drain( config: Config, - stripe: Stripe, catalog: ConfiguredCatalog, registry: Record, streamNames: Set ): AsyncGenerator { while (queue.length > 0) { const queued = queue.shift()! - yield* processStripeEvent( - queued.data as Stripe.Event, - config, - stripe, - catalog, - registry, - streamNames - ) + yield* processStripeEvent(queued.data as StripeEvent, config, catalog, registry, streamNames) } } diff --git a/packages/source-stripe/src/stripe-api.ts b/packages/source-stripe/src/stripe-api.ts new file mode 100644 index 00000000..5ec78227 --- /dev/null +++ b/packages/source-stripe/src/stripe-api.ts @@ -0,0 +1,170 @@ +import { createHmac, timingSafeEqual } from 'node:crypto' +import type { + StripeAccount, + StripeEvent, + StripeList, + StripeWebhookEndpoint, +} from './stripe-types.js' +import { fetchWithProxy } from './transport.js' + +export interface StripeClientConfig { + apiKey: string + baseUrl?: string +} + +const DEFAULT_BASE = 'https://api.stripe.com' + +function authHeaders(apiKey: string): Record { + return { Authorization: `Bearer ${apiKey}` } +} + +async function stripeGet( + config: StripeClientConfig, + path: string, + qs?: URLSearchParams +): Promise { + const base = config.baseUrl ?? DEFAULT_BASE + const url = qs?.toString() ? `${base}${path}?${qs}` : `${base}${path}` + const res = await fetchWithProxy(url, { headers: authHeaders(config.apiKey) }) + if (!res.ok) { + const body = await res.text() + throw new Error(`Stripe API error ${res.status}: ${body}`) + } + return (await res.json()) as T +} + +async function stripePost( + config: StripeClientConfig, + path: string, + body?: URLSearchParams +): Promise { + const base = config.baseUrl ?? DEFAULT_BASE + const res = await fetchWithProxy(`${base}${path}`, { + method: 'POST', + headers: { + ...authHeaders(config.apiKey), + 'Content-Type': 'application/x-www-form-urlencoded', + }, + body: body?.toString(), + }) + if (!res.ok) { + const text = await res.text() + throw new Error(`Stripe API error ${res.status}: ${text}`) + } + return (await res.json()) as T +} + +async function stripeDelete(config: StripeClientConfig, path: string): Promise { + const base = config.baseUrl ?? DEFAULT_BASE + const res = await fetchWithProxy(`${base}${path}`, { + method: 'DELETE', + headers: authHeaders(config.apiKey), + }) + if (!res.ok) { + const text = await res.text() + throw new Error(`Stripe API error ${res.status}: ${text}`) + } +} + +export async function retrieveAccount(config: StripeClientConfig): Promise { + return stripeGet(config, '/v1/account') +} + +export async function listWebhookEndpoints( + config: StripeClientConfig, + params?: { limit?: number } +): Promise> { + const qs = new URLSearchParams() + if (params?.limit) qs.set('limit', String(params.limit)) + return stripeGet(config, '/v1/webhook_endpoints', qs) +} + +export async function createWebhookEndpoint( + config: StripeClientConfig, + params: { url: string; enabled_events: string[]; metadata?: Record } +): Promise { + const body = new URLSearchParams() + body.set('url', params.url) + for (const event of params.enabled_events) { + body.append('enabled_events[]', event) + } + if (params.metadata) { + for (const [key, value] of Object.entries(params.metadata)) { + body.set(`metadata[${key}]`, value) + } + } + return stripePost(config, '/v1/webhook_endpoints', body) +} + +export async function deleteWebhookEndpoint(config: StripeClientConfig, id: string): Promise { + return stripeDelete(config, `/v1/webhook_endpoints/${id}`) +} + +export async function* listAllEvents( + config: StripeClientConfig, + params: { created?: { gt?: number } } +): AsyncGenerator { + let startingAfter: string | undefined + let hasMore = true + + while (hasMore) { + const qs = new URLSearchParams() + qs.set('limit', '100') + if (startingAfter) qs.set('starting_after', startingAfter) + if (params.created?.gt != null) qs.set('created[gt]', String(params.created.gt)) + + const result = await stripeGet>(config, '/v1/events', qs) + + for (const event of result.data) { + yield event + } + + hasMore = result.has_more + if (result.data.length > 0) { + startingAfter = result.data[result.data.length - 1].id + } + } +} + +const DEFAULT_TOLERANCE = 300 + +export function verifyWebhookSignature( + payload: string | Buffer, + signature: string, + secret: string, + tolerance = DEFAULT_TOLERANCE +): StripeEvent { + const pairs: Record = {} + for (const element of signature.split(',')) { + const idx = element.indexOf('=') + if (idx > 0) { + pairs[element.slice(0, idx)] = element.slice(idx + 1) + } + } + + const timestamp = parseInt(pairs.t, 10) + const sig = pairs.v1 + + if (!timestamp || !sig) { + throw new Error('Unable to extract timestamp and signatures from header') + } + + const now = Math.floor(Date.now() / 1000) + if (tolerance > 0 && Math.abs(now - timestamp) > tolerance) { + throw new Error('Webhook timestamp outside the tolerance zone') + } + + const payloadStr = typeof payload === 'string' ? payload : payload.toString('utf8') + const expectedSig = createHmac('sha256', secret) + .update(`${timestamp}.${payloadStr}`) + .digest('hex') + + const a = Buffer.from(sig, 'utf8') + const b = Buffer.from(expectedSig, 'utf8') + + if (a.length !== b.length || !timingSafeEqual(a, b)) { + throw new Error('No signatures found matching the expected signature for payload') + } + + return JSON.parse(payloadStr) as StripeEvent +} diff --git a/packages/source-stripe/src/stripe-types.ts b/packages/source-stripe/src/stripe-types.ts new file mode 100644 index 00000000..408a2237 --- /dev/null +++ b/packages/source-stripe/src/stripe-types.ts @@ -0,0 +1,38 @@ +/** Wire shapes for Stripe JSON (no SDK). */ + +export interface StripeEvent { + id: string + object: 'event' + type: string + created: number + api_version: string | null + livemode: boolean + pending_webhooks: number + request: { id: string | null; idempotency_key: string | null } | null + data: { + object: Record & { id?: string; object?: string; deleted?: boolean } + } +} + +export interface StripeList { + object: 'list' + data: T[] + has_more: boolean + url?: string +} + +export interface StripeAccount { + id: string + object: 'account' + [key: string]: unknown +} + +export interface StripeWebhookEndpoint { + id: string + object: 'webhook_endpoint' + url: string + status: string + enabled_events: string[] + metadata: Record | null + [key: string]: unknown +} diff --git a/packages/source-stripe/src/transforms/expandLists.ts b/packages/source-stripe/src/transforms/expandLists.ts index 1fe5a166..b350a4fa 100644 --- a/packages/source-stripe/src/transforms/expandLists.ts +++ b/packages/source-stripe/src/transforms/expandLists.ts @@ -1,9 +1,9 @@ -import Stripe from 'stripe' +import type { StripeList } from '../stripe-types.js' import { expandEntity } from '../utils/expandEntity.js' export async function expandLists(opts: { items: Record[] // eslint-disable-line @typescript-eslint/no-explicit-any - listExpands: Record Promise>>[] + listExpands: Record Promise>>[] }): Promise { for (const expandEntry of opts.listExpands) { for (const [property, expandFn] of Object.entries(expandEntry)) { diff --git a/packages/source-stripe/src/transforms/subscriptionItems.ts b/packages/source-stripe/src/transforms/subscriptionItems.ts index da0dd98e..42bf413c 100644 --- a/packages/source-stripe/src/transforms/subscriptionItems.ts +++ b/packages/source-stripe/src/transforms/subscriptionItems.ts @@ -1,11 +1,27 @@ -import Stripe from 'stripe' +/** + * Subscription JSON shapes (SDK-free). Used by legacy sync helpers only. + */ +interface SubscriptionItemRow { + id: string + object: 'subscription_item' + price: { id: string; [key: string]: unknown } | string + deleted?: boolean + quantity?: number | null + [key: string]: unknown +} + +interface SubscriptionWithItems { + id: string + items: { data: SubscriptionItemRow[] } + [key: string]: unknown +} export async function syncSubscriptionItems(opts: { - subscriptions: Stripe.Subscription[] + subscriptions: SubscriptionWithItems[] accountId: string syncTimestamp?: string upsertItems: ( - items: Stripe.SubscriptionItem[], + items: SubscriptionItemRow[], accountId: string, syncTimestamp?: string ) => Promise @@ -19,18 +35,16 @@ export async function syncSubscriptionItems(opts: { const allSubscriptionItems = subscriptionsWithItems.flatMap((s) => s.items.data) await opts.upsertItems(allSubscriptionItems, opts.accountId, opts.syncTimestamp) - // Mark existing subscription items in db as deleted - // if they don't exist in the current subscriptionItems list await Promise.all( subscriptionsWithItems.map((subscription) => { - const subItemIds = subscription.items.data.map((x: Stripe.SubscriptionItem) => x.id) + const subItemIds = subscription.items.data.map((x) => x.id) return opts.markDeleted(subscription.id, subItemIds) }) ) } export async function upsertSubscriptionItems( - subscriptionItems: Stripe.SubscriptionItem[], + subscriptionItems: SubscriptionItemRow[], accountId: string, upsertMany: ( // eslint-disable-next-line @typescript-eslint/no-explicit-any @@ -43,7 +57,10 @@ export async function upsertSubscriptionItems( ): Promise { const modifiedSubscriptionItems = subscriptionItems.map((subscriptionItem) => ({ ...subscriptionItem, - price: subscriptionItem.price.id.toString(), + price: + typeof subscriptionItem.price === 'string' + ? subscriptionItem.price + : subscriptionItem.price.id.toString(), deleted: subscriptionItem.deleted ?? false, quantity: subscriptionItem.quantity ?? null, })) diff --git a/packages/source-stripe/src/types.ts b/packages/source-stripe/src/types.ts index 91b1a781..cb0f7023 100644 --- a/packages/source-stripe/src/types.ts +++ b/packages/source-stripe/src/types.ts @@ -1,4 +1,3 @@ -import type Stripe from 'stripe' import type { ListFn, RetrieveFn } from '@stripe/sync-openapi' import type { RevalidateEntityName } from './resourceRegistry.js' @@ -49,7 +48,7 @@ export type ResourceConfig = BaseResourceConfig & { export type RevalidateEntity = RevalidateEntityName -export const SUPPORTED_WEBHOOK_EVENTS: Stripe.WebhookEndpointCreateParams.EnabledEvent[] = [ +export const SUPPORTED_WEBHOOK_EVENTS: string[] = [ 'charge.captured', 'charge.expired', 'charge.failed', diff --git a/packages/source-stripe/src/utils/expandEntity.ts b/packages/source-stripe/src/utils/expandEntity.ts index 199d1be7..bb364fd0 100644 --- a/packages/source-stripe/src/utils/expandEntity.ts +++ b/packages/source-stripe/src/utils/expandEntity.ts @@ -1,17 +1,14 @@ -import Stripe from 'stripe' +import type { StripeList } from '../stripe-types.js' -/** - * Stripe only sends the first 10 entries by default, the option will actively fetch all entries. - * Uses manual pagination - each fetch() gets automatic retry protection. - */ +/** Expand truncated list fields by paginating until `has_more` is false. */ export async function expandEntity< K extends { id?: string }, P extends keyof T, - T extends { id?: string } & { [key in P]?: Stripe.ApiList | null }, + T extends { id?: string } & { [key in P]?: StripeList | null }, >( entities: T[], property: P, - listFn: (id: string, params?: { starting_after?: string }) => Promise> + listFn: (id: string, params?: { starting_after?: string }) => Promise> ) { for (const entity of entities) { const existingList = entity[property] diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 8a20bc17..c29baf30 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -103,9 +103,6 @@ importers: pino-pretty: specifier: ^13 version: 13.1.3 - stripe: - specifier: ^17.7.0 - version: 17.7.0 ws: specifier: ^8.18.0 version: 8.18.3 @@ -212,9 +209,6 @@ importers: esbuild: specifier: ^0.27.2 version: 0.27.3 - stripe: - specifier: ^18.1.0 - version: 18.5.0(@types/node@24.10.1) vitest: specifier: ^3.2.4 version: 3.2.4(@types/node@24.10.1)(jiti@2.6.1)(lightningcss@1.32.0)(terser@5.46.1)(tsx@4.21.0)(yaml@2.8.1) @@ -309,9 +303,6 @@ importers: pg: specifier: ^8.16.0 version: 8.16.3 - stripe: - specifier: ^17.7.0 - version: 17.7.0 vitest: specifier: ^3.2.1 version: 3.2.4(@types/node@25.5.0)(jiti@2.6.1)(lightningcss@1.32.0)(terser@5.46.1)(tsx@4.21.0)(yaml@2.8.1) @@ -400,9 +391,6 @@ importers: https-proxy-agent: specifier: ^7.0.6 version: 7.0.6 - stripe: - specifier: ^17.7.0 - version: 17.7.0 undici: specifier: ^7.16.0 version: 7.24.6 @@ -3577,19 +3565,6 @@ packages: strip-literal@3.0.0: resolution: {integrity: sha512-TcccoMhJOM3OebGhSBEmp3UZ2SfDMZUEBdRA/9ynfLi8yYajyWX3JiXArcJt4Umh4vISpspkQIY8ZZoCqjbviA==} - stripe@17.7.0: - resolution: {integrity: sha512-aT2BU9KkizY9SATf14WhhYVv2uOapBWX0OFWF4xvcj1mPaNotlSc2CsxpS4DS46ZueSppmCF5BX1sNYBtwBvfw==} - engines: {node: '>=12.*'} - - stripe@18.5.0: - resolution: {integrity: sha512-Hp+wFiEQtCB0LlNgcFh5uVyKznpDjzyUZ+CNVEf+I3fhlYvh7rZruIg+jOwzJRCpy0ZTPMjlzm7J2/M2N6d+DA==} - engines: {node: '>=12.*'} - peerDependencies: - '@types/node': '>=12.x.x' - peerDependenciesMeta: - '@types/node': - optional: true - strnum@2.2.1: resolution: {integrity: sha512-BwRvNd5/QoAtyW1na1y1LsJGQNvRlkde6Q/ipqqEaivoMdV+B1OMOTVdwR+N/cwVUcIt9PYyHmV8HyexCZSupg==} @@ -7310,17 +7285,6 @@ snapshots: dependencies: js-tokens: 9.0.1 - stripe@17.7.0: - dependencies: - '@types/node': 24.10.1 - qs: 6.14.0 - - stripe@18.5.0(@types/node@24.10.1): - dependencies: - qs: 6.14.0 - optionalDependencies: - '@types/node': 24.10.1 - strnum@2.2.1: {} style-mod@4.1.3: {}