diff --git a/src/trace/context/extractor.ts b/src/trace/context/extractor.ts index 31fafa8f3..cc03521bc 100644 --- a/src/trace/context/extractor.ts +++ b/src/trace/context/extractor.ts @@ -5,9 +5,11 @@ import { XrayService } from "../xray-service"; import { AppSyncEventTraceExtractor, CustomTraceExtractor, + DurableExecutionEventTraceExtractor, EventBridgeEventTraceExtractor, EventBridgeSQSEventTraceExtractor, HTTPEventTraceExtractor, + isDurableExecutionEvent, KinesisEventTraceExtractor, LambdaContextTraceExtractor, SNSEventTraceExtractor, @@ -56,6 +58,9 @@ export class TraceContextExtractor { } } + // No stripping needed — trace context is stored in dedicated + // `_datadog_{N}` checkpoint operations. + if (spanContext === null) { this.stepFunctionContextService = StepFunctionContextService.instance(event); if (this.stepFunctionContextService?.context) { @@ -81,6 +86,9 @@ export class TraceContextExtractor { private getTraceEventExtractor(event: any): EventTraceExtractor | undefined { if (!event || typeof event !== "object") return; + // Check for durable execution event first (has DurableExecutionArn + CheckpointToken) + if (isDurableExecutionEvent(event)) return new DurableExecutionEventTraceExtractor(this.tracerWrapper); + const headers = event.headers ?? event.multiValueHeaders; if (headers !== null && typeof headers === "object") { return new HTTPEventTraceExtractor(this.tracerWrapper, this.config.decodeAuthorizerContext); diff --git a/src/trace/context/extractors/durable-execution.spec.ts b/src/trace/context/extractors/durable-execution.spec.ts new file mode 100644 index 000000000..fd8e46fc2 --- /dev/null +++ b/src/trace/context/extractors/durable-execution.spec.ts @@ -0,0 +1,148 @@ +import { createDurableExecutionRootSpan, DurableExecutionEventTraceExtractor } from "./durable-execution"; +import { TracerWrapper } from "../../tracer-wrapper"; + +jest.mock("dd-trace", () => ({ + startSpan: jest.fn(), +})); + +function makeTracerWrapper(extractReturn: any = null): TracerWrapper { + return { extract: jest.fn().mockReturnValue(extractReturn) } as unknown as TracerWrapper; +} + +describe("DurableExecutionEventTraceExtractor", () => { + const tracer = require("dd-trace"); + const startSpanMock = tracer.startSpan as jest.Mock; + + beforeEach(() => { + jest.clearAllMocks(); + }); + + it("delegates checkpoint headers to the standard propagator", () => { + const executionArn = + "arn:aws:lambda:us-east-2:123456789012:function:demo:$LATEST/durable-execution/demo/abc"; + + const checkpointHeaders = { + "x-datadog-trace-id": "149750110124521191", + "x-datadog-parent-id": "987654321012345678", + "x-datadog-sampling-priority": "1", + }; + + const event = { + DurableExecutionArn: executionArn, + CheckpointToken: "t-1", + InitialExecutionState: { + Operations: [ + { + Id: "op-1", + Name: "_datadog_0", + Status: "SUCCEEDED", + StepDetails: { + Result: JSON.stringify(checkpointHeaders), + }, + }, + ], + }, + }; + + const sentinelContext = { sentinel: true }; + const tracerWrapper = makeTracerWrapper(sentinelContext); + const extractor = new DurableExecutionEventTraceExtractor(tracerWrapper); + const context = extractor.extract(event); + + expect(tracerWrapper.extract).toHaveBeenCalledWith(checkpointHeaders); + expect(context).toBe(sentinelContext); + }); + + it("returns null when no checkpoint or upstream context exists", () => { + const tracerWrapper = makeTracerWrapper(); + const extractor = new DurableExecutionEventTraceExtractor(tracerWrapper); + + const context = extractor.extract({ + DurableExecutionArn: "arn:aws:lambda:us-east-2:123:function:demo", + CheckpointToken: "t-empty", + InitialExecutionState: { Operations: [] }, + }); + + expect(context).toBeNull(); + expect(tracerWrapper.extract).not.toHaveBeenCalled(); + }); + + it("creates durable root span only for first invocation", () => { + const executionArn = + "arn:aws:lambda:us-east-2:123456789012:function:demo:$LATEST/durable-execution/demo/first"; + + const spanContext: any = { + _spanId: null, + _parentId: null, + toTraceId: () => "1111111111111111111", + toSpanId: () => "2222222222222222222", + }; + const span = { + context: () => spanContext, + finish: jest.fn(), + }; + startSpanMock.mockReturnValue(span); + + const firstInvocationEvent = { + DurableExecutionArn: executionArn, + CheckpointToken: "t-first", + InitialExecutionState: { + Operations: [ + { + Id: "op-1", + Name: "input", + Status: "RUNNING", + StartTimestamp: 1710000000000, + ExecutionDetails: { + InputPayload: JSON.stringify({ hello: "world" }), + }, + }, + ], + }, + }; + + const root = createDurableExecutionRootSpan(firstInvocationEvent, null); + + expect(root).not.toBeNull(); + expect(startSpanMock).toHaveBeenCalledTimes(1); + }); + + it("skips durable root span creation on replay invocations", () => { + const executionArn = + "arn:aws:lambda:us-east-2:123456789012:function:demo:$LATEST/durable-execution/demo/replay"; + + const replayEvent = { + DurableExecutionArn: executionArn, + CheckpointToken: "t-replay", + InitialExecutionState: { + Operations: [ + { + Id: "op-1", + Name: "_datadog_0", + Status: "SUCCEEDED", + StepDetails: { + Result: JSON.stringify({ + "x-datadog-trace-id": "149750110124521191", + "x-datadog-parent-id": "538591322263933970", + "x-datadog-sampling-priority": "1", + }), + }, + }, + { + Id: "op-2", + Name: "callback_step_prepare", + Status: "SUCCEEDED", + }, + ], + }, + }; + + const tracerWrapper = makeTracerWrapper({ source: "Event" }); + const extractor = new DurableExecutionEventTraceExtractor(tracerWrapper); + const extracted = extractor.extract(replayEvent); + const root = createDurableExecutionRootSpan(replayEvent, extracted); + + expect(root).toBeNull(); + expect(startSpanMock).not.toHaveBeenCalled(); + }); +}); diff --git a/src/trace/context/extractors/durable-execution.ts b/src/trace/context/extractors/durable-execution.ts new file mode 100644 index 000000000..5fb6c2a55 --- /dev/null +++ b/src/trace/context/extractors/durable-execution.ts @@ -0,0 +1,396 @@ +/** + * Durable Execution Trace Extractor — Checkpoint/Upstream Approach + * + * Strategy: + * 1. Prefer trace context from the latest `_datadog_{N}` checkpoint. + * 2. If no trace checkpoint exists (first invocation), try upstream trace context + * from the original customer event stored in `Operations[0].ExecutionDetails.InputPayload`. + * 3. If neither exists, return null and let the default extraction path create the context. + * + * The dd-trace-js durable-execution plugin writes checkpoint headers via the + * standard HTTP propagator (`tracer.inject(span, 'http_headers', headers)`), + * so we just hand the resulting header dict back to `tracer.extract` here. + */ + +import { logDebug } from "../../../utils"; +import { SpanContextWrapper } from "../../span-context-wrapper"; +import { TracerWrapper } from "../../tracer-wrapper"; +import { EventTraceExtractor } from "../extractor"; + +/** + * Interface for operation data in durable execution state + */ +export interface DurableExecutionOperation { + Id: string; + Status: string; + Type?: string; + Name?: string; + ExecutionDetails?: { + InputPayload?: string; + }; + StepDetails?: { + Result?: string; + Error?: unknown; + NextAttemptTimestamp?: string; + }; + Payload?: string; + CallbackDetails?: { + Result?: string; + CallbackId?: string; + Error?: unknown; + }; + StartedAt?: string; + StartTimestamp?: number; + CompletedAt?: string; +} + +/** + * Interface for initial execution state in durable execution events + */ +export interface InitialExecutionState { + Operations?: DurableExecutionOperation[]; + Status?: string; +} + +/** + * Interface for durable execution event + */ +export interface DurableExecutionEvent { + DurableExecutionArn?: string; + CheckpointToken?: string; + InitialExecutionState?: InitialExecutionState; + Input?: unknown; +} + +/** + * Check if event is a durable execution event + */ +export function isDurableExecutionEvent(event: unknown): event is DurableExecutionEvent { + if (!event || typeof event !== "object") { + return false; + } + + const maybeEvent = event as Record; + return Boolean(maybeEvent.DurableExecutionArn && maybeEvent.CheckpointToken); +} + +/** + * Check if this is a replay invocation (has previous operations) + */ +export function isDurableExecutionReplay(event: unknown): boolean { + if (!isDurableExecutionEvent(event)) { + return false; + } + + const operations = event.InitialExecutionState?.Operations; + return Array.isArray(operations) && operations.length > 0; +} + +/** + * Get durable execution ARN from event + */ +export function getDurableExecutionArn(event: unknown): string | undefined { + if (!isDurableExecutionEvent(event)) { + return undefined; + } + return event.DurableExecutionArn; +} + +/** + * Get checkpoint token from event + */ +export function getCheckpointToken(event: unknown): string | undefined { + if (!isDurableExecutionEvent(event)) { + return undefined; + } + return event.CheckpointToken; +} + +// Terminal operation statuses that indicate an operation has completed +const TERMINAL_STATUSES = new Set(["SUCCEEDED", "FAILED", "CANCELLED", "STOPPED", "TIMED_OUT"]); + +const TRACE_CHECKPOINT_NAME_PREFIX = "_datadog_"; + +function parseTraceCheckpointNumber(name: unknown): number | null { + if (typeof name !== "string") return null; + + if (!name.startsWith(TRACE_CHECKPOINT_NAME_PREFIX)) return null; + const suffix = name.slice(TRACE_CHECKPOINT_NAME_PREFIX.length); + const n = Number.parseInt(suffix, 10); + if (Number.isNaN(n) || String(n) !== suffix) return null; + return n; +} + +function isTraceCheckpointName(name: unknown): boolean { + return parseTraceCheckpointNumber(name) !== null; +} + +/** + * Find the highest-numbered `_datadog_{N}` checkpoint in the event and return + * its parsed header dict. + * + * Each invocation that changes trace context saves a new checkpoint with N+1; + * the one with the highest N is the most recent. Headers are written by the + * dd-trace-js plugin via `tracer.inject(span, 'http_headers', headers)` so the + * payload is a standard HTTP-style header dict. + * + */ +function findLatestCheckpointHeaders(event: DurableExecutionEvent): Record | null { + const operations = event.InitialExecutionState?.Operations; + if (!operations || operations.length === 0) return null; + + let best: { number: number; op: DurableExecutionOperation } | null = null; + for (const op of operations) { + const n = parseTraceCheckpointNumber(op?.Name); + if (n === null) continue; + if (best === null || n > best.number) { + best = { number: n, op }; + } + } + if (best === null) return null; + + const raw = best.op.Payload ?? best.op.StepDetails?.Result; + if (!raw || typeof raw !== "string") return null; + try { + const parsed = JSON.parse(raw); + if (parsed && typeof parsed === "object") { + return parsed as Record; + } + } catch (e) { + logDebug(`Failed to parse trace checkpoint payload: ${e}`); + } + return null; +} + +/** + * Find upstream HTTP headers carried by the original customer event stored in + * `Operations[0].ExecutionDetails.InputPayload`. Returns the standard header + * dict (keys like `x-datadog-trace-id`, `traceparent`, etc.) or null. + */ +function findUpstreamHeaders(event: DurableExecutionEvent): Record | null { + try { + const operations = event.InitialExecutionState?.Operations; + if (!operations || operations.length === 0) return null; + + const inputPayloadStr = operations[0].ExecutionDetails?.InputPayload; + if (!inputPayloadStr) return null; + + const customerEvent = JSON.parse(inputPayloadStr); + if (!customerEvent || typeof customerEvent !== "object") return null; + + const headers = customerEvent.headers; + if (headers && typeof headers === "object") { + return headers as Record; + } + + const ddData = customerEvent._datadog; + if (ddData && typeof ddData === "object") { + return ddData as Record; + } + } catch (e) { + logDebug(`Failed to read upstream headers from durable input payload: ${e}`); + } + + return null; +} + +/** + * Durable Execution Trace Extractor + * + * Locates trace headers carried inside the durable execution envelope and hands + * them to the standard dd-trace propagator via `TracerWrapper.extract`. Order: + * 1. Latest `_datadog_{N}` checkpoint payload. + * 2. Upstream customer event headers from `InputPayload`. + * 3. Otherwise return null and let the default extraction path take over. + */ +export class DurableExecutionEventTraceExtractor implements EventTraceExtractor { + constructor(private tracerWrapper: TracerWrapper) {} + + extract(event: unknown): SpanContextWrapper | null { + if (!isDurableExecutionEvent(event)) { + logDebug("Event is not a durable execution event"); + return null; + } + if (!event.DurableExecutionArn) { + logDebug("No DurableExecutionArn in event"); + return null; + } + + const checkpointHeaders = findLatestCheckpointHeaders(event); + if (checkpointHeaders) { + logDebug("Extracting trace context from durable checkpoint"); + return this.tracerWrapper.extract(checkpointHeaders); + } + + const upstreamHeaders = findUpstreamHeaders(event); + if (upstreamHeaders) { + logDebug("Extracting trace context from upstream durable input payload"); + return this.tracerWrapper.extract(upstreamHeaders); + } + + logDebug("No durable trace context found; deferring to default extraction"); + return null; + } +} + +/** + * Utility to check if a durable operation is a replay + * + * An operation is a replay if it exists in the initial execution state + * with a terminal status (SUCCEEDED, FAILED, CANCELLED, STOPPED, TIMED_OUT) + * + * @param event - Lambda event + * @param stepId - The step ID to check (may be hashed) + * @returns true if the operation is a replay + */ +export function isOperationReplay(event: unknown, stepId: string): boolean { + if (!isDurableExecutionEvent(event)) { + return false; + } + + const operations = event.InitialExecutionState?.Operations; + if (!operations || operations.length === 0) { + return false; + } + + const operation = operations.find((op) => op.Id === stepId); + if (!operation) { + return false; + } + + return TERMINAL_STATUSES.has(operation.Status); +} + +/** + * Get the replay status of an operation + * + * @param event - Lambda event + * @param stepId - The step ID to check + * @returns Operation status if found, undefined otherwise + */ +export function getOperationStatus(event: unknown, stepId: string): string | undefined { + if (!isDurableExecutionEvent(event)) { + return undefined; + } + + const operations = event.InitialExecutionState?.Operations; + if (!operations) { + return undefined; + } + + const operation = operations.find((op) => op.Id === stepId); + return operation?.Status; +} + +/** + * Count the number of completed operations in the event + * + * @param event - Lambda event + * @returns Number of completed operations + */ +export function getCompletedOperationCount(event: unknown): number { + if (!isDurableExecutionEvent(event)) { + return 0; + } + + const operations = event.InitialExecutionState?.Operations; + if (!operations) { + return 0; + } + + return operations.filter((op) => + op.Status === "SUCCEEDED" || op.Status === "FAILED" + ).length; +} + +/** + * Create the durable execution root span for likely first invocations only. + * + * Replay invocations return null. The current first-invocation heuristic is: + * - no trace checkpoint operation exists + * - no operation has terminal status + * - operation count is <= 1 + * + * The created span is parented to the current aws.lambda span context. + * + * Returns an object with { span, finish() } or null if not a durable execution. + * Caller must call finish() when the invocation ends. + */ +export function createDurableExecutionRootSpan( + event: unknown, + parentSpanContext?: unknown, +): { span: any; finish: () => void } | null { + if (!isDurableExecutionEvent(event)) { + return null; + } + + const executionArn = event.DurableExecutionArn; + if (!executionArn) { + return null; + } + + const operations = event.InitialExecutionState?.Operations; + const hasCheckpoint = Boolean( + operations?.some((op) => isTraceCheckpointName(op?.Name)), + ); + const hasCompletedOperation = Boolean(operations?.some((op) => TERMINAL_STATUSES.has(op.Status))); + const isLikelyFirstInvocation = !hasCheckpoint && !hasCompletedOperation && (operations?.length ?? 0) <= 1; + + if (!isLikelyFirstInvocation) { + return null; + } + + // Use the first operation's StartTimestamp (unix milliseconds) so the root + // span's start time matches the actual start of the durable execution. + let startTime: number | undefined; + if (operations && operations.length > 0) { + const firstStartTs = operations[0].StartTimestamp; + if (firstStartTs != null) { + const parsed = Number(firstStartTs); + if (!isNaN(parsed)) { + startTime = parsed; + } + } + } + + try { + const tracer = require("dd-trace"); + + const serviceName = process.env.DD_DURABLE_EXECUTION_SERVICE || "aws.durable-execution"; + const resourceName = executionArn.includes(":") ? executionArn.split(":").pop() : executionArn; + + const spanOptions: Record = { + type: "serverless", + tags: { + "service.name": serviceName, + "resource.name": resourceName, + "durable.execution_arn": executionArn, + "durable.is_root_span": true, + "durable.invocation_count": operations?.length ?? 0, + }, + }; + + if (startTime !== undefined) { + spanOptions.startTime = startTime; + } + if (parentSpanContext) { + // Root span is modeled as a child of aws.lambda. + spanOptions.childOf = parentSpanContext; + } + + const span = tracer.startSpan("aws.durable-execution", spanOptions); + + logDebug(`Created root execution span: start_time=${startTime}`); + + return { + span, + finish: () => { + span.finish(); + logDebug("Finished root execution span"); + }, + }; + } catch (e) { + logDebug(`Failed to create durable execution root span: ${e}`); + return null; + } +} diff --git a/src/trace/context/extractors/index.ts b/src/trace/context/extractors/index.ts index 6bd690713..0fbbf081b 100644 --- a/src/trace/context/extractors/index.ts +++ b/src/trace/context/extractors/index.ts @@ -9,3 +9,4 @@ export { SNSSQSEventTraceExtractor } from "./sns-sqs"; export { StepFunctionEventTraceExtractor } from "./step-function"; export { LambdaContextTraceExtractor } from "./lambda-context"; export { CustomTraceExtractor } from "./custom"; +export { DurableExecutionEventTraceExtractor, isDurableExecutionEvent, isDurableExecutionReplay, createDurableExecutionRootSpan } from "./durable-execution"; diff --git a/src/trace/listener.spec.ts b/src/trace/listener.spec.ts index 246a907f8..6e970eeec 100644 --- a/src/trace/listener.spec.ts +++ b/src/trace/listener.spec.ts @@ -569,6 +569,62 @@ describe("TraceListener", () => { } }); + it("parents aws.lambda to the durable execution init inferred span on first invocation", async () => { + const startSpanSpy = TracerWrapper.prototype.startSpan as jest.Mock; + startSpanSpy.mockClear(); + + const listener = new TraceListener(defaultConfig); + mockController.mockTraceSource = TraceSource.Event; + mockController.mockSpanContext = { + toTraceId: () => "4110911582297405551", + toSpanId: () => "797643193680388251", + _sampling: { + priority: "2", + }, + }; + mockController.mockSpanContextWrapper = { + spanContext: mockController.mockSpanContext, + }; + + const durableEvent = { + DurableExecutionArn: + "arn:aws:lambda:us-east-1:123456789012:function:my-func:1/durable-execution/my-execution/550e8400-e29b-41d4-a716-446655440004", + CheckpointToken: "checkpoint-token", + InitialExecutionState: { + Operations: [ + { + Id: "op-1", + Name: "input", + Status: "RUNNING", + StartTimestamp: 1710000000000, + }, + ], + }, + }; + + await listener.onStartInvocation(durableEvent, context as any); + const unwrappedFunc = () => {}; + listener.onWrap(unwrappedFunc); + + expect(startSpanSpy).toHaveBeenCalledWith( + "aws.durable.execution_init", + expect.objectContaining({ + childOf: mockController.mockSpanContext, + startTime: 1710000000000, + }), + ); + expect(wrapSpy).toHaveBeenCalledWith( + "aws.lambda", + expect.objectContaining({ + childOf: expect.objectContaining({ + toSpanId: expect.any(Function), + toTraceId: expect.any(Function), + }), + }), + unwrappedFunc, + ); + }); + it("sets execution_status tag on the aws.lambda span when result.Status is valid", async () => { const mockSetTag = jest.fn(); const mockSpan = { setTag: mockSetTag }; diff --git a/src/trace/listener.ts b/src/trace/listener.ts index f068c5352..7917aa01d 100644 --- a/src/trace/listener.ts +++ b/src/trace/listener.ts @@ -143,12 +143,14 @@ export class TraceListener { traceSource: this.contextService.traceSource, }); } + this.durableFunctionContext = extractDurableFunctionContext(event); if (this.config.createInferredSpan) { this.inferredSpan = this.inferrer.createInferredSpan( event, context, parentSpanContext, this.config.encodeAuthorizerContext, + this.durableFunctionContext, ); } @@ -157,7 +159,6 @@ export class TraceListener { const eventSource = parseEventSource(event); this.triggerTags = extractTriggerTags(event, context, eventSource); this.stepFunctionContext = StepFunctionContextService.instance().context; - this.durableFunctionContext = extractDurableFunctionContext(event); if (this.config.addSpanPointers) { this.spanPointerAttributesList = getSpanPointerAttributes(eventSource, event); diff --git a/src/trace/span-inferrer.spec.ts b/src/trace/span-inferrer.spec.ts index 17332044b..d4b4975d2 100644 --- a/src/trace/span-inferrer.spec.ts +++ b/src/trace/span-inferrer.spec.ts @@ -24,6 +24,53 @@ const apiGatewayWSSRequestAuthorizerConnect = require("../../event_samples/api-g const apiGatewayWSSRequestAuthorizerMessage = require("../../event_samples/api-gateway-traced-authorizer-request-websocket-message.json"); const s3Event = require("../../event_samples/s3.json"); const functionUrlEvent = require("../../event_samples/lambda-function-urls.json"); + +const durableFirstInvocationEvent = { + DurableExecutionArn: + "arn:aws:lambda:us-east-1:123456789012:function:my-func:1/durable-execution/my-execution/550e8400-e29b-41d4-a716-446655440004", + CheckpointToken: "checkpoint-token", + InitialExecutionState: { + Operations: [ + { + Id: "op-1", + Name: "input", + Status: "RUNNING", + StartTimestamp: 1710000000000, + }, + ], + }, +}; + +const durableReplayInvocationEvent = { + ...durableFirstInvocationEvent, + InitialExecutionState: { + Operations: [ + { + Id: "op-1", + Name: "input", + Status: "SUCCEEDED", + StartTimestamp: 1710000000000, + }, + { + Id: "op-2", + Name: "step_1", + Status: "RUNNING", + }, + ], + }, +}; + +const durableFirstInvocationContext = { + "aws_lambda.durable_function.execution_name": "my-execution", + "aws_lambda.durable_function.execution_id": "550e8400-e29b-41d4-a716-446655440004", + "aws_lambda.durable_function.first_invocation": "true", +}; + +const durableReplayInvocationContext = { + ...durableFirstInvocationContext, + "aws_lambda.durable_function.first_invocation": "false", +}; + const mockWrapper = { startSpan: jest.fn(), }; @@ -876,6 +923,51 @@ describe("SpanInferrer", () => { ]); }); + it("creates an inferred execution init span for first durable execution invocation", () => { + const inferrer = new SpanInferrer(mockWrapper as unknown as TracerWrapper); + inferrer.createInferredSpan( + durableFirstInvocationEvent, + {} as any, + {} as SpanContext, + true, + durableFirstInvocationContext as any, + ); + + expect(mockWrapper.startSpan).toBeCalledWith("aws.durable.execution_init", { + childOf: {}, + startTime: 1710000000000, + tags: { + _inferred_span: { synchronicity: "async", tag_source: "self" }, + "durable.execution_arn": + "arn:aws:lambda:us-east-1:123456789012:function:my-func:1/durable-execution/my-execution/550e8400-e29b-41d4-a716-446655440004", + "durable.execution_id": "550e8400-e29b-41d4-a716-446655440004", + "durable.execution_name": "my-execution", + operation_name: "aws.durable.execution_init", + "peer.service": "mock-lambda-service", + request_id: undefined, + "resource.name": "my-execution", + resource_names: "my-execution", + service: "aws.durable-execution", + "service.name": "aws.durable-execution", + "span.kind": "server", + "span.type": "serverless", + }, + }); + }); + + it("does not create an execution init span for replay durable invocations", () => { + const inferrer = new SpanInferrer(mockWrapper as unknown as TracerWrapper); + inferrer.createInferredSpan( + durableReplayInvocationEvent, + {} as any, + {} as SpanContext, + true, + durableReplayInvocationContext as any, + ); + + expect(mockWrapper.startSpan).not.toBeCalled(); + }); + it("creates an inferred span for websocket events", () => { const inferrer = new SpanInferrer(mockWrapper as unknown as TracerWrapper); inferrer.createInferredSpan(webSocketEvent, {} as any, {} as SpanContext); diff --git a/src/trace/span-inferrer.ts b/src/trace/span-inferrer.ts index d6c4ae8f0..7844b88d9 100644 --- a/src/trace/span-inferrer.ts +++ b/src/trace/span-inferrer.ts @@ -16,6 +16,7 @@ import { logDebug } from "../utils"; import { parseLambdaARN } from "../utils/arn"; import { HTTPEventTraceExtractor } from "./context/extractors"; import { HTTPEventSubType } from "./context/extractors/http"; +import { DurableFunctionContext } from "./durable-function-context"; export class SpanInferrer { private static serviceMapping: Record = {}; @@ -47,7 +48,16 @@ export class SpanInferrer { context: Context | undefined, parentSpanContext: SpanContext | undefined, decodeAuthorizerContext: boolean = true, + durableFunctionContext?: DurableFunctionContext, ): any { + if (durableFunctionContext) { + return this.createInferredSpanForDurableExecutionInit( + event, + context, + parentSpanContext, + durableFunctionContext, + ); + } const eventSource = parseEventSource(event); if (eventSource === eventTypes.lambdaUrl) { return this.createInferredSpanForLambdaUrl(event, context, parentSpanContext); @@ -75,6 +85,54 @@ export class SpanInferrer { } } + createInferredSpanForDurableExecutionInit( + event: any, + context: Context | undefined, + parentSpanContext: SpanContext | undefined, + durableFunctionContext: DurableFunctionContext, + ): SpanWrapper | undefined { + + if (durableFunctionContext["aws_lambda.durable_function.first_invocation"] !== "true") return; + + const durableExecutionArn = event?.DurableExecutionArn; + const parsedStartTime = Number(event?.InitialExecutionState?.Operations?.[0]?.StartTimestamp); + if (!Number.isFinite(parsedStartTime)) { + return; + } + + const serviceName = process.env.DD_DURABLE_EXECUTION_SERVICE || "aws.durable-execution"; + const executionName = durableFunctionContext["aws_lambda.durable_function.execution_name"]; + const executionId = durableFunctionContext["aws_lambda.durable_function.execution_id"]; + const resourceName = executionName; + + const options: SpanOptions = { + startTime: parsedStartTime, + tags: { + operation_name: "aws.durable.execution_init", + resource_names: resourceName, + request_id: context?.awsRequestId, + service: serviceName, + "service.name": serviceName, + "span.type": "serverless", + "resource.name": resourceName, + "peer.service": this.service, + "span.kind": "server", + "durable.execution_arn": durableExecutionArn, + "durable.execution_name": executionName, + "durable.execution_id": executionId, + _inferred_span: { + tag_source: "self", + synchronicity: "async", + }, + }, + }; + if (parentSpanContext) { + options.childOf = parentSpanContext; + } + + return new SpanWrapper(this.traceWrapper.startSpan("aws.durable.execution_init", options), { isAsync: true }); + } + isApiGatewayAsync(event: any): string { if (event.headers && event.headers["X-Amz-Invocation-Type"] && event.headers["X-Amz-Invocation-Type"] === "Event") { return "async";