feat(dsm): Add support for automatic DSM context extraction inside the extension#1265
feat(dsm): Add support for automatic DSM context extraction inside the extension#1265jeastham1993 wants to merge 10 commits into
Conversation
|
There was a problem hiding this comment.
Pull request overview
This PR extends Bottlecap’s universal instrumentation trace context propagation to automatically extract and emit Data Streams Monitoring (DSM) consume-side checkpoints within the extension for SQS, SNS, Kinesis, and EventBridge. It adds a DSM aggregation/serialization pipeline that drains into the existing proxy flush path and introduces configuration knobs to enable DSM consume extraction and provide an EventBridge exchange fallback.
Changes:
- Add a new
traces::data_streamsmodule implementing DSM context decode, pathway hashing, checkpoint computation, aggregation, and msgpack+gzip payload generation. - Wire an optional
DsmProcessorinto invocation start processing (universal instrumentation) and into the flushing pipeline so pipeline-stats are shipped via the proxy flusher. - Add trigger-specific DSM edge tags for SQS/SNS/Kinesis/EventBridge and introduce
DD_DSM_CONSUME_ENABLED/DD_DSM_EXCHANGE_NAMEconfig support.
Reviewed changes
Copilot reviewed 25 out of 26 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| bottlecap/src/traces/mod.rs | Exposes the new data_streams module from traces. |
| bottlecap/src/traces/data_streams/mod.rs | DSM module entrypoint; re-exports key DSM types/functions. |
| bottlecap/src/traces/data_streams/context.rs | Implements inbound DSM pathway context decoding (base64 + zigzag varints). |
| bottlecap/src/traces/data_streams/pathway.rs | Implements dd-trace-js-compatible pathway hash computation. |
| bottlecap/src/traces/data_streams/propagation_hash.rs | Implements optional FNV-1 propagation hash. |
| bottlecap/src/traces/data_streams/checkpoint.rs | Computes consume-side checkpoints from extracted context + tags. |
| bottlecap/src/traces/data_streams/sketch.rs | Implements tracer-compatible DDSketch + protobuf bytes serialization. |
| bottlecap/src/traces/data_streams/aggregator.rs | Aggregates checkpoints into 10s buckets and serializes msgpack payloads. |
| bottlecap/src/traces/data_streams/processor.rs | Bridges checkpoint aggregation to proxy flush by gzipping/enqueueing proxy requests. |
| bottlecap/src/traces/data_streams/fixtures/sketch_golden.json | Golden vectors to validate DDSketch compatibility. |
| bottlecap/src/tags/lambda/tags.rs | Updates hardcoded extension version string used in tags/logging. |
| bottlecap/src/proxy/interceptor.rs | Enables universal instrumentation processing when experimental proxy env var is set. |
| bottlecap/src/lifecycle/invocation/triggers/mod.rs | Adds Trigger::get_dsm_edge_tags() default hook. |
| bottlecap/src/lifecycle/invocation/triggers/sqs_event.rs | Adds SQS DSM consume edge tag derivation. |
| bottlecap/src/lifecycle/invocation/triggers/sns_event.rs | Adds SNS DSM consume edge tag derivation. |
| bottlecap/src/lifecycle/invocation/triggers/kinesis_event.rs | Adds Kinesis DSM consume edge tag derivation. |
| bottlecap/src/lifecycle/invocation/triggers/event_bridge_event.rs | Adds EventBridge DSM consume tags + best-effort bus name extraction + tests. |
| bottlecap/src/lifecycle/invocation/processor.rs | Hooks DSM consume recording into universal instrumentation start; adds EventBridge exchange fallback. |
| bottlecap/src/lifecycle/invocation/processor_service.rs | Threads optional DsmProcessor into Processor initialization. |
| bottlecap/src/flushing/service.rs | Drains DSM pipeline-stats into proxy aggregator immediately before proxy flush. |
| bottlecap/src/config/yaml.rs | Adds YAML config fields for DSM consume enabling and exchange fallback. |
| bottlecap/src/config/mod.rs | Adds Config fields: dsm_consume_enabled, dsm_exchange_name. |
| bottlecap/src/config/env.rs | Adds env config parsing/merging for DD_DSM_CONSUME_ENABLED and DD_DSM_EXCHANGE_NAME. |
| bottlecap/src/bin/bottlecap/main.rs | Constructs shared proxy aggregator, conditionally instantiates DSM processor, wires it into services. |
| bottlecap/Cargo.toml | Promotes msgpack/gzip dependencies to main deps and adds serde_bytes. |
| bottlecap/Cargo.lock | Adds serde_bytes to resolved dependency set. |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 579f6e6d9b
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 947380af7b
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| let first = segments.next().unwrap_or_default(); | ||
| // `rule/<bus>/<rule>` => bus is the first segment. | ||
| // `rule/<rule>` (default bus) => no second segment, skip. | ||
| if segments.next().is_some() && !first.is_empty() { |
There was a problem hiding this comment.
Emit default EventBridge bus exchange
For EventBridge rules on the default bus, AWS rule ARNs are encoded as ...:rule/<rule> with no bus segment; this branch treats that as unknown and omits the exchange tag unless DD_DSM_EXCHANGE_NAME is configured. Default-bus EventBridge Lambda invokes are common, and dropping the bus identity makes the consume checkpoint hash/aggregate differently from the same edge tagged with its bus, breaking DSM continuity for those events. Please treat the one-segment rule ARN as exchange:default.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: ca5b7da286
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 9b923a55cf
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| // functions whose tracer does not drive the invocation lifecycle. | ||
| let experimental_proxy_enabled = std::env::var("DD_EXPERIMENTAL_ENABLE_PROXY") | ||
| .is_ok_and(|v| v.eq_ignore_ascii_case("true")); | ||
| if aws_config.aws_lwa_proxy_lambda_runtime_api.is_some() || experimental_proxy_enabled { |
There was a problem hiding this comment.
Drive DSM extraction when the proxy already has the payload
When the runtime API proxy is running for the Datadog wrapper because AppSec is enabled, and DD_DSM_CONSUME_ENABLED=true but no tracer calls /lambda/start-invocation, this condition stays false unless LWA or DD_EXPERIMENTAL_ENABLE_PROXY is also set. In that no-tracer/AppSec setup the event body is available here but lwa::process_invocation_next is skipped, so process_on_universal_instrumentation_start never records any DSM consume checkpoints; include the DSM flag, or all proxy-active wrapper cases, in this gate.
Useful? React with 👍 / 👎.
Please include Jira ticket in title.
Overview
Update the trace context propagation support in universal instrumentation to include the automatic extraction of Data Streams Monitoring context for SQS, SNS, Kinesis and EventBridge.
Testing
Added unit tests. Manually tested the functionality for Java, Go and .NET. Java and Go work correctly. The .NET tracer needs updating to support the 2.x version of
Amazon.Lambda.RuntimeSupportso can't test that manually yet.