Reliably process side-effects in your Node.js applications without data loss
When building applications, you often need to perform multiple operations together: update your database AND send an email, publish an event, trigger a webhook, or notify another service. This creates a critical challenge:
// ❌ The problem: What if the email fails after the database commit?
await db.createUser(user);
await db.commit();
await emailService.sendWelcomeEmail(user.email); // 💥 Fails! User created but no email sent// ❌ Also problematic: What if the database fails after sending the email?
await emailService.sendWelcomeEmail(user.email); // ✅ Email sent
await db.createUser(user);
await db.commit(); // 💥 Fails! Email sent but no user record// ❌ What about using a message queue?
await db.createUser(user);
await messageQueue.publish("user.created", user); // ✅ Message queued
await db.commit(); // 💥 Fails! Message is in queue but no user record
// The queue and database are separate systems - you can't make them atomic!The Transactional Outbox Pattern solves this by storing both the business data and events in a single database transaction, then processing events asynchronously with guaranteed delivery.
// ✅ Solution: Save both user and event in the same transaction
await db.query("BEGIN");
// Save your business data
await db.query("INSERT INTO users (id, email, name) VALUES ($1, $2, $3)", [
userId,
email,
name,
]);
// Save the event in the SAME transaction
await db.query(
"INSERT INTO events (id, type, data, correlation_id, handler_results, errors) VALUES ($1, $2, $3, $4, $5, $6)",
[randomUUID(), "UserCreated", { userId, email }, correlationId, {}, 0],
);
await db.query("COMMIT");
// ✅ Both user and event are saved atomically!
// If commit fails, neither is saved. If it succeeds, both are saved.
// The processor will pick up the event and send the email (and any other side effects that you register) asynchronously- ✅ At-least-once delivery - Events are never lost, even during failures or crashes
- ✅ Graceful shutdown - Finish processing in-flight events before shutting down
- ✅ Horizontal scalability - Run multiple processors without conflicts using row-level locking
- ✅ Database agnostic - Built-in support for PostgreSQL and MongoDB, or implement your own
- ✅ Near-realtime delivery - Optional wakeup signals (Postgres NOTIFY, MongoDB Change Streams) trigger immediate processing when events are inserted, with polling as a fallback
- ✅ Configurable error handling - Exponential backoff, max retries, and custom error hooks
- ✅ TypeScript-first - Full type safety and autocompletion
- ✅ Handler result tracking - Track the execution status of each handler independently
- ✅ Minimal dependencies - Only
p-limitandp-queue(plus your database driver)
# For PostgreSQL
npm install txob pg
# For MongoDB
npm install txob mongodb1. Create the events table:
CREATE TABLE events (
id UUID PRIMARY KEY,
timestamp TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP,
type VARCHAR(255) NOT NULL,
data JSONB,
correlation_id UUID,
handler_results JSONB DEFAULT '{}',
errors INTEGER DEFAULT 0,
backoff_until TIMESTAMPTZ,
processed_at TIMESTAMPTZ
);
-- Critical index for performance
CREATE INDEX idx_events_processing ON events(processed_at, backoff_until, errors)
WHERE processed_at IS NULL;2. Set up the event processor:
import { EventProcessor } from "txob";
import { createProcessorClient } from "txob/pg";
import pg from "pg";
const client = new pg.Client({
/* your config */
});
await client.connect();
const processor = new EventProcessor({
client: createProcessorClient({ querier: client, eventSchemas }),
handlerMap: {
UserCreated: {
// Handlers are processed concurrently and independently with retries
// If one handler fails, others continue processing
sendWelcomeEmail: async (event, { signal }) => {
await emailService.send({
to: event.data.email,
subject: "Welcome!",
template: "welcome",
});
},
createStripeCustomer: async (event, { signal }) => {
await stripe.customers.create({
email: event.data.email,
metadata: { userId: event.data.userId },
});
},
},
},
});
processor.start();
// Graceful shutdown
process.on("SIGTERM", () => processor.stop());3. Save events transactionally with your business logic:
import { randomUUID } from "crypto";
// Inside your application code
await client.query("BEGIN");
// Save your business data
await client.query("INSERT INTO users (id, email, name) VALUES ($1, $2, $3)", [
userId,
email,
name,
]);
// Save the event in the SAME transaction
await client.query(
"INSERT INTO events (id, type, data, correlation_id, handler_results, errors) VALUES ($1, $2, $3, $4, $5, $6)",
[randomUUID(), "UserCreated", { userId, email }, correlationId, {}, 0],
);
await client.query("COMMIT");
// ✅ Both user and event are saved atomically!
// The processor will pick up the event and send the emailThat's it! The processor will automatically poll for new events and execute your handlers.
Optional: Near-realtime delivery with wakeup signals
By default, the processor polls every pollingIntervalMs (5s). Add a wakeup emitter to trigger immediate processing the moment an event is inserted, dropping latency from seconds to milliseconds while keeping polling as a safety net.
// PostgreSQL: Use Postgres NOTIFY
import { createWakeupEmitter } from "txob/pg";
const wakeupEmitter = await createWakeupEmitter({
listenClientConfig: clientConfig,
createTrigger: true,
querier: client,
});
// MongoDB: Use Change Streams (requires replica set or sharded cluster)
import { createWakeupEmitter } from "txob/mongodb";
const wakeupEmitter = await createWakeupEmitter({
mongo: mongoClient,
db: "myapp",
});
// Use with EventProcessor
const processor = new EventProcessor({
client: processorClient,
wakeupEmitter, // Processor polls immediately on each wakeup signal
handlerMap: {
/* ... */
},
});When a wakeup emitter is provided, the processor will:
- Poll immediately when new events are inserted (via wakeup signal)
- Throttle wakeup signals (
wakeupThrottleMs) to coalesce bursts - Fall back to interval polling (
pollingIntervalMs) if no wakeup signal is received withinwakeupTimeoutMs
┌─────────────────────────────────────────────────────────────────┐
│ Your Application │
├─────────────────────────────────────────────────────────────────┤
│ BEGIN TRANSACTION │
│ 1. Insert/Update business data (users, orders, etc.) │
│ 2. Insert event record │
│ COMMIT TRANSACTION │
└─────────────────────────────────────────────────────────────────┘
│
│ Both saved atomically ✅
▼
┌─────────────────────────────────────────────────────────────────┐
│ Events Table │
│ [id] [type] [data] [processed_at] [errors] [backoff_until] │
└─────────────────────────────────────────────────────────────────┘
│
┌──────────────────┴──────────────────┐
│ │
│ (Optional) Wakeup Signal │
│ (Postgres NOTIFY / MongoDB Stream) │
│ │
▼ ▼
┌──────────────────────────────┐ ┌──────────────────────────────┐
│ Polling Component │ │ Fallback Polling Loop │
│ (Decoupled from Processing) │ │ (If wakeup signals missed)│
├──────────────────────────────┤ ├──────────────────────────────┤
│ • Listens for wakeup signals│ │ • Polls periodically │
│ • Throttles rapid signals │ │ • Only if no recent wakeup │
│ • Triggers immediate poll │ │ • Uses same throttled poll │
└──────────────────────────────┘ └──────────────────────────────┘
│ │
└──────────────────┬──────────────────┘
│
│ SELECT unprocessed events
│ (FOR UPDATE SKIP LOCKED)
▼
┌─────────────────────────────────────────────────────────────────┐
│ Processing Queue │
│ (Concurrency-controlled event queue) │
└─────────────────────────────────────────────────────────────────┘
│
│ Process events concurrently
▼
┌─────────────────────────────────────────────────────────────────┐
│ Event Processor │
├─────────────────────────────────────────────────────────────────┤
│ 1. Lock event in transaction │
│ 2. Execute handlers concurrently (send email, webhook, etc.) │
│ 3. UPDATE event with results and processed_at │
│ 4. On failure: increment errors, set backoff_until │
└─────────────────────────────────────────────────────────────────┘
Key Points:
- Events are saved in the same transaction as your business data
- If the transaction fails, neither the data nor event is saved
- The processor runs independently and guarantees at-least-once delivery
- Multiple processors can run concurrently using database row locking
- Failed events are retried with exponential backoff
- Polling and processing are decoupled - polling finds events, processing queue handles execution
- Wakeup signals (Postgres NOTIFY or MongoDB Change Streams) reduce polling latency
- Throttled polling prevents excessive database queries during event bursts
- Fallback polling ensures events are processed even if wakeup signals are missed
For a detailed architecture diagram, see architecture.mmd.
Every event in txob follows this structure:
interface TxOBEvent<EventType extends string, EventData = Record<string, unknown>> {
id: string; // Unique event identifier (UUID recommended)
timestamp: Date; // When the event was created
type: EventType; // Event type (e.g., "UserCreated", "OrderPlaced")
data: EventData; // Event payload - can be strongly typed per event type
correlation_id: string; // For tracing requests across services
handler_results: Record<string, TxOBEventHandlerResult>; // Results from each handler
errors: number; // Number of processing attempts
backoff_until?: Date; // When to retry (null if not backing off)
processed_at?: Date; // When fully processed (null if pending)
}Field Explanations:
handler_results: Tracks each handler's status independently. If one handler fails, others can still succeederrors: Global error count. When it reachesmaxErrors, the event is marked as processed (failed)backoff_until: Prevents immediate retries. Set to future timestamp after failurescorrelation_id: Essential for distributed tracing and debugging
Handlers are async functions that execute your side-effects:
type TxOBEventHandler<
EventType extends string = string,
EventData = Record<string, unknown>,
> = (
event: TxOBEvent<EventType, EventData>,
opts: { signal?: AbortSignal },
) => Promise<void>;Important: Handlers should be idempotent because they may be called multiple times for the same event (at-least-once delivery).
// ✅ Good: Idempotent handler
const sendEmail: TxOBEventHandler = async (event) => {
const alreadySent = await checkIfEmailSent(event.data.userId);
if (alreadySent) return; // Safe to retry
await emailService.send(event.data.email);
};
// ❌ Bad: Not idempotent
const incrementCounter: TxOBEventHandler = async (event) => {
await db.query("UPDATE counters SET count = count + 1"); // Will increment multiple times!
};txob uses schema-driven event payload typing through the Standard Schema interface.
eventSchemas is required by both createProcessorClient(...) and createEventProcessor(...).
Use any validator that implements Standard Schema (for example Zod, ArkType, or Valibot).
import { z } from "zod";
import {
createEventHandlerMap,
createEventProcessor,
type TxOBEventSchemaMap,
} from "txob";
import { createProcessorClient } from "txob/pg";
const eventSchemas = {
UserCreated: z.object({
userId: z.string().uuid(),
email: z.string().email(),
}),
OrderPlaced: z.object({
orderId: z.string().uuid(),
amount: z.number().positive(),
}),
} satisfies TxOBEventSchemaMap<"UserCreated" | "OrderPlaced">;
const handlerMap = createEventHandlerMap({
eventSchemas,
handlerMap: {
UserCreated: {
sendWelcomeEmail: async (event) => {
await emailService.send(event.data.email); // typed as string
},
},
OrderPlaced: {
sendReceipt: async (event) => {
await receipts.send(event.data.orderId, event.data.amount); // strongly typed
},
},
},
});
const processor = createEventProcessor({
eventSchemas,
client: createProcessorClient({ querier: client, eventSchemas }),
handlerMap,
});Schema-first inference via eventSchemas is the standard and required approach.
Each handler's execution is tracked independently:
type TxOBEventHandlerResult = {
processed_at?: Date; // When this handler succeeded
unprocessable_at?: Date; // When this handler was marked unprocessable
errors?: Array<{
// Error history for this handler
error: unknown;
timestamp: Date;
}>;
};This means if you have 3 handlers and 1 fails, the other 2 won't be re-executed on retry.
txob implements at-least-once delivery:
- ✅ Events are never lost even if the processor crashes
⚠️ Handlers may be called multiple times for the same event⚠️ IfupdateEventfails after handlers succeed, they will be re-invoked
Why this matters:
// This handler will be called again if the event update fails
UserCreated: {
sendEmail: async (event) => {
await emailService.send(event.data.email); // ✅ Sent successfully
// 💥 But if updateEvent() fails here, this will run again!
};
}Solution: Make your handlers idempotent (check if work was already done before doing it again).
txob provides sophisticated error handling:
1. Automatic Retries with Backoff
new EventProcessor({
client,
handlerMap: handlers,
maxErrors: 5, // Retry up to 5 times
backoff: ({ attempt, error, event, errors, maxErrors }) => {
// Custom backoff strategy
// error is the latest handler error, errors contains all handler errors for this attempt
// event is the event snapshot after this attempt's handlers finished
const delayMs = 1000 * 2 ** attempt; // Exponential: 1s, 2s, 4s, 8s, 16s
return new Date(Date.now() + delayMs);
},
});2. Custom Backoff with TxOBError
You can throw TxOBError to specify a custom backoff time for retries:
import { TxOBError } from "txob";
UserCreated: {
sendEmail: async (event) => {
try {
await emailService.send(event.data.email);
} catch (err) {
if (err.code === "RATE_LIMIT_EXCEEDED") {
// Retry after 1 minute instead of using default backoff
throw new TxOBError("Rate limit exceeded", {
cause: err,
backoffUntil: new Date(Date.now() + 60000),
});
}
throw err; // Use default backoff for other errors
}
};
}Note: If multiple handlers throw TxOBError with different backoffUntil dates, the processor will use the latest (maximum) backoff time.
3. Unprocessable Events
Sometimes an event cannot be processed (e.g., invalid data). Mark it as unprocessable to stop retrying:
import { ErrorUnprocessableEventHandler } from "txob";
UserCreated: {
sendEmail: async (event) => {
if (!isValidEmail(event.data.email)) {
throw new ErrorUnprocessableEventHandler(
new Error("Invalid email address"),
);
}
await emailService.send(event.data.email);
};
}4. Max Errors Hook
When an event reaches max errors, you can create a "dead letter" event:
new EventProcessor({
client,
handlerMap: handlers,
onEventMaxErrorsReached: async ({ event, txClient, signal }) => {
// Save a failure event in the same transaction
await txClient.createEvent({
id: randomUUID(),
type: "EventFailed",
data: {
originalEventId: event.id,
originalEventType: event.type,
reason: "Max errors reached",
},
correlation_id: event.correlation_id,
handler_results: {},
errors: 0,
});
// Send alert, log to monitoring system, etc.
},
});1. Create the events table:
CREATE TABLE events (
id UUID PRIMARY KEY,
timestamp TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP,
type VARCHAR(255) NOT NULL,
data JSONB,
correlation_id UUID,
handler_results JSONB DEFAULT '{}',
errors INTEGER DEFAULT 0,
backoff_until TIMESTAMPTZ,
processed_at TIMESTAMPTZ
);2. Create indexes for optimal performance:
-- Critical: Partial index for unprocessed events (keeps index small and fast)
CREATE INDEX idx_events_processing ON events(processed_at, backoff_until, errors)
WHERE processed_at IS NULL;
-- Unique index on id (if not using id as PRIMARY KEY)
CREATE UNIQUE INDEX idx_events_id ON events(id);
-- Optional: For querying by correlation_id
CREATE INDEX idx_events_correlation_id ON events(correlation_id);Why these indexes?
The idx_events_processing partial index is critical for performance. It:
- Only indexes unprocessed events (
WHERE processed_at IS NULL) - Stays small as events are processed
- Covers the main query pattern:
processed_at IS NULL AND (backoff_until IS NULL OR backoff_until < NOW()) AND errors < maxErrors
3. Use the PostgreSQL client:
import { createProcessorClient } from "txob/pg";
import pg from "pg";
const client = new pg.Client({
user: process.env.POSTGRES_USER,
password: process.env.POSTGRES_PASSWORD,
database: process.env.POSTGRES_DB,
});
await client.connect();
const processorClient = createProcessorClient({
querier: client,
table: "events", // Optional: table name (default: "events")
limit: 100, // Optional: max events per poll (default: 100)
});4. (Optional) Set up wakeup signals to reduce polling:
import { createWakeupEmitter } from "txob/pg";
// Create a wakeup emitter using Postgres NOTIFY
// This will automatically create a trigger that sends NOTIFY on INSERT
const wakeupEmitter = await createWakeupEmitter({
listenClientConfig: clientConfig,
createTrigger: true,
querier: client,
table: "events", // Optional: table name (default: "events")
channel: "txob_events", // Optional: NOTIFY channel (default: "txob_events")
});
// Use with EventProcessor
const processor = new EventProcessor({
client: processorClient,
wakeupEmitter, // Reduces polling frequency when new events arrive
handlerMap: {
/* ... */
},
pollingIntervalMs: 5000, // Still used as fallback if wakeup signals are missed
wakeupTimeoutMs: 60000, // Fallback poll if no wakeup signal received in 60s
wakeupThrottleMs: 1000, // Throttle wakeup signals to prevent excessive polling
});1. Create the events collection with indexes:
import { MongoClient } from "mongodb";
const mongoClient = new MongoClient(process.env.MONGO_URL);
await mongoClient.connect();
const db = mongoClient.db("myapp");
// Create collection
const eventsCollection = db.collection("events");
// Create indexes
await eventsCollection.createIndex(
{ processed_at: 1, backoff_until: 1, errors: 1 },
{ partialFilterExpression: { processed_at: null } },
);
await eventsCollection.createIndex({ id: 1 }, { unique: true });
await eventsCollection.createIndex({ correlation_id: 1 });2. Use the MongoDB client:
import { createProcessorClient } from "txob/mongodb";
const processorClient = createProcessorClient({
mongo: mongoClient,
db: "myapp", // Database name
collection: "events", // Optional: collection name (default: "events")
limit: 100, // Optional: max events per poll (default: 100)
});3. (Optional) Set up wakeup signals to reduce polling:
import { createWakeupEmitter } from "txob/mongodb";
// Create a wakeup emitter using MongoDB Change Streams
// Note: Requires a replica set or sharded cluster
const wakeupEmitter = await createWakeupEmitter({
mongo: mongoClient,
db: "myapp",
collection: "events", // Optional: collection name (default: "events")
});
// Use with EventProcessor
const processor = new EventProcessor({
client: processorClient,
wakeupEmitter, // Reduces polling frequency when new events arrive
handlerMap: {
/* ... */
},
pollingIntervalMs: 5000, // Still used as fallback if wakeup signals are missed
wakeupTimeoutMs: 60000, // Fallback poll if no wakeup signal received in 60s
wakeupThrottleMs: 100, // Throttle wakeup signals to prevent excessive polling
});
// Handle wakeup emitter errors (e.g., if not configured for replica set)
wakeupEmitter.on("error", (err) => {
console.error("Wakeup emitter error:", err);
// Processor will automatically fall back to polling
});Note:
- MongoDB transactions require a replica set or sharded cluster. See MongoDB docs.
- MongoDB Change Streams (used for wakeup signals) also require a replica set or sharded cluster. If your MongoDB instance is a standalone server, you must convert it to a single-node replica set by running
rs.initiate()in the mongo shell.
Implement the TxOBProcessorClient interface:
interface TxOBProcessorClient<EventType extends string> {
getEventsToProcess(opts: {
signal?: AbortSignal;
maxErrors: number;
}): Promise<Pick<TxOBEvent<EventType>, "id" | "errors">[]>;
transaction(
fn: (txClient: TxOBTransactionProcessorClient<EventType>) => Promise<void>,
): Promise<void>;
}
interface TxOBTransactionProcessorClient<EventType extends string> {
getEventByIdForUpdateSkipLocked(
eventId: string,
opts: { signal?: AbortSignal; maxErrors: number },
): Promise<TxOBEvent<EventType> | null>;
updateEvent(event: TxOBEvent<EventType>): Promise<void>;
createEvent(
event: Omit<TxOBEvent<EventType>, "processed_at" | "backoff_until">,
): Promise<void>;
}See src/pg/client.ts or src/mongodb/client.ts for reference implementations.
new EventProcessor({
client,
handlerMap,
// Polling interval in milliseconds, also used as fallback when wakeupEmitter is set (default: 5000)
pollingIntervalMs: 5000,
// Maximum errors before marking event as processed/failed (default: 5)
maxErrors: 5,
// Backoff calculation function (default: exponential backoff capped at 60s)
backoff: ({ attempt, error, event, errors, maxErrors }): Date => {
const baseDelayMs = 1000;
const maxDelayMs = 60000;
const backoffMs = Math.min(baseDelayMs * 2 ** attempt, maxDelayMs);
return new Date(Date.now() + backoffMs);
},
// Maximum concurrent events being processed (default: 20)
maxEventConcurrency: 20,
// Maximum concurrent handlers per event (default: 10)
maxHandlerConcurrency: 10,
// Maximum events buffered in the in-memory queue before polling pauses (default: 500)
maxQueuedEvents: 500,
// Optional wakeup emitter for near-realtime processing (default: undefined)
wakeupEmitter,
// Fallback poll fires if no wakeup signal received in this window (default: 60000)
wakeupTimeoutMs: 60000,
// Throttle wakeup signals to coalesce bursts (default: 1000)
wakeupThrottleMs: 1000,
// Custom logger (default: undefined)
logger: {
debug: (msg, ...args) => console.debug(msg, ...args),
info: (msg, ...args) => console.info(msg, ...args),
warn: (msg, ...args) => console.warn(msg, ...args),
error: (msg, ...args) => console.error(msg, ...args),
},
// OpenTelemetry-compatible tracing and metrics (default: undefined)
telemetry: {
tracer: trace.getTracer("txob"),
meter: metrics.getMeter("txob"),
attributes: {
"service.name": "orders-worker",
"deployment.environment": "production",
},
},
// Hook called when max errors reached (default: undefined)
onEventMaxErrorsReached: async ({ event, txClient, signal }) => {
// Create a dead-letter event, send alerts, etc.
},
});| Option | Type | Default | Description |
|---|---|---|---|
pollingIntervalMs |
number |
5000 |
Milliseconds between polling cycles (used when no wakeup emitter or as fallback) |
maxErrors |
number |
5 |
Max retry attempts before marking as failed |
backoff |
(count: number) => Date |
Exponential | Calculate next retry time |
maxEventConcurrency |
number |
20 |
Max events processed simultaneously |
maxHandlerConcurrency |
number |
10 |
Max handlers per event running concurrently |
maxQueuedEvents |
number |
500 |
Max events buffered in-memory before polling pauses |
wakeupEmitter |
WakeupEmitter |
undefined |
Optional wakeup signal emitter (Postgres NOTIFY or MongoDB Change Streams) |
wakeupTimeoutMs |
number |
60000 |
Fallback poll if no wakeup signal received (only used with wakeupEmitter) |
wakeupThrottleMs |
number |
1000 |
Throttle wakeup signals to prevent excessive polling (only used with wakeupEmitter) |
logger |
Logger |
undefined |
Custom logger interface |
telemetry |
TxOBTelemetry |
undefined |
OpenTelemetry-compatible tracer, meter, and shared attributes |
onEventMaxErrorsReached |
function |
undefined |
Hook for max errors |
This example shows a complete HTTP API that creates users and sends welcome emails transactionally:
import http from "node:http";
import { randomUUID } from "node:crypto";
import pg from "pg";
import gracefulShutdown from "http-graceful-shutdown";
import { EventProcessor, ErrorUnprocessableEventHandler } from "txob";
import { createProcessorClient } from "txob/pg";
// 1. Define your event types
const eventTypes = {
UserCreated: "UserCreated",
EventMaxErrorsReached: "EventMaxErrorsReached",
} as const;
type EventType = keyof typeof eventTypes;
// 2. Set up database connection
const client = new pg.Client({
user: process.env.POSTGRES_USER,
password: process.env.POSTGRES_PASSWORD,
database: process.env.POSTGRES_DB,
});
await client.connect();
// 3. Create and start the processor
const processor = new EventProcessor<EventType>({
client: createProcessorClient({ querier: client, eventSchemas }),
handlerMap: {
UserCreated: {
sendEmail: async (event, { signal }) => {
// Check if email was already sent (idempotency)
const sent = await checkEmailSent(event.data.userId);
if (sent) return;
// Send email
await emailService.send({
to: event.data.email,
subject: "Welcome!",
template: "welcome",
});
// Use signal for cleanup on shutdown
signal?.addEventListener("abort", () => {
emailService.cancelPending();
});
},
publishToEventBus: async (event) => {
await eventBus.publish("user.created", event.data);
},
},
EventMaxErrorsReached: {
alertOps: async (event) => {
await slack.send({
channel: "#alerts",
text: `Event failed: ${event.data.eventType} (${event.data.eventId})`,
});
},
},
},
pollingIntervalMs: 5000,
maxErrors: 5,
logger: console,
onEventMaxErrorsReached: async ({ event, txClient }) => {
await txClient.createEvent({
id: randomUUID(),
timestamp: new Date(),
type: eventTypes.EventMaxErrorsReached,
data: {
eventId: event.id,
eventType: event.type,
},
correlation_id: event.correlation_id,
handler_results: {},
errors: 0,
});
},
});
processor.start();
// 4. Create HTTP server
const server = http.createServer(async (req, res) => {
if (req.url !== "/users" || req.method !== "POST") {
res.statusCode = 404;
return res.end();
}
const correlationId = req.headers["x-correlation-id"] || randomUUID();
try {
const body = await getBody(req);
const { email, name } = JSON.parse(body);
// Start transaction
await client.query("BEGIN");
// Save user
const userId = randomUUID();
await client.query(
"INSERT INTO users (id, email, name) VALUES ($1, $2, $3)",
[userId, email, name],
);
// Save event IN THE SAME TRANSACTION
await client.query(
"INSERT INTO events (id, type, data, correlation_id, handler_results, errors) VALUES ($1, $2, $3, $4, $5, $6)",
[
randomUUID(),
eventTypes.UserCreated,
{ userId, email, name },
correlationId,
{},
0,
],
);
// Commit transaction
await client.query("COMMIT");
res.statusCode = 201;
res.end(JSON.stringify({ userId }));
} catch (error) {
await client.query("ROLLBACK").catch(() => {});
res.statusCode = 500;
res.end(JSON.stringify({ error: "Internal server error" }));
}
});
const HTTP_PORT = process.env.PORT || 3000;
server.listen(HTTP_PORT, () => console.log(`Server listening on ${HTTP_PORT}`));
// 5. Graceful shutdown
gracefulShutdown(server, {
onShutdown: async () => {
await processor.stop(); // Wait for in-flight events to complete
await client.end();
},
});const processor = new EventProcessor({
client: createProcessorClient({ querier: client, eventSchemas }),
handlerMap: {
UserCreated: {
sendWelcomeEmail: async (event) => {
/* ... */
},
createStripeCustomer: async (event) => {
/* ... */
},
},
OrderPlaced: {
sendConfirmationEmail: async (event) => {
/* ... */
},
updateInventory: async (event) => {
/* ... */
},
notifyWarehouse: async (event) => {
/* ... */
},
},
PaymentFailed: {
sendRetryEmail: async (event) => {
/* ... */
},
logToAnalytics: async (event) => {
/* ... */
},
},
},
});// Linear backoff: 5s, 10s, 15s, 20s, 25s
const linearBackoff = ({ attempt }): Date => {
const delayMs = 5000 * attempt;
return new Date(Date.now() + delayMs);
};
// Fixed delay: always 30s
const fixedBackoff = (): Date => {
return new Date(Date.now() + 30000);
};
// Fibonacci backoff: 1s, 1s, 2s, 3s, 5s, 8s, 13s...
const fibonacciBackoff = (() => {
const fib = (n: number): number => (n <= 1 ? 1 : fib(n - 1) + fib(n - 2));
return ({ attempt }): Date => {
const delayMs = fib(attempt) * 1000;
return new Date(Date.now() + delayMs);
};
})();
new EventProcessor({
client,
handlerMap: handlers,
backoff: linearBackoff, // or fixedBackoff, or fibonacciBackoff
});UserCreated: {
sendEmail: async (event, { signal }) => {
// Long-running operation
const emailJob = emailService.sendLarge(event.data);
// Listen for shutdown signal
signal?.addEventListener("abort", () => {
console.log("Shutdown requested, canceling email...");
emailJob.cancel(); // Clean up quickly so event can be saved
});
await emailJob;
};
}Use txob to guarantee consistency between your database and queue, then let the queue handle low-latency distribution:
import { EventProcessor } from "txob";
import { createProcessorClient } from "txob/pg";
import { Kafka } from "kafkajs";
const kafka = new Kafka({ brokers: ["localhost:9092"] });
const producer = kafka.producer();
await producer.connect();
const processor = new EventProcessor({
client: createProcessorClient({ querier: client, eventSchemas }),
handlerMap: {
UserCreated: {
// Publish to Kafka with guaranteed consistency
publishToKafka: async (event) => {
// Kafka's idempotent producer handles deduplication
// Using event.id as the key ensures retries are safe
await producer.send({
topic: "user-events",
messages: [
{
key: event.id, // Use event.id for idempotency
value: JSON.stringify({
type: event.type,
data: event.data,
timestamp: event.timestamp,
}),
},
],
});
},
// Also handle other side effects
sendEmail: async (event) => {
await emailService.send(event.data.email);
},
},
},
});
processor.start();Benefits of this approach:
- ✅ Database and Kafka are guaranteed consistent (via txob's transactional guarantees)
- ✅ If Kafka publish fails, txob will retry automatically
- ✅ Downstream consumers get low-latency events from Kafka
- ✅ You can still handle other side effects (email, webhooks) in parallel
- ✅ Best of both worlds: consistency from txob + speed from Kafka
You can run the processor as a separate service from your API:
// processor-service.ts
import { EventProcessor } from "txob";
import { createProcessorClient } from "txob/pg";
import pg from "pg";
const client = new pg.Client({
/* config */
});
await client.connect();
const processor = new EventProcessor({
client: createProcessorClient({ querier: client, eventSchemas }),
handlerMap: {
// All your handlers...
},
});
processor.start();
console.log("Event processor started");
// Graceful shutdown
const shutdown = async () => {
console.log("Shutting down...");
await processor.stop();
await client.end();
process.exit(0);
};
process.on("SIGTERM", shutdown);
process.on("SIGINT", shutdown);Run multiple instances for horizontal scaling:
# Terminal 1
node processor-service.js
# Terminal 2
node processor-service.js
# Terminal 3
node processor-service.jsAll three will coordinate using database row locking (FOR UPDATE SKIP LOCKED).
Constructs a processor instance with start() and stop() methods.
Options (opts):
client:TxOBProcessorClient<EventType>- Database client (required)handlerMap:TxOBEventHandlerMap<EventType>- Map of event types to handlers (required)wakeupEmitter?:WakeupEmitter- Optional wakeup emitter for near-realtime processingpollingIntervalMs?,wakeupTimeoutMs?,wakeupThrottleMs?,maxErrors?,backoff?,maxEventConcurrency?,maxHandlerConcurrency?,maxQueuedEvents?,logger?,telemetry?,onEventMaxErrorsReached?- see Configuration Reference
Methods:
{
start: () => void;
stop: (opts?: { timeoutMs?: number }) => Promise<void>;
}Example:
const processor = new EventProcessor({ client, handlerMap: handlers });
processor.start();
await processor.stop({ timeoutMs: 10000 }); // 10 second timeoutCreates a PostgreSQL processor client.
import { createProcessorClient } from "txob/pg";
createProcessorClient(opts: {
querier: pg.Client;
eventSchemas: Record<string, StandardSchemaV1<unknown, unknown>>;
table?: string; // Default: "events"
limit?: number; // Default: 100
}): TxOBProcessorClient<...inferred from eventSchemas...>Creates a MongoDB processor client.
import { createProcessorClient } from "txob/mongodb";
createProcessorClient(opts: {
mongo: mongodb.MongoClient;
db: string; // Database name
eventSchemas: Record<string, StandardSchemaV1<unknown, unknown>>;
collection?: string; // Default: "events"
limit?: number; // Default: 100
}): TxOBProcessorClient<...inferred from eventSchemas...>Error class to specify custom backoff times for retries.
import { TxOBError } from "txob";
// Throw with custom backoff time
throw new TxOBError("Rate limit exceeded", {
backoffUntil: new Date(Date.now() + 60000), // Retry after 1 minute
});
// Throw with cause and custom backoff
throw new TxOBError("Processing failed", {
cause: originalError,
backoffUntil: new Date(Date.now() + 30000), // Retry after 30 seconds
});Note: If multiple handlers throw TxOBError with different backoffUntil dates, the processor will use the latest (maximum) backoff time.
Error class to mark a handler as unprocessable (stops retrying).
import { ErrorUnprocessableEventHandler } from "txob";
throw new ErrorUnprocessableEventHandler(new Error("Invalid data"));Creates a Postgres NOTIFY-based wakeup emitter to reduce polling frequency.
import { createWakeupEmitter } from "txob/pg";
const wakeupEmitter = await createWakeupEmitter({
listenClientConfig: clientConfig,
createTrigger: true, // Automatically create database trigger
querier: client, // Required if createTrigger is true
table: "events", // Optional: table name (default: "events")
channel: "txob_events", // Optional: NOTIFY channel (default: "txob_events")
});The trigger automatically sends NOTIFY when new events are inserted. The wakeup emitter emits wakeup events that trigger immediate polling, reducing the need for constant polling.
Creates a MongoDB Change Stream-based wakeup emitter to reduce polling frequency.
import { createWakeupEmitter } from "txob/mongodb";
const wakeupEmitter = await createWakeupEmitter({
mongo: mongoClient,
db: "myapp",
collection: "events", // Optional: collection name (default: "events")
});Important: MongoDB Change Streams require a replica set or sharded cluster. If your MongoDB instance is a standalone server, you must convert it to a single-node replica set by running rs.initiate() in the mongo shell.
If the database is not configured for Change Streams, an error will be emitted via the error event on the returned WakeupEmitter. The processor will automatically fall back to polling.
// Main event type
type TxOBEvent<EventType extends string, EventData = Record<string, unknown>> = {
id: string;
timestamp: Date;
type: EventType;
data: EventData;
correlation_id: string;
handler_results: Record<string, TxOBEventHandlerResult>;
errors: number;
backoff_until?: Date | null;
processed_at?: Date;
};
// Handler function signature
type TxOBEventHandler<
EventType extends string = string,
EventData = Record<string, unknown>,
> = (
event: TxOBEvent<EventType, EventData>,
opts: { signal?: AbortSignal },
) => Promise<void>;
// Handler map structure
type TxOBEventDataMap<EventType extends string> = Record<
EventType,
Record<string, unknown>
>;
type TxOBEventHandlerMap<
EventType extends string,
EventDataMap extends TxOBEventDataMap<EventType>,
> = {
[TType in EventType]: Record<string, TxOBEventHandler<TType, EventDataMap[TType]>>;
};
// Schema-first convenience API
createEventProcessor({
eventSchemas,
client: createProcessorClient({ querier, eventSchemas }),
handlerMap,
});
// Handler result tracking
type TxOBEventHandlerResult = {
processed_at?: Date;
unprocessable_at?: Date;
errors?: Array<{
error: unknown;
timestamp: Date;
}>;
};
// Logger interface
interface Logger {
debug(message?: unknown, ...optionalParams: unknown[]): void;
info(message?: unknown, ...optionalParams: unknown[]): void;
warn(message?: unknown, ...optionalParams: unknown[]): void;
error(message?: unknown, ...optionalParams: unknown[]): void;
}- Keep handlers focused - Each handler should perform a single, independent side effect (send email, call webhook, etc.)
- Make handlers idempotent - Check if work was already done before doing it again
- Use correlation IDs - Essential for tracing and debugging distributed systems
- Set appropriate maxErrors - Balance between retry attempts and failure detection
- Monitor handler performance - Track execution time and error rates
- Use AbortSignal - Implement quick cleanup during graceful shutdown
- Create indexes - The partial index on
processed_atis critical for performance - Validate event data - Throw
ErrorUnprocessableEventHandlerfor invalid data - Use transactions - Always save events with business data in the same transaction
- Test handlers - Unit test handlers independently with mock events
- Log with context - Include
event.idandcorrelation_idin all logs
- Don't assume exactly-once - Handlers may be called multiple times
- Don't require event ordering - Handlers should be independent; if you need ordering, reconsider your design
- Don't perform long operations without signal checks - Delays shutdown
- Don't ignore errors - Handle them appropriately or let them propagate
- Don't skip the indexes - Performance will degrade rapidly
- Don't save events outside transactions - Defeats the purpose of the outbox pattern
- Don't use for hard real-time processing - Without wakeup signals, polling introduces latency up to
pollingIntervalMs(default 5s); with wakeup signals latency drops to tens of ms but is still bounded by handler execution andwakeupThrottleMs - Don't modify events in handlers - Event object is read-only
- Don't share mutable state - Handlers may run concurrently
- Don't forget correlation IDs - Makes debugging distributed issues very difficult
- Enable wakeup signals - Pass a
wakeupEmitter(Postgres NOTIFY or MongoDB Change Streams) to drop processing latency from seconds to milliseconds without increasing poll frequency - Tune concurrency limits - Adjust
maxEventConcurrencyandmaxHandlerConcurrencybased on your workload - Reduce polling interval - Lower
pollingIntervalMsfor lower latency without wakeup signals (at cost of more database queries) - Batch operations - If handlers can batch work, collect multiple events
- Monitor query performance - Use
EXPLAIN ANALYZEon thegetEventsToProcessquery - Partition the events table - For very high volume, partition by
processed_atortimestamp - Archive processed events - Move old processed events to archive table to keep main table small
Check:
- Is the processor started?
processor.start()was called? - Is the database connection working?
- Are events actually being saved? Query the events table
- Is
processed_atNULL on pending events? - Is
backoff_untilin the past (or NULL)? - Is
errorsless thanmaxErrors?
Debug:
new EventProcessor({
client,
handlerMap: handlers,
logger: console, // Enable logging
});Check:
- Are handlers throwing errors? Check logs
- Is an external service down? (email, API, etc.)
- Is event data invalid? Add validation
- Are handlers timing out? Increase timeouts
Solutions:
- Use
ErrorUnprocessableEventHandlerfor invalid data - Implement circuit breakers for external services
- Add retries within handlers for transient failures
- Increase
maxErrorsif failures are expected
This happens when:
- Processor crashed after locking event but before updating
- Transaction was rolled back
Solution: Events are never truly "stuck" - they're locked at the transaction level. Once the transaction ends (commit or rollback), the lock is released and another processor can pick it up.
If using FOR UPDATE SKIP LOCKED properly (which txob does), stuck events are not possible.
Check:
- Do you have the recommended indexes?
- How many unprocessed events are in the table?
- What's your
maxEventConcurrencysetting? - Are handlers slow? Profile them
Solutions:
- Create the partial index on
processed_at - Archive or delete old processed events
- Increase
maxEventConcurrency - Optimize slow handlers
- Run multiple processor instances
Check:
- How many events are processed concurrently?
- Are handlers leaking memory?
- Is the events table huge?
Solutions:
- Lower
maxEventConcurrency - Profile handlers for memory leaks
- Archive old events
- Reduce
limitincreateProcessorClient({ querier: client, eventSchemas, table, limit })
This is expected behavior due to at-least-once delivery. It happens when:
- Event update fails after handler succeeds
- Processor crashes after handler succeeds but before updating event
Solution: Make handlers idempotent. Prefer pushing idempotency to the downstream system over check-then-act in the handler.
Why check-then-act is fragile:
// ❌ Race-prone: check + do + mark is not atomic across processes
const handler = async (event) => {
const alreadyDone = await checkWorkStatus(event.id); // T1: false
if (alreadyDone) return;
// Another retry/processor can read `false` here too before T1 marks done
await doWork(event.data); // executed twice
await markWorkDone(event.id);
};This pattern only works if checkWorkStatus + doWork + markWorkDone runs under a strict distributed lock or the downstream system itself rejects duplicates. txob already serializes per-event execution via FOR UPDATE SKIP LOCKED, but if doWork mutates an external system the local "already done" record can be out of sync (handler succeeded, update failed, retry sees no record).
Better: rely on downstream idempotency. Pass event.id (or a derived key) as an idempotency key the external system enforces:
// ✅ Stripe rejects duplicate idempotency keys server-side
const createStripeCharge = async (event) => {
await stripe.charges.create(
{ amount: event.data.amount, customer: event.data.customerId },
{ idempotencyKey: event.id },
);
};
// ✅ Upsert by event-derived id — DB enforces uniqueness
const recordPayment = async (event) => {
await db.query(
`INSERT INTO payments (event_id, amount) VALUES ($1, $2)
ON CONFLICT (event_id) DO NOTHING`,
[event.id, event.data.amount],
);
};
// ✅ Kafka idempotent producer + event.id key dedupes within the producer session
await producer.send({
topic: "user-events",
messages: [{ key: event.id, value: JSON.stringify(event.data) }],
});Avoid handler patterns that look like 2PC across the database and an external system — there is no way to make them atomic. Don't try to mirror external state into your database from inside the handler that triggered the mutation. The reliable pattern is:
- Handler issues the external mutation with an idempotency key (
event.id) and returns. - The external system signals completion via webhook.
- Your webhook endpoint applies the local mutation and, in the same transaction, raises any domain-specific txob events that downstream handlers care about.
// ✅ Handler kicks off external work and returns — no waiting, no mirror writes
UserSignedUp: {
startKycCheck: async (event) => {
await kyc.start({ userId: event.data.userId, idempotencyKey: event.id });
},
}
// ✅ Webhook endpoint applies the mutation and emits a domain event in the same transaction
app.post("/webhooks/kyc", async (req, res) => {
await db.transaction(async (tx) => {
await tx.users.update(req.body.userId, { kycStatus: req.body.status });
if (req.body.status === "approved") {
// Raise a domain event — not a "KycWebhookReceived" plumbing event
await tx.events.insert({
id: randomUUID(),
type: "UserKycApproved",
data: { userId: req.body.userId },
correlation_id: req.body.correlationId,
handler_results: {},
errors: 0,
});
}
});
res.sendStatus(200);
});Why:
- Handlers stay fast and bounded — no waiting on remote state machines
- Retries are safe because the external system enforces idempotency
- Local state converges from authoritative external signals, not optimistic local writes
- txob events stay aligned with domain concepts (
UserKycApproved), not transport plumbing (KycWebhookReceived) — downstream handlers depend on meaning, not delivery mechanism
The fundamental problem with message queues:
Message queues (RabbitMQ, SQS, Kafka) are separate systems from your database. You cannot make both operations atomic:
// ❌ This is NOT atomic - the queue and database are separate systems
await db.query("BEGIN");
await db.createUser(user);
await messageQueue.publish("user.created", user); // ✅ Succeeds
await db.query("COMMIT"); // 💥 Fails! Message is queued but user doesn't existEven if you publish after commit, you have the opposite problem:
// ❌ Also NOT atomic
await db.query("BEGIN");
await db.createUser(user);
await db.query("COMMIT"); // ✅ Succeeds
await messageQueue.publish("user.created", user); // 💥 Fails! User exists but no messageMessage queues require:
- Additional infrastructure to run and monitor
- Network calls to publish messages (can fail independently of database)
- Handling connection failures
- No way to guarantee consistency between database and queue
- Complex error recovery (replay, reconciliation, etc.)
Transactional Outbox with txob:
- Uses your existing database (no additional infrastructure)
- Guaranteed consistency - events saved in same transaction as data (atomicity via ACID)
- No network calls during transaction (everything is in one database)
- Simpler operational model
- If transaction fails, neither data nor events are saved
- If transaction succeeds, both data and events are saved
Trade-offs:
- Message queues: Lower latency (~10ms), higher per-broker throughput (10k+/s) with dedicated buffering and fan-out primitives
- txob: Tens of ms with wakeup signals (or
pollingIntervalMs-bounded without). Per-processor throughput is 10-100/s typical, but scales near-linearly with processor count until the database write ceiling — production deployments can reach 1k-10k+/s on a single Postgres/MongoDB. Ultimate ceiling is handler efficiency and DB write capacity, same as any consumer.
Can I use txob WITH message queues?
Yes! This is actually a great pattern. Use txob to guarantee consistency, then publish to your queue from a handler:
UserCreated: {
publishToKafka: async (event) => {
// Now this is guaranteed to only run if user was created
await kafka.publish("user.created", event.data);
// If this fails, txob will retry it
};
}This gives you:
- ✅ Guaranteed consistency between database and queue (via txob)
- ✅ Low latency downstream (via message queue)
- ✅ Idempotent publishing (txob handles retries)
Yes! The transactional outbox pattern is useful in any application that needs reliable side-effects:
- Monoliths that send emails
- Single-service apps that call webhooks
- Any app that needs guaranteed event delivery
You don't need a microservices architecture to benefit from txob.
The processor is designed to handle crashes gracefully:
- During handler execution: The transaction hasn't committed yet, so the event remains unprocessed. Another processor (or restart) will pick it up.
- After handler but before update: Same as above - event remains unprocessed.
- During event update: Database transaction ensures atomicity. Either the update completes or it doesn't.
Result: Events are never lost. At worst, handlers are called again (which is why idempotency matters).
Make your handlers idempotent by checking if work was already done:
// Pattern 1: Check external system
const sendEmail = async (event) => {
const sent = await emailService.checkSent(event.id);
if (sent) return; // Already sent
await emailService.send(event.data.email);
};
// Pattern 2: Use unique constraints
const createStripeCustomer = async (event) => {
try {
await stripe.customers.create({
id: event.data.userId, // Stripe will reject if already exists
email: event.data.email,
});
} catch (err) {
if (err.code === "resource_already_exists") return; // Already created
throw err;
}
};
// Pattern 3: Track in database
const processPayment = async (event) => {
const processed = await db.query(
"SELECT 1 FROM payment_events WHERE event_id = $1",
[event.id],
);
if (processed.rowCount > 0) return;
await processPayment(event.data);
await db.query("INSERT INTO payment_events (event_id) VALUES ($1)", [
event.id,
]);
};Short answer: You generally shouldn't need to.
Events are processed concurrently by default, and handlers should contain single, independent side effects. If you need ordering, it usually indicates a design issue.
Why ordering is usually a design smell:
- Each handler should represent one side effect (send email, call webhook, etc.)
- Side effects are typically independent and don't need ordering
- Ordering defeats the purpose of concurrent processing and reduces throughput
- If side effects must happen in sequence, they might belong in the same handler
Better approaches:
- Make side effects independent (recommended):
UserCreated: {
sendEmail: async (event) => { /* sends welcome email */ },
createStripeCustomer: async (event) => { /* creates customer */ },
// These can run in any order or concurrently ✅
}- If truly dependent, combine into one handler:
UserCreated: {
completeOnboarding: async (event) => {
// These MUST happen in order
await createStripeCustomer(event.data);
await sendWelcomeEmail(event.data);
await enrollInTrial(event.data);
};
}- Use separate event types for workflows:
// Event 1 creates the customer
UserCreated: {
createStripeCustomer: async (event) => {
await stripe.customers.create(event.data);
// Create next event when done
await createEvent({ type: "StripeCustomerCreated", ... });
}
}
// Event 2 sends the email
StripeCustomerCreated: {
sendWelcomeEmail: async (event) => {
await emailService.send(event.data);
}
}If you absolutely must process events sequentially (not recommended):
new EventProcessor({
client,
handlerMap: handlers,
maxEventConcurrency: 1, // Forces sequential processing
// ⚠️ This sacrifices throughput and concurrency benefits
});1. Enable OpenTelemetry-compatible telemetry:
txob can emit spans and metrics without depending on a specific telemetry SDK. Install and configure OpenTelemetry in your application, then pass a Tracer and/or Meter to opt in:
import { metrics, trace } from "@opentelemetry/api";
const processor = new EventProcessor({
client: createProcessorClient({ querier: client, eventSchemas }),
handlerMap: handlers,
telemetry: {
tracer: trace.getTracer("txob"),
meter: metrics.getMeter("txob"),
attributes: {
"service.name": "orders-worker",
"deployment.environment": process.env.NODE_ENV ?? "development",
},
},
});This records txob.poll, txob.event.process, and txob.handler.process spans plus txob.poll.count, txob.poll.duration, txob.event.processing.count, txob.event.processing.duration, txob.handler.processing.count, and txob.handler.processing.duration metrics. Metrics use low-cardinality attributes such as event type, handler name, and outcome; event IDs and correlation IDs are only attached to spans.
The full set of telemetry names is exported as constants from txob: TxOBTelemetrySpanName, TxOBTelemetryMetricName, TxOBTelemetryAttributeKey, TxOBTelemetryEventOutcome, TxOBTelemetryHandlerOutcome, and TxOBTelemetryPollOutcome.
txob surfaces failures while creating metric instruments during processor construction so misconfigured telemetry is visible at startup. Runtime telemetry operations, including span creation and metric recording, are best-effort and will not interrupt event processing if an exporter or SDK callback fails.
2. Use the logger option:
new EventProcessor({
client,
handlerMap: handlers,
logger: myLogger, // Logs all processing activity
});3. Query the events table:
-- Pending events
SELECT COUNT(*) FROM events WHERE processed_at IS NULL;
-- Failed events (max errors reached)
SELECT * FROM events WHERE errors >= 5 AND processed_at IS NOT NULL;
-- Events by type
SELECT type, COUNT(*) FROM events GROUP BY type;
-- Average processing time (requires timestamp tracking)
SELECT type, AVG(processed_at - timestamp) as avg_duration
FROM events WHERE processed_at IS NOT NULL
GROUP BY type;4. Create monitoring events:
onEventMaxErrorsReached: async ({ event, txClient }) => {
await txClient.createEvent({
id: randomUUID(),
type: "EventFailed",
data: { originalEvent: event },
correlation_id: event.correlation_id,
handler_results: {},
errors: 0,
});
// Send to monitoring service
await monitoring.recordFailure(event);
};Yes! txob is written in TypeScript and provides full type safety, including typed event payloads:
import { z } from "zod";
import {
createEventHandlerMap,
createEventProcessor,
type TxOBEventSchemaMap,
} from "txob";
import { createProcessorClient } from "txob/pg";
const eventSchemas = {
UserCreated: z.object({ userId: z.string().uuid(), email: z.string().email() }),
OrderPlaced: z.object({ orderId: z.string().uuid(), amount: z.number() }),
} satisfies TxOBEventSchemaMap<"UserCreated" | "OrderPlaced">;
const handlers = createEventHandlerMap({
eventSchemas,
handlerMap: {
UserCreated: {
sendWelcomeEmail: async (event) => {
event.data.email; // string
},
},
OrderPlaced: {
sendReceipt: async (event) => {
event.data.amount; // number
},
},
// Missing an event type? TypeScript error
},
});
const processor = createEventProcessor({
eventSchemas,
client: createProcessorClient({ querier: client, eventSchemas }),
handlerMap: handlers,
});Database Impact:
- Without wakeup signals: one SELECT query per polling interval (default: every 5 seconds)
- With wakeup signals: SELECTs are driven by inserts (throttled by
wakeupThrottleMs); fallback poll only fires if no wakeup signal arrives withinwakeupTimeoutMs - One SELECT + UPDATE per event processed
- With proper indexes, queries are very fast (< 10ms typically)
Processing Latency:
- With wakeup signals (Postgres NOTIFY / MongoDB Change Streams): typically tens of milliseconds — bounded by
wakeupThrottleMs(default 1s) during bursts - Polling-only: average
pollingIntervalMs / 2(~2.5s with the 5s default), worst casepollingIntervalMs
Throughput:
- Single processor: 10-100 events/second typical, primarily bounded by handler speed and
maxEventConcurrency - Aggregate: scales near-linearly with processor count via
FOR UPDATE SKIP LOCKED(Postgres) / per-document locking (MongoDB), no coordination required - Production deployments can reach 1k-10k+ events/second on a single well-tuned Postgres/MongoDB
- Ultimate ceilings are the same as any message-queue consumer: handler efficiency, downstream-system capacity, and database write throughput
- Where message queues genuinely win: fan-out / pub-sub to many independent consumers, cross-region replication, and dedicated burst buffering — not raw single-stream throughput
Optimization:
- Use a
wakeupEmitterfor low-latency processing without aggressive polling - Lower
pollingIntervalMsfor lower latency without wakeup signals (at cost of more queries) - Increase
maxEventConcurrencyfor higher throughput - Run multiple processors for horizontal scaling
Run multiple processor instances (same code, different processes/machines):
# Machine 1
node processor.js
# Machine 2
node processor.js
# Machine 3
node processor.jsEach processor will:
- Query for unprocessed events
- Lock events using
FOR UPDATE SKIP LOCKED - Process locked events
- Release locks on commit/rollback
Key mechanism: FOR UPDATE SKIP LOCKED ensures each event is locked by only one processor. Other processors skip locked rows and process different events.
No coordination needed - processors don't need to know about each other. The database handles coordination.
Yes, modify the query in your custom client:
// Custom client with priority
const getEventsToProcess = async (opts) => {
const events = await client.query(
`SELECT id, errors FROM events
WHERE processed_at IS NULL
AND (backoff_until IS NULL OR backoff_until < NOW())
AND errors < $1
ORDER BY priority DESC, timestamp ASC -- High priority first
LIMIT 100`,
[opts.maxErrors],
);
return events.rows;
};Add a priority column to your events table.
- You need guaranteed event delivery (can't lose events)
- You want to avoid distributed transactions (2PC, Saga)
- You're already using PostgreSQL or MongoDB
- You need at-least-once delivery semantics
- You can make handlers idempotent
- You're building reliable background processing
- You want simple infrastructure (no separate message queue)
- You need horizontal scalability without coordination
- You want near-realtime processing without standing up extra infra — wakeup signals (Postgres NOTIFY / MongoDB Change Streams) deliver events within tens of ms while keeping ACID guarantees
- You need exactly-once semantics (use Kafka with transactions)
- You need hard real-time processing (sub-10ms tail latency) - use message queue
- You need fan-out / pub-sub to many independent consumers - use message broker (txob handlers all run in the same processor process)
- You need sustained throughput beyond your database's write capacity - use a dedicated message system
- You already have message queue infrastructure you're happy with
- You can't make handlers idempotent
- You need complex routing or pub/sub patterns - use message broker
| Feature | txob | RabbitMQ | Kafka | AWS SQS |
|---|---|---|---|---|
| Infrastructure | Database only | Separate service | Separate cluster | Managed service |
| Consistency | Strong (ACID) | Eventual | Eventual | Eventual |
| Latency | ~10s of ms with wakeup signals, ~5s polling-only | ~10ms | ~10ms | ~1s |
| Throughput | 10-100/s per processor; 1k-10k+/s aggregate (DB-bound) | 10k+/s | 100k+/s | 3k/s |
| Horizontal scaling | ✅ Yes | ✅ Yes | ✅ Yes | ✅ Yes |
| Exactly-once | ❌ No | ❌ No | ✅ Yes | ❌ No |
| Operational complexity | Low | Medium | High | Low |
| Cost | DB storage | Self-hosted | Self-hosted | Pay per request |
Contributions are welcome! To contribute:
- Fork the repository
- Create a feature branch:
git checkout -b feature/my-feature - Make your changes with tests
- Run tests:
npm test - Run linting:
npm run format - Commit your changes:
git commit -m "Add my feature" - Push to your fork:
git push origin feature/my-feature - Open a Pull Request
Guidelines:
- Add tests for new features
- Update documentation for API changes
- Follow existing code style
- Keep PRs focused on a single concern
See the examples directory for complete working examples:
- PostgreSQL example - HTTP API with user creation and email sending
- More examples coming soon!
- 📖 Documentation: You're reading it!
- 🐛 Bug Reports: GitHub Issues
- Transactional Outbox Pattern - Detailed explanation of the pattern
- Implementing the Outbox Pattern - Debezium blog post
- Event-Driven Architecture - Martin Fowler
MIT © Dillon Streator
Implements the Transactional Outbox pattern microservices patterns.