diff --git a/components/context/src/main/java/datadog/context/Context.java b/components/context/src/main/java/datadog/context/Context.java index 967feb86fe9..34a07524de8 100644 --- a/components/context/src/main/java/datadog/context/Context.java +++ b/components/context/src/main/java/datadog/context/Context.java @@ -101,6 +101,15 @@ static Context detachFrom(Object carrier) { return binder().detachFrom(carrier); } + /** + * Captures this (attached) context so it can be resumed in another execution unit. + * + * @return continuation capturing this context. + */ + default ContextContinuation capture() { + return manager().capture(this); + } + /** * Gets the value stored in this context under the given key. * @@ -159,4 +168,13 @@ default Context with(@Nullable ImplicitContextKeyed value) { } return value.storeInto(this); } + + /** + * Wraps context as a scope without attaching it to the current execution unit. + * + * @return a scope that has no effect on execution units. + */ + default ContextScope asScope() { + return new NoopContextScope(this); + } } diff --git a/components/context/src/main/java/datadog/context/ContextContinuation.java b/components/context/src/main/java/datadog/context/ContextContinuation.java new file mode 100644 index 00000000000..c516a7b35bf --- /dev/null +++ b/components/context/src/main/java/datadog/context/ContextContinuation.java @@ -0,0 +1,79 @@ +package datadog.context; + +/** + * Captures a context attached to one execution unit so it can be resumed in another. + * + *
To propagate context to a single background task: + * + *
{@code
+ * ContextContinuation continuation = Context.current().capture();
+ * executor.execute(() -> {
+ * try (ContextScope scope = continuation.resume()) {
+ * // ... Context.current() here returns the captured context
+ * }
+ * // context implicitly released from continuation when resumed scope closes
+ * });
+ * }
+ *
+ * If a continuation is never resumed (e.g. a task is cancelled before it runs), you must release + * it explicitly to avoid resource leaks: + * + *
{@code
+ * ContextContinuation continuation = Context.current().capture();
+ * Future> future = executor.submit(() -> {
+ * try (ContextScope scope = continuation.resume()) {
+ * // ...
+ * }
+ * });
+ * // ...
+ * if (future.cancel(false)) {
+ * continuation.release(); // task will never resume, so release manually
+ * }
+ * }
+ *
+ * When the same context is resumed concurrently across multiple threads, call {@link #hold()} + * immediately after capture to prevent the first {@link #resume()} from releasing the context: + * + *
{@code
+ * ContextContinuation continuation = Context.current().capture().hold();
+ * for (int i = 0; i < N; i++) {
+ * executor.execute(() -> {
+ * try (ContextScope scope = continuation.resume()) {
+ * // ...
+ * }
+ * });
+ * }
+ * // ...
+ * continuation.release(); // remember to release the hold once all tasks are resumed/done
+ * }
+ */
+public interface ContextContinuation {
+
+ /**
+ * Optional builder method to stop {@link #resume()} from implicitly releasing the captured
+ * context. This is useful when multiple threads may concurrently resume the context. You must
+ * then explicitly {@link #release() release} the context once all threads are resumed/done.
+ *
+ * @return this continuation, but with implicit release-after-resume turned off.
+ */
+ ContextContinuation hold();
+
+ /**
+ * Returns the context captured by this continuation.
+ *
+ * @return the captured context.
+ */
+ Context context();
+
+ /**
+ * Resumes the context captured by this continuation by attaching it to the current execution
+ * unit. Implicitly {@link #release() releases} the captured context at the end of the resumed
+ * scope, unless {@link #hold()} was called when creating the continuation.
+ *
+ * @return a scope to be closed when the resumed context is invalid.
+ */
+ ContextScope resume();
+
+ /** Explicitly releases the context captured by this continuation. */
+ void release();
+}
diff --git a/components/context/src/main/java/datadog/context/ContextListener.java b/components/context/src/main/java/datadog/context/ContextListener.java
new file mode 100644
index 00000000000..1bf06a7e974
--- /dev/null
+++ b/components/context/src/main/java/datadog/context/ContextListener.java
@@ -0,0 +1,33 @@
+package datadog.context;
+
+/** Listener of context events. */
+public interface ContextListener {
+
+ /**
+ * Notifies that the given context has been attached to the current execution unit.
+ *
+ * @param context the attached context.
+ */
+ default void onAttach(Context context) {}
+
+ /**
+ * Notifies that the given context has been detached from the current execution unit.
+ *
+ * @param context the detached context.
+ */
+ default void onDetach(Context context) {}
+
+ /**
+ * Notifies that the given context has been captured by a continuation.
+ *
+ * @param context the captured context.
+ */
+ default void onCapture(Context context) {}
+
+ /**
+ * Notifies that the given context has been released from a continuation.
+ *
+ * @param context the released context.
+ */
+ default void onRelease(Context context) {}
+}
diff --git a/components/context/src/main/java/datadog/context/ContextManager.java b/components/context/src/main/java/datadog/context/ContextManager.java
index af0a2b9289a..6828352d1d4 100644
--- a/components/context/src/main/java/datadog/context/ContextManager.java
+++ b/components/context/src/main/java/datadog/context/ContextManager.java
@@ -1,5 +1,7 @@
package datadog.context;
+import static datadog.context.ContextProviders.manager;
+
/** Manages context across execution units. */
public interface ContextManager {
/**
@@ -25,6 +27,29 @@ public interface ContextManager {
*/
Context swap(Context context);
+ /**
+ * Captures the given (attached) context so it can be resumed in another execution unit.
+ *
+ * @return continuation capturing the context.
+ */
+ ContextContinuation capture(Context context);
+
+ /**
+ * Registers the given listener to receive context events.
+ *
+ * @param listener the listener to register
+ */
+ void addListener(ContextListener listener);
+
+ /**
+ * Registers the given listener to receive context events.
+ *
+ * @param listener the listener to register.
+ */
+ static void register(ContextListener listener) {
+ manager().addListener(listener);
+ }
+
/**
* Requests use of a custom {@link ContextManager}.
*
diff --git a/components/context/src/main/java/datadog/context/EmptyContext.java b/components/context/src/main/java/datadog/context/EmptyContext.java
index 20023482647..e9fee391373 100644
--- a/components/context/src/main/java/datadog/context/EmptyContext.java
+++ b/components/context/src/main/java/datadog/context/EmptyContext.java
@@ -3,13 +3,13 @@
import static java.util.Objects.requireNonNull;
import javax.annotation.Nullable;
-import javax.annotation.ParametersAreNonnullByDefault;
/** {@link Context} containing no values. */
-@ParametersAreNonnullByDefault
-final class EmptyContext implements Context {
+final class EmptyContext implements SelfScopedContext {
static final Context INSTANCE = new EmptyContext();
+ private EmptyContext() {}
+
@Override
@Nullable
public | Value | Meaning |
|---|---|
| 0 | Not held or resumed |
| 1..HELD-1 | Resumed, not held |
| HELD | Held, not yet resumed |
| HELD..MAX_INT | Resumed and held |
A negative value of RELEASED reflects that the continuation has either been resumed and
+ * all associated scopes are now closed, or it has been explicitly released. This value was
+ * chosen to be half the size of MIN_INT to avoid speculative additions in {@link #resume()}
+ * from overflowing to a positive count.
+ */
+ private volatile int count = 0;
+
+ ContextContinuationImpl(Context context) {
+ this.context = context;
+ notifyCapture(context, INSTANCE.listeners);
+ }
+
+ @Override
+ public ContextContinuation hold() {
+ // update initial count to record that this continuation has a hold
+ COUNT.compareAndSet(this, 0, HELD);
+ return this;
+ }
+
+ @Override
+ public Context context() {
+ return context;
+ }
+
+ @Override
+ public ContextScope resume() {
+ if (COUNT.incrementAndGet(this) > 0) {
+ // speculative update succeeded, continuation can be resumed
+ return INSTANCE.doAttach(context, this);
+ } else {
+ // continuation released or too many resumes; rollback count
+ COUNT.decrementAndGet(this);
+ return this; // acts as no-op scope, avoiding allocation
+ }
+ }
+
+ @Override
+ public void release() {
+ int current = count;
+ while (current >= HELD) {
+ // remove the hold on this continuation by removing the offset
+ COUNT.compareAndSet(this, current, current - HELD);
+ current = count;
+ }
+ while (current == 0) {
+ // no outstanding resumes and hold has been removed
+ if (COUNT.compareAndSet(this, current, RELEASED)) {
+ notifyRelease(context, INSTANCE.listeners);
+ return;
+ }
+ current = count;
+ }
+ }
+
+ void releaseOnScopeClose() {
+ if (COUNT.compareAndSet(this, 1, RELEASED)) {
+ // fast path: only one resume of the continuation (no hold)
+ notifyRelease(context, INSTANCE.listeners);
+ } else if (COUNT.decrementAndGet(this) == 0) {
+ // slow path: multiple resumes, all scopes now closed (no hold)
+ release();
+ } /* else there are outstanding resumes or hold is in place */
+ }
+
+ @Override
+ public void close() {}
+ }
+
+ private static final class ContextHolder {
+ Context current = Context.root();
+ }
}
diff --git a/components/context/src/main/java/datadog/context/WeakMapContextBinder.java b/components/context/src/main/java/datadog/context/WeakMapContextBinder.java
index 9b5fd24299e..15e0154f25a 100644
--- a/components/context/src/main/java/datadog/context/WeakMapContextBinder.java
+++ b/components/context/src/main/java/datadog/context/WeakMapContextBinder.java
@@ -6,10 +6,8 @@
import java.util.Map;
import java.util.WeakHashMap;
-import javax.annotation.ParametersAreNonnullByDefault;
/** {@link ContextBinder} that uses a global weak map of carriers to contexts. */
-@ParametersAreNonnullByDefault
final class WeakMapContextBinder implements ContextBinder {
static final ContextBinder INSTANCE = new WeakMapContextBinder();
diff --git a/components/context/src/test/java/datadog/context/ContextBinderTest.java b/components/context/src/test/java/datadog/context/ContextBinderTest.java
index 9af265fea50..eb2bb35563e 100644
--- a/components/context/src/test/java/datadog/context/ContextBinderTest.java
+++ b/components/context/src/test/java/datadog/context/ContextBinderTest.java
@@ -8,15 +8,9 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-class ContextBinderTest {
- @BeforeEach
- void setUp() {
- assertEquals(root(), current(), "No context is expected to be set");
- }
-
+class ContextBinderTest extends ContextTestBase {
@Test
void testAttachAndDetach() {
// Setting up test
diff --git a/components/context/src/test/java/datadog/context/ContextContinuationTest.java b/components/context/src/test/java/datadog/context/ContextContinuationTest.java
new file mode 100644
index 00000000000..0b6508220bf
--- /dev/null
+++ b/components/context/src/test/java/datadog/context/ContextContinuationTest.java
@@ -0,0 +1,368 @@
+package datadog.context;
+
+import static datadog.context.Context.current;
+import static datadog.context.Context.root;
+import static java.util.Arrays.asList;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import javax.annotation.ParametersAreNonnullByDefault;
+import org.junit.jupiter.api.Test;
+
+@ParametersAreNonnullByDefault
+class ContextContinuationTest extends ContextTestBase {
+ private static final ContextKey For the same-non-root-context stack-depth scenario see {@link ContextManagerDepthBenchmark}.
+ *
+ * Run with:
+ *
+ * {@code ThreadLocal} — {@link ThreadLocalContextManager} (the lightweight default).
+ *
+ * {@code Continuable} — {@link ContinuableScopeManager} (the full scope/span manager).
+ */
+ @Param({"ThreadLocal", "Continuable"})
+ public String managerType;
+
+ // ── Benchmark-scoped shared state ─────────────────────────────────────────
+
+ ContextManager manager;
+ // CONTEXT_COUNT distinct non-root contexts; threads cycle through them to
+ // avoid artificial same-context hits in benchmarks that don't want them
+ Context[] contexts;
+
+ @Setup
+ public void setup() {
+ manager = createManager(managerType);
+ contexts = createContexts();
+ }
+
+ static ContextManager createManager(String type) {
+ return "Continuable".equals(type)
+ ? new ContinuableScopeManager(0, false)
+ : ThreadLocalContextManager.INSTANCE;
+ }
+
+ static Context[] createContexts() {
+ Context[] contexts = new Context[CONTEXT_COUNT];
+ for (int i = 0; i < CONTEXT_COUNT; i++) {
+ contexts[i] = Context.root().with(KEY, "value-" + i);
+ }
+ return contexts;
+ }
+
+ // ── Per-thread state ───────────────────────────────────────────────────────
+
+ @State(Scope.Thread)
+ public static class ThreadState {
+ int index;
+ // Pre-allocated barrier reused across fan-out invocations.
+ // Avoids a new CountDownLatch allocation per invocation that would inflate gc.alloc.rate.norm.
+ final Semaphore fanOutBarrier = new Semaphore(0);
+ ExecutorService platformExecutor;
+ ExecutorService virtualExecutor;
+
+ @Setup(Level.Trial)
+ public void setup() {
+ // Both pools are fixed-size so no new thread is created per submitted task.
+ // The virtual pool uses virtual threads (Java 21+) or falls back to platform threads.
+ // Pool size is intentionally larger than the JMH thread count to avoid executor starvation
+ // when benchmark threads all submit tasks concurrently.
+ platformExecutor = Executors.newFixedThreadPool(16);
+ virtualExecutor = newFixedVirtualPool(16);
+ }
+
+ @TearDown(Level.Trial)
+ public void tearDown() throws InterruptedException {
+ platformExecutor.shutdown();
+ virtualExecutor.shutdown();
+ platformExecutor.awaitTermination(10, SECONDS);
+ virtualExecutor.awaitTermination(10, SECONDS);
+ }
+
+ Context nextContext(Context[] contexts) {
+ return contexts[(index++) & (CONTEXT_COUNT - 1)];
+ }
+ }
+
+ // ── Thread state with a pre-attached context (for read benchmarks) ─────────
+
+ /**
+ * Attaches a context once per trial so that {@link #current} and {@link #currentAndGet} measure
+ * only the read path, not the attach overhead.
+ */
+ @State(Scope.Thread)
+ public static class ActiveContextState {
+ ContextScope scope;
+
+ @Setup(Level.Trial)
+ public void setup(ContextManagerBenchmark benchmark) {
+ scope = benchmark.manager.attach(benchmark.contexts[0]);
+ }
+
+ @TearDown(Level.Trial)
+ public void tearDown() {
+ scope.close();
+ }
+ }
+
+ // ── Scenario 1: attach a different context, close scope ───────────────────
+
+ /** Attach one distinct context then close its scope. The hot path for most instrumentations. */
+ @Benchmark
+ public void attachAndClose(ThreadState thread) {
+ Context ctx = thread.nextContext(contexts);
+ try (ContextScope scope = manager.attach(ctx)) {
+ // scope is active
+ }
+ }
+
+ // ── Scenario 2: nested attach of two different contexts ───────────────────
+
+ /**
+ * Attach two distinct contexts in sequence and close both. Exercises the stack push/pop cycle
+ * that occurs at every instrumented method boundary.
+ */
+ @Benchmark
+ public void nestedAttachAndClose(ThreadState thread) {
+ Context outer = thread.nextContext(contexts);
+ Context inner = thread.nextContext(contexts);
+ try (ContextScope outerScope = manager.attach(outer)) {
+ try (ContextScope innerScope = manager.attach(inner)) {
+ // inner is active
+ }
+ }
+ }
+
+ // ── Scenario 3: swap different contexts ───────────────────────────────────
+
+ /**
+ * Swap in a new context then swap back the previous one. {@link
+ * ContinuableScopeManager#swap(Context)} replaces the entire scope stack, making this a heavier
+ * operation than in {@link ThreadLocalContextManager}.
+ *
+ * Note: GCProfiler will show allocation asymmetry here by design. {@link
+ * ContinuableScopeManager} swap allocates a {@code ScopeStack}, a {@code ContinuableScope}, and a
+ * {@code ScopeContext} per invocation; {@link ThreadLocalContextManager} swap is a plain field
+ * write. That asymmetry is the real cost of each manager's swap operation, not scaffolding.
+ */
+ @Benchmark
+ public void swapContexts(ThreadState thread) {
+ Context ctx = thread.nextContext(contexts);
+ Context previous = manager.swap(ctx);
+ manager.swap(previous);
+ }
+
+ // ── Scenario 4: capture + same-thread resume (continuation baseline) ───────
+
+ /**
+ * Capture the current context as a continuation and immediately resume it on the same thread.
+ * Establishes the allocation and atomic-counter cost of the continuation mechanism without any
+ * cross-thread scheduling overhead.
+ */
+ @Benchmark
+ public void captureThenResumeSameThread(ThreadState thread) {
+ Context ctx = thread.nextContext(contexts);
+ try (ContextScope scope = manager.attach(ctx)) {
+ ContextContinuation cont = manager.capture(ctx);
+ try (ContextScope resumed = cont.resume()) {
+ // context restored on the same thread
+ }
+ }
+ }
+
+ // ── Scenario 5: capture, resume on a platform thread ─────────────────────
+
+ /**
+ * Capture the current context as a continuation and resume it on a pooled platform thread.
+ * Measures cross-thread handoff latency (submit + schedule + execute) for each manager.
+ *
+ * Fewer JMH threads than the default so the platform executor is never saturated.
+ */
+ @Benchmark
+ @Threads(4)
+ public void captureAndResumeOnPlatformThread(ThreadState thread) throws Exception {
+ captureAndResumeOnExecutor(thread, thread.platformExecutor);
+ }
+
+ // ── Scenario 6: capture, resume on a virtual thread ──────────────────────
+
+ /**
+ * Capture the current context as a continuation and resume it on a fixed-pool virtual thread.
+ * Shows how well each manager scales when continuations are used for structured concurrency or
+ * reactive pipelines on virtual threads.
+ */
+ @Benchmark
+ @Threads(4)
+ public void captureAndResumeOnVirtualThread(ThreadState thread) throws Exception {
+ captureAndResumeOnExecutor(thread, thread.virtualExecutor);
+ }
+
+ private void captureAndResumeOnExecutor(ThreadState thread, ExecutorService executor)
+ throws Exception {
+ Context ctx = thread.nextContext(contexts);
+ try (ContextScope scope = manager.attach(ctx)) {
+ ContextContinuation cont = manager.capture(ctx);
+ CompletableFuture.runAsync(
+ () -> {
+ try (ContextScope resumed = cont.resume()) {
+ // context propagated to executor thread
+ }
+ },
+ executor)
+ .get(10, SECONDS);
+ }
+ }
+
+ // ── Scenario 7: fan-out — one held continuation resumed on N virtual threads
+
+ /**
+ * Capture one context, hold the continuation, then fan it out to {@value #FAN_OUT} virtual
+ * threads concurrently. Each virtual thread resumes the same continuation and closes its scope;
+ * only the explicit {@link ContextContinuation#release()} after the barrier completes the
+ * lifecycle.
+ *
+ * This reflects async frameworks that dispatch a single request context to a pool of worker
+ * coroutines / virtual threads.
+ *
+ * Uses {@link Mode#SampleTime} to capture percentile tail latency in addition to the mean.
+ * Warmup and measurement windows are extended because each invocation waits for {@value #FAN_OUT}
+ * round-trips before returning.
+ */
+ @Benchmark
+ @Threads(2)
+ @BenchmarkMode(Mode.SampleTime)
+ @Warmup(iterations = 3, time = 3)
+ @Measurement(iterations = 5, time = 5)
+ public void captureAndFanOutToVirtualThreads(ThreadState thread) throws Exception {
+ Context ctx = thread.nextContext(contexts);
+ try (ContextScope scope = manager.attach(ctx)) {
+ ContextContinuation cont = manager.capture(ctx).hold();
+ Semaphore barrier = thread.fanOutBarrier;
+ for (int i = 0; i < FAN_OUT; i++) {
+ thread.virtualExecutor.execute(
+ () -> {
+ try (ContextScope resumed = cont.resume()) {
+ // each virtual thread sees the same captured context
+ } finally {
+ barrier.release();
+ }
+ });
+ }
+ try {
+ if (!barrier.tryAcquire(FAN_OUT, 10, SECONDS)) {
+ throw new IllegalStateException("fan-out timed out");
+ }
+ } finally {
+ cont.release();
+ }
+ }
+ }
+
+ // ── Scenario 8: read the current context ─────────────────────────────────
+
+ /**
+ * Returns the currently active context. The most frequent operation in any traced application —
+ * called at every instrumented method boundary before reading a span or key.
+ */
+ @Benchmark
+ public Context current(ActiveContextState active) {
+ return manager.current();
+ }
+
+ // ── Scenario 9: read a value from the current context ────────────────────
+
+ /**
+ * Returns a value from the currently active context. The full "read active span" path that
+ * instrumentation executes at every traced method boundary.
+ */
+ @Benchmark
+ public Object currentAndGet(ActiveContextState active) {
+ return manager.current().get(KEY);
+ }
+}
diff --git a/dd-trace-core/src/jmh/java/datadog/context/ContextManagerDepthBenchmark.java b/dd-trace-core/src/jmh/java/datadog/context/ContextManagerDepthBenchmark.java
new file mode 100644
index 00000000000..81c378d54d8
--- /dev/null
+++ b/dd-trace-core/src/jmh/java/datadog/context/ContextManagerDepthBenchmark.java
@@ -0,0 +1,88 @@
+package datadog.context;
+
+import static java.util.concurrent.TimeUnit.MICROSECONDS;
+
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+/**
+ * Benchmarks attaching the same non-root context {@code depth} times then closing all scopes in
+ * LIFO order, isolating the same-context fast-path cost from the general attach/close benchmarks in
+ * {@link ContextManagerBenchmark}.
+ *
+ * In {@link ThreadLocalContextManager} each re-attach after the first returns a no-op scope. In
+ * {@link datadog.trace.core.scopemanager.ContinuableScopeManager} the first attach creates the
+ * scope and subsequent re-attaches increment its reference count; each close decrements it, with
+ * the final close doing the real work.
+ *
+ * Run with:
+ *
+ *
+ * {@code ./gradlew :dd-trace-core:jmh -PjmhIncludes=ContextManagerBenchmark -PjmhJvm=25 -PjmhProfilers=gc}
+ *
+ */
+@State(Scope.Benchmark)
+@Warmup(iterations = 3, time = 1)
+@Measurement(iterations = 5, time = 1)
+@BenchmarkMode(Mode.Throughput)
+@Threads(8)
+@OutputTimeUnit(MICROSECONDS)
+@Fork(value = 1)
+public class ContextManagerBenchmark {
+
+ // ── Constants ──────────────────────────────────────────────────────────────
+
+ // Reflective access to Thread.ofVirtual().factory() (Java 21+).
+ // Used to create fixed-size pools of virtual threads so no new VT is spawned per task.
+ // Falls back to platform threads on older JVMs — the benchmark still runs, but
+ // captureAndResumeOnVirtualThread and captureAndFanOutToVirtualThreads will measure
+ // platform-thread overhead instead.
+ static final boolean VIRTUAL_THREADS_AVAILABLE;
+ static final ThreadFactory VIRTUAL_OR_PLATFORM_FACTORY;
+
+ static {
+ ThreadFactory factory = null;
+ try {
+ Object builder = Thread.class.getMethod("ofVirtual").invoke(null);
+ factory = (ThreadFactory) builder.getClass().getMethod("factory").invoke(builder);
+ } catch (Exception ignored) {
+ }
+ VIRTUAL_THREADS_AVAILABLE = factory != null;
+ VIRTUAL_OR_PLATFORM_FACTORY = factory != null ? factory : Thread::new;
+ }
+
+ // Creates a fixed pool whose threads are virtual (Java 21+) or platform (older JVMs).
+ // Using a fixed pool rather than newVirtualThreadPerTaskExecutor avoids spawning a
+ // fresh virtual thread on every task submission, keeping thread-creation cost out of
+ // the measured critical path.
+ static ExecutorService newFixedVirtualPool(int nThreads) {
+ return Executors.newFixedThreadPool(nThreads, VIRTUAL_OR_PLATFORM_FACTORY);
+ }
+
+ static final ContextKey
+ * {@code ./gradlew :dd-trace-core:jmh -PjmhIncludes=ContextManagerDepthBenchmark -PjmhJvm=25 -PjmhProfilers=gc}
+ *
+ */
+@State(Scope.Benchmark)
+@Warmup(iterations = 3, time = 1)
+@Measurement(iterations = 5, time = 1)
+@BenchmarkMode(Mode.Throughput)
+@Threads(8)
+@OutputTimeUnit(MICROSECONDS)
+@Fork(value = 1)
+public class ContextManagerDepthBenchmark {
+
+ /**
+ * Which {@link ContextManager} implementation to benchmark.
+ *
+ * @see ContextManagerBenchmark#managerType
+ */
+ @Param({"ThreadLocal", "Continuable"})
+ public String managerType;
+
+ @Param({"1", "4", "8", "100"})
+ public int depth;
+
+ ContextManager manager;
+ Context[] contexts;
+
+ @Setup
+ public void setup() {
+ manager = ContextManagerBenchmark.createManager(managerType);
+ contexts = ContextManagerBenchmark.createContexts();
+ }
+
+ @State(Scope.Thread)
+ public static class ThreadState {
+ final ContextScope[] scopes = new ContextScope[100];
+
+ int nextContextIndex;
+
+ Context nextContext(Context[] contexts) {
+ return contexts[(nextContextIndex++) & (ContextManagerBenchmark.CONTEXT_COUNT - 1)];
+ }
+ }
+
+ // ── Benchmark ─────────────────────────────────────────────────────────────
+
+ /** Attach the same context {@code depth} times then close all scopes in LIFO order. */
+ @Benchmark
+ public void attachSameContextDepth(ThreadState thread) {
+ Context ctx = thread.nextContext(contexts);
+ ContextScope[] scopes = thread.scopes;
+ for (int i = 0; i < depth; i++) {
+ scopes[i] = manager.attach(ctx);
+ }
+ for (int i = depth - 1; i >= 0; i--) {
+ scopes[i].close();
+ }
+ }
+}
diff --git a/dd-trace-core/src/main/java/datadog/trace/core/scopemanager/ContinuableScopeManager.java b/dd-trace-core/src/main/java/datadog/trace/core/scopemanager/ContinuableScopeManager.java
index 53e4d5b7da6..b62b4ee39f6 100644
--- a/dd-trace-core/src/main/java/datadog/trace/core/scopemanager/ContinuableScopeManager.java
+++ b/dd-trace-core/src/main/java/datadog/trace/core/scopemanager/ContinuableScopeManager.java
@@ -13,6 +13,8 @@
import static java.util.concurrent.TimeUnit.SECONDS;
import datadog.context.Context;
+import datadog.context.ContextContinuation;
+import datadog.context.ContextListener;
import datadog.context.ContextManager;
import datadog.context.ContextScope;
import datadog.logging.RatelimitedLogger;
@@ -409,6 +411,31 @@ public Context swap(Context context) {
return new ScopeContext(oldStack);
}
+ @Override
+ public ContextContinuation capture(Context context) {
+ // respect async propagation flag for Context.current().capture()
+ ContinuableScope activeScope = scopeStack().active();
+ if (activeScope != null
+ && activeScope.context == context
+ && !activeScope.isAsyncPropagating()) {
+ return AgentTracer.noopContinuation();
+ }
+ AgentSpan span = AgentSpan.fromContext(context);
+ AgentTraceCollector traceCollector;
+ if (span != null) {
+ traceCollector = span.context().getTraceCollector();
+ } else {
+ traceCollector = AgentTracer.NoopAgentTraceCollector.INSTANCE;
+ }
+ return new ScopeContinuation(this, context, CONTEXT, traceCollector).register();
+ }
+
+ @Override
+ public void addListener(ContextListener unused) {
+ // this new API is not expected to be used in legacy mode...
+ log.warn("Unexpected call to ContextManager.addListener(...)");
+ }
+
static final class ScopeStackThreadLocal extends ThreadLocal