Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions src/trace/context/extractor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,14 @@ export class TraceContextExtractor {

if (EventValidator.isSNSEvent(event)) return new SNSEventTraceExtractor(this.tracerWrapper, this.config);
if (EventValidator.isSNSSQSEvent(event)) return new SNSSQSEventTraceExtractor(this.tracerWrapper, this.config);
if (EventValidator.isEventBridgeSQSEvent(event)) return new EventBridgeSQSEventTraceExtractor(this.tracerWrapper);
if (EventValidator.isEventBridgeSQSEvent(event))
return new EventBridgeSQSEventTraceExtractor(this.tracerWrapper, this.config);
if (EventValidator.isAppSyncResolverEvent(event)) return new AppSyncEventTraceExtractor(this.tracerWrapper);
if (EventValidator.isSQSEvent(event)) return new SQSEventTraceExtractor(this.tracerWrapper, this.config);
if (EventValidator.isKinesisStreamEvent(event))
return new KinesisEventTraceExtractor(this.tracerWrapper, this.config);
if (EventValidator.isEventBridgeEvent(event)) return new EventBridgeEventTraceExtractor(this.tracerWrapper);
if (EventValidator.isEventBridgeEvent(event))
return new EventBridgeEventTraceExtractor(this.tracerWrapper, this.config);

return;
}
Expand Down
151 changes: 140 additions & 11 deletions src/trace/context/extractors/event-bridge-sqs.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,15 @@ import { EventBridgeSQSEventTraceExtractor } from "./event-bridge-sqs";
import { StepFunctionContextService } from "../../step-function-service";

let mockSpanContext: any = null;
let mockDataStreamsCheckpointer: any = {
setConsumeCheckpoint: jest.fn(),
};

jest.mock("dd-trace/packages/dd-trace/src/datastreams/checkpointer", () => {
return {
DataStreamsCheckpointer: jest.fn().mockImplementation(() => mockDataStreamsCheckpointer),
};
});

// Mocking extract is needed, due to dd-trace being a No-op
// if the detected environment is testing. This is expected, since
Expand All @@ -12,14 +21,32 @@ jest.mock("dd-trace", () => {
...jest.requireActual("dd-trace"),
_tracer: { _service: {} },
extract: (_carrier: any, _headers: any) => mockSpanContext,
dataStreamsCheckpointer: mockDataStreamsCheckpointer,
};
});
const spyTracerWrapper = jest.spyOn(TracerWrapper.prototype, "extract");

describe("EventBridgeSQSEventTraceExtractor", () => {
const mockConfig = {
autoPatchHTTP: true,
captureLambdaPayload: false,
captureLambdaPayloadMaxDepth: 10,
createInferredSpan: true,
encodeAuthorizerContext: true,
decodeAuthorizerContext: true,
mergeDatadogXrayTraces: false,
injectLogContext: false,
minColdStartTraceDuration: 3,
coldStartTraceSkipLib: "",
addSpanPointers: true,
dataStreamsEnabled: true,
};

describe("extract", () => {
beforeEach(() => {
mockSpanContext = null;
spyTracerWrapper.mockClear();
mockDataStreamsCheckpointer.setConsumeCheckpoint.mockClear();
});

afterEach(() => {
Expand Down Expand Up @@ -59,7 +86,7 @@ describe("EventBridgeSQSEventTraceExtractor", () => {
],
};

const extractor = new EventBridgeSQSEventTraceExtractor(tracerWrapper);
const extractor = new EventBridgeSQSEventTraceExtractor(tracerWrapper, mockConfig);

const traceContext = extractor.extract(payload);
expect(traceContext).not.toBeNull();
Expand All @@ -75,21 +102,46 @@ describe("EventBridgeSQSEventTraceExtractor", () => {
expect(traceContext?.toSpanId()).toBe("2644033662113726488");
expect(traceContext?.sampleMode()).toBe("1");
expect(traceContext?.source).toBe("event");

// EventBridge -> SQS follows the SQS DSM conventions (type:sqs, topic:queue ARN)
expect(mockDataStreamsCheckpointer.setConsumeCheckpoint).toHaveBeenCalledWith(
"sqs",
"arn:aws:sqs:us-east-1:425362996713:lambda-eb-sqs-lambda-dev-demo-queue",
{
"x-datadog-parent-id": "2644033662113726488",
"x-datadog-sampling-priority": "1",
"x-datadog-tags": "_dd.p.dm=-0",
"x-datadog-trace-id": "7379586022458917877",
},
false,
);
});

// prettier-ignore
it.each([
["Records", {}],
["Records first entry", { Records: [] }],
["Records first entry body", { Records: [{}] }],
["valid data in body", { Records: [{ body: "{" }] }], // JSON.parse should fail
["detail in body", { Records: [{ body: "{}" }] }],
["_datadog in detail", { Records: [{ body: '{"detail":{"text":"Hello, world!"}}' }] }],
])("returns null and skips extracting when payload is missing '%s'", (_, payload) => {
["Records", {}, 0],
["Records first entry", { Records: [] }, 0],
["Records first entry body", { Records: [{ eventSourceARN: "arn:aws:sqs:us-east-1:test" }] }, 1],
["valid data in body", { Records: [{ body: "{", eventSourceARN: "arn:aws:sqs:us-east-1:test" }] }, 1], // JSON.parse should fail but we still set checkpoint
["detail in body", { Records: [{ body: "{}", eventSourceARN: "arn:aws:sqs:us-east-1:test" }] }, 1],
["_datadog in detail", { Records: [{ body: '{"detail":{"text":"Hello, world!"}}', eventSourceARN: "arn:aws:sqs:us-east-1:test" }] }, 1],
])("returns null and skips extracting when payload is missing '%s'", (_, payload, dsmCalls) => {
const tracerWrapper = new TracerWrapper();
const extractor = new EventBridgeSQSEventTraceExtractor(tracerWrapper);
const extractor = new EventBridgeSQSEventTraceExtractor(tracerWrapper, mockConfig);

const traceContext = extractor.extract(payload as any);
expect(traceContext).toBeNull();

// DSM checkpoint is set per-record even when headers are absent
expect(mockDataStreamsCheckpointer.setConsumeCheckpoint).toHaveBeenCalledTimes(dsmCalls);
if (dsmCalls > 0) {
expect(mockDataStreamsCheckpointer.setConsumeCheckpoint).toHaveBeenCalledWith(
"sqs",
"arn:aws:sqs:us-east-1:test",
null,
false,
);
}
});

it("returns null when extracted span context by tracer is null", () => {
Expand Down Expand Up @@ -119,7 +171,7 @@ describe("EventBridgeSQSEventTraceExtractor", () => {
],
};

const extractor = new EventBridgeSQSEventTraceExtractor(tracerWrapper);
const extractor = new EventBridgeSQSEventTraceExtractor(tracerWrapper, mockConfig);

const traceContext = extractor.extract(payload);
expect(traceContext).toBeNull();
Expand Down Expand Up @@ -153,7 +205,7 @@ describe("EventBridgeSQSEventTraceExtractor", () => {
],
};

const extractor = new EventBridgeSQSEventTraceExtractor(tracerWrapper);
const extractor = new EventBridgeSQSEventTraceExtractor(tracerWrapper, mockConfig);

const traceContext = extractor.extract(payload);
expect(traceContext).not.toBeNull();
Expand All @@ -164,5 +216,82 @@ describe("EventBridgeSQSEventTraceExtractor", () => {
expect(traceContext?.sampleMode()).toBe("1");
expect(traceContext?.source).toBe("event");
});

it("sets a DSM checkpoint for every record in the event", () => {
mockSpanContext = {
toTraceId: () => "7379586022458917877",
toSpanId: () => "2644033662113726488",
_sampling: { priority: "1" },
};
const tracerWrapper = new TracerWrapper();

const makeRecord = (arn: string, traceId: string) => ({
body: JSON.stringify({
"detail-type": "my.Detail",
source: "my.Source",
detail: {
text: "Hello, world!",
_datadog: {
"x-datadog-trace-id": traceId,
"x-datadog-parent-id": "2644033662113726488",
"x-datadog-sampling-priority": "1",
"dd-pathway-ctx-base64": `ctx-${arn}`,
},
},
}),
eventSourceARN: arn,
});

const payload = {
Records: [
makeRecord("arn:aws:sqs:us-east-1:test:queue-1", "7379586022458917877"),
makeRecord("arn:aws:sqs:us-east-1:test:queue-2", "1111111111111111111"),
],
};

const extractor = new EventBridgeSQSEventTraceExtractor(tracerWrapper, mockConfig);
extractor.extract(payload as any);

expect(mockDataStreamsCheckpointer.setConsumeCheckpoint).toHaveBeenCalledTimes(2);
expect(mockDataStreamsCheckpointer.setConsumeCheckpoint).toHaveBeenNthCalledWith(
1,
"sqs",
"arn:aws:sqs:us-east-1:test:queue-1",
expect.objectContaining({ "dd-pathway-ctx-base64": "ctx-arn:aws:sqs:us-east-1:test:queue-1" }),
false,
);
expect(mockDataStreamsCheckpointer.setConsumeCheckpoint).toHaveBeenNthCalledWith(
2,
"sqs",
"arn:aws:sqs:us-east-1:test:queue-2",
expect.objectContaining({ "dd-pathway-ctx-base64": "ctx-arn:aws:sqs:us-east-1:test:queue-2" }),
false,
);
});

it("does not set DSM checkpoints when DSM is disabled", () => {
mockSpanContext = {
toTraceId: () => "7379586022458917877",
toSpanId: () => "2644033662113726488",
_sampling: { priority: "1" },
};
const tracerWrapper = new TracerWrapper();

const payload = {
Records: [
{
body: '{"detail-type":"my.Detail","source":"my.Source","detail":{"text":"Hello, world!","_datadog":{"x-datadog-trace-id":"7379586022458917877","x-datadog-parent-id":"2644033662113726488","x-datadog-sampling-priority":"1"}}}',
eventSourceARN: "arn:aws:sqs:us-east-1:test:queue",
},
],
};

const disabledConfig = { ...mockConfig, dataStreamsEnabled: false };
const extractor = new EventBridgeSQSEventTraceExtractor(tracerWrapper, disabledConfig);

const traceContext = extractor.extract(payload as any);
expect(traceContext).not.toBeNull();
expect(mockDataStreamsCheckpointer.setConsumeCheckpoint).toHaveBeenCalledTimes(0);
});
});
});
58 changes: 49 additions & 9 deletions src/trace/context/extractors/event-bridge-sqs.ts
Original file line number Diff line number Diff line change
@@ -1,26 +1,66 @@
import { SQSEvent } from "aws-lambda";
import { SQSEvent, SQSRecord } from "aws-lambda";
import { TracerWrapper } from "../../tracer-wrapper";
import { EventTraceExtractor } from "../extractor";
import { SpanContextWrapper } from "../../span-context-wrapper";
import { extractTraceContext, handleExtractionError } from "../extractor-utils";
import { TraceConfig } from "../../listener";

export class EventBridgeSQSEventTraceExtractor implements EventTraceExtractor {
constructor(private tracerWrapper: TracerWrapper) {}
constructor(private tracerWrapper: TracerWrapper, private config: TraceConfig) {}

extract(event: SQSEvent): SpanContextWrapper | null {
try {
const body = event?.Records?.[0]?.body;
if (body) {
const parsedBody = JSON.parse(body);
const headers = parsedBody?.detail?._datadog;
if (headers) {
return extractTraceContext(headers, this.tracerWrapper);
// Set DSM consume checkpoints if enabled and capture first record's headers.
// EventBridge -> SQS follows the SQS DSM conventions (type:sqs, topic:queue ARN),
// since the event is delivered to and consumed from the SQS queue.
let firstRecordHeaders: Record<string, string> | null = null;
if (this.config.dataStreamsEnabled) {
for (let i = 0; i < (event?.Records || []).length; i++) {
const record = event.Records[i];
try {
const headers = this.getParsedRecordHeaders(record);

// Store first record's headers for trace context extraction
if (i === 0) {
firstRecordHeaders = headers;
}

// Set a checkpoint for the record, even if we don't have headers
this.tracerWrapper.setConsumeCheckpoint(headers, "sqs", record.eventSourceARN);
} catch (error) {
handleExtractionError(error, "EventBridge-SQS DSM checkpoint");
}
}
}

try {
// Use already parsed headers from DSM if available, otherwise parse now
if (!firstRecordHeaders) {
firstRecordHeaders = this.getParsedRecordHeaders(event?.Records?.[0]);
}

if (firstRecordHeaders) {
return extractTraceContext(firstRecordHeaders, this.tracerWrapper);
}
} catch (error) {
handleExtractionError(error, "EventBridge-SQS");
}

return null;
}

private getParsedRecordHeaders(record: SQSRecord | undefined): Record<string, string> | null {
if (!record) {
return null;
}
try {
const body = record.body;
if (body) {
const parsedBody = JSON.parse(body);
return parsedBody?.detail?._datadog ?? null;
}
return null;
} catch (error) {
return null;
}
}
}
Loading
Loading