Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -185,10 +185,11 @@ private void doDiscovery(State newState) {

if (log.isDebugEnabled()) {
log.debug(
"discovered traceEndpoint={}, metricsEndpoint={}, supportsDropping={}, supportsLongRunning={}, dataStreamsEndpoint={}, configEndpoint={}, logEndpoint={}, snapshotEndpoint={}, diagnosticsEndpoint={}, evpProxyEndpoint={}, telemetryProxyEndpoint={}",
"discovered traceEndpoint={}, metricsEndpoint={}, supportsDropping={}, supportsClientSideStats={}, supportsLongRunning={}, dataStreamsEndpoint={}, configEndpoint={}, logEndpoint={}, snapshotEndpoint={}, diagnosticsEndpoint={}, evpProxyEndpoint={}, telemetryProxyEndpoint={}",
newState.traceEndpoint,
newState.metricsEndpoint,
newState.supportsDropping,
newState.supportsClientSideStats,
newState.supportsLongRunning,
newState.dataStreamsEndpoint,
newState.configEndpoint,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import static datadog.trace.util.AgentThreadFactory.AgentThread.METRICS_AGGREGATOR;
import static datadog.trace.util.AgentThreadFactory.THREAD_JOIN_TIMOUT_MS;
import static datadog.trace.util.AgentThreadFactory.newAgentThread;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;

import datadog.common.queue.Queues;
Expand Down Expand Up @@ -71,6 +72,7 @@ public final class ConflatingMetricsAggregator implements MetricsAggregator, Eve
private final DDAgentFeaturesDiscovery features;
private final HealthMetrics healthMetrics;
private final boolean includeEndpointInMetrics;
private final boolean otlpStatsExportEnabled;

/**
* Cached peer-aggregation schema. Producers read this reference once per trace and pass it
Expand Down Expand Up @@ -112,6 +114,30 @@ public ConflatingMetricsAggregator(
config.isTraceResourceRenamingEnabled());
}

/**
* OTLP span-metrics export variant. Reuses the same span selection + DDSketch aggregation as the
* native path, but emits via the injected {@link OtlpStatsMetricWriter} instead of msgpack to. No
* agent {@link Sink} is needed, so a {@link NoOpSink} satisfies the register()/backpressure
* contract, and the reporting interval comes from {@code trace.stats.interval} (milliseconds).
*/
public ConflatingMetricsAggregator(
Config config,
SharedCommunicationObjects sharedCommunicationObjects,
HealthMetrics healthMetrics,
OtlpStatsMetricWriter metricWriter) {
this(
config.getMetricsIgnoredResources(),
sharedCommunicationObjects.featuresDiscovery(config),
healthMetrics,
NoOpSink.INSTANCE,
metricWriter,
config.getTracerMetricsMaxAggregates(),
config.getTracerMetricsMaxPending(),
config.getTraceStatsInterval(),
MILLISECONDS,
true);
}

ConflatingMetricsAggregator(
WellKnownTags wellKnownTags,
Set<String> ignoredResources,
Expand Down Expand Up @@ -171,6 +197,7 @@ public ConflatingMetricsAggregator(
boolean includeEndpointInMetrics) {
this.ignoredResources = ignoredResources;
this.includeEndpointInMetrics = includeEndpointInMetrics;
this.otlpStatsExportEnabled = metricWriter instanceof OtlpStatsMetricWriter;
this.inbox = Queues.mpscArrayQueue(queueSize);
this.features = features;
this.healthMetrics = healthMetric;
Expand Down Expand Up @@ -204,11 +231,19 @@ public void start() {
log.debug("started metrics aggregator");
}

private boolean statsExportEnabled() {
return otlpStatsExportEnabled || features.supportsMetrics();
}

private boolean isMetricsEnabled() {
if (features.getMetricsEndpoint() == null) {
// The discovery refresh only helps the native path, which is gated on the agent advertising
// v0.6/stats. The OTLP path uses its own sender and never depends on that capability -- even
// when a Datadog Agent is present (the OTLP endpoint may itself be the agent's OTLP receiver),
// so refreshing agent features here would be pointless for it.
if (!otlpStatsExportEnabled && features.getMetricsEndpoint() == null) {
features.discoverIfOutdated();
}
return features.supportsMetrics();
return statsExportEnabled();
}

@Override
Expand Down Expand Up @@ -264,7 +299,7 @@ public Future<Boolean> forceReport() {
public boolean publish(List<? extends CoreSpan<?>> trace) {
boolean forceKeep = false;
int counted = 0;
if (features.supportsMetrics()) {
if (statsExportEnabled()) {
for (CoreSpan<?> span : trace) {
boolean isTopLevel = span.isTopLevel();
if (shouldComputeMetric(span, isTopLevel)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,20 @@ public static MetricsAggregator createMetricsAggregator(
Config config,
SharedCommunicationObjects sharedCommunicationObjects,
HealthMetrics healthMetrics) {
// OTLP span-metrics export and native msgpack stats are mutually exclusive (XOR): both hang off
// the same ConflatingMetricsAggregator span selection + DDSketch aggregation, differing only in
// the injected MetricWriter.
if (config.isTracesSpanMetricsEnabled()) {
if (config.isTracerMetricsEnabled()) {
log.warn(
"Both OTLP trace span metrics and native tracer metrics are enabled; "
+ "using OTLP export and ignoring native tracer metrics (the two are mutually "
+ "exclusive).");
}
log.debug("OTLP trace span metrics enabled");
return new ConflatingMetricsAggregator(
config, sharedCommunicationObjects, healthMetrics, new OtlpStatsMetricWriter(config));
}
if (config.isTracerMetricsEnabled()) {
log.debug("tracer metrics enabled");
return new ConflatingMetricsAggregator(config, sharedCommunicationObjects, healthMetrics);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package datadog.trace.common.metrics;

import java.nio.ByteBuffer;

/**
* A {@link Sink} that discards everything. Used by the OTLP trace-metrics export path, where {@link
* OtlpStatsMetricWriter} sends payloads directly via its own OTLP sender in {@code finishBucket()}
* rather than through the aggregator's {@code Sink}. {@link ConflatingMetricsAggregator} still
* requires a {@code Sink} for {@code register()}/backpressure wiring, so this satisfies that
* contract without performing any I/O.
*/
public final class NoOpSink implements Sink {

public static final NoOpSink INSTANCE = new NoOpSink();

private NoOpSink() {}

@Override
public void accept(int messageCount, ByteBuffer buffer) {}

@Override
public void register(EventListener listener) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import static datadog.trace.common.writer.ddagent.Prioritization.FAST_LANE;

import datadog.communication.ddagent.DDAgentFeaturesDiscovery;
import datadog.communication.ddagent.DroppingPolicy;
import datadog.metrics.api.Monitoring;
import datadog.trace.api.Config;
import datadog.trace.api.ProtocolVersion;
Expand Down Expand Up @@ -48,6 +49,7 @@ public static class DDAgentWriterBuilder {
private DDAgentApi agentApi;
private Prioritization prioritization;
private DDAgentFeaturesDiscovery featureDiscovery;
private DroppingPolicy droppingPolicy;
private SingleSpanSampler singleSpanSampler;

public DDAgentWriterBuilder agentApi(DDAgentApi agentApi) {
Expand Down Expand Up @@ -125,6 +127,11 @@ public DDAgentWriterBuilder featureDiscovery(DDAgentFeaturesDiscovery featureDis
return this;
}

public DDAgentWriterBuilder droppingPolicy(DroppingPolicy droppingPolicy) {
this.droppingPolicy = droppingPolicy;
return this;
}

public DDAgentWriterBuilder flushTimeout(int flushTimeout, TimeUnit flushTimeoutUnit) {
this.flushTimeout = flushTimeout;
this.flushTimeoutUnit = flushTimeoutUnit;
Expand Down Expand Up @@ -170,7 +177,10 @@ public DDAgentWriter build() {
traceBufferSize,
healthMetrics,
dispatcher,
featureDiscovery,
droppingPolicy != null
? droppingPolicy
: featureDiscovery, // custom dropping policy for OTLP but backup to feature
// discovery
null == prioritization ? FAST_LANE : prioritization,
flushIntervalMilliseconds,
TimeUnit.MILLISECONDS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,16 +187,24 @@ public static Writer createWriter(
commObjects.agentUrl,
featuresDiscovery,
commObjects.monitoring,
config.isTracerMetricsEnabled());
config.isTracerMetricsEnabled() || config.isTracesSpanMetricsEnabled());

if (sampler instanceof RemoteResponseListener) {
ddAgentApi.addResponseListener((RemoteResponseListener) sampler);
}

// Drop p0 (sampled-out) traces when client-side stats are being computed -- either via the
// native agent-stats path (featuresDiscovery) or the OTLP trace metrics path -- so the
// now-redundant traces aren't shipped.
final boolean otlpSpanMetricsEnabled = config.isTracesSpanMetricsEnabled();
final DroppingPolicy droppingPolicy =
() -> otlpSpanMetricsEnabled || featuresDiscovery.active();

DDAgentWriter.DDAgentWriterBuilder builder =
DDAgentWriter.builder()
.agentApi(ddAgentApi)
.featureDiscovery(featuresDiscovery)
.droppingPolicy(droppingPolicy)
.prioritization(prioritization)
.healthMetrics(healthMetrics)
.monitoring(commObjects.monitoring)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ public Response sendSerializedTraces(final Payload payload) {
.addHeader(DATADOG_DROPPED_SPAN_COUNT, Long.toString(payload.droppedSpans()))
.addHeader(
DATADOG_CLIENT_COMPUTED_STATS,
(metricsEnabled && featuresDiscovery.supportsMetrics())
Config.get().isTracesSpanMetricsEnabled()
|| (metricsEnabled && featuresDiscovery.supportsMetrics())
// Disabling the computation agent-side of the APM trace metrics by
// pretending it was already done by the library
|| !Config.get().isApmTracingEnabled()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,22 +36,33 @@ private OtlpResourceProto() {}
/** Prefix applied to {@code datadog.runtime_id} and process-tag resource attributes. */
private static final String DATADOG_PREFIX = "datadog.";

/** Vendor-neutral resource (no {@code datadog.*}). Used by the OTLP trace/metric export. */
public static final byte[] RESOURCE_MESSAGE = buildResourceMessage(Config.get());
/**
* Resource attribute added to OTLP trace metrics to ensure calculations are not re-computed in
* the Agent
*/
private static final String STATS_COMPUTED_KEY = "_dd.stats_computed";

/** Vendor-neutral resource (no {@code datadog.*}). Used by the OTLP metric export. */
public static final byte[] RESOURCE_MESSAGE = buildResourceMessage(Config.get(), false, false);

/**
* Resource that additionally carries {@code datadog.runtime_id} and process tags (each prefixed
* {@code datadog.}). Used by the default-mode SDK trace-metrics export; omitted in OTel-semantics
* mode.
*/
public static final byte[] RESOURCE_MESSAGE_WITH_DATADOG_ATTRS =
buildResourceMessage(Config.get(), true);
buildResourceMessage(Config.get(), true, false);

static byte[] buildResourceMessage(Config config) {
return buildResourceMessage(config, false);
}
/**
* Resource used by the OTLP trace export. Identical to {@link #RESOURCE_MESSAGE} but adds the
* {@code _dd.stats_computed} marker when the SDK is computing OTLP span metrics, so a downstream
* Agent does not recompute them from the exported spans.
*/
public static final byte[] TRACE_RESOURCE_MESSAGE =
buildResourceMessage(Config.get(), false, Config.get().isTracesSpanMetricsEnabled());

static byte[] buildResourceMessage(Config config, boolean includeDatadogResourceAttributes) {
static byte[] buildResourceMessage(
Config config, boolean includeDatadogResourceAttributes, boolean includeStatsComputed) {
GrowableBuffer buf = new GrowableBuffer(512);

String serviceName = config.getServiceName();
Expand Down Expand Up @@ -89,6 +100,10 @@ static byte[] buildResourceMessage(Config config, boolean includeDatadogResource
writeDatadogResourceAttributes(buf, config);
}

if (includeStatsComputed) {
writeResourceAttribute(buf, STATS_COMPUTED_KEY, "true");
}

OtlpProtoBuffer protobuf = new OtlpProtoBuffer(buf.capacity());
int numBytes = protobuf.recordMessage(buf, 1);
byte[] resourceMessage = new byte[numBytes];
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package datadog.trace.core.otlp.trace;

import static datadog.trace.core.otlp.common.OtlpResourceProto.RESOURCE_MESSAGE;
import static datadog.trace.core.otlp.common.OtlpResourceProto.TRACE_RESOURCE_MESSAGE;
import static datadog.trace.core.otlp.trace.OtlpTraceProto.recordScopedSpansMessage;
import static datadog.trace.core.otlp.trace.OtlpTraceProto.recordSpanLinkMessage;
import static datadog.trace.core.otlp.trace.OtlpTraceProto.recordSpanMessage;
Expand Down Expand Up @@ -138,7 +138,7 @@ private OtlpPayload completePayload() {
}

// prepend the canned resource chunk
payloadBytes += protobuf.recordMessage(RESOURCE_MESSAGE);
payloadBytes += protobuf.recordMessage(TRACE_RESOURCE_MESSAGE);

// finally prepend the total length of all collected chunks
protobuf.recordMessage(buf, 1, payloadBytes);
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,51 @@ class WriterFactoryTest extends DDSpecification {
OtlpConfig.Protocol.GRPC | OtlpConfig.Compression.NONE | "http://otel-collector:4317" | OtlpGrpcSender | "http://otel-collector:4317/opentelemetry.proto.collector.trace.v1.TraceService/Export" | false
}

def "DDAgentApi marks client-computed stats when either stats pipeline is enabled (spanMetrics=#spanMetrics, nativeStats=#nativeStats)"() {
setup:
// metricsEnabled gates the Datadog-Client-Computed-Stats header on the native trace transport.
// It must be true whenever the SDK computes stats by EITHER pipeline: native msgpack stats
// (isTracerMetricsEnabled) or OTLP span metrics (isTracesSpanMetricsEnabled). The OTLP-stats
// case arises when OTEL_TRACES_SPAN_METRICS_ENABLED is set while traces still export natively;
// without the header the Agent would recompute stats from spans already exported via OTLP.
def config = Mock(Config)
config.apiKey >> "my-api-key"
config.agentUrl >> "http://my-agent.url"
config.getEnumValue(PRIORITIZATION_TYPE, _, _) >> Prioritization.FAST_LANE
config.tracerMetricsEnabled >> nativeStats
config.tracesSpanMetricsEnabled >> spanMetrics

def mockCall = Mock(Call)
def mockHttpClient = Mock(OkHttpClient)
// advertise only v0.4 (no EVP proxy) so "DDAgentWriter" resolves to a real DDAgentWriter
mockCall.execute() >> { buildHttpResponse(false, false, HttpUrl.parse(config.agentUrl + "/info")) }
mockHttpClient.newCall(_ as Request) >> mockCall

def sharedComm = new SharedCommunicationObjects()
sharedComm.agentHttpClient = mockHttpClient
sharedComm.agentUrl = HttpUrl.parse(config.agentUrl)
sharedComm.createRemaining(config)

def sampler = Mock(Sampler)

when:
def writer = WriterFactory.createWriter(config, sharedComm, sampler, null, HealthMetrics.NO_OP, "DDAgentWriter")
def api = ((RemoteWriter) writer).apis.find { it instanceof DDAgentApi }
def metricsEnabledField = DDAgentApi.getDeclaredField("metricsEnabled")
metricsEnabledField.setAccessible(true)

then:
writer instanceof DDAgentWriter
metricsEnabledField.getBoolean(api) == expectedComputesStats

where:
spanMetrics | nativeStats | expectedComputesStats
true | false | true // gap case: OTLP span metrics on, native stats off
false | false | false // neither pipeline computes stats
false | true | true // native stats on (regression guard)
true | true | true // both on
}

Response buildHttpResponse(boolean hasEvpProxy, boolean evpProxySupportsCompression, HttpUrl agentUrl) {
def endpoints = []
if (hasEvpProxy && evpProxySupportsCompression) {
Expand Down
Loading
Loading