From 33a9e677dec3b822c7205871f66339931f5282ce Mon Sep 17 00:00:00 2001 From: Douglas Q Hawkins Date: Tue, 14 Apr 2026 20:37:00 -0400 Subject: [PATCH] Move client-side stats computation off the span-finish thread ConflatingMetricsAggregator.publish() was consuming ~17% of foreground CPU (ConcurrentHashMap 12%, TraceHealthMetrics 3%, LongAdder 2%) by running MetricKey construction, ConcurrentHashMap lookups, and Batch management synchronously on the span-finish thread. This change extracts lightweight SpanStatsData DTOs on the foreground thread and defers all expensive work (MetricKey construction, map lookups, health metrics) to the existing background Aggregator thread via the MPSC inbox queue. The pending/keys maps are downgraded from ConcurrentHashMap to plain HashMap since they are now single-threaded. Benchmark shows 64-span trace foreground cost reduced from 2.86us to 2.04us (~29% reduction). tag: no release note tag: ai generated Co-Authored-By: Claude Opus 4.6 (1M context) --- .../metrics/SpanFinishWithStatsBenchmark.java | 155 +++++++++++++ .../trace/common/metrics/AggregateMetric.java | 22 ++ .../trace/common/metrics/Aggregator.java | 160 +++++++++++--- .../metrics/ConflatingMetricsAggregator.java | 207 +++++++----------- .../trace/common/metrics/SpanStatsData.java | 61 ++++++ .../trace/common/metrics/TraceStatsData.java | 18 ++ 6 files changed, 462 insertions(+), 161 deletions(-) create mode 100644 dd-trace-core/src/jmh/java/datadog/trace/common/metrics/SpanFinishWithStatsBenchmark.java create mode 100644 dd-trace-core/src/main/java/datadog/trace/common/metrics/SpanStatsData.java create mode 100644 dd-trace-core/src/main/java/datadog/trace/common/metrics/TraceStatsData.java diff --git a/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/SpanFinishWithStatsBenchmark.java b/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/SpanFinishWithStatsBenchmark.java new file mode 100644 index 00000000000..a44c9e3fcaa --- /dev/null +++ b/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/SpanFinishWithStatsBenchmark.java @@ -0,0 +1,155 @@ +package datadog.trace.common.metrics; + +import static java.util.concurrent.TimeUnit.MICROSECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; + +import datadog.communication.ddagent.DDAgentFeaturesDiscovery; +import datadog.trace.api.WellKnownTags; +import datadog.trace.core.CoreSpan; +import datadog.trace.core.monitor.HealthMetrics; +import datadog.trace.util.Strings; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +/** + * Measures the foreground thread cost of publishing span stats. With the background-stats + * optimization, the foreground thread should only extract lightweight SpanStatsData and offer to + * the inbox queue, while the expensive MetricKey construction and HashMap operations happen on the + * background aggregator thread. + */ +@State(Scope.Benchmark) +@Warmup(iterations = 3, time = 5, timeUnit = SECONDS) +@Measurement(iterations = 5, time = 5, timeUnit = SECONDS) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(MICROSECONDS) +@Fork(value = 1) +public class SpanFinishWithStatsBenchmark { + + private static final Set PEER_TAGS = Collections.singleton("peer.hostname"); + + private final DDAgentFeaturesDiscovery featuresDiscovery = + new ConflatingMetricsAggregatorBenchmark.FixedAgentFeaturesDiscovery( + PEER_TAGS, Collections.emptySet()); + + private ConflatingMetricsAggregator aggregator; + + private final List> smallTrace = generateTrace(4); + private final List> mediumTrace = generateTrace(16); + private final List> largeTrace = generateTrace(64); + + @Setup(Level.Trial) + public void setup() { + aggregator = + new ConflatingMetricsAggregator( + new WellKnownTags("", "", "", "", "", ""), + Collections.emptySet(), + featuresDiscovery, + HealthMetrics.NO_OP, + new NullSink(), + 2048, + 2048, + false); + aggregator.start(); + } + + @TearDown(Level.Trial) + public void teardown() { + if (aggregator != null) { + aggregator.close(); + } + } + + static List> generateTrace(int len) { + final List> trace = new ArrayList<>(); + for (int i = 0; i < len; i++) { + SimpleSpan span = new SimpleSpan("", "", "", "", true, true, false, 0, 10, -1); + span.setTag("peer.hostname", Strings.random(10)); + trace.add(span); + } + return trace; + } + + static class NullSink implements Sink { + @Override + public void register(EventListener listener) {} + + @Override + public void accept(int messageCount, ByteBuffer buffer) {} + } + + @Benchmark + public void publishSmallTrace(Blackhole blackhole) { + blackhole.consume(aggregator.publish(smallTrace)); + } + + @Benchmark + public void publishMediumTrace(Blackhole blackhole) { + blackhole.consume(aggregator.publish(mediumTrace)); + } + + @Benchmark + public void publishLargeTrace(Blackhole blackhole) { + blackhole.consume(aggregator.publish(largeTrace)); + } + + /** Multi-threaded benchmark to measure contention under concurrent publishing. */ + @State(Scope.Benchmark) + @Warmup(iterations = 3, time = 5, timeUnit = SECONDS) + @Measurement(iterations = 5, time = 5, timeUnit = SECONDS) + @BenchmarkMode(Mode.Throughput) + @OutputTimeUnit(MICROSECONDS) + @Threads(8) + @Fork(value = 1) + public static class ConcurrentPublish { + + private ConflatingMetricsAggregator aggregator; + private final List> trace = generateTrace(16); + + @Setup(Level.Trial) + public void setup() { + DDAgentFeaturesDiscovery features = + new ConflatingMetricsAggregatorBenchmark.FixedAgentFeaturesDiscovery( + PEER_TAGS, Collections.emptySet()); + aggregator = + new ConflatingMetricsAggregator( + new WellKnownTags("", "", "", "", "", ""), + Collections.emptySet(), + features, + HealthMetrics.NO_OP, + new NullSink(), + 2048, + 2048, + false); + aggregator.start(); + } + + @TearDown(Level.Trial) + public void teardown() { + if (aggregator != null) { + aggregator.close(); + } + } + + @Benchmark + public void publishConcurrent(Blackhole blackhole) { + blackhole.consume(aggregator.publish(trace)); + } + } +} diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateMetric.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateMetric.java index 478ff520a37..8a8b7225e8f 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateMetric.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateMetric.java @@ -46,6 +46,28 @@ public AggregateMetric recordDurations(int count, AtomicLongArray durations) { return this; } + /** + * Record a single duration value with embedded tags. Called from the background aggregator thread + * when processing SpanStatsData (no Batch intermediary needed since the aggregation is + * single-threaded). + */ + public void recordDuration(long taggedDuration) { + this.hitCount++; + long duration = taggedDuration; + if ((duration & TOP_LEVEL_TAG) == TOP_LEVEL_TAG) { + duration ^= TOP_LEVEL_TAG; + ++topLevelCount; + } + if ((duration & ERROR_TAG) == ERROR_TAG) { + duration ^= ERROR_TAG; + errorLatencies.accept(duration); + ++errorCount; + } else { + okLatencies.accept(duration); + } + this.duration += duration; + } + public int getErrorCount() { return errorCount; } diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/Aggregator.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/Aggregator.java index dd87406ce7f..714f67faf8b 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/Aggregator.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/Aggregator.java @@ -1,15 +1,32 @@ package datadog.trace.common.metrics; +import static datadog.trace.api.Functions.UTF8_ENCODE; +import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_CLIENT; +import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_CONSUMER; +import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_PRODUCER; +import static java.util.Collections.unmodifiableSet; import static java.util.concurrent.TimeUnit.MILLISECONDS; +import datadog.communication.ddagent.DDAgentFeaturesDiscovery; +import datadog.trace.api.Pair; +import datadog.trace.api.cache.DDCache; +import datadog.trace.api.cache.DDCaches; +import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString; import datadog.trace.common.metrics.SignalItem.StopSignal; +import datadog.trace.core.monitor.HealthMetrics; import datadog.trace.core.util.LRUCache; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; +import java.util.function.Function; import org.jctools.queues.MessagePassingQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -20,12 +37,32 @@ final class Aggregator implements Runnable { private static final Logger log = LoggerFactory.getLogger(Aggregator.class); - private final MessagePassingQueue batchPool; + static final Set ELIGIBLE_SPAN_KINDS_FOR_PEER_AGGREGATION = + unmodifiableSet( + new HashSet<>(Arrays.asList(SPAN_KIND_CLIENT, SPAN_KIND_PRODUCER, SPAN_KIND_CONSUMER))); + + private static final DDCache SERVICE_NAMES = + DDCaches.newFixedSizeCache(32); + + private static final DDCache SPAN_KINDS = + DDCaches.newFixedSizeCache(16); + private static final DDCache< + String, Pair, Function>> + PEER_TAGS_CACHE = DDCaches.newFixedSizeCache(64); + private static final Function< + String, Pair, Function>> + PEER_TAGS_CACHE_ADDER = + key -> + Pair.of( + DDCaches.newFixedSizeCache(512), + value -> UTF8BytesString.create(key + ":" + value)); + private final MessagePassingQueue inbox; private final LRUCache aggregates; - private final ConcurrentMap pending; - private final Set commonKeys; + // Downgraded from ConcurrentHashMap: only accessed on the aggregator thread + private final HashMap keys; private final MetricWriter writer; + private final HealthMetrics healthMetrics; // the reporting interval controls how much history will be buffered // when the agent is unresponsive (only 10 pending requests will be // buffered by OkHttpSink) @@ -40,45 +77,43 @@ final class Aggregator implements Runnable { Aggregator( MetricWriter writer, - MessagePassingQueue batchPool, MessagePassingQueue inbox, - ConcurrentMap pending, - final Set commonKeys, int maxAggregates, long reportingInterval, - TimeUnit reportingIntervalTimeUnit) { + TimeUnit reportingIntervalTimeUnit, + HealthMetrics healthMetrics, + DDAgentFeaturesDiscovery features, + boolean includeEndpointInMetrics) { this( writer, - batchPool, inbox, - pending, - commonKeys, maxAggregates, reportingInterval, reportingIntervalTimeUnit, - DEFAULT_SLEEP_MILLIS); + DEFAULT_SLEEP_MILLIS, + healthMetrics, + features, + includeEndpointInMetrics); } Aggregator( MetricWriter writer, - MessagePassingQueue batchPool, MessagePassingQueue inbox, - ConcurrentMap pending, - final Set commonKeys, int maxAggregates, long reportingInterval, TimeUnit reportingIntervalTimeUnit, - long sleepMillis) { + long sleepMillis, + HealthMetrics healthMetrics, + DDAgentFeaturesDiscovery features, + boolean includeEndpointInMetrics) { this.writer = writer; - this.batchPool = batchPool; this.inbox = inbox; - this.commonKeys = commonKeys; + this.keys = new HashMap<>(); this.aggregates = - new LRUCache<>( - new CommonKeyCleaner(commonKeys), maxAggregates * 4 / 3, 0.75f, maxAggregates); - this.pending = pending; + new LRUCache<>(new CommonKeyCleaner(keys), maxAggregates * 4 / 3, 0.75f, maxAggregates); this.reportingIntervalNanos = reportingIntervalTimeUnit.toNanos(reportingInterval); this.sleepMillis = sleepMillis; + this.healthMetrics = healthMetrics; } public void clearAggregates() { @@ -122,20 +157,73 @@ public void accept(InboxItem item) { } else { signal.ignore(); } - } else if (item instanceof Batch && !stopped) { - Batch batch = (Batch) item; - MetricKey key = batch.getKey(); - // important that it is still *this* batch pending, must not remove otherwise - pending.remove(key, batch); - AggregateMetric aggregate = aggregates.computeIfAbsent(key, k -> new AggregateMetric()); - batch.contributeTo(aggregate); - dirty = true; - // return the batch for reuse - batchPool.offer(batch); + } else if (item instanceof TraceStatsData && !stopped) { + processTraceStats((TraceStatsData) item); } } } + /** Process all span stats from a trace on the background thread. */ + private void processTraceStats(TraceStatsData traceData) { + int counted = traceData.spans.length; + for (SpanStatsData spanData : traceData.spans) { + publishSpan(spanData); + } + healthMetrics.onClientStatTraceComputed(counted, traceData.totalSpanCount, !traceData.hasError); + } + + /** + * Construct MetricKey from SpanStatsData and aggregate -- all on the background thread. This is + * the expensive work that was previously done on the foreground span-finish thread. + */ + private void publishSpan(SpanStatsData span) { + List peerTags = buildPeerTags(span.peerTagValues); + + MetricKey newKey = + new MetricKey( + span.resourceName, + SERVICE_NAMES.computeIfAbsent(span.serviceName, UTF8_ENCODE), + span.operationName, + span.serviceNameSource, + span.spanType, + span.httpStatusCode, + span.synthetic, + span.traceRoot, + SPAN_KINDS.computeIfAbsent(span.spanKind, UTF8BytesString::create), + peerTags, + span.httpMethod, + span.httpEndpoint, + span.grpcStatusCode); + MetricKey key = keys.putIfAbsent(newKey, newKey); + if (null == key) { + key = newKey; + } + long tag = + (span.error > 0 ? AggregateMetric.ERROR_TAG : 0L) + | (span.topLevel ? AggregateMetric.TOP_LEVEL_TAG : 0L); + long durationNanos = span.durationNano; + + AggregateMetric aggregate = aggregates.computeIfAbsent(key, k -> new AggregateMetric()); + aggregate.recordDuration(tag | durationNanos); + dirty = true; + } + + /** Build UTF8BytesString peer tags from the flat [name, value, name, value, ...] array. */ + private static List buildPeerTags(Object[] peerTagValues) { + if (peerTagValues == null || peerTagValues.length == 0) { + return Collections.emptyList(); + } + List peerTags = new ArrayList<>(peerTagValues.length / 2); + for (int i = 0; i < peerTagValues.length; i += 2) { + String tagName = (String) peerTagValues[i]; + String tagValue = (String) peerTagValues[i + 1]; + final Pair, Function> + cacheAndCreator = PEER_TAGS_CACHE.computeIfAbsent(tagName, PEER_TAGS_CACHE_ADDER); + peerTags.add(cacheAndCreator.getLeft().computeIfAbsent(tagValue, cacheAndCreator.getRight())); + } + return peerTags; + } + private void report(long when, SignalItem signal) { boolean skipped = true; if (dirty) { @@ -170,7 +258,7 @@ private void expungeStaleAggregates() { AggregateMetric metric = pair.getValue(); if (metric.getHitCount() == 0) { it.remove(); - commonKeys.remove(pair.getKey()); + keys.remove(pair.getKey()); } } } @@ -182,15 +270,15 @@ private long wallClockTime() { private static final class CommonKeyCleaner implements LRUCache.ExpiryListener { - private final Set commonKeys; + private final Map keys; - private CommonKeyCleaner(Set commonKeys) { - this.commonKeys = commonKeys; + private CommonKeyCleaner(Map keys) { + this.keys = keys; } @Override public void accept(Map.Entry expired) { - commonKeys.remove(expired.getKey()); + keys.remove(expired.getKey()); } } } diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java index 010993efe50..45ab5b649ff 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java @@ -2,18 +2,13 @@ import static datadog.communication.ddagent.DDAgentFeaturesDiscovery.V06_METRICS_ENDPOINT; import static datadog.trace.api.DDSpanTypes.RPC; -import static datadog.trace.api.DDTags.BASE_SERVICE; -import static datadog.trace.api.Functions.UTF8_ENCODE; import static datadog.trace.bootstrap.instrumentation.api.Tags.HTTP_ENDPOINT; import static datadog.trace.bootstrap.instrumentation.api.Tags.HTTP_METHOD; import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND; import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_CLIENT; import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_CONSUMER; -import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_INTERNAL; import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_PRODUCER; import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_SERVER; -import static datadog.trace.common.metrics.AggregateMetric.ERROR_TAG; -import static datadog.trace.common.metrics.AggregateMetric.TOP_LEVEL_TAG; import static datadog.trace.common.metrics.SignalItem.ReportSignal.REPORT; import static datadog.trace.common.metrics.SignalItem.StopSignal.STOP; import static datadog.trace.util.AgentThreadFactory.AgentThread.METRICS_AGGREGATOR; @@ -26,19 +21,14 @@ import datadog.communication.ddagent.DDAgentFeaturesDiscovery; import datadog.communication.ddagent.SharedCommunicationObjects; import datadog.trace.api.Config; -import datadog.trace.api.Pair; import datadog.trace.api.WellKnownTags; -import datadog.trace.api.cache.DDCache; -import datadog.trace.api.cache.DDCaches; import datadog.trace.bootstrap.instrumentation.api.InstrumentationTags; -import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString; import datadog.trace.common.metrics.SignalItem.ReportSignal; import datadog.trace.common.writer.ddagent.DDAgentApi; import datadog.trace.core.CoreSpan; import datadog.trace.core.DDTraceCoreInfo; import datadog.trace.core.monitor.HealthMetrics; import datadog.trace.util.AgentTaskScheduler; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; @@ -46,10 +36,8 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import java.util.function.Function; import javax.annotation.Nonnull; import org.jctools.queues.MessagePassingQueue; import org.slf4j.Logger; @@ -62,24 +50,6 @@ public final class ConflatingMetricsAggregator implements MetricsAggregator, Eve private static final Map DEFAULT_HEADERS = Collections.singletonMap(DDAgentApi.DATADOG_META_TRACER_VERSION, DDTraceCoreInfo.VERSION); - private static final DDCache SERVICE_NAMES = - DDCaches.newFixedSizeCache(32); - - private static final DDCache SPAN_KINDS = - DDCaches.newFixedSizeCache(16); - private static final DDCache< - String, Pair, Function>> - PEER_TAGS_CACHE = - DDCaches.newFixedSizeCache( - 64); // it can be unbounded since those values are returned by the agent and should be - // under control. 64 entries is enough in this case to contain all the peer tags. - private static final Function< - String, Pair, Function>> - PEER_TAGS_CACHE_ADDER = - key -> - Pair.of( - DDCaches.newFixedSizeCache(512), - value -> UTF8BytesString.create(key + ":" + value)); private static final CharSequence SYNTHETICS_ORIGIN = "synthetics"; private static final Set ELIGIBLE_SPAN_KINDS_FOR_METRICS = @@ -88,14 +58,7 @@ public final class ConflatingMetricsAggregator implements MetricsAggregator, Eve Arrays.asList( SPAN_KIND_SERVER, SPAN_KIND_CLIENT, SPAN_KIND_CONSUMER, SPAN_KIND_PRODUCER))); - private static final Set ELIGIBLE_SPAN_KINDS_FOR_PEER_AGGREGATION = - unmodifiableSet( - new HashSet<>(Arrays.asList(SPAN_KIND_CLIENT, SPAN_KIND_PRODUCER, SPAN_KIND_CONSUMER))); - private final Set ignoredResources; - private final MessagePassingQueue batchPool; - private final ConcurrentHashMap pending; - private final ConcurrentHashMap keys; private final Thread thread; private final MessagePassingQueue inbox; private final Sink sink; @@ -189,22 +152,19 @@ public ConflatingMetricsAggregator( this.ignoredResources = ignoredResources; this.includeEndpointInMetrics = includeEndpointInMetrics; this.inbox = Queues.mpscArrayQueue(queueSize); - this.batchPool = Queues.spmcArrayQueue(maxAggregates); - this.pending = new ConcurrentHashMap<>(maxAggregates * 4 / 3); - this.keys = new ConcurrentHashMap<>(); this.features = features; this.healthMetrics = healthMetric; this.sink = sink; this.aggregator = new Aggregator( metricWriter, - batchPool, inbox, - pending, - keys.keySet(), maxAggregates, reportingInterval, - timeUnit); + timeUnit, + healthMetric, + features, + includeEndpointInMetrics); this.thread = newAgentThread(METRICS_AGGREGATOR, aggregator); this.reportingInterval = reportingInterval; this.reportingIntervalTimeUnit = timeUnit; @@ -284,8 +244,11 @@ public Future forceReport() { @Override public boolean publish(List> trace) { boolean forceKeep = false; - int counted = 0; if (features.supportsMetrics()) { + // Pre-size to trace size; most spans will be eligible + SpanStatsData[] buffer = new SpanStatsData[trace.size()]; + int counted = 0; + boolean hasError = false; for (CoreSpan span : trace) { boolean isTopLevel = span.isTopLevel(); final CharSequence spanKind = span.unsafeGetTag(SPAN_KIND, ""); @@ -294,13 +257,33 @@ public boolean publish(List> trace) { if (resourceName != null && ignoredResources.contains(resourceName.toString())) { // skip publishing all children forceKeep = false; + counted = 0; break; } - counted++; - forceKeep |= publish(span, isTopLevel, spanKind); + int error = span.getError(); + if (error > 0) { + forceKeep = true; + hasError = true; + } + buffer[counted++] = extractSpanData(span, isTopLevel, spanKind); } } - healthMetrics.onClientStatTraceComputed(counted, trace.size(), !forceKeep); + if (counted > 0) { + SpanStatsData[] spans; + if (counted == buffer.length) { + spans = buffer; + } else { + spans = new SpanStatsData[counted]; + System.arraycopy(buffer, 0, spans, 0, counted); + } + TraceStatsData traceData = new TraceStatsData(spans, trace.size(), hasError); + inbox.offer(traceData); + } else { + // Nothing counted -- still report to health metrics on background thread, + // but avoid allocating spans array + TraceStatsData traceData = new TraceStatsData(new SpanStatsData[0], trace.size(), false); + inbox.offer(traceData); + } } return forceKeep; } @@ -317,8 +300,12 @@ private boolean spanKindEligible(@Nonnull CharSequence spanKind) { return ELIGIBLE_SPAN_KINDS_FOR_METRICS.contains(spanKind.toString()); } - private boolean publish(CoreSpan span, boolean isTopLevel, CharSequence spanKind) { - // Extract HTTP method and endpoint only if the feature is enabled + /** + * Extract lightweight data from a span on the foreground thread. Only reads cheap volatile/final + * fields and tag lookups. The expensive MetricKey construction happens on the background thread. + */ + private SpanStatsData extractSpanData( + CoreSpan span, boolean isTopLevel, CharSequence spanKind) { String httpMethod = null; String httpEndpoint = null; if (includeEndpointInMetrics) { @@ -334,96 +321,68 @@ private boolean publish(CoreSpan span, boolean isTopLevel, CharSequence spanK Object grpcStatusObj = span.unsafeGetTag(InstrumentationTags.GRPC_STATUS_CODE); grpcStatusCode = grpcStatusObj != null ? grpcStatusObj.toString() : null; } - MetricKey newKey = - new MetricKey( - span.getResourceName(), - SERVICE_NAMES.computeIfAbsent(span.getServiceName(), UTF8_ENCODE), - span.getOperationName(), - span.getServiceNameSource(), - spanType, - span.getHttpStatusCode(), - isSynthetic(span), - span.getParentId() == 0, - SPAN_KINDS.computeIfAbsent( - spanKind, UTF8BytesString::create), // save repeated utf8 conversions - getPeerTags(span, spanKind.toString()), - httpMethod, - httpEndpoint, - grpcStatusCode); - MetricKey key = keys.putIfAbsent(newKey, newKey); - if (null == key) { - key = newKey; - } - long tag = (span.getError() > 0 ? ERROR_TAG : 0L) | (isTopLevel ? TOP_LEVEL_TAG : 0L); - long durationNanos = span.getDurationNano(); - Batch batch = pending.get(key); - if (null != batch) { - // there is a pending batch, try to win the race to add to it - // returning false means that either the batch can't take any - // more data, or it has already been consumed - if (batch.add(tag, durationNanos)) { - // added to a pending batch prior to consumption, - // so skip publishing to the queue (we also know - // the key isn't rare enough to override the sampler) - return false; - } - // recycle the older key - key = batch.getKey(); - } - batch = newBatch(key); - batch.add(tag, durationNanos); - // overwrite the last one if present, it was already full - // or had been consumed by the time we tried to add to it - pending.put(key, batch); - // must offer to the queue after adding to pending - inbox.offer(batch); - // force keep keys if there are errors - return span.getError() > 0; + + // Extract peer tag values as raw objects -- the background thread will resolve them + Object[] peerTagValues = extractPeerTagValues(span, spanKind.toString()); + + return new SpanStatsData( + span.getResourceName(), + span.getServiceName(), + span.getOperationName(), + span.getServiceNameSource(), + spanType, + spanKind, + span.getHttpStatusCode(), + isSynthetic(span), + span.getParentId() == 0, + span.getError(), + isTopLevel, + span.getDurationNano(), + peerTagValues, + httpMethod, + httpEndpoint, + grpcStatusCode); } - private List getPeerTags(CoreSpan span, String spanKind) { - if (ELIGIBLE_SPAN_KINDS_FOR_PEER_AGGREGATION.contains(spanKind)) { + /** + * Extract peer tag values as a flat array of [tagName, tagValue, tagName, tagValue, ...]. This + * avoids building UTF8BytesString or doing cache lookups on the foreground thread. + */ + private Object[] extractPeerTagValues(CoreSpan span, String spanKind) { + if (Aggregator.ELIGIBLE_SPAN_KINDS_FOR_PEER_AGGREGATION.contains(spanKind)) { final Set eligiblePeerTags = features.peerTags(); - List peerTags = new ArrayList<>(eligiblePeerTags.size()); + // Worst case: 2 entries per peer tag (name + value) + Object[] buffer = new Object[eligiblePeerTags.size() * 2]; + int idx = 0; for (String peerTag : eligiblePeerTags) { Object value = span.unsafeGetTag(peerTag); if (value != null) { - final Pair, Function> - cacheAndCreator = PEER_TAGS_CACHE.computeIfAbsent(peerTag, PEER_TAGS_CACHE_ADDER); - peerTags.add( - cacheAndCreator - .getLeft() - .computeIfAbsent(value.toString(), cacheAndCreator.getRight())); + buffer[idx++] = peerTag; + buffer[idx++] = value.toString(); } } - return peerTags; - } else if (SPAN_KIND_INTERNAL.equals(spanKind)) { - // in this case only the base service should be aggregated if present - final Object baseService = span.unsafeGetTag(BASE_SERVICE); + if (idx == 0) { + return null; + } + if (idx < buffer.length) { + Object[] result = new Object[idx]; + System.arraycopy(buffer, 0, result, 0, idx); + return result; + } + return buffer; + } else if ("internal".equals(spanKind)) { + final Object baseService = span.unsafeGetTag("_dd.base_service"); if (baseService != null) { - final Pair, Function> - cacheAndCreator = PEER_TAGS_CACHE.computeIfAbsent(BASE_SERVICE, PEER_TAGS_CACHE_ADDER); - return Collections.singletonList( - cacheAndCreator - .getLeft() - .computeIfAbsent(baseService.toString(), cacheAndCreator.getRight())); + return new Object[] {"_dd.base_service", baseService.toString()}; } } - return Collections.emptyList(); + return null; } private static boolean isSynthetic(CoreSpan span) { return span.getOrigin() != null && SYNTHETICS_ORIGIN.equals(span.getOrigin().toString()); } - private Batch newBatch(MetricKey key) { - Batch batch = batchPool.poll(); - if (null == batch) { - return new Batch(key); - } - return batch.reset(key); - } - public void stop() { if (null != cancellation) { cancellation.cancel(); @@ -466,8 +425,6 @@ private void disable() { features.discover(); if (!features.supportsMetrics()) { log.debug("Disabling metric reporting because an agent downgrade was detected"); - this.pending.clear(); - this.batchPool.clear(); this.inbox.clear(); this.aggregator.clearAggregates(); } diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/SpanStatsData.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/SpanStatsData.java new file mode 100644 index 00000000000..843683fb677 --- /dev/null +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/SpanStatsData.java @@ -0,0 +1,61 @@ +package datadog.trace.common.metrics; + +/** + * Immutable DTO carrying the minimal data needed from a CoreSpan for MetricKey construction on the + * background aggregator thread. All fields are extracted from simple span getters (cheap + * volatile/final field reads) on the foreground thread. + */ +final class SpanStatsData { + final CharSequence resourceName; + final String serviceName; + final CharSequence operationName; + final CharSequence serviceNameSource; + final CharSequence spanType; + final CharSequence spanKind; + final short httpStatusCode; + final boolean synthetic; + final boolean traceRoot; + final int error; + final boolean topLevel; + final long durationNano; + // Flat array of peer tag values (already resolved to UTF8BytesString via caches) + final Object[] peerTagValues; + final String httpMethod; + final String httpEndpoint; + final String grpcStatusCode; + + SpanStatsData( + CharSequence resourceName, + String serviceName, + CharSequence operationName, + CharSequence serviceNameSource, + CharSequence spanType, + CharSequence spanKind, + short httpStatusCode, + boolean synthetic, + boolean traceRoot, + int error, + boolean topLevel, + long durationNano, + Object[] peerTagValues, + String httpMethod, + String httpEndpoint, + String grpcStatusCode) { + this.resourceName = resourceName; + this.serviceName = serviceName; + this.operationName = operationName; + this.serviceNameSource = serviceNameSource; + this.spanType = spanType; + this.spanKind = spanKind; + this.httpStatusCode = httpStatusCode; + this.synthetic = synthetic; + this.traceRoot = traceRoot; + this.error = error; + this.topLevel = topLevel; + this.durationNano = durationNano; + this.peerTagValues = peerTagValues; + this.httpMethod = httpMethod; + this.httpEndpoint = httpEndpoint; + this.grpcStatusCode = grpcStatusCode; + } +} diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/TraceStatsData.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/TraceStatsData.java new file mode 100644 index 00000000000..74da363d4cb --- /dev/null +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/TraceStatsData.java @@ -0,0 +1,18 @@ +package datadog.trace.common.metrics; + +/** + * InboxItem that carries extracted span data from the foreground thread to the background + * aggregator thread. The expensive MetricKey construction, HashMap operations, and Batch management + * happen on the background thread after receiving this item. + */ +final class TraceStatsData implements InboxItem { + final SpanStatsData[] spans; + final int totalSpanCount; + final boolean hasError; + + TraceStatsData(SpanStatsData[] spans, int totalSpanCount, boolean hasError) { + this.spans = spans; + this.totalSpanCount = totalSpanCount; + this.hasError = hasError; + } +}