diff --git a/.github/dependabot.yml b/.github/dependabot.yml index 06d8552..da208ad 100644 --- a/.github/dependabot.yml +++ b/.github/dependabot.yml @@ -5,4 +5,4 @@ updates: schedule: interval: "weekly" open-pull-requests-limit: 0 - security-updates-only: true \ No newline at end of file + security-updates-only: true diff --git a/README.md b/README.md index 57b2e58..befafdf 100644 --- a/README.md +++ b/README.md @@ -704,6 +704,16 @@ EventProcessor(client, handlerMap, { error: (msg, ...args) => console.error(msg, ...args), }, + // OpenTelemetry-compatible tracing and metrics (default: undefined) + telemetry: { + tracer: trace.getTracer("txob"), + meter: metrics.getMeter("txob"), + attributes: { + "service.name": "orders-worker", + "deployment.environment": "production", + }, + }, + // Hook called when max errors reached (default: undefined) onEventMaxErrorsReached: async ({ event, txClient, signal }) => { // Create a dead-letter event, send alerts, etc. @@ -724,6 +734,7 @@ EventProcessor(client, handlerMap, { | `wakeupTimeoutMs` | `number` | `30000` | Fallback poll if no wakeup signal received (only used with wakeupEmitter) | | `wakeupThrottleMs` | `number` | `1000` | Throttle wakeup signals to prevent excessive polling (only used with wakeupEmitter) | | `logger` | `Logger` | `undefined` | Custom logger interface | +| `telemetry` | `TxOBTelemetry` | `undefined` | OpenTelemetry-compatible tracer, meter, and shared attributes | | `onEventMaxErrorsReached` | `function` | `undefined` | Hook for max errors | ## Usage Examples @@ -1568,7 +1579,34 @@ EventProcessor(client, handlers, { ### How do I monitor event processing? -**1. Use the logger option:** +**1. Enable OpenTelemetry-compatible telemetry:** + +txob can emit spans and metrics without depending on a specific telemetry SDK. Install and configure OpenTelemetry in your application, then pass a `Tracer` and/or `Meter` to opt in: + +```typescript +import { metrics, trace } from "@opentelemetry/api"; + +const processor = new EventProcessor({ + client: createProcessorClient({ querier: client }), + handlerMap: handlers, + telemetry: { + tracer: trace.getTracer("txob"), + meter: metrics.getMeter("txob"), + attributes: { + "service.name": "orders-worker", + "deployment.environment": process.env.NODE_ENV ?? "development", + }, + }, +}); +``` + +This records `txob.poll`, `txob.event.process`, and `txob.handler.process` spans plus `txob.poll.count`, `txob.poll.duration`, `txob.event.processing.count`, `txob.event.processing.duration`, `txob.handler.processing.count`, and `txob.handler.processing.duration` metrics. Metrics use low-cardinality attributes such as event type, handler name, and outcome; event IDs and correlation IDs are only attached to spans. + +The full set of telemetry names is exported as constants from `txob`: `TxOBTelemetrySpanName`, `TxOBTelemetryMetricName`, `TxOBTelemetryAttributeKey`, `TxOBTelemetryEventOutcome`, `TxOBTelemetryHandlerOutcome`, and `TxOBTelemetryPollOutcome`. + +txob surfaces failures while creating metric instruments during processor construction so misconfigured telemetry is visible at startup. Runtime telemetry operations, including span creation and metric recording, are best-effort and will not interrupt event processing if an exporter or SDK callback fails. + +**2. Use the logger option:** ```typescript EventProcessor(client, handlers, { @@ -1576,7 +1614,7 @@ EventProcessor(client, handlers, { }); ``` -**2. Query the events table:** +**3. Query the events table:** ```sql -- Pending events @@ -1594,7 +1632,7 @@ FROM events WHERE processed_at IS NOT NULL GROUP BY type; ``` -**3. Create monitoring events:** +**4. Create monitoring events:** ```typescript onEventMaxErrorsReached: async ({ event, txClient }) => { diff --git a/src/index.ts b/src/index.ts index ee60a7e..06c67cc 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,2 +1,3 @@ export * from "./processor.js"; export * from "./error.js"; +export * from "./telemetry.js"; diff --git a/src/processor.test.ts b/src/processor.test.ts index a203cb3..420e97b 100644 --- a/src/processor.test.ts +++ b/src/processor.test.ts @@ -2,6 +2,13 @@ import { describe, it, expect, vi, afterEach } from "vitest"; import { EventProcessor, TxOBEvent, defaultBackoff } from "./processor.js"; import { TxOBError, ErrorUnprocessableEventHandler } from "./error.js"; import { sleep } from "./sleep.js"; +import { + TxOBTelemetryAttributeKey, + TxOBTelemetryEventOutcome, + TxOBTelemetryHandlerOutcome, + TxOBTelemetryMetricName, + TxOBTelemetrySpanName, +} from "./telemetry.js"; const mockTxClient = { getEventByIdForUpdateSkipLocked: vi.fn(), @@ -1253,3 +1260,223 @@ describe("EventProcessor - basic", () => { expect(mockTxClient.updateEvent).not.toHaveBeenCalled(); }); }); + +describe("EventProcessor - telemetry", () => { + it("records OpenTelemetry-compatible spans and metrics when enabled", async () => { + const counters = new Map }>(); + const histograms = new Map }>(); + const spans: { + name: string; + setAttributes: ReturnType; + recordException: ReturnType; + setStatus: ReturnType; + end: ReturnType; + }[] = []; + const meter = { + createCounter: vi.fn((name: string) => { + const counter = { add: vi.fn() }; + counters.set(name, counter); + return counter; + }), + createHistogram: vi.fn((name: string) => { + const histogram = { record: vi.fn() }; + histograms.set(name, histogram); + return histogram; + }), + }; + const tracer = { + startSpan: vi.fn((name: string) => { + const span = { + name, + setAttributes: vi.fn(), + recordException: vi.fn(), + setStatus: vi.fn(), + end: vi.fn(), + }; + spans.push(span); + return span; + }), + }; + const handlerMap = { + evtType1: { + handler1: vi.fn(() => Promise.resolve()), + }, + }; + const evt1: TxOBEvent = { + type: "evtType1", + id: "1", + timestamp: now, + data: {}, + correlation_id: "abc123", + handler_results: {}, + errors: 0, + }; + let callCount = 0; + mockClient.getEventsToProcess.mockImplementation(() => { + callCount++; + return Promise.resolve(callCount === 1 ? [evt1] : []); + }); + mockTxClient.getEventByIdForUpdateSkipLocked.mockImplementation(() => + Promise.resolve(evt1), + ); + mockTxClient.updateEvent.mockImplementation(() => Promise.resolve()); + + const processor = new EventProcessor({ + client: mockClient, + handlerMap, + pollingIntervalMs: 10, + telemetry: { + meter, + tracer, + attributes: { + "service.name": "txob-test", + }, + }, + }); + processor.start(); + await sleep(50); + await processor.stop(); + + expect(meter.createCounter).toHaveBeenCalledWith( + TxOBTelemetryMetricName.EventProcessingCount, + expect.any(Object), + ); + expect(meter.createHistogram).toHaveBeenCalledWith( + TxOBTelemetryMetricName.HandlerProcessingDuration, + expect.any(Object), + ); + expect(tracer.startSpan).toHaveBeenCalledWith( + TxOBTelemetrySpanName.EventProcess, + expect.objectContaining({ + attributes: expect.objectContaining({ + "service.name": "txob-test", + [TxOBTelemetryAttributeKey.EventId]: "1", + [TxOBTelemetryAttributeKey.EventType]: "evtType1", + [TxOBTelemetryAttributeKey.EventCorrelationId]: "abc123", + }), + }), + ); + expect(spans.map((span) => span.name)).toEqual( + expect.arrayContaining([ + TxOBTelemetrySpanName.Poll, + TxOBTelemetrySpanName.EventProcess, + TxOBTelemetrySpanName.HandlerProcess, + ]), + ); + expect( + counters.get(TxOBTelemetryMetricName.EventProcessingCount)?.add, + ).toHaveBeenCalledWith( + 1, + expect.objectContaining({ + "service.name": "txob-test", + [TxOBTelemetryAttributeKey.EventType]: "evtType1", + [TxOBTelemetryAttributeKey.EventOutcome]: + TxOBTelemetryEventOutcome.Success, + }), + ); + expect( + counters.get(TxOBTelemetryMetricName.HandlerProcessingCount)?.add, + ).toHaveBeenCalledWith( + 1, + expect.objectContaining({ + "service.name": "txob-test", + [TxOBTelemetryAttributeKey.EventType]: "evtType1", + [TxOBTelemetryAttributeKey.HandlerName]: "handler1", + [TxOBTelemetryAttributeKey.HandlerOutcome]: + TxOBTelemetryHandlerOutcome.Success, + }), + ); + expect( + histograms.get(TxOBTelemetryMetricName.EventProcessingDuration)?.record, + ).toHaveBeenCalledWith( + expect.any(Number), + expect.objectContaining({ + [TxOBTelemetryAttributeKey.EventType]: "evtType1", + [TxOBTelemetryAttributeKey.EventOutcome]: + TxOBTelemetryEventOutcome.Success, + }), + ); + expect( + spans.find((span) => span.name === TxOBTelemetrySpanName.EventProcess) + ?.setStatus, + ).toHaveBeenCalledWith({ code: 1 }); + }); + + it("surfaces failures when creating metric instruments", () => { + const metricError = new Error("counter failed"); + + expect( + () => + new EventProcessor({ + client: mockClient, + handlerMap: {}, + pollingIntervalMs: 10, + telemetry: { + meter: { + createCounter: vi.fn(() => { + throw metricError; + }), + createHistogram: vi.fn(), + }, + }, + }), + ).toThrow(metricError); + }); + + it("does not let runtime telemetry failures interrupt processing", async () => { + const handlerMap = { + evtType1: { + handler1: vi.fn(() => Promise.resolve()), + }, + }; + const evt1: TxOBEvent = { + type: "evtType1", + id: "1", + timestamp: now, + data: {}, + correlation_id: "abc123", + handler_results: {}, + errors: 0, + }; + let callCount = 0; + mockClient.getEventsToProcess.mockImplementation(() => { + callCount++; + return Promise.resolve(callCount === 1 ? [evt1] : []); + }); + mockTxClient.getEventByIdForUpdateSkipLocked.mockImplementation(() => + Promise.resolve(evt1), + ); + mockTxClient.updateEvent.mockImplementation(() => Promise.resolve()); + + const processor = new EventProcessor({ + client: mockClient, + handlerMap, + pollingIntervalMs: 10, + telemetry: { + meter: { + createCounter: vi.fn(() => ({ + add: vi.fn(() => { + throw new Error("counter add failed"); + }), + })), + createHistogram: vi.fn(() => ({ + record: vi.fn(() => { + throw new Error("histogram record failed"); + }), + })), + }, + tracer: { + startSpan: vi.fn(() => { + throw new Error("span failed"); + }), + }, + }, + }); + processor.start(); + await sleep(50); + await processor.stop(); + + expect(handlerMap.evtType1.handler1).toHaveBeenCalledOnce(); + expect(mockTxClient.updateEvent).toHaveBeenCalledOnce(); + }); +}); diff --git a/src/processor.ts b/src/processor.ts index f94d78e..1ac4f36 100644 --- a/src/processor.ts +++ b/src/processor.ts @@ -5,6 +5,23 @@ import { deepClone } from "./clone.js"; import PQueue from "p-queue"; import { ErrorUnprocessableEventHandler, TxOBError } from "./error.js"; import { throttle } from "throttle-debounce"; +import { + createTelemetryInstruments, + endTelemetrySpan, + recordTelemetryCounter, + recordTelemetryDuration, + setTelemetrySpanAttributes, + startTelemetrySpan, + TxOBTelemetryAttributeKey, + TxOBTelemetryEventOutcome, + TxOBTelemetryHandlerOutcome, + TxOBTelemetryPollOutcome, + TxOBTelemetrySpanName, + type TxOBTelemetry, + type TxOBTelemetryAttributes, + type TxOBTelemetryInstruments, + type TxOBTelemetrySpan, +} from "./telemetry.js"; type TxOBEventHandlerResult = { processed_at?: Date; @@ -102,6 +119,7 @@ type TxOBProcessEventsOpts = { txClient: TxOBTransactionProcessorClient; signal?: AbortSignal; }) => Promise; + telemetry?: TxOBTelemetryInstruments; }; const processEvent = async ({ @@ -122,7 +140,9 @@ const processEvent = async ({ backoff = defaultBackoff, maxHandlerConcurrency = defaultMaxHandlerConcurrency, onEventMaxErrorsReached, + telemetry, } = opts ?? {}; + const eventStartedAt = Date.now(); if (signal?.aborted) { return {}; @@ -138,223 +158,343 @@ const processEvent = async ({ }, "unexpected event with max errors returned from `getEventsToProcess`", ); + recordTelemetryCounter(telemetry?.eventCounter, telemetry, { + [TxOBTelemetryAttributeKey.EventOutcome]: + TxOBTelemetryEventOutcome.SkippedMaxErrors, + }); + recordTelemetryDuration( + telemetry?.eventDuration, + telemetry, + eventStartedAt, + { + [TxOBTelemetryAttributeKey.EventOutcome]: + TxOBTelemetryEventOutcome.SkippedMaxErrors, + }, + ); return {}; } let backoffUntil: Date | undefined; - - await client.transaction(async (txClient) => { - const lockedEvent = await txClient.getEventByIdForUpdateSkipLocked( - unlockedEvent.id, - { signal, maxErrors }, - ); - if (!lockedEvent) { - logger?.debug( - { - eventId: unlockedEvent.id, - }, - "skipping locked or already processed event", + let eventSpan: TxOBTelemetrySpan | undefined; + let eventError: unknown; + let eventOutcome: TxOBTelemetryEventOutcome | undefined; + let eventMetricAttributes: TxOBTelemetryAttributes = {}; + + try { + await client.transaction(async (txClient) => { + const lockedEvent = await txClient.getEventByIdForUpdateSkipLocked( + unlockedEvent.id, + { signal, maxErrors }, ); - return; - } + if (!lockedEvent) { + eventOutcome = TxOBTelemetryEventOutcome.SkippedLocked; + logger?.debug( + { + eventId: unlockedEvent.id, + }, + "skipping locked or already processed event", + ); + return; + } - // While unlikely, the following two conditions are possible if a concurrent processor finished processing this event or reaching maximum errors between the time - // that this processor found the event with `getEventsToProcess` and called `getEventByIdForUpdateSkipLocked` - // `getEventByIdForUpdateSkipLocked` should handle this in its query implementation and return null to save resources - if (lockedEvent.processed_at) { - logger?.debug( - { - eventId: lockedEvent.id, - correlationId: lockedEvent.correlation_id, - }, - "skipping already processed event", - ); - return; - } - if (lockedEvent.errors >= maxErrors) { - logger?.debug( + eventMetricAttributes = { + [TxOBTelemetryAttributeKey.EventType]: lockedEvent.type, + }; + eventSpan = startTelemetrySpan( + telemetry, + TxOBTelemetrySpanName.EventProcess, { - eventId: lockedEvent.id, - correlationId: lockedEvent.correlation_id, + [TxOBTelemetryAttributeKey.EventId]: lockedEvent.id, + [TxOBTelemetryAttributeKey.EventType]: lockedEvent.type, + [TxOBTelemetryAttributeKey.EventCorrelationId]: + lockedEvent.correlation_id, + [TxOBTelemetryAttributeKey.EventErrors]: lockedEvent.errors, }, - "skipping event with maximum errors", ); - return; - } - let errored = false; + // While unlikely, the following two conditions are possible if a concurrent processor finished processing this event or reaching maximum errors between the time + // that this processor found the event with `getEventsToProcess` and called `getEventByIdForUpdateSkipLocked` + // `getEventByIdForUpdateSkipLocked` should handle this in its query implementation and return null to save resources + if (lockedEvent.processed_at) { + eventOutcome = TxOBTelemetryEventOutcome.SkippedProcessed; + logger?.debug( + { + eventId: lockedEvent.id, + correlationId: lockedEvent.correlation_id, + }, + "skipping already processed event", + ); + return; + } + if (lockedEvent.errors >= maxErrors) { + eventOutcome = TxOBTelemetryEventOutcome.SkippedMaxErrors; + logger?.debug( + { + eventId: lockedEvent.id, + correlationId: lockedEvent.correlation_id, + }, + "skipping event with maximum errors", + ); + return; + } - const eventHandlerMap = handlerMap[lockedEvent.type] ?? {}; + let errored = false; + + const eventHandlerMap = handlerMap[lockedEvent.type] ?? {}; + + // Typescript should prevent the caller from passing a handler map that doesn't specify all event types but we'll check for it anyway + // This is distinct from an empty handler map for an event type which is valid + // We just want the caller to be explicit about the event types they are interested in handling and not accidentally skip events + if (!(lockedEvent.type in handlerMap)) { + logger?.warn( + { + eventId: lockedEvent.id, + type: lockedEvent.type, + correlationId: lockedEvent.correlation_id, + }, + "missing event handler map", + ); + errored = true; + lockedEvent.errors = maxErrors; + } - // Typescript should prevent the caller from passing a handler map that doesn't specify all event types but we'll check for it anyway - // This is distinct from an empty handler map for an event type which is valid - // We just want the caller to be explicit about the event types they are interested in handling and not accidentally skip events - if (!(lockedEvent.type in handlerMap)) { - logger?.warn( + logger?.debug( { eventId: lockedEvent.id, type: lockedEvent.type, correlationId: lockedEvent.correlation_id, }, - "missing event handler map", + `processing event`, ); - errored = true; - lockedEvent.errors = maxErrors; - } - logger?.debug( - { - eventId: lockedEvent.id, - type: lockedEvent.type, - correlationId: lockedEvent.correlation_id, - }, - `processing event`, - ); - - const backoffs: Date[] = []; + const backoffs: Date[] = []; + + const handlerLimit = pLimit(maxHandlerConcurrency); + await Promise.allSettled( + Object.entries(eventHandlerMap).map(([handlerName, handler]) => + handlerLimit(async (): Promise => { + const handlerMetricAttributes = { + [TxOBTelemetryAttributeKey.EventType]: lockedEvent.type, + [TxOBTelemetryAttributeKey.HandlerName]: handlerName, + }; + const handlerResults = + lockedEvent.handler_results[handlerName] ?? {}; + if (handlerResults.processed_at) { + logger?.debug( + { + eventId: lockedEvent.id, + type: lockedEvent.type, + handlerName, + correlationId: lockedEvent.correlation_id, + }, + "handler already processed", + ); + recordTelemetryCounter(telemetry?.handlerCounter, telemetry, { + ...handlerMetricAttributes, + [TxOBTelemetryAttributeKey.HandlerOutcome]: + TxOBTelemetryHandlerOutcome.SkippedProcessed, + }); + return; + } + if (handlerResults.unprocessable_at) { + logger?.debug( + { + eventId: lockedEvent.id, + type: lockedEvent.type, + handlerName, + correlationId: lockedEvent.correlation_id, + }, + "handler unprocessable", + ); + recordTelemetryCounter(telemetry?.handlerCounter, telemetry, { + ...handlerMetricAttributes, + [TxOBTelemetryAttributeKey.HandlerOutcome]: + TxOBTelemetryHandlerOutcome.SkippedUnprocessable, + }); + return; + } - const handlerLimit = pLimit(maxHandlerConcurrency); - await Promise.allSettled( - Object.entries(eventHandlerMap).map(([handlerName, handler]) => - handlerLimit(async (): Promise => { - const handlerResults = lockedEvent.handler_results[handlerName] ?? {}; - if (handlerResults.processed_at) { - logger?.debug( + handlerResults.errors ??= []; + const handlerStartedAt = Date.now(); + const handlerSpan = startTelemetrySpan( + telemetry, + TxOBTelemetrySpanName.HandlerProcess, { - eventId: lockedEvent.id, - type: lockedEvent.type, - handlerName, - correlationId: lockedEvent.correlation_id, + [TxOBTelemetryAttributeKey.EventId]: lockedEvent.id, + [TxOBTelemetryAttributeKey.EventType]: lockedEvent.type, + [TxOBTelemetryAttributeKey.EventCorrelationId]: + lockedEvent.correlation_id, + [TxOBTelemetryAttributeKey.HandlerName]: handlerName, }, - "handler already processed", ); - return; - } - if (handlerResults.unprocessable_at) { - logger?.debug( - { - eventId: lockedEvent.id, - type: lockedEvent.type, - handlerName, - correlationId: lockedEvent.correlation_id, - }, - "handler unprocessable", - ); - return; - } - - handlerResults.errors ??= []; + let handlerOutcome: TxOBTelemetryHandlerOutcome = + TxOBTelemetryHandlerOutcome.Success; + let handlerError: unknown; + + try { + await handler(lockedEvent, { signal }); + handlerResults.processed_at = getDate(); + logger?.debug( + { + eventId: lockedEvent.id, + type: lockedEvent.type, + handlerName, + correlationId: lockedEvent.correlation_id, + }, + "handler succeeded", + ); + } catch (error) { + handlerError = error; + logger?.error( + { + eventId: lockedEvent.id, + type: lockedEvent.type, + handlerName, + error, + correlationId: lockedEvent.correlation_id, + }, + "handler errored", + ); - try { - await handler(lockedEvent, { signal }); - handlerResults.processed_at = getDate(); - logger?.debug( - { - eventId: lockedEvent.id, - type: lockedEvent.type, - handlerName, - correlationId: lockedEvent.correlation_id, - }, - "handler succeeded", - ); - } catch (error) { - logger?.error( - { - eventId: lockedEvent.id, - type: lockedEvent.type, - handlerName, - error, - correlationId: lockedEvent.correlation_id, - }, - "handler errored", - ); + if (error instanceof ErrorUnprocessableEventHandler) { + handlerOutcome = TxOBTelemetryHandlerOutcome.Unprocessable; + handlerResults.unprocessable_at = getDate(); + handlerResults.errors?.push({ + error: error.message ?? error, + timestamp: getDate(), + }); + errored = true; + } else { + handlerOutcome = TxOBTelemetryHandlerOutcome.Error; + if (error instanceof TxOBError && error.backoffUntil) { + backoffs.push(error.backoffUntil); + } - if (error instanceof ErrorUnprocessableEventHandler) { - handlerResults.unprocessable_at = getDate(); - handlerResults.errors?.push({ - error: error.message ?? error, - timestamp: getDate(), - }); - errored = true; - } else { - if (error instanceof TxOBError && error.backoffUntil) { - backoffs.push(error.backoffUntil); + errored = true; + handlerResults.errors?.push({ + error: (error as Error)?.message ?? error, + timestamp: getDate(), + }); } - - errored = true; - handlerResults.errors?.push({ - error: (error as Error)?.message ?? error, - timestamp: getDate(), - }); + } finally { + const handlerAttributes = { + ...handlerMetricAttributes, + [TxOBTelemetryAttributeKey.HandlerOutcome]: handlerOutcome, + }; + recordTelemetryCounter( + telemetry?.handlerCounter, + telemetry, + handlerAttributes, + ); + recordTelemetryDuration( + telemetry?.handlerDuration, + telemetry, + handlerStartedAt, + handlerAttributes, + ); + endTelemetrySpan(handlerSpan, handlerError); } - } - - lockedEvent.handler_results[handlerName] = handlerResults; - }), - ), - ); - // Check if all remaining handlers (those that haven't succeeded) are unprocessable - // If so, there's nothing left to retry, so set errors to maxErrors to stop processing - const remainingHandlers = Object.entries(eventHandlerMap).filter( - ([handlerName, _]) => { - const result = lockedEvent.handler_results[handlerName]; - return !result?.processed_at; - }, - ); + lockedEvent.handler_results[handlerName] = handlerResults; + }), + ), + ); - const allRemainingHandlersUnprocessable = - remainingHandlers.length > 0 && - remainingHandlers.every(([handlerName, _]) => { - const result = lockedEvent.handler_results[handlerName]; - return result?.unprocessable_at; - }); + // Check if all remaining handlers (those that haven't succeeded) are unprocessable + // If so, there's nothing left to retry, so set errors to maxErrors to stop processing + const remainingHandlers = Object.entries(eventHandlerMap).filter( + ([handlerName, _]) => { + const result = lockedEvent.handler_results[handlerName]; + return !result?.processed_at; + }, + ); - if (allRemainingHandlersUnprocessable) { - lockedEvent.errors = maxErrors; - errored = true; - } + const allRemainingHandlersUnprocessable = + remainingHandlers.length > 0 && + remainingHandlers.every(([handlerName, _]) => { + const result = lockedEvent.handler_results[handlerName]; + return result?.unprocessable_at; + }); - if (errored) { - lockedEvent.errors = Math.min(lockedEvent.errors + 1, maxErrors); - backoffs.push(backoff(lockedEvent.errors)); - const latestBackoff = backoffs.sort( - (a, b) => b.getTime() - a.getTime(), - )[0]; - lockedEvent.backoff_until = latestBackoff; - if (lockedEvent.errors === maxErrors) { - lockedEvent.backoff_until = null; - lockedEvent.processed_at = getDate(); + if (allRemainingHandlersUnprocessable) { + lockedEvent.errors = maxErrors; + errored = true; + } - if (onEventMaxErrorsReached) { - try { - await onEventMaxErrorsReached({ - event: deepClone(lockedEvent), - txClient, - signal, - }); - } catch (hookError) { - logger?.error( - { - eventId: lockedEvent.id, - error: hookError, - }, - "error in onEventMaxErrorsReached hook", - ); + if (errored) { + lockedEvent.errors = Math.min(lockedEvent.errors + 1, maxErrors); + backoffs.push(backoff(lockedEvent.errors)); + const latestBackoff = backoffs.sort( + (a, b) => b.getTime() - a.getTime(), + )[0]; + lockedEvent.backoff_until = latestBackoff; + if (lockedEvent.errors === maxErrors) { + lockedEvent.backoff_until = null; + lockedEvent.processed_at = getDate(); + + if (onEventMaxErrorsReached) { + try { + await onEventMaxErrorsReached({ + event: deepClone(lockedEvent), + txClient, + signal, + }); + } catch (hookError) { + logger?.error( + { + eventId: lockedEvent.id, + error: hookError, + }, + "error in onEventMaxErrorsReached hook", + ); - throw hookError; + throw hookError; + } } } + } else { + lockedEvent.backoff_until = null; + lockedEvent.processed_at = getDate(); } - } else { - lockedEvent.backoff_until = null; - lockedEvent.processed_at = getDate(); - } - backoffUntil = lockedEvent.backoff_until ?? undefined; + eventOutcome = errored + ? lockedEvent.errors === maxErrors + ? TxOBTelemetryEventOutcome.MaxErrors + : TxOBTelemetryEventOutcome.Error + : TxOBTelemetryEventOutcome.Success; + setTelemetrySpanAttributes(eventSpan, { + [TxOBTelemetryAttributeKey.EventOutcome]: eventOutcome, + [TxOBTelemetryAttributeKey.EventErrors]: lockedEvent.errors, + }); + + backoffUntil = lockedEvent.backoff_until ?? undefined; - await txClient.updateEvent(lockedEvent); - }); + await txClient.updateEvent(lockedEvent); + }); + } catch (error) { + eventError = error; + eventOutcome ??= TxOBTelemetryEventOutcome.Error; + throw error; + } finally { + if (eventOutcome) { + const eventAttributes = { + ...eventMetricAttributes, + [TxOBTelemetryAttributeKey.EventOutcome]: eventOutcome, + }; + recordTelemetryCounter( + telemetry?.eventCounter, + telemetry, + eventAttributes, + ); + recordTelemetryDuration( + telemetry?.eventDuration, + telemetry, + eventStartedAt, + eventAttributes, + ); + } + endTelemetrySpan(eventSpan, eventError); + } return { backoffUntil }; }; @@ -388,12 +528,17 @@ export class EventProcessor { client, handlerMap, wakeupEmitter, + telemetry, ...opts - }: Omit>, "signal"> & { + }: Omit< + Partial>, + "signal" | "telemetry" + > & { pollingIntervalMs?: number; wakeupTimeoutMs?: number; wakeupThrottleMs?: number; wakeupEmitter?: WakeupEmitter; + telemetry?: TxOBTelemetry; } & { client: TxOBProcessorClient; handlerMap: TxOBEventHandlerMap; @@ -407,6 +552,7 @@ export class EventProcessor { maxQueuedEvents: defaultMaxQueuedEvents, wakeupTimeoutMs: defaultWakeupTimeoutMs, wakeupThrottleMs: defaultWakeupThrottleMs, + telemetry: createTelemetryInstruments(telemetry), ...opts, }; this.client = client; @@ -438,13 +584,34 @@ export class EventProcessor { // Prevent concurrent polls if (this.isPolling) { this.opts.logger?.debug("skipping poll - already polling"); + recordTelemetryCounter( + this.opts.telemetry?.pollCounter, + this.opts.telemetry, + { + [TxOBTelemetryAttributeKey.PollOutcome]: + TxOBTelemetryPollOutcome.SkippedAlreadyPolling, + }, + ); return; } this.isPolling = true; + const pollStartedAt = Date.now(); + const pollSpan = startTelemetrySpan( + this.opts.telemetry, + TxOBTelemetrySpanName.Poll, + { + [TxOBTelemetryAttributeKey.QueueSize]: queuedEventIds.size, + [TxOBTelemetryAttributeKey.QueueMaxSize]: this.opts.maxQueuedEvents, + }, + ); + let pollOutcome: TxOBTelemetryPollOutcome = + TxOBTelemetryPollOutcome.Success; + let pollError: unknown; try { // Skip polling if we're at capacity to prevent memory leaks if (queuedEventIds.size >= this.opts.maxQueuedEvents) { + pollOutcome = TxOBTelemetryPollOutcome.SkippedQueueFull; this.opts.logger?.debug( { queuedCount: queuedEventIds.size, @@ -463,6 +630,11 @@ export class EventProcessor { const unqueuedEvents = events.filter( (event) => !queuedEventIds.has(event.id), ); + setTelemetrySpanAttributes(pollSpan, { + [TxOBTelemetryAttributeKey.EventsFound]: events.length, + [TxOBTelemetryAttributeKey.EventsQueued]: unqueuedEvents.length, + [TxOBTelemetryAttributeKey.QueueSize]: queuedEventIds.size, + }); this.opts.logger?.debug( `found ${unqueuedEvents.length} events to process`, ); @@ -516,12 +688,30 @@ export class EventProcessor { }); } } catch (error) { + pollOutcome = TxOBTelemetryPollOutcome.Error; + pollError = error; this.opts.logger?.error( { error }, "error polling for events, will retry", ); // Continue polling even on error } finally { + recordTelemetryCounter( + this.opts.telemetry?.pollCounter, + this.opts.telemetry, + { + [TxOBTelemetryAttributeKey.PollOutcome]: pollOutcome, + }, + ); + recordTelemetryDuration( + this.opts.telemetry?.pollDuration, + this.opts.telemetry, + pollStartedAt, + { + [TxOBTelemetryAttributeKey.PollOutcome]: pollOutcome, + }, + ); + endTelemetrySpan(pollSpan, pollError); this.isPolling = false; } }; diff --git a/src/telemetry.ts b/src/telemetry.ts new file mode 100644 index 0000000..19752d3 --- /dev/null +++ b/src/telemetry.ts @@ -0,0 +1,263 @@ +export type TxOBTelemetryAttributeValue = + | string + | number + | boolean + | string[] + | number[] + | boolean[]; +export type TxOBTelemetryAttributes = Record< + string, + TxOBTelemetryAttributeValue +>; + +export interface TxOBTelemetrySpan { + setAttribute?(key: string, value: TxOBTelemetryAttributeValue): void; + setAttributes?(attributes: TxOBTelemetryAttributes): void; + recordException?(exception: unknown): void; + setStatus?(status: { code: 1 | 2; message?: string }): void; + end(): void; +} + +export interface TxOBTelemetryTracer { + startSpan( + name: string, + options?: { attributes?: TxOBTelemetryAttributes }, + ): TxOBTelemetrySpan; +} + +export interface TxOBTelemetryCounter { + add(value: number, attributes?: TxOBTelemetryAttributes): void; +} + +export interface TxOBTelemetryHistogram { + record(value: number, attributes?: TxOBTelemetryAttributes): void; +} + +export interface TxOBTelemetryMeter { + createCounter( + name: string, + options?: { description?: string; unit?: string }, + ): TxOBTelemetryCounter; + createHistogram( + name: string, + options?: { description?: string; unit?: string }, + ): TxOBTelemetryHistogram; +} + +export type TxOBTelemetry = { + tracer?: TxOBTelemetryTracer; + meter?: TxOBTelemetryMeter; + attributes?: TxOBTelemetryAttributes; +}; + +export type TxOBTelemetryInstruments = { + tracer?: TxOBTelemetryTracer; + attributes?: TxOBTelemetryAttributes; + eventCounter?: TxOBTelemetryCounter; + eventDuration?: TxOBTelemetryHistogram; + handlerCounter?: TxOBTelemetryCounter; + handlerDuration?: TxOBTelemetryHistogram; + pollCounter?: TxOBTelemetryCounter; + pollDuration?: TxOBTelemetryHistogram; +}; + +export const TxOBTelemetrySpanName = { + Poll: "txob.poll", + EventProcess: "txob.event.process", + HandlerProcess: "txob.handler.process", +} as const; + +export const TxOBTelemetryMetricName = { + PollCount: "txob.poll.count", + PollDuration: "txob.poll.duration", + EventProcessingCount: "txob.event.processing.count", + EventProcessingDuration: "txob.event.processing.duration", + HandlerProcessingCount: "txob.handler.processing.count", + HandlerProcessingDuration: "txob.handler.processing.duration", +} as const; + +export const TxOBTelemetryAttributeKey = { + EventId: "txob.event.id", + EventType: "txob.event.type", + EventCorrelationId: "txob.event.correlation_id", + EventErrors: "txob.event.errors", + EventOutcome: "txob.event.outcome", + HandlerName: "txob.handler.name", + HandlerOutcome: "txob.handler.outcome", + PollOutcome: "txob.poll.outcome", + QueueSize: "txob.queue.size", + QueueMaxSize: "txob.queue.max_size", + EventsFound: "txob.events.found", + EventsQueued: "txob.events.queued", +} as const; + +export const TxOBTelemetryEventOutcome = { + Success: "success", + Error: "error", + MaxErrors: "max_errors", + SkippedLocked: "skipped.locked", + SkippedProcessed: "skipped.processed", + SkippedMaxErrors: "skipped.max_errors", +} as const; + +export const TxOBTelemetryHandlerOutcome = { + Success: "success", + Error: "error", + Unprocessable: "unprocessable", + SkippedProcessed: "skipped.processed", + SkippedUnprocessable: "skipped.unprocessable", +} as const; + +export const TxOBTelemetryPollOutcome = { + Success: "success", + Error: "error", + SkippedAlreadyPolling: "skipped.already_polling", + SkippedQueueFull: "skipped.queue_full", +} as const; + +export type TxOBTelemetryEventOutcome = + (typeof TxOBTelemetryEventOutcome)[keyof typeof TxOBTelemetryEventOutcome]; +export type TxOBTelemetryHandlerOutcome = + (typeof TxOBTelemetryHandlerOutcome)[keyof typeof TxOBTelemetryHandlerOutcome]; +export type TxOBTelemetryPollOutcome = + (typeof TxOBTelemetryPollOutcome)[keyof typeof TxOBTelemetryPollOutcome]; + +const telemetryStatusCode = { + OK: 1, + ERROR: 2, +} as const; + +const suppressTelemetryRuntimeError = (fn: () => T, fallback: T): T => { + try { + return fn(); + } catch { + // Runtime telemetry is best-effort: once processing is underway, a broken + // tracer/exporter must not change event processing behavior. + return fallback; + } +}; + +export const createTelemetryInstruments = ( + telemetry?: TxOBTelemetry, +): TxOBTelemetryInstruments => { + // Metric instrument creation happens during processor construction. Surface + // failures here so callers can catch configuration or SDK setup issues early. + return { + tracer: telemetry?.tracer, + attributes: telemetry?.attributes, + eventCounter: telemetry?.meter?.createCounter( + TxOBTelemetryMetricName.EventProcessingCount, + { + description: "Number of outbox event processing attempts by outcome.", + unit: "{event}", + }, + ), + eventDuration: telemetry?.meter?.createHistogram( + TxOBTelemetryMetricName.EventProcessingDuration, + { + description: "Duration of outbox event processing.", + unit: "ms", + }, + ), + handlerCounter: telemetry?.meter?.createCounter( + TxOBTelemetryMetricName.HandlerProcessingCount, + { + description: "Number of outbox handler processing attempts by outcome.", + unit: "{handler}", + }, + ), + handlerDuration: telemetry?.meter?.createHistogram( + TxOBTelemetryMetricName.HandlerProcessingDuration, + { + description: "Duration of outbox event handler execution.", + unit: "ms", + }, + ), + pollCounter: telemetry?.meter?.createCounter( + TxOBTelemetryMetricName.PollCount, + { + description: "Number of outbox polling attempts by outcome.", + unit: "{poll}", + }, + ), + pollDuration: telemetry?.meter?.createHistogram( + TxOBTelemetryMetricName.PollDuration, + { + description: "Duration of outbox polling attempts.", + unit: "ms", + }, + ), + }; +}; + +const mergeTelemetryAttributes = ( + telemetry: TxOBTelemetryInstruments | undefined, + attributes: TxOBTelemetryAttributes = {}, +): TxOBTelemetryAttributes => ({ + ...(telemetry?.attributes ?? {}), + ...attributes, +}); + +export const startTelemetrySpan = ( + telemetry: TxOBTelemetryInstruments | undefined, + name: string, + attributes?: TxOBTelemetryAttributes, +): TxOBTelemetrySpan | undefined => + suppressTelemetryRuntimeError( + () => + telemetry?.tracer?.startSpan(name, { + attributes: mergeTelemetryAttributes(telemetry, attributes), + }), + undefined, + ); + +export const setTelemetrySpanAttributes = ( + span: TxOBTelemetrySpan | undefined, + attributes: TxOBTelemetryAttributes, +): void => { + suppressTelemetryRuntimeError(() => { + span?.setAttributes?.(attributes); + }, undefined); +}; + +export const endTelemetrySpan = ( + span: TxOBTelemetrySpan | undefined, + error?: unknown, +): void => { + suppressTelemetryRuntimeError(() => { + if (error) { + span?.recordException?.(error); + span?.setStatus?.({ + code: telemetryStatusCode.ERROR, + message: error instanceof Error ? error.message : undefined, + }); + } else { + span?.setStatus?.({ code: telemetryStatusCode.OK }); + } + span?.end(); + }, undefined); +}; + +export const recordTelemetryCounter = ( + counter: TxOBTelemetryCounter | undefined, + telemetry: TxOBTelemetryInstruments | undefined, + attributes?: TxOBTelemetryAttributes, +): void => { + suppressTelemetryRuntimeError(() => { + counter?.add(1, mergeTelemetryAttributes(telemetry, attributes)); + }, undefined); +}; + +export const recordTelemetryDuration = ( + histogram: TxOBTelemetryHistogram | undefined, + telemetry: TxOBTelemetryInstruments | undefined, + startedAt: number, + attributes?: TxOBTelemetryAttributes, +): void => { + suppressTelemetryRuntimeError(() => { + histogram?.record( + Date.now() - startedAt, + mergeTelemetryAttributes(telemetry, attributes), + ); + }, undefined); +};