Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion apps/engine/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
"pg": "^8.16.3",
"pino": "^10",
"pino-pretty": "^13",
"stripe": "^17.7.0",
"ws": "^8.18.0",
"zod": "^4.3.6",
"zod-openapi": "^5"
Expand Down
1 change: 0 additions & 1 deletion apps/supabase/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
"devDependencies": {
"@types/node": "^24.10.1",
"esbuild": "^0.27.2",
"stripe": "^18.1.0",
"vitest": "^3.2.4"
}
}
10 changes: 4 additions & 6 deletions apps/supabase/src/__tests__/supabase.e2e.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
* STRIPE_API_KEY
*/
import { describe, it, expect, beforeAll, afterAll } from 'vitest'
import Stripe from 'stripe'
import { SupabaseSetupClient } from '../supabase.js'
import { describeWithEnv } from '../../../../e2e/test-helpers.js'
import { stripeDelete, stripePost } from '../../../../e2e/stripe-helpers.js'

function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms))
Expand All @@ -22,14 +22,12 @@ describeWithEnv(
['SUPABASE_PROJECT_ID', 'SUPABASE_PERSONAL_ACCESS_TOKEN', 'STRIPE_API_KEY'],
({ SUPABASE_PROJECT_ID, SUPABASE_PERSONAL_ACCESS_TOKEN, STRIPE_API_KEY }) => {
let client: SupabaseSetupClient
let stripe: Stripe

beforeAll(async () => {
client = new SupabaseSetupClient({
accessToken: SUPABASE_PERSONAL_ACCESS_TOKEN,
projectRef: SUPABASE_PROJECT_ID,
})
stripe = new Stripe(STRIPE_API_KEY)

// Ensure clean slate
try {
Expand Down Expand Up @@ -60,7 +58,7 @@ describeWithEnv(
// Clean up test customer
if (customerId) {
try {
await stripe.customers.del(customerId)
await stripeDelete(STRIPE_API_KEY, `/v1/customers/${customerId}`)
} catch {}
}
})
Expand Down Expand Up @@ -89,7 +87,7 @@ describeWithEnv(

it('should receive customer.created webhook', async () => {
const testName = `Supabase E2E ${Date.now()}`
const customer = await stripe.customers.create({
const customer = await stripePost<{ id: string }>(STRIPE_API_KEY, '/v1/customers', {
name: testName,
email: 'supabase-e2e@test.local',
})
Expand All @@ -116,7 +114,7 @@ describeWithEnv(
expect(customerId).toBeDefined()

const updatedName = `Updated Supabase E2E ${Date.now()}`
await stripe.customers.update(customerId!, { name: updatedName })
await stripePost(STRIPE_API_KEY, `/v1/customers/${customerId!}`, { name: updatedName })

// Poll until the update arrives (up to 60s)
let found = false
Expand Down
2 changes: 1 addition & 1 deletion docs/slides/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ interface Source<TConfig, TState, TInput = never> {
discover(): Promise<ConfiguredCatalog>
setup(): Promise<void>
read(params: SyncParams, $stdin?: AsyncIterable<TInput>): AsyncIterable<Message>
teardown(): Promise<void>
teardown(opts?: { remove_shared_resources?: boolean }): Promise<void>
}

interface Destination<TConfig, TState> {
Expand Down
1 change: 0 additions & 1 deletion e2e/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
"@temporalio/worker": "^1",
"googleapis": "^144",
"pg": "^8.16.0",
"stripe": "^17.7.0",
"vitest": "^3.2.1"
}
}
52 changes: 52 additions & 0 deletions e2e/stripe-helpers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
export const DEFAULT_BASE = 'https://api.stripe.com'

export async function stripeGet<T>(apiKey: string, path: string, baseUrl?: string): Promise<T> {
const base = baseUrl ?? DEFAULT_BASE
const url = path.startsWith('http') ? path : `${base}${path}`
const res = await fetch(url, { headers: { Authorization: `Bearer ${apiKey}` } })
if (!res.ok) {
const body = await res.text()
throw new Error(`Stripe API ${res.status}: ${body}`)
}
return (await res.json()) as T
}

export async function stripePost<T = unknown>(
apiKey: string,
path: string,
params?: Record<string, string>,
baseUrl?: string
): Promise<T> {
const base = baseUrl ?? DEFAULT_BASE
const body = params ? new URLSearchParams(params).toString() : undefined
const res = await fetch(`${base}${path}`, {
method: 'POST',
headers: {
Authorization: `Bearer ${apiKey}`,
'Content-Type': 'application/x-www-form-urlencoded',
},
body,
})
if (!res.ok) {
const text = await res.text()
throw new Error(`Stripe API ${res.status}: ${text}`)
}
return (await res.json()) as T
}

export async function stripeDelete<T = unknown>(
apiKey: string,
path: string,
baseUrl?: string
): Promise<T> {
const base = baseUrl ?? DEFAULT_BASE
const res = await fetch(`${base}${path}`, {
method: 'DELETE',
headers: { Authorization: `Bearer ${apiKey}` },
})
if (!res.ok) {
const text = await res.text()
throw new Error(`Stripe API ${res.status}: ${text}`)
}
return (await res.json()) as T
}
13 changes: 7 additions & 6 deletions e2e/stripe-to-postgres.test.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import pg from 'pg'
import Stripe from 'stripe'
import { afterAll, beforeAll, expect, it } from 'vitest'
import source from '@stripe/sync-source-stripe'
import destination from '@stripe/sync-destination-postgres'
import { createEngine, noopStateStore } from '@stripe/sync-engine'
import type { StateMessage, DestinationOutput } from '@stripe/sync-protocol'
import { describeWithEnv } from './test-helpers.js'
import { stripeGet, stripePost } from './stripe-helpers.js'

// ---------------------------------------------------------------------------
// Config
Expand Down Expand Up @@ -39,7 +39,6 @@ async function collectStates(iter: AsyncIterable<DestinationOutput>): Promise<St

describeWithEnv('stripe → postgres e2e', ['STRIPE_API_KEY'], ({ STRIPE_API_KEY }) => {
let pool: pg.Pool
let stripe: Stripe

function makeEngine(opts: { websocket?: boolean } = {}) {
return createEngine(
Expand All @@ -61,8 +60,7 @@ describeWithEnv('stripe → postgres e2e', ['STRIPE_API_KEY'], ({ STRIPE_API_KEY
beforeAll(async () => {
pool = new pg.Pool({ connectionString: POSTGRES_URL })
await pool.query('SELECT 1')
stripe = new Stripe(STRIPE_API_KEY)
const account = await stripe.accounts.retrieve()
const account = await stripeGet<{ id: string }>(STRIPE_API_KEY, '/v1/account')
const isTest = STRIPE_API_KEY.startsWith('sk_test_')
const dashPrefix = isTest ? 'dashboard.stripe.com/test' : 'dashboard.stripe.com'
console.log(`\n Stripe: ${account.id} → https://${dashPrefix}/developers`)
Expand Down Expand Up @@ -112,11 +110,14 @@ describeWithEnv('stripe → postgres e2e', ['STRIPE_API_KEY'], ({ STRIPE_API_KEY
console.log(' Backfill complete, sending product update…')

// Phase 2: update a product via Stripe API
const products = await stripe.products.list({ limit: 1 })
const products = await stripeGet<{ data: { id: string }[] }>(
STRIPE_API_KEY,
'/v1/products?limit=1'
)
expect(products.data.length).toBeGreaterThan(0)
const product = products.data[0]
const newName = `e2e-test-${Date.now()}`
await stripe.products.update(product.id, { name: newName })
await stripePost(STRIPE_API_KEY, `/v1/products/${product.id}`, { name: newName })

// Phase 3: consume until we see a live event state for product
const deadline = Date.now() + 30_000
Expand Down
16 changes: 10 additions & 6 deletions e2e/temporal.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import { NativeConnection, Worker } from '@temporalio/worker'
import { serve } from '@hono/node-server'
import type { ServerType } from '@hono/node-server'
import pg from 'pg'
import Stripe from 'stripe'
import { google } from 'googleapis'
import { stripeGet, stripePost } from './stripe-helpers.js'
import fs from 'node:fs'
import path from 'node:path'
import os from 'node:os'
Expand Down Expand Up @@ -162,11 +162,9 @@ function createTestInfra() {
describeWithEnv('temporal e2e: stripe → postgres', ['STRIPE_API_KEY'], ({ STRIPE_API_KEY }) => {
const infra = createTestInfra()
const schema = schemaName()
let stripe: Stripe

beforeAll(async () => {
await infra.setup()
stripe = new Stripe(STRIPE_API_KEY)
console.log(` Schema: ${schema}`)
}, 60_000)

Expand Down Expand Up @@ -235,14 +233,20 @@ describeWithEnv('temporal e2e: stripe → postgres', ['STRIPE_API_KEY'], ({ STRI
console.log(` Sample: ${sampleRows[0].id} → ${sampleRows[0].name}`)

// --- Live event via stripe_event signal ---
const products = await stripe.products.list({ limit: 1 })
const products = await stripeGet<{ data: { id: string }[] }>(
STRIPE_API_KEY,
'/v1/products?limit=1'
)
const product = products.data[0]
const newName = `temporal-e2e-${Date.now()}`
await stripe.products.update(product.id, { name: newName })
await stripePost(STRIPE_API_KEY, `/v1/products/${product.id}`, { name: newName })
console.log(` Updated product ${product.id} → "${newName}"`)

await new Promise((r) => setTimeout(r, 2000))
const events = await stripe.events.list({ limit: 5, type: 'product.updated' })
const events = await stripeGet<{ data: { id: string; type: string }[] }>(
STRIPE_API_KEY,
'/v1/events?limit=5&type=product.updated'
)
const event = events.data[0]
console.log(` Fetched event ${event.id} (${event.type})`)
await handle.signal('stripe_event', event)
Expand Down
2 changes: 1 addition & 1 deletion packages/protocol/src/protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ export interface Source<
setup?(params: { config: TConfig; catalog: ConfiguredCatalog }): Promise<void>

/** Clean up external resources. Called when a sync is deleted. */
teardown?(params: { config: TConfig }): Promise<void>
teardown?(params: { config: TConfig; remove_shared_resources?: boolean }): Promise<void>
}

// MARK: - Destination
Expand Down
1 change: 0 additions & 1 deletion packages/source-stripe/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
"@stripe/sync-openapi": "workspace:*",
"@stripe/sync-protocol": "workspace:*",
"https-proxy-agent": "^7.0.6",
"stripe": "^17.7.0",
"undici": "^7.16.0",
"ws": "^8.18.0",
"zod": "^4.3.6"
Expand Down
75 changes: 37 additions & 38 deletions packages/source-stripe/src/client.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { Agent } from 'node:http'
import { describe, expect, it } from 'vitest'
import { buildStripeClientOptions, type StripeClientConfigInput } from './client.js'
import { buildTransportOptions, makeClientConfig, type StripeClientConfigInput } from './client.js'
import { getProxyUrl } from './transport.js'

const config: StripeClientConfigInput = {
Expand All @@ -22,61 +21,61 @@ describe('getProxyUrl', () => {
})
})

describe('buildStripeClientOptions', () => {
it('adds a proxy agent and default timeout when HTTPS_PROXY is set', () => {
const options = buildStripeClientOptions(config, {
HTTPS_PROXY: 'http://proxy.example.test:8080',
})
describe('buildTransportOptions', () => {
it('returns default timeout and api.stripe.com base when no overrides', () => {
const options = buildTransportOptions(config, {})

expect(options.timeout).toBe(10_000)
expect(options.httpAgent).toBeInstanceOf(Agent)
expect(options.timeout_ms).toBe(10_000)
expect(options.base_url).toBe('https://api.stripe.com')
expect(options.host).toBe('api.stripe.com')
expect(options.port).toBe(443)
expect(options.protocol).toBe('https')
})

it('uses the configured timeout override', () => {
const options = buildStripeClientOptions(config, {
HTTPS_PROXY: 'http://proxy.example.test:8080',
const options = buildTransportOptions(config, {
STRIPE_REQUEST_TIMEOUT_MS: '2500',
})

expect(options.timeout).toBe(2500)
expect(options.timeout_ms).toBe(2500)
})

it('bypasses the proxy for localhost base_url overrides', () => {
const options = buildStripeClientOptions(
{
...config,
base_url: 'http://localhost:12111',
},
{
HTTPS_PROXY: 'http://proxy.example.test:8080',
}
)
it('decomposes a localhost base_url', () => {
const options = buildTransportOptions({ ...config, base_url: 'http://localhost:12111' }, {})

expect(options.host).toBe('localhost')
expect(options.port).toBe(12111)
expect(options.protocol).toBe('http')
expect(options.httpAgent).toBeUndefined()
})

it('keeps the proxy for external base_url overrides', () => {
const options = buildStripeClientOptions(
{
...config,
base_url: 'https://api.stripe.com',
},
{
HTTPS_PROXY: 'http://proxy.example.test:8080',
}
)

expect(options.httpAgent).toBeInstanceOf(Agent)
})

it('throws on an invalid timeout override', () => {
expect(() =>
buildStripeClientOptions(config, {
buildTransportOptions(config, {
STRIPE_REQUEST_TIMEOUT_MS: '0',
})
).toThrow('STRIPE_REQUEST_TIMEOUT_MS must be a positive integer')
})
})

describe('makeClientConfig', () => {
it('maps snake_case input to camelCase StripeClientConfig', () => {
const result = makeClientConfig({
api_key: 'sk_test_123',
base_url: 'http://localhost:12111',
})

expect(result).toEqual({
apiKey: 'sk_test_123',
baseUrl: 'http://localhost:12111',
})
})

it('omits baseUrl when base_url is not provided', () => {
const result = makeClientConfig({ api_key: 'sk_test_123' })

expect(result).toEqual({
apiKey: 'sk_test_123',
baseUrl: undefined,
})
})
})
Loading