From 042cf3d36d94f50d18069fe224b3592dc94f64f1 Mon Sep 17 00:00:00 2001 From: dillonstreator Date: Mon, 4 May 2026 15:23:13 -0500 Subject: [PATCH 1/2] fix #78 standard schema event definition --- README.md | 129 ++++++++++++++++++++++++++------- examples/pg/events.ts | 25 +++++++ examples/pg/package.json | 3 +- examples/pg/processor.ts | 83 ++++++++++++--------- examples/pg/server.ts | 18 ++--- examples/pg/tsconfig.json | 2 +- examples/pg/yarn.lock | 5 ++ package.json | 1 + src/mongodb/client.ts | 46 ++++++++---- src/pg/client.ts | 43 +++++++---- src/processor.test.ts | 58 ++++++++++++++- src/processor.ts | 149 +++++++++++++++++++++++++++++--------- 12 files changed, 422 insertions(+), 140 deletions(-) create mode 100644 examples/pg/events.ts diff --git a/README.md b/README.md index d016124..ded7f2b 100644 --- a/README.md +++ b/README.md @@ -301,11 +301,11 @@ For a detailed architecture diagram, see [architecture.mmd](./architecture.mmd). Every event in txob follows this structure: ```typescript -interface TxOBEvent { +interface TxOBEvent> { id: string; // Unique event identifier (UUID recommended) timestamp: Date; // When the event was created type: EventType; // Event type (e.g., "UserCreated", "OrderPlaced") - data: Record; // Event payload - your custom data + data: EventData; // Event payload - can be strongly typed per event type correlation_id: string; // For tracing requests across services handler_results: Record; // Results from each handler errors: number; // Number of processing attempts @@ -326,8 +326,11 @@ interface TxOBEvent { Handlers are async functions that execute your side-effects: ```typescript -type TxOBEventHandler = ( - event: TxOBEvent, +type TxOBEventHandler< + EventType extends string = string, + EventData = Record, +> = ( + event: TxOBEvent, opts: { signal?: AbortSignal }, ) => Promise; ``` @@ -349,6 +352,54 @@ const incrementCounter: TxOBEventHandler = async (event) => { }; ``` +#### Typed payloads with Standard Schema (Zod/ArkType/Valibot/etc.) + +txob supports schema-driven event payload typing through the Standard Schema interface. + +```typescript +import { z } from "zod"; +import { + defineTxOBEventHandlerMap, + defineTxOBEventSchemas, + EventProcessor, + type TxOBEventDataMapFromSchemas, +} from "txob"; + +const eventSchemas = defineTxOBEventSchemas({ + UserCreated: z.object({ + userId: z.string().uuid(), + email: z.string().email(), + }), + OrderPlaced: z.object({ + orderId: z.string().uuid(), + amount: z.number().positive(), + }), +}); + +type EventType = keyof typeof eventSchemas; +type EventDataMap = TxOBEventDataMapFromSchemas; + +const handlers = defineTxOBEventHandlerMap(eventSchemas, { + UserCreated: { + sendWelcomeEmail: async (event) => { + await emailService.send(event.data.email); // typed as string + }, + }, + OrderPlaced: { + sendReceipt: async (event) => { + await receipts.send(event.data.orderId, event.data.amount); // strongly typed + }, + }, +}); + +const processor = new EventProcessor({ + client: createProcessorClient({ querier: client }), + handlerMap: handlers, +}); +``` + +`defineTxOBEventSchemas(...)` is optional. It is a convenience helper for cleaner type inference. You can also define schema maps without it using explicit types. + ### Handler Results Each handler's execution is tracked independently: @@ -1233,11 +1284,11 @@ If the database is not configured for Change Streams, an error will be emitted v ```typescript // Main event type -type TxOBEvent = { +type TxOBEvent> = { id: string; timestamp: Date; type: EventType; - data: Record; + data: EventData; correlation_id: string; handler_results: Record; errors: number; @@ -1246,16 +1297,30 @@ type TxOBEvent = { }; // Handler function signature -type TxOBEventHandler = ( - event: TxOBEvent, +type TxOBEventHandler< + EventType extends string = string, + EventData = Record, +> = ( + event: TxOBEvent, opts: { signal?: AbortSignal }, ) => Promise; // Handler map structure -type TxOBEventHandlerMap = Record< +type TxOBEventDataMap = Record< EventType, - Record + Record >; +type TxOBEventHandlerMap< + EventType extends string, + EventDataMap extends TxOBEventDataMap, +> = { + [TType in EventType]: Record>; +}; + +// Standard Schema helpers +type TxOBEventDataMapFromSchemas = { + [TType in keyof TSchemas & string]: /* schema output for TType */; +}; // Handler result tracking type TxOBEventHandlerResult = { @@ -1767,29 +1832,41 @@ onEventMaxErrorsReached: async ({ event, txClient }) => { ### Can I use this with TypeScript? -Yes! txob is written in TypeScript and provides full type safety: +Yes! txob is written in TypeScript and provides full type safety, including typed event payloads: ```typescript -// Define your event types -const eventTypes = { - UserCreated: "UserCreated", - OrderPlaced: "OrderPlaced", -} as const; +import { z } from "zod"; +import { + defineTxOBEventHandlerMap, + defineTxOBEventSchemas, + type TxOBEventDataMapFromSchemas, +} from "txob"; + +const eventSchemas = defineTxOBEventSchemas({ + UserCreated: z.object({ userId: z.string().uuid(), email: z.string().email() }), + OrderPlaced: z.object({ orderId: z.string().uuid(), amount: z.number() }), +}); -type EventType = keyof typeof eventTypes; +type EventType = keyof typeof eventSchemas; +type EventDataMap = TxOBEventDataMapFromSchemas; -// TypeScript will enforce all event types have handlers -const processor = new EventProcessor({ - client: createProcessorClient({ querier: client }), - handlerMap: { - UserCreated: { - /* handlers */ +const handlers = defineTxOBEventHandlerMap(eventSchemas, { + UserCreated: { + sendWelcomeEmail: async (event) => { + event.data.email; // string }, - OrderPlaced: { - /* handlers */ + }, + OrderPlaced: { + sendReceipt: async (event) => { + event.data.amount; // number }, - // Missing an event type? TypeScript error! }, + // Missing an event type? TypeScript error +}); + +const processor = new EventProcessor({ + client: createProcessorClient({ querier: client }), + handlerMap: handlers, }); ``` diff --git a/examples/pg/events.ts b/examples/pg/events.ts new file mode 100644 index 0000000..c70c424 --- /dev/null +++ b/examples/pg/events.ts @@ -0,0 +1,25 @@ +import { z } from "zod"; +import { + defineTxOBEventSchemas, + type TxOBEventDataMapFromSchemas, +} from "../../src/index.js"; + +export const eventTypes = { + ResourceSaved: "ResourceSaved", + EventMaxErrorsReached: "EventMaxErrorsReached", +} as const; + +export const eventSchemas = defineTxOBEventSchemas({ + [eventTypes.ResourceSaved]: z.object({ + type: z.literal("activity"), + id: z.string().uuid(), + }), + [eventTypes.EventMaxErrorsReached]: z.object({ + failedEventId: z.string().uuid(), + failedEventType: z.string(), + failedEventCorrelationId: z.string().uuid(), + }), +}); + +export type EventType = keyof typeof eventSchemas; +export type EventDataMap = TxOBEventDataMapFromSchemas; diff --git a/examples/pg/package.json b/examples/pg/package.json index 6711b2e..f3389be 100644 --- a/examples/pg/package.json +++ b/examples/pg/package.json @@ -11,7 +11,8 @@ }, "dependencies": { "http-graceful-shutdown": "^3.1.13", - "pg": "^8.11.3" + "pg": "^8.11.3", + "zod": "^4.1.12" }, "devDependencies": { "@types/node": "^20.10.5", diff --git a/examples/pg/processor.ts b/examples/pg/processor.ts index 9bb72b0..98dbfe4 100644 --- a/examples/pg/processor.ts +++ b/examples/pg/processor.ts @@ -1,27 +1,31 @@ import pg from "pg"; import { randomUUID } from "node:crypto"; import { - ErrorUnprocessableEventHandler, + defineTxOBEventHandlerMap, EventProcessor, + type TxOBEventDataMapFromSchemas, WakeupEmitter, } from "../../src/index.js"; import { createProcessorClient, createWakeupEmitter, } from "../../src/pg/client.js"; -import { migrate, type EventType, eventTypes } from "./server.js"; +import { eventSchemas, eventTypes, type EventType } from "./events.js"; +import { migrate } from "./server.js"; import dotenv from "dotenv"; import { sleep } from "../../src/sleep.js"; dotenv.config(); -let processor: EventProcessor | undefined = undefined; +type EventDataMap = TxOBEventDataMapFromSchemas; + +let processor: EventProcessor | undefined = undefined; let wakeupEmitter: WakeupEmitter | undefined = undefined; (async () => { const clientConfig: pg.ClientConfig = { - user: process.env.POSTGRES_USER, - password: process.env.POSTGRES_PASSWORD, - database: process.env.POSTGRES_DB, + user: process.env.POSTGRES_USER || 'outbox', + password: process.env.POSTGRES_PASSWORD || 'outbox', + database: process.env.POSTGRES_DB || 'outbox', port: parseInt(process.env.POSTGRES_PORT || "5434"), }; const client = new pg.Client(clientConfig); @@ -34,37 +38,44 @@ let wakeupEmitter: WakeupEmitter | undefined = undefined; querier: client, }); - processor = new EventProcessor({ - maxEventConcurrency: 50, - client: createProcessorClient({ querier: client }), - wakeupEmitter, - handlerMap: { - ResourceSaved: { - thing1: async (event) => { - console.log(`${event.id} thing1 ${event.correlation_id}`); - if (Math.random() > 0.99) throw new Error("some issue"); - - return; - }, - thing2: async (event) => { - console.log(`${event.id} thing2 ${event.correlation_id}`); - if (Math.random() > 0.96) throw new Error("some issue"); - - return; - }, - thing3: async (event) => { - await sleep(Math.random() * 1_000); - console.log(`${event.id} thing3 ${event.correlation_id}`); - if (Math.random() > 0.8) throw new Error("some issue"); - - return; - }, + const handlerMap = defineTxOBEventHandlerMap(eventSchemas, { + ResourceSaved: { + thing1: async (event) => { + console.log( + `${event.id} thing1 ${event.correlation_id} activity=${event.data.id}`, + ); + if (Math.random() > 0.99) throw new Error("some issue"); + }, + thing2: async (event) => { + console.log( + `${event.id} thing2 ${event.correlation_id} kind=${event.data.type}`, + ); + if (Math.random() > 0.96) throw new Error("some issue"); }, - EventMaxErrorsReached: { - // Optional: add handlers for EventMaxErrorsReached events if needed - // For example, you might want to send alerts or log to external systems + thing3: async (event) => { + await sleep(Math.random() * 1_000); + console.log(`${event.id} thing3 ${event.correlation_id}`); + if (Math.random() > 0.8) throw new Error("some issue"); }, }, + EventMaxErrorsReached: { + // Optional: add handlers for EventMaxErrorsReached events if needed + // For example, you might want to send alerts or log to external systems + notify: async (event) => { + console.log( + "Event max errors reached", + event.data.failedEventType, + event.data.failedEventId, + ); + }, + }, + }); + + processor = new EventProcessor({ + maxEventConcurrency: 50, + client: createProcessorClient({ querier: client }), + wakeupEmitter, + handlerMap, pollingIntervalMs: 5000, logger: console, onEventMaxErrorsReached: async ({ event, txClient }) => { @@ -78,11 +89,11 @@ let wakeupEmitter: WakeupEmitter | undefined = undefined; id: randomUUID(), timestamp: new Date(), type: eventTypes.EventMaxErrorsReached, - data: { + data: eventSchemas.EventMaxErrorsReached.parse({ failedEventId: event.id, failedEventType: event.type, failedEventCorrelationId: event.correlation_id, - }, + }), correlation_id: event.correlation_id, handler_results: {}, errors: 0, diff --git a/examples/pg/server.ts b/examples/pg/server.ts index ee63cda..826b30c 100644 --- a/examples/pg/server.ts +++ b/examples/pg/server.ts @@ -5,15 +5,9 @@ import { resolve } from "node:path"; import pg from "pg"; import dotenv from "dotenv"; import gracefulShutdown from "http-graceful-shutdown"; +import { eventSchemas, eventTypes } from "./events.js"; dotenv.config(); -export const eventTypes = { - ResourceSaved: "ResourceSaved", - EventMaxErrorsReached: "EventMaxErrorsReached", -} as const; - -export type EventType = keyof typeof eventTypes; - export async function migrate(client: pg.Client): Promise { await client.query(`CREATE TABLE IF NOT EXISTS events ( id UUID, @@ -52,9 +46,9 @@ export async function migrate(client: pg.Client): Promise { const main = async (): Promise => { const client = new pg.Client({ - user: process.env.POSTGRES_USER, - password: process.env.POSTGRES_PASSWORD, - database: process.env.POSTGRES_DB, + user: process.env.POSTGRES_USER || 'outbox', + password: process.env.POSTGRES_PASSWORD || 'outbox', + database: process.env.POSTGRES_DB || 'outbox', port: parseInt(process.env.POSTGRES_PORT || "5434"), }); await client.connect(); @@ -86,10 +80,10 @@ const main = async (): Promise => { [ randomUUID(), eventTypes.ResourceSaved, - { + eventSchemas.ResourceSaved.parse({ type: "activity", id: activityId, - }, + }), correlationId, {}, 0, diff --git a/examples/pg/tsconfig.json b/examples/pg/tsconfig.json index 0c7834f..6a51bf8 100644 --- a/examples/pg/tsconfig.json +++ b/examples/pg/tsconfig.json @@ -13,5 +13,5 @@ "skipLibCheck": true, "noEmit": true }, - "include": ["server.ts", "processor.ts"] + "include": ["server.ts", "processor.ts", "events.ts"] } diff --git a/examples/pg/yarn.lock b/examples/pg/yarn.lock index 7c580c9..8fd6d0d 100644 --- a/examples/pg/yarn.lock +++ b/examples/pg/yarn.lock @@ -501,3 +501,8 @@ yn@3.1.1: version "3.1.1" resolved "https://registry.yarnpkg.com/yn/-/yn-3.1.1.tgz#1e87401a09d767c1d5eab26a6e4c185182d2eb50" integrity sha512-Ux4ygGWsu2c7isFWe8Yu1YluJmqVhxqK2cLXNQA5AcC3QfbGNpM7fu0Y8b/z16pXLnFxZYvWhd3fhBY9DLmC6Q== + +zod@^4.1.12: + version "4.4.3" + resolved "https://registry.yarnpkg.com/zod/-/zod-4.4.3.tgz#b680f172885d18bbebf21a834ea25e55a1bbf356" + integrity sha512-ytENFjIJFl2UwYglde2jchW2Hwm4GJFLDiSXWdTrJQBIN9Fcyp7n4DhxJEiWNAJMV1/BqWfW/kkg71UDcHJyTQ== diff --git a/package.json b/package.json index 0879fc9..63c4066 100644 --- a/package.json +++ b/package.json @@ -58,6 +58,7 @@ "format": "prettier -w ." }, "dependencies": { + "@standard-schema/spec": "^1.1.0", "p-limit": "^7.2.0", "p-queue": "^9.0.1", "throttle-debounce": "^5.0.2" diff --git a/src/mongodb/client.ts b/src/mongodb/client.ts index bc3ea20..f295563 100644 --- a/src/mongodb/client.ts +++ b/src/mongodb/client.ts @@ -1,6 +1,7 @@ import { EventEmitter } from "node:events"; import { MongoClient, ObjectId, type ChangeStream } from "mongodb"; import type { + TxOBEventDataMap, TxOBEvent, TxOBProcessorClient, TxOBProcessorClientOpts, @@ -22,20 +23,28 @@ const createReadyToProcessFilter = (maxErrors: number) => ({ errors: { $lt: maxErrors }, }); -export type CreateProcessorClientOpts = { +export type CreateProcessorClientOpts< + EventType extends string, + TEventDataMap extends TxOBEventDataMap = TxOBEventDataMap, +> = { mongo: MongoClient; db: string; collection?: string; limit?: number; }; -export const createProcessorClient = ( - opts: CreateProcessorClientOpts, -): TxOBProcessorClient => { +export const createProcessorClient = < + EventType extends string, + TEventDataMap extends TxOBEventDataMap = TxOBEventDataMap, +>( + opts: CreateProcessorClientOpts, +): TxOBProcessorClient => { const { mongo, db, collection = "events", limit = 100 } = opts; const getEventsToProcess = async ( opts: TxOBProcessorClientOpts, - ): Promise, "id" | "errors">[]> => { + ): Promise< + Pick, "id" | "errors">[] + > => { const filter = createReadyToProcessFilter(opts.maxErrors); const events = (await mongo @@ -45,23 +54,26 @@ export const createProcessorClient = ( .project({ id: 1, errors: 1 }) .limit(limit) .sort("timestamp", "asc") - .toArray()) as Pick, "id" | "errors">[]; + .toArray()) as Pick< + TxOBEvent, + "id" | "errors" + >[]; return events; }; - const transaction: TxOBProcessorClient["transaction"] = async ( + const transaction: TxOBProcessorClient["transaction"] = async ( fn: ( - txProcessorClient: TxOBTransactionProcessorClient, + txProcessorClient: TxOBTransactionProcessorClient, ) => Promise, ): Promise => { await mongo.withSession(async (session): Promise => { await session.withTransaction(async (): Promise => { await fn({ getEventByIdForUpdateSkipLocked: async ( - eventId: TxOBEvent["id"], + eventId: TxOBEvent["id"], opts: TxOBProcessorClientOpts, - ): Promise | null> => { + ): Promise | null> => { // https://www.mongodb.com/blog/post/how-to-select--for-update-inside-mongodb-transactions // Note: findOneAndUpdate returns null (not an error) when document not found, // so any thrown error is unexpected and will propagate to the transaction handler @@ -94,9 +106,14 @@ export const createProcessorClient = ( if (!result || !result.value) return null; - return result.value as TxOBEvent; + return result.value as TxOBEvent< + EventType, + TEventDataMap[EventType] + >; }, - updateEvent: async (event: TxOBEvent): Promise => { + updateEvent: async ( + event: TxOBEvent, + ): Promise => { await mongo .db(db) .collection(collection) @@ -119,7 +136,10 @@ export const createProcessorClient = ( ); }, createEvent: async ( - event: Omit, "processed_at" | "backoff_until">, + event: Omit< + TxOBEvent, + "processed_at" | "backoff_until" + >, ): Promise => { await mongo.db(db).collection(collection).insertOne( { diff --git a/src/pg/client.ts b/src/pg/client.ts index 2e33567..cdc652e 100644 --- a/src/pg/client.ts +++ b/src/pg/client.ts @@ -3,9 +3,9 @@ import { escapeIdentifier, escapeLiteral, type ClientConfig, - DatabaseError, } from "pg"; import type { + TxOBEventDataMap, TxOBEvent, TxOBProcessorClient, TxOBProcessorClientOpts, @@ -21,23 +21,31 @@ interface Querier { // TODO: leverage the signal option that comes in on options for `getEventsToProcess` and `getEventByIdForUpdateSkipLocked` // to cancel queries if/when supported by `pg` https://github.com/brianc/node-postgres/issues/2774 -export type CreateProcessorClientOpts = { +export type CreateProcessorClientOpts< + EventType extends string, + TEventDataMap extends TxOBEventDataMap = TxOBEventDataMap, +> = { querier: Querier; table?: string; limit?: number; }; -export const createProcessorClient = ( - opts: CreateProcessorClientOpts, -): TxOBProcessorClient => { +export const createProcessorClient = < + EventType extends string, + TEventDataMap extends TxOBEventDataMap = TxOBEventDataMap, +>( + opts: CreateProcessorClientOpts, +): TxOBProcessorClient => { const { querier, table = "events", limit = 100 } = opts; const _table = table; const _limit = limit; const getEventsToProcess = async ( opts: TxOBProcessorClientOpts, - ): Promise, "id" | "errors">[]> => { + ): Promise< + Pick, "id" | "errors">[] + > => { const events = await querier.query< - Pick, "id" | "errors"> + Pick, "id" | "errors"> >( `SELECT id, errors FROM ${escapeIdentifier(_table)} WHERE processed_at IS NULL AND (backoff_until IS NULL OR backoff_until < NOW()) AND errors < $1 ORDER BY timestamp ASC LIMIT ${_limit}`, [opts.maxErrors], @@ -45,19 +53,21 @@ export const createProcessorClient = ( return events.rows; }; - const transaction: TxOBProcessorClient["transaction"] = async ( + const transaction: TxOBProcessorClient["transaction"] = async ( fn: ( - txProcessorClient: TxOBTransactionProcessorClient, + txProcessorClient: TxOBTransactionProcessorClient, ) => Promise, ): Promise => { try { await querier.query("BEGIN"); await fn({ getEventByIdForUpdateSkipLocked: async ( - eventId: TxOBEvent["id"], + eventId: TxOBEvent["id"], opts: TxOBProcessorClientOpts, - ): Promise | null> => { - const event = await querier.query>( + ): Promise | null> => { + const event = await querier.query< + TxOBEvent + >( `SELECT id, timestamp, type, data, correlation_id, handler_results, errors, backoff_until, processed_at FROM ${escapeIdentifier(_table)} WHERE id = $1 AND processed_at IS NULL AND (backoff_until IS NULL OR backoff_until < NOW()) AND errors < $2 FOR UPDATE SKIP LOCKED`, [eventId, opts.maxErrors], ); @@ -67,7 +77,9 @@ export const createProcessorClient = ( return event.rows[0]; }, - updateEvent: async (event: TxOBEvent): Promise => { + updateEvent: async ( + event: TxOBEvent, + ): Promise => { await querier.query( `UPDATE ${escapeIdentifier(_table)} SET handler_results = $1, errors = $2, processed_at = $3, backoff_until = $4 WHERE id = $5`, [ @@ -80,7 +92,10 @@ export const createProcessorClient = ( ); }, createEvent: async ( - event: Omit, "processed_at" | "backoff_until">, + event: Omit< + TxOBEvent, + "processed_at" | "backoff_until" + >, ): Promise => { await querier.query( `INSERT INTO ${escapeIdentifier(_table)} (id, timestamp, type, data, correlation_id, handler_results, errors) VALUES ($1, $2, $3, $4, $5, $6, $7)`, diff --git a/src/processor.test.ts b/src/processor.test.ts index 420e97b..375306c 100644 --- a/src/processor.test.ts +++ b/src/processor.test.ts @@ -1,5 +1,12 @@ -import { describe, it, expect, vi, afterEach } from "vitest"; -import { EventProcessor, TxOBEvent, defaultBackoff } from "./processor.js"; +import { describe, it, expect, vi, afterEach, expectTypeOf } from "vitest"; +import type { StandardSchemaV1 } from "@standard-schema/spec"; +import { + defineTxOBEventHandlerMap, + defineTxOBEventSchemas, + EventProcessor, + TxOBEvent, + defaultBackoff, +} from "./processor.js"; import { TxOBError, ErrorUnprocessableEventHandler } from "./error.js"; import { sleep } from "./sleep.js"; import { @@ -33,6 +40,53 @@ afterEach(() => { vi.clearAllMocks(); }); +describe("EventProcessor - schema typing", () => { + it("infers handler event data from Standard Schema outputs", () => { + const createSchema = >(): StandardSchemaV1< + unknown, + TOutput + > => ({ + "~standard": { + version: 1, + vendor: "test", + validate: (value) => ({ value: value as TOutput }), + }, + }); + + const eventSchemas = defineTxOBEventSchemas({ + UserCreated: createSchema<{ userId: string; email: string }>(), + SubscriptionCancelled: createSchema<{ + subscriptionId: string; + reason?: string; + }>(), + }); + + const handlerMap = defineTxOBEventHandlerMap(eventSchemas, { + UserCreated: { + sendWelcomeEmail: async (event) => { + expectTypeOf(event.type).toEqualTypeOf<"UserCreated">(); + expectTypeOf(event.data).toEqualTypeOf<{ + userId: string; + email: string; + }>(); + }, + }, + SubscriptionCancelled: { + syncBilling: async (event) => { + expectTypeOf(event.type).toEqualTypeOf<"SubscriptionCancelled">(); + expectTypeOf(event.data).toEqualTypeOf<{ + subscriptionId: string; + reason?: string; + }>(); + }, + }, + }); + + expectTypeOf(handlerMap.UserCreated.sendWelcomeEmail).toBeFunction(); + expectTypeOf(handlerMap.SubscriptionCancelled.syncBilling).toBeFunction(); + }); +}); + describe("EventProcessor - processEvents", () => { it("does nothing when no events to process", async () => { const opts = { diff --git a/src/processor.ts b/src/processor.ts index 1ac4f36..0a2af02 100644 --- a/src/processor.ts +++ b/src/processor.ts @@ -5,6 +5,7 @@ import { deepClone } from "./clone.js"; import PQueue from "p-queue"; import { ErrorUnprocessableEventHandler, TxOBError } from "./error.js"; import { throttle } from "throttle-debounce"; +import type { StandardSchemaV1 } from "@standard-schema/spec"; import { createTelemetryInstruments, endTelemetrySpan, @@ -29,11 +30,13 @@ type TxOBEventHandlerResult = { errors?: { error: unknown; timestamp: Date }[]; }; -export type TxOBEvent = { +export type TxOBEventData = Record; + +export type TxOBEvent = { id: string; timestamp: Date; type: TxOBEventType; - data: Record; + data: TData; correlation_id: string; handler_results: Record; errors: number; @@ -41,21 +44,74 @@ export type TxOBEvent = { processed_at?: Date; }; +export type TxOBEventDataMap = Record< + TxOBEventType, + TxOBEventData +>; + +type TxOBEventByType< + TxOBEventType extends string, + TEventDataMap extends TxOBEventDataMap, +> = { + [TType in TxOBEventType]: TxOBEvent; +}[TxOBEventType]; + +export type TxOBStandardSchema = StandardSchemaV1; + +export type TxOBEventSchemaMap = Record< + TxOBEventType, + TxOBStandardSchema +>; + +export type TxOBSchemaOutput = + TSchema extends StandardSchemaV1 ? TOutput : never; + +export type TxOBEventDataMapFromSchemas< + TEventSchemas extends TxOBEventSchemaMap, +> = { + [TType in keyof TEventSchemas & string]: TxOBSchemaOutput; +}; + +export type TxOBEventHandlerMapFromSchemas< + TEventSchemas extends TxOBEventSchemaMap, +> = TxOBEventHandlerMap< + keyof TEventSchemas & string, + TxOBEventDataMapFromSchemas +>; + +export const defineTxOBEventSchemas = < + const TEventSchemas extends TxOBEventSchemaMap, +>( + eventSchemas: TEventSchemas, +): TEventSchemas => eventSchemas; + type TxOBEventHandlerOpts = { signal?: AbortSignal; }; -export type TxOBEventHandler = ( - event: TxOBEvent, +export type TxOBEventHandler< + TxOBEventType extends string = string, + TData = TxOBEventData, +> = ( + event: TxOBEvent, opts: TxOBEventHandlerOpts, ) => Promise; -export type TxOBEventHandlerMap = Record< - TxOBEventType, - { - [key: string]: TxOBEventHandler; - } ->; +export type TxOBEventHandlerMap< + TxOBEventType extends string, + TEventDataMap extends TxOBEventDataMap = TxOBEventDataMap, +> = { + [TType in TxOBEventType]: { + [key: string]: TxOBEventHandler; + }; +}; + +export const defineTxOBEventHandlerMap = < + const TEventSchemas extends TxOBEventSchemaMap, +>( + _eventSchemas: TEventSchemas, + handlerMap: TxOBEventHandlerMapFromSchemas, +): TxOBEventHandlerMapFromSchemas => handlerMap; export type TxOBProcessorClientOpts = { signal?: AbortSignal; @@ -68,25 +124,34 @@ export interface WakeupEmitter { close(): Promise; } -export interface TxOBProcessorClient { +export interface TxOBProcessorClient< + TxOBEventType extends string, + TEventDataMap extends TxOBEventDataMap = TxOBEventDataMap, +> { getEventsToProcess( opts: TxOBProcessorClientOpts, - ): Promise, "id" | "errors">[]>; + ): Promise, "id" | "errors">[]>; transaction( fn: ( - txProcessorClient: TxOBTransactionProcessorClient, + txProcessorClient: TxOBTransactionProcessorClient, ) => Promise, ): Promise; } -export interface TxOBTransactionProcessorClient { +export interface TxOBTransactionProcessorClient< + TxOBEventType extends string, + TEventDataMap extends TxOBEventDataMap = TxOBEventDataMap, +> { getEventByIdForUpdateSkipLocked( - eventId: TxOBEvent["id"], + eventId: TxOBEventByType["id"], opts: TxOBProcessorClientOpts, - ): Promise | null>; - updateEvent(event: TxOBEvent): Promise; + ): Promise | null>; + updateEvent(event: TxOBEventByType): Promise; createEvent( - event: Omit, "processed_at" | "backoff_until">, + event: Omit< + TxOBEventByType, + "processed_at" | "backoff_until" + >, ): Promise; } @@ -106,7 +171,10 @@ const defaultMaxQueuedEvents = 500; const defaultWakeupTimeoutMs = 60_000; const defaultWakeupThrottleMs = 1_000; -type TxOBProcessEventsOpts = { +type TxOBProcessEventsOpts< + TxOBEventType extends string, + TEventDataMap extends TxOBEventDataMap, +> = { maxErrors: number; backoff: (count: number) => Date; signal?: AbortSignal; @@ -115,23 +183,26 @@ type TxOBProcessEventsOpts = { maxHandlerConcurrency?: number; maxQueuedEvents?: number; onEventMaxErrorsReached?: (opts: { - event: Readonly>; - txClient: TxOBTransactionProcessorClient; + event: Readonly>; + txClient: TxOBTransactionProcessorClient; signal?: AbortSignal; }) => Promise; telemetry?: TxOBTelemetryInstruments; }; -const processEvent = async ({ +const processEvent = async < + TxOBEventType extends string, + TEventDataMap extends TxOBEventDataMap, +>({ client, handlerMap, unlockedEvent, opts, }: { - client: TxOBProcessorClient; - handlerMap: TxOBEventHandlerMap; - unlockedEvent: Pick, "id" | "errors">; - opts?: Partial>; + client: TxOBProcessorClient; + handlerMap: TxOBEventHandlerMap; + unlockedEvent: Pick, "id" | "errors">; + opts?: Partial>; }): Promise<{ backoffUntil?: Date }> => { const { logger, @@ -435,7 +506,9 @@ const processEvent = async ({ if (onEventMaxErrorsReached) { try { await onEventMaxErrorsReached({ - event: deepClone(lockedEvent), + event: deepClone(lockedEvent) as Readonly< + TxOBEventByType + >, txClient, signal, }); @@ -506,10 +579,13 @@ export interface Logger { error(message?: unknown, ...optionalParams: unknown[]): void; } -export class EventProcessor { - private client: TxOBProcessorClient; - private handlerMap: TxOBEventHandlerMap; - private opts: Omit, "signal"> & { +export class EventProcessor< + TxOBEventType extends string, + TEventDataMap extends TxOBEventDataMap = TxOBEventDataMap, +> { + private client: TxOBProcessorClient; + private handlerMap: TxOBEventHandlerMap; + private opts: Omit, "signal"> & { pollingIntervalMs: number; maxQueuedEvents: number; wakeupTimeoutMs: number; @@ -531,7 +607,7 @@ export class EventProcessor { telemetry, ...opts }: Omit< - Partial>, + Partial>, "signal" | "telemetry" > & { pollingIntervalMs?: number; @@ -540,8 +616,8 @@ export class EventProcessor { wakeupEmitter?: WakeupEmitter; telemetry?: TxOBTelemetry; } & { - client: TxOBProcessorClient; - handlerMap: TxOBEventHandlerMap; + client: TxOBProcessorClient; + handlerMap: TxOBEventHandlerMap; }) { const _opts = { pollingIntervalMs: defaultPollingIntervalMs, @@ -645,7 +721,10 @@ export class EventProcessor { .add( async () => { try { - const { backoffUntil } = await processEvent({ + const { backoffUntil } = await processEvent< + TxOBEventType, + TEventDataMap + >({ client: this.client, handlerMap: this.handlerMap, unlockedEvent: event, From d7107ae76c48d0f6f982f8d7e52d9d518167830a Mon Sep 17 00:00:00 2001 From: dillonstreator Date: Mon, 4 May 2026 15:52:00 -0500 Subject: [PATCH 2/2] stronger coupling to standard schema --- README.md | 121 +++++++++++++++++++++------------------ examples/pg/events.ts | 20 +++---- examples/pg/processor.ts | 84 +++++++++++++++------------ src/mongodb/client.ts | 83 +++++++++++++++++++++------ src/pg/client.test.ts | 37 ++++++++---- src/pg/client.ts | 82 ++++++++++++++++++++------ src/processor.test.ts | 42 +++++++------- src/processor.ts | 71 ++++++++++++++++++----- 8 files changed, 354 insertions(+), 186 deletions(-) diff --git a/README.md b/README.md index ded7f2b..c8a819a 100644 --- a/README.md +++ b/README.md @@ -130,7 +130,7 @@ const client = new pg.Client({ await client.connect(); const processor = new EventProcessor({ - client: createProcessorClient({ querier: client }), + client: createProcessorClient({ querier: client, eventSchemas }), handlerMap: { UserCreated: { // Handlers are processed concurrently and independently with retries @@ -354,18 +354,20 @@ const incrementCounter: TxOBEventHandler = async (event) => { #### Typed payloads with Standard Schema (Zod/ArkType/Valibot/etc.) -txob supports schema-driven event payload typing through the Standard Schema interface. +txob uses schema-driven event payload typing through the Standard Schema interface. +`eventSchemas` is required by both `createProcessorClient(...)` and `createEventProcessor(...)`. +Use any validator that implements Standard Schema (for example Zod, ArkType, or Valibot). ```typescript import { z } from "zod"; import { - defineTxOBEventHandlerMap, - defineTxOBEventSchemas, - EventProcessor, - type TxOBEventDataMapFromSchemas, + createEventHandlerMap, + createEventProcessor, + type TxOBEventSchemaMap, } from "txob"; +import { createProcessorClient } from "txob/pg"; -const eventSchemas = defineTxOBEventSchemas({ +const eventSchemas = { UserCreated: z.object({ userId: z.string().uuid(), email: z.string().email(), @@ -374,31 +376,32 @@ const eventSchemas = defineTxOBEventSchemas({ orderId: z.string().uuid(), amount: z.number().positive(), }), -}); +} satisfies TxOBEventSchemaMap<"UserCreated" | "OrderPlaced">; -type EventType = keyof typeof eventSchemas; -type EventDataMap = TxOBEventDataMapFromSchemas; - -const handlers = defineTxOBEventHandlerMap(eventSchemas, { - UserCreated: { - sendWelcomeEmail: async (event) => { - await emailService.send(event.data.email); // typed as string +const handlerMap = createEventHandlerMap({ + eventSchemas, + handlerMap: { + UserCreated: { + sendWelcomeEmail: async (event) => { + await emailService.send(event.data.email); // typed as string + }, }, - }, - OrderPlaced: { - sendReceipt: async (event) => { - await receipts.send(event.data.orderId, event.data.amount); // strongly typed + OrderPlaced: { + sendReceipt: async (event) => { + await receipts.send(event.data.orderId, event.data.amount); // strongly typed + }, }, }, }); -const processor = new EventProcessor({ - client: createProcessorClient({ querier: client }), - handlerMap: handlers, +const processor = createEventProcessor({ + eventSchemas, + client: createProcessorClient({ querier: client, eventSchemas }), + handlerMap, }); ``` -`defineTxOBEventSchemas(...)` is optional. It is a convenience helper for cleaner type inference. You can also define schema maps without it using explicit types. +Schema-first inference via `eventSchemas` is the standard and required approach. ### Handler Results @@ -843,7 +846,7 @@ await client.connect(); // 3. Create and start the processor const processor = new EventProcessor({ - client: createProcessorClient({ querier: client }), + client: createProcessorClient({ querier: client, eventSchemas }), handlerMap: { UserCreated: { sendEmail: async (event, { signal }) => { @@ -963,7 +966,7 @@ gracefulShutdown(server, { ```typescript const processor = new EventProcessor({ - client: createProcessorClient({ querier: client }), + client: createProcessorClient({ querier: client, eventSchemas }), handlerMap: { UserCreated: { sendWelcomeEmail: async (event) => { @@ -1061,7 +1064,7 @@ const producer = kafka.producer(); await producer.connect(); const processor = new EventProcessor({ - client: createProcessorClient({ querier: client }), + client: createProcessorClient({ querier: client, eventSchemas }), handlerMap: { UserCreated: { // Publish to Kafka with guaranteed consistency @@ -1118,7 +1121,7 @@ const client = new pg.Client({ await client.connect(); const processor = new EventProcessor({ - client: createProcessorClient({ querier: client }), + client: createProcessorClient({ querier: client, eventSchemas }), handlerMap: { // All your handlers... }, @@ -1191,11 +1194,12 @@ Creates a PostgreSQL processor client. ```typescript import { createProcessorClient } from "txob/pg"; -createProcessorClient(opts: { +createProcessorClient(opts: { querier: pg.Client; + eventSchemas: Record>; table?: string; // Default: "events" limit?: number; // Default: 100 -}): TxOBProcessorClient +}): TxOBProcessorClient<...inferred from eventSchemas...> ``` ### `createProcessorClient` (MongoDB) @@ -1205,12 +1209,13 @@ Creates a MongoDB processor client. ```typescript import { createProcessorClient } from "txob/mongodb"; -createProcessorClient(opts: { +createProcessorClient(opts: { mongo: mongodb.MongoClient; db: string; // Database name + eventSchemas: Record>; collection?: string; // Default: "events" limit?: number; // Default: 100 -}): TxOBProcessorClient +}): TxOBProcessorClient<...inferred from eventSchemas...> ``` ### `TxOBError` @@ -1317,10 +1322,12 @@ type TxOBEventHandlerMap< [TType in EventType]: Record>; }; -// Standard Schema helpers -type TxOBEventDataMapFromSchemas = { - [TType in keyof TSchemas & string]: /* schema output for TType */; -}; +// Schema-first convenience API +createEventProcessor({ + eventSchemas, + client: createProcessorClient({ querier, eventSchemas }), + handlerMap, +}); // Handler result tracking type TxOBEventHandlerResult = { @@ -1460,7 +1467,7 @@ If using `FOR UPDATE SKIP LOCKED` properly (which txob does), stuck events are n - Lower `maxEventConcurrency` - Profile handlers for memory leaks - Archive old events -- Reduce `limit` in `createProcessorClient({ querier: client, table, limit })` +- Reduce `limit` in `createProcessorClient({ querier: client, eventSchemas, table, limit })` ### Duplicate handler executions @@ -1765,7 +1772,7 @@ txob can emit spans and metrics without depending on a specific telemetry SDK. I import { metrics, trace } from "@opentelemetry/api"; const processor = new EventProcessor({ - client: createProcessorClient({ querier: client }), + client: createProcessorClient({ querier: client, eventSchemas }), handlerMap: handlers, telemetry: { tracer: trace.getTracer("txob"), @@ -1837,35 +1844,37 @@ Yes! txob is written in TypeScript and provides full type safety, including type ```typescript import { z } from "zod"; import { - defineTxOBEventHandlerMap, - defineTxOBEventSchemas, - type TxOBEventDataMapFromSchemas, + createEventHandlerMap, + createEventProcessor, + type TxOBEventSchemaMap, } from "txob"; +import { createProcessorClient } from "txob/pg"; -const eventSchemas = defineTxOBEventSchemas({ +const eventSchemas = { UserCreated: z.object({ userId: z.string().uuid(), email: z.string().email() }), OrderPlaced: z.object({ orderId: z.string().uuid(), amount: z.number() }), -}); - -type EventType = keyof typeof eventSchemas; -type EventDataMap = TxOBEventDataMapFromSchemas; +} satisfies TxOBEventSchemaMap<"UserCreated" | "OrderPlaced">; -const handlers = defineTxOBEventHandlerMap(eventSchemas, { - UserCreated: { - sendWelcomeEmail: async (event) => { - event.data.email; // string +const handlers = createEventHandlerMap({ + eventSchemas, + handlerMap: { + UserCreated: { + sendWelcomeEmail: async (event) => { + event.data.email; // string + }, }, - }, - OrderPlaced: { - sendReceipt: async (event) => { - event.data.amount; // number + OrderPlaced: { + sendReceipt: async (event) => { + event.data.amount; // number + }, }, + // Missing an event type? TypeScript error }, - // Missing an event type? TypeScript error }); -const processor = new EventProcessor({ - client: createProcessorClient({ querier: client }), +const processor = createEventProcessor({ + eventSchemas, + client: createProcessorClient({ querier: client, eventSchemas }), handlerMap: handlers, }); ``` diff --git a/examples/pg/events.ts b/examples/pg/events.ts index c70c424..744904d 100644 --- a/examples/pg/events.ts +++ b/examples/pg/events.ts @@ -1,25 +1,21 @@ import { z } from "zod"; -import { - defineTxOBEventSchemas, - type TxOBEventDataMapFromSchemas, -} from "../../src/index.js"; +import type { TxOBEventSchemaMap } from "../../src/index.js"; export const eventTypes = { ResourceSaved: "ResourceSaved", EventMaxErrorsReached: "EventMaxErrorsReached", } as const; -export const eventSchemas = defineTxOBEventSchemas({ +export type EventType = keyof typeof eventTypes; + +export const eventSchemas = { [eventTypes.ResourceSaved]: z.object({ type: z.literal("activity"), - id: z.string().uuid(), + id: z.uuid(), }), [eventTypes.EventMaxErrorsReached]: z.object({ - failedEventId: z.string().uuid(), + failedEventId: z.uuid(), failedEventType: z.string(), - failedEventCorrelationId: z.string().uuid(), + failedEventCorrelationId: z.uuid(), }), -}); - -export type EventType = keyof typeof eventSchemas; -export type EventDataMap = TxOBEventDataMapFromSchemas; +} satisfies TxOBEventSchemaMap; diff --git a/examples/pg/processor.ts b/examples/pg/processor.ts index 98dbfe4..6017336 100644 --- a/examples/pg/processor.ts +++ b/examples/pg/processor.ts @@ -1,24 +1,22 @@ import pg from "pg"; import { randomUUID } from "node:crypto"; import { - defineTxOBEventHandlerMap, - EventProcessor, - type TxOBEventDataMapFromSchemas, + createEventHandlerMap, + createEventProcessor, + type TxOBProcessor, WakeupEmitter, } from "../../src/index.js"; import { createProcessorClient, createWakeupEmitter, } from "../../src/pg/client.js"; -import { eventSchemas, eventTypes, type EventType } from "./events.js"; +import { eventSchemas, eventTypes } from "./events.js"; import { migrate } from "./server.js"; import dotenv from "dotenv"; import { sleep } from "../../src/sleep.js"; dotenv.config(); -type EventDataMap = TxOBEventDataMapFromSchemas; - -let processor: EventProcessor | undefined = undefined; +let processor: TxOBProcessor | undefined = undefined; let wakeupEmitter: WakeupEmitter | undefined = undefined; (async () => { @@ -38,46 +36,55 @@ let wakeupEmitter: WakeupEmitter | undefined = undefined; querier: client, }); - const handlerMap = defineTxOBEventHandlerMap(eventSchemas, { - ResourceSaved: { - thing1: async (event) => { - console.log( - `${event.id} thing1 ${event.correlation_id} activity=${event.data.id}`, - ); - if (Math.random() > 0.99) throw new Error("some issue"); - }, - thing2: async (event) => { - console.log( - `${event.id} thing2 ${event.correlation_id} kind=${event.data.type}`, - ); - if (Math.random() > 0.96) throw new Error("some issue"); + const handlerMap = createEventHandlerMap({ + eventSchemas, + handlerMap: { + ResourceSaved: { + thing1: async (event) => { + console.log( + `${event.id} thing1 ${event.correlation_id} activity=${event.data.id}`, + ); + if (Math.random() > 0.99) throw new Error("some issue"); + }, + thing2: async (event) => { + console.log( + `${event.id} thing2 ${event.correlation_id} kind=${event.data.type}`, + ); + if (Math.random() > 0.96) throw new Error("some issue"); + }, + thing3: async (event) => { + await sleep(Math.random() * 1_000); + console.log(`${event.id} thing3 ${event.correlation_id}`); + if (Math.random() > 0.8) throw new Error("some issue"); + }, }, - thing3: async (event) => { - await sleep(Math.random() * 1_000); - console.log(`${event.id} thing3 ${event.correlation_id}`); - if (Math.random() > 0.8) throw new Error("some issue"); - }, - }, - EventMaxErrorsReached: { - // Optional: add handlers for EventMaxErrorsReached events if needed - // For example, you might want to send alerts or log to external systems - notify: async (event) => { - console.log( - "Event max errors reached", - event.data.failedEventType, - event.data.failedEventId, - ); + EventMaxErrorsReached: { + // Optional: add handlers for EventMaxErrorsReached events if needed + // For example, you might want to send alerts or log to external systems + notify: async (event) => { + console.log( + "Event max errors reached", + event.data.failedEventType, + event.data.failedEventId, + ); + }, }, }, }); - processor = new EventProcessor({ + processor = createEventProcessor({ + eventSchemas, maxEventConcurrency: 50, - client: createProcessorClient({ querier: client }), + client: createProcessorClient({ querier: client, eventSchemas }), wakeupEmitter, handlerMap, pollingIntervalMs: 5000, - logger: console, + logger: { + info: console.log, + error: console.error, + warn: console.warn, + debug: () => { }, + }, onEventMaxErrorsReached: async ({ event, txClient }) => { // Transactionally persist an 'event max errors reached' event // This hook is called when: @@ -115,6 +122,7 @@ const shutdown = (() => { shutdownStarted = true; try { + await processor?.stop(); await wakeupEmitter?.close(); } catch (err) { console.error(err); diff --git a/src/mongodb/client.ts b/src/mongodb/client.ts index f295563..ce3f879 100644 --- a/src/mongodb/client.ts +++ b/src/mongodb/client.ts @@ -1,7 +1,8 @@ import { EventEmitter } from "node:events"; import { MongoClient, ObjectId, type ChangeStream } from "mongodb"; import type { - TxOBEventDataMap, + TxOBEventSchemaMap, + TxOBSchemaOutput, TxOBEvent, TxOBProcessorClient, TxOBProcessorClientOpts, @@ -24,26 +25,42 @@ const createReadyToProcessFilter = (maxErrors: number) => ({ }); export type CreateProcessorClientOpts< - EventType extends string, - TEventDataMap extends TxOBEventDataMap = TxOBEventDataMap, + TEventSchemas extends TxOBEventSchemaMap, > = { mongo: MongoClient; db: string; collection?: string; limit?: number; + eventSchemas: TEventSchemas; }; export const createProcessorClient = < - EventType extends string, - TEventDataMap extends TxOBEventDataMap = TxOBEventDataMap, + const TEventSchemas extends TxOBEventSchemaMap, >( - opts: CreateProcessorClientOpts, -): TxOBProcessorClient => { - const { mongo, db, collection = "events", limit = 100 } = opts; + opts: CreateProcessorClientOpts, +): TxOBProcessorClient< + keyof TEventSchemas & string, + { + [TType in keyof TEventSchemas & string]: TxOBSchemaOutput; + } +> => { + const { + mongo, + db, + collection = "events", + limit = 100, + eventSchemas: _eventSchemas, + } = opts; const getEventsToProcess = async ( opts: TxOBProcessorClientOpts, ): Promise< - Pick, "id" | "errors">[] + Pick< + TxOBEvent< + keyof TEventSchemas & string, + TxOBSchemaOutput + >, + "id" | "errors" + >[] > => { const filter = createReadyToProcessFilter(opts.maxErrors); @@ -55,25 +72,49 @@ export const createProcessorClient = < .limit(limit) .sort("timestamp", "asc") .toArray()) as Pick< - TxOBEvent, + TxOBEvent< + keyof TEventSchemas & string, + TxOBSchemaOutput + >, "id" | "errors" >[]; return events; }; - const transaction: TxOBProcessorClient["transaction"] = async ( + const transaction: TxOBProcessorClient< + keyof TEventSchemas & string, + { + [TType in keyof TEventSchemas & string]: TxOBSchemaOutput; + } + >["transaction"] = async ( fn: ( - txProcessorClient: TxOBTransactionProcessorClient, + txProcessorClient: TxOBTransactionProcessorClient< + keyof TEventSchemas & string, + { + [TType in keyof TEventSchemas & string]: TxOBSchemaOutput< + TEventSchemas[TType] + >; + } + >, ) => Promise, ): Promise => { await mongo.withSession(async (session): Promise => { await session.withTransaction(async (): Promise => { await fn({ getEventByIdForUpdateSkipLocked: async ( - eventId: TxOBEvent["id"], + eventId: TxOBEvent< + keyof TEventSchemas & string, + TxOBSchemaOutput + >["id"], opts: TxOBProcessorClientOpts, - ): Promise | null> => { + ): Promise< + | TxOBEvent< + keyof TEventSchemas & string, + TxOBSchemaOutput + > + | null + > => { // https://www.mongodb.com/blog/post/how-to-select--for-update-inside-mongodb-transactions // Note: findOneAndUpdate returns null (not an error) when document not found, // so any thrown error is unexpected and will propagate to the transaction handler @@ -107,12 +148,15 @@ export const createProcessorClient = < if (!result || !result.value) return null; return result.value as TxOBEvent< - EventType, - TEventDataMap[EventType] + keyof TEventSchemas & string, + TxOBSchemaOutput >; }, updateEvent: async ( - event: TxOBEvent, + event: TxOBEvent< + keyof TEventSchemas & string, + TxOBSchemaOutput + >, ): Promise => { await mongo .db(db) @@ -137,7 +181,10 @@ export const createProcessorClient = < }, createEvent: async ( event: Omit< - TxOBEvent, + TxOBEvent< + keyof TEventSchemas & string, + TxOBSchemaOutput + >, "processed_at" | "backoff_until" >, ): Promise => { diff --git a/src/pg/client.test.ts b/src/pg/client.test.ts index 1a8fda8..435634f 100644 --- a/src/pg/client.test.ts +++ b/src/pg/client.test.ts @@ -1,12 +1,27 @@ import { vi, describe, it, expect } from "vitest"; import { createProcessorClient } from "./client.js"; +const eventSchemas = { + TestEvent: { + "~standard": { + version: 1 as const, + vendor: "test", + validate: (value: unknown) => ({ + value: + typeof value === "object" && value !== null + ? (value as Record) + : {}, + }), + }, + }, +}; + describe("createProcessorClient", () => { it("should create a client with the correct functions", async () => { const pgClient = { query: vi.fn(), } as any; - const client = createProcessorClient({ querier: pgClient }); + const client = createProcessorClient({ querier: pgClient, eventSchemas }); expect(typeof client.getEventsToProcess).toBe("function"); expect(typeof client.transaction).toBe("function"); }); @@ -25,7 +40,7 @@ describe("getEventsToProcess", () => { const opts = { maxErrors: 10, }; - const client = createProcessorClient({ querier: pgClient }); + const client = createProcessorClient({ querier: pgClient, eventSchemas }); const result = await client.getEventsToProcess(opts); expect(pgClient.query).toHaveBeenCalledOnce(); expect(pgClient.query).toHaveBeenCalledWith( @@ -41,7 +56,7 @@ describe("transaction", () => { const pgClient = { query: vi.fn(() => Promise.resolve()), } as any; - const client = createProcessorClient({ querier: pgClient }); + const client = createProcessorClient({ querier: pgClient, eventSchemas }); await client.transaction(async () => {}); expect(pgClient.query).toHaveBeenCalledTimes(2); expect(pgClient.query).toHaveBeenNthCalledWith(1, "BEGIN"); @@ -51,7 +66,7 @@ describe("transaction", () => { const pgClient = { query: vi.fn(() => Promise.resolve()), } as any; - const client = createProcessorClient({ querier: pgClient }); + const client = createProcessorClient({ querier: pgClient, eventSchemas }); await client .transaction(async () => { throw new Error("error"); @@ -74,7 +89,7 @@ describe("transaction", () => { ), } as any; const eventId = "123"; - const client = createProcessorClient({ querier: pgClient }); + const client = createProcessorClient({ querier: pgClient, eventSchemas }); let result: any; await client.transaction(async (txClient) => { result = await txClient.getEventByIdForUpdateSkipLocked(eventId, { @@ -101,7 +116,7 @@ describe("transaction", () => { ), } as any; const eventId = "123"; - const client = createProcessorClient({ querier: pgClient }); + const client = createProcessorClient({ querier: pgClient, eventSchemas }); let result: any; await client.transaction(async (txClient) => { result = await txClient.getEventByIdForUpdateSkipLocked(eventId, { @@ -135,13 +150,13 @@ describe("transaction", () => { processed_at: new Date(), backoff_until: new Date(), timestamp: new Date(), - type: "type", + type: "TestEvent" as const, data: { thing1: "something", }, correlation_id: "abc123", }; - const client = createProcessorClient({ querier: pgClient }); + const client = createProcessorClient({ querier: pgClient, eventSchemas }); await client.transaction(async (txClient) => { await txClient.updateEvent(event); }); @@ -168,7 +183,7 @@ describe("transaction", () => { const event = { id: "1", timestamp: new Date(), - type: "test_event", + type: "TestEvent" as const, data: { thing1: "something", }, @@ -176,7 +191,7 @@ describe("transaction", () => { handler_results: {}, errors: 0, }; - const client = createProcessorClient({ querier: pgClient }); + const client = createProcessorClient({ querier: pgClient, eventSchemas }); await client.transaction(async (txClient) => { await txClient.createEvent(event); }); @@ -219,7 +234,7 @@ describe("transaction", () => { }), } as any; - const client = createProcessorClient({ querier: pgClient }); + const client = createProcessorClient({ querier: pgClient, eventSchemas }); try { await client.transaction(async () => { diff --git a/src/pg/client.ts b/src/pg/client.ts index cdc652e..023b972 100644 --- a/src/pg/client.ts +++ b/src/pg/client.ts @@ -5,7 +5,8 @@ import { type ClientConfig, } from "pg"; import type { - TxOBEventDataMap, + TxOBEventSchemaMap, + TxOBSchemaOutput, TxOBEvent, TxOBProcessorClient, TxOBProcessorClientOpts, @@ -22,30 +23,47 @@ interface Querier { // to cancel queries if/when supported by `pg` https://github.com/brianc/node-postgres/issues/2774 export type CreateProcessorClientOpts< - EventType extends string, - TEventDataMap extends TxOBEventDataMap = TxOBEventDataMap, + TEventSchemas extends TxOBEventSchemaMap, > = { querier: Querier; table?: string; limit?: number; + eventSchemas: TEventSchemas; }; export const createProcessorClient = < - EventType extends string, - TEventDataMap extends TxOBEventDataMap = TxOBEventDataMap, + const TEventSchemas extends TxOBEventSchemaMap, >( - opts: CreateProcessorClientOpts, -): TxOBProcessorClient => { - const { querier, table = "events", limit = 100 } = opts; + opts: CreateProcessorClientOpts, +): TxOBProcessorClient< + keyof TEventSchemas & string, + { + [TType in keyof TEventSchemas & string]: TxOBSchemaOutput; + } +> => { + const { querier, table = "events", limit = 100, eventSchemas: _eventSchemas } = + opts; const _table = table; const _limit = limit; const getEventsToProcess = async ( opts: TxOBProcessorClientOpts, ): Promise< - Pick, "id" | "errors">[] + Pick< + TxOBEvent< + keyof TEventSchemas & string, + TxOBSchemaOutput + >, + "id" | "errors" + >[] > => { const events = await querier.query< - Pick, "id" | "errors"> + Pick< + TxOBEvent< + keyof TEventSchemas & string, + TxOBSchemaOutput + >, + "id" | "errors" + > >( `SELECT id, errors FROM ${escapeIdentifier(_table)} WHERE processed_at IS NULL AND (backoff_until IS NULL OR backoff_until < NOW()) AND errors < $1 ORDER BY timestamp ASC LIMIT ${_limit}`, [opts.maxErrors], @@ -53,20 +71,44 @@ export const createProcessorClient = < return events.rows; }; - const transaction: TxOBProcessorClient["transaction"] = async ( + const transaction: TxOBProcessorClient< + keyof TEventSchemas & string, + { + [TType in keyof TEventSchemas & string]: TxOBSchemaOutput; + } + >["transaction"] = async ( fn: ( - txProcessorClient: TxOBTransactionProcessorClient, + txProcessorClient: TxOBTransactionProcessorClient< + keyof TEventSchemas & string, + { + [TType in keyof TEventSchemas & string]: TxOBSchemaOutput< + TEventSchemas[TType] + >; + } + >, ) => Promise, ): Promise => { try { await querier.query("BEGIN"); await fn({ getEventByIdForUpdateSkipLocked: async ( - eventId: TxOBEvent["id"], + eventId: TxOBEvent< + keyof TEventSchemas & string, + TxOBSchemaOutput + >["id"], opts: TxOBProcessorClientOpts, - ): Promise | null> => { + ): Promise< + | TxOBEvent< + keyof TEventSchemas & string, + TxOBSchemaOutput + > + | null + > => { const event = await querier.query< - TxOBEvent + TxOBEvent< + keyof TEventSchemas & string, + TxOBSchemaOutput + > >( `SELECT id, timestamp, type, data, correlation_id, handler_results, errors, backoff_until, processed_at FROM ${escapeIdentifier(_table)} WHERE id = $1 AND processed_at IS NULL AND (backoff_until IS NULL OR backoff_until < NOW()) AND errors < $2 FOR UPDATE SKIP LOCKED`, [eventId, opts.maxErrors], @@ -78,7 +120,10 @@ export const createProcessorClient = < return event.rows[0]; }, updateEvent: async ( - event: TxOBEvent, + event: TxOBEvent< + keyof TEventSchemas & string, + TxOBSchemaOutput + >, ): Promise => { await querier.query( `UPDATE ${escapeIdentifier(_table)} SET handler_results = $1, errors = $2, processed_at = $3, backoff_until = $4 WHERE id = $5`, @@ -93,7 +138,10 @@ export const createProcessorClient = < }, createEvent: async ( event: Omit< - TxOBEvent, + TxOBEvent< + keyof TEventSchemas & string, + TxOBSchemaOutput + >, "processed_at" | "backoff_until" >, ): Promise => { diff --git a/src/processor.test.ts b/src/processor.test.ts index 375306c..125b113 100644 --- a/src/processor.test.ts +++ b/src/processor.test.ts @@ -1,8 +1,7 @@ import { describe, it, expect, vi, afterEach, expectTypeOf } from "vitest"; import type { StandardSchemaV1 } from "@standard-schema/spec"; import { - defineTxOBEventHandlerMap, - defineTxOBEventSchemas, + createEventHandlerMap, EventProcessor, TxOBEvent, defaultBackoff, @@ -53,31 +52,34 @@ describe("EventProcessor - schema typing", () => { }, }); - const eventSchemas = defineTxOBEventSchemas({ + const eventSchemas = { UserCreated: createSchema<{ userId: string; email: string }>(), SubscriptionCancelled: createSchema<{ subscriptionId: string; reason?: string; }>(), - }); + }; - const handlerMap = defineTxOBEventHandlerMap(eventSchemas, { - UserCreated: { - sendWelcomeEmail: async (event) => { - expectTypeOf(event.type).toEqualTypeOf<"UserCreated">(); - expectTypeOf(event.data).toEqualTypeOf<{ - userId: string; - email: string; - }>(); + const handlerMap = createEventHandlerMap({ + eventSchemas, + handlerMap: { + UserCreated: { + sendWelcomeEmail: async (event) => { + expectTypeOf(event.type).toEqualTypeOf<"UserCreated">(); + expectTypeOf(event.data).toEqualTypeOf<{ + userId: string; + email: string; + }>(); + }, }, - }, - SubscriptionCancelled: { - syncBilling: async (event) => { - expectTypeOf(event.type).toEqualTypeOf<"SubscriptionCancelled">(); - expectTypeOf(event.data).toEqualTypeOf<{ - subscriptionId: string; - reason?: string; - }>(); + SubscriptionCancelled: { + syncBilling: async (event) => { + expectTypeOf(event.type).toEqualTypeOf<"SubscriptionCancelled">(); + expectTypeOf(event.data).toEqualTypeOf<{ + subscriptionId: string; + reason?: string; + }>(); + }, }, }, }); diff --git a/src/processor.ts b/src/processor.ts index 0a2af02..80a40ca 100644 --- a/src/processor.ts +++ b/src/processor.ts @@ -66,24 +66,21 @@ export type TxOBEventSchemaMap = Record< export type TxOBSchemaOutput = TSchema extends StandardSchemaV1 ? TOutput : never; -export type TxOBEventDataMapFromSchemas< +type TxOBEventDataMapFromSchemas< TEventSchemas extends TxOBEventSchemaMap, > = { [TType in keyof TEventSchemas & string]: TxOBSchemaOutput; }; -export type TxOBEventHandlerMapFromSchemas< +type TxOBEventHandlerMapFromSchemas< TEventSchemas extends TxOBEventSchemaMap, > = TxOBEventHandlerMap< keyof TEventSchemas & string, TxOBEventDataMapFromSchemas >; -export const defineTxOBEventSchemas = < - const TEventSchemas extends TxOBEventSchemaMap, ->( - eventSchemas: TEventSchemas, -): TEventSchemas => eventSchemas; +type TxOBEventTypeFromSchemas> = + keyof TEventSchemas & string; type TxOBEventHandlerOpts = { signal?: AbortSignal; @@ -106,12 +103,29 @@ export type TxOBEventHandlerMap< }; }; -export const defineTxOBEventHandlerMap = < - const TEventSchemas extends TxOBEventSchemaMap, ->( - _eventSchemas: TEventSchemas, - handlerMap: TxOBEventHandlerMapFromSchemas, -): TxOBEventHandlerMapFromSchemas => handlerMap; +export type CreateEventProcessorOptsFromSchemas< + TEventSchemas extends TxOBEventSchemaMap, +> = Omit< + Partial< + TxOBProcessEventsOpts< + TxOBEventTypeFromSchemas, + TxOBEventDataMapFromSchemas + > + >, + "signal" | "telemetry" +> & { + pollingIntervalMs?: number; + wakeupTimeoutMs?: number; + wakeupThrottleMs?: number; + wakeupEmitter?: WakeupEmitter; + telemetry?: TxOBTelemetry; + eventSchemas: TEventSchemas; + client: TxOBProcessorClient< + TxOBEventTypeFromSchemas, + TxOBEventDataMapFromSchemas + >; + handlerMap: TxOBEventHandlerMapFromSchemas; +}; export type TxOBProcessorClientOpts = { signal?: AbortSignal; @@ -579,10 +593,15 @@ export interface Logger { error(message?: unknown, ...optionalParams: unknown[]): void; } +export interface TxOBProcessor { + start(): void; + stop(opts?: { timeoutMs?: number }): Promise; +} + export class EventProcessor< TxOBEventType extends string, TEventDataMap extends TxOBEventDataMap = TxOBEventDataMap, -> { +> implements TxOBProcessor { private client: TxOBProcessorClient; private handlerMap: TxOBEventHandlerMap; private opts: Omit, "signal"> & { @@ -939,3 +958,27 @@ export class EventProcessor< } } } + +export const createEventProcessor = < + const TEventSchemas extends TxOBEventSchemaMap, +>( + opts: CreateEventProcessorOptsFromSchemas, +): EventProcessor< + TxOBEventTypeFromSchemas, + TxOBEventDataMapFromSchemas +> => { + const { eventSchemas: _eventSchemas, ...processorOpts } = opts; + return new EventProcessor(processorOpts); +}; + +export const createEventHandlerMap = < + const TEventSchemas extends TxOBEventSchemaMap, +>( + opts: { + eventSchemas: TEventSchemas; + handlerMap: TxOBEventHandlerMapFromSchemas; + }, +): TxOBEventHandlerMapFromSchemas => { + const { eventSchemas: _eventSchemas, handlerMap } = opts; + return handlerMap; +};