diff --git a/components/context/src/main/java/datadog/context/Context.java b/components/context/src/main/java/datadog/context/Context.java index 967feb86fe9..34a07524de8 100644 --- a/components/context/src/main/java/datadog/context/Context.java +++ b/components/context/src/main/java/datadog/context/Context.java @@ -101,6 +101,15 @@ static Context detachFrom(Object carrier) { return binder().detachFrom(carrier); } + /** + * Captures this (attached) context so it can be resumed in another execution unit. + * + * @return continuation capturing this context. + */ + default ContextContinuation capture() { + return manager().capture(this); + } + /** * Gets the value stored in this context under the given key. * @@ -159,4 +168,13 @@ default Context with(@Nullable ImplicitContextKeyed value) { } return value.storeInto(this); } + + /** + * Wraps context as a scope without attaching it to the current execution unit. + * + * @return a scope that has no effect on execution units. + */ + default ContextScope asScope() { + return new NoopContextScope(this); + } } diff --git a/components/context/src/main/java/datadog/context/ContextContinuation.java b/components/context/src/main/java/datadog/context/ContextContinuation.java new file mode 100644 index 00000000000..c516a7b35bf --- /dev/null +++ b/components/context/src/main/java/datadog/context/ContextContinuation.java @@ -0,0 +1,79 @@ +package datadog.context; + +/** + * Captures a context attached to one execution unit so it can be resumed in another. + * + *

To propagate context to a single background task: + * + *

{@code
+ * ContextContinuation continuation = Context.current().capture();
+ * executor.execute(() -> {
+ *   try (ContextScope scope = continuation.resume()) {
+ *     // ... Context.current() here returns the captured context
+ *   }
+ *   // context implicitly released from continuation when resumed scope closes
+ * });
+ * }
+ * + *

If a continuation is never resumed (e.g. a task is cancelled before it runs), you must release + * it explicitly to avoid resource leaks: + * + *

{@code
+ * ContextContinuation continuation = Context.current().capture();
+ * Future future = executor.submit(() -> {
+ *   try (ContextScope scope = continuation.resume()) {
+ *     // ...
+ *   }
+ * });
+ * // ...
+ * if (future.cancel(false)) {
+ *   continuation.release(); // task will never resume, so release manually
+ * }
+ * }
+ * + *

When the same context is resumed concurrently across multiple threads, call {@link #hold()} + * immediately after capture to prevent the first {@link #resume()} from releasing the context: + * + *

{@code
+ * ContextContinuation continuation = Context.current().capture().hold();
+ * for (int i = 0; i < N; i++) {
+ *   executor.execute(() -> {
+ *     try (ContextScope scope = continuation.resume()) {
+ *       // ...
+ *     }
+ *   });
+ * }
+ * // ...
+ * continuation.release(); // remember to release the hold once all tasks are resumed/done
+ * }
+ */ +public interface ContextContinuation { + + /** + * Optional builder method to stop {@link #resume()} from implicitly releasing the captured + * context. This is useful when multiple threads may concurrently resume the context. You must + * then explicitly {@link #release() release} the context once all threads are resumed/done. + * + * @return this continuation, but with implicit release-after-resume turned off. + */ + ContextContinuation hold(); + + /** + * Returns the context captured by this continuation. + * + * @return the captured context. + */ + Context context(); + + /** + * Resumes the context captured by this continuation by attaching it to the current execution + * unit. Implicitly {@link #release() releases} the captured context at the end of the resumed + * scope, unless {@link #hold()} was called when creating the continuation. + * + * @return a scope to be closed when the resumed context is invalid. + */ + ContextScope resume(); + + /** Explicitly releases the context captured by this continuation. */ + void release(); +} diff --git a/components/context/src/main/java/datadog/context/ContextListener.java b/components/context/src/main/java/datadog/context/ContextListener.java new file mode 100644 index 00000000000..1bf06a7e974 --- /dev/null +++ b/components/context/src/main/java/datadog/context/ContextListener.java @@ -0,0 +1,33 @@ +package datadog.context; + +/** Listener of context events. */ +public interface ContextListener { + + /** + * Notifies that the given context has been attached to the current execution unit. + * + * @param context the attached context. + */ + default void onAttach(Context context) {} + + /** + * Notifies that the given context has been detached from the current execution unit. + * + * @param context the detached context. + */ + default void onDetach(Context context) {} + + /** + * Notifies that the given context has been captured by a continuation. + * + * @param context the captured context. + */ + default void onCapture(Context context) {} + + /** + * Notifies that the given context has been released from a continuation. + * + * @param context the released context. + */ + default void onRelease(Context context) {} +} diff --git a/components/context/src/main/java/datadog/context/ContextManager.java b/components/context/src/main/java/datadog/context/ContextManager.java index af0a2b9289a..6828352d1d4 100644 --- a/components/context/src/main/java/datadog/context/ContextManager.java +++ b/components/context/src/main/java/datadog/context/ContextManager.java @@ -1,5 +1,7 @@ package datadog.context; +import static datadog.context.ContextProviders.manager; + /** Manages context across execution units. */ public interface ContextManager { /** @@ -25,6 +27,29 @@ public interface ContextManager { */ Context swap(Context context); + /** + * Captures the given (attached) context so it can be resumed in another execution unit. + * + * @return continuation capturing the context. + */ + ContextContinuation capture(Context context); + + /** + * Registers the given listener to receive context events. + * + * @param listener the listener to register + */ + void addListener(ContextListener listener); + + /** + * Registers the given listener to receive context events. + * + * @param listener the listener to register. + */ + static void register(ContextListener listener) { + manager().addListener(listener); + } + /** * Requests use of a custom {@link ContextManager}. * diff --git a/components/context/src/main/java/datadog/context/EmptyContext.java b/components/context/src/main/java/datadog/context/EmptyContext.java index 20023482647..e9fee391373 100644 --- a/components/context/src/main/java/datadog/context/EmptyContext.java +++ b/components/context/src/main/java/datadog/context/EmptyContext.java @@ -3,13 +3,13 @@ import static java.util.Objects.requireNonNull; import javax.annotation.Nullable; -import javax.annotation.ParametersAreNonnullByDefault; /** {@link Context} containing no values. */ -@ParametersAreNonnullByDefault -final class EmptyContext implements Context { +final class EmptyContext implements SelfScopedContext { static final Context INSTANCE = new EmptyContext(); + private EmptyContext() {} + @Override @Nullable public T get(ContextKey key) { diff --git a/components/context/src/main/java/datadog/context/ImplicitContextKeyed.java b/components/context/src/main/java/datadog/context/ImplicitContextKeyed.java index a852a19c01b..cdc4107d01a 100644 --- a/components/context/src/main/java/datadog/context/ImplicitContextKeyed.java +++ b/components/context/src/main/java/datadog/context/ImplicitContextKeyed.java @@ -1,9 +1,6 @@ package datadog.context; -import javax.annotation.ParametersAreNonnullByDefault; - /** {@link Context} value that has its own implicit {@link ContextKey}. */ -@ParametersAreNonnullByDefault public interface ImplicitContextKeyed { /** * Creates a new context with this value under its chosen key. diff --git a/components/context/src/main/java/datadog/context/IndexedContext.java b/components/context/src/main/java/datadog/context/IndexedContext.java index e2c520fffdb..ff9acb33e57 100644 --- a/components/context/src/main/java/datadog/context/IndexedContext.java +++ b/components/context/src/main/java/datadog/context/IndexedContext.java @@ -6,11 +6,9 @@ import java.util.Arrays; import javax.annotation.Nullable; -import javax.annotation.ParametersAreNonnullByDefault; /** {@link Context} containing many values. */ -@ParametersAreNonnullByDefault -final class IndexedContext implements Context { +final class IndexedContext implements SelfScopedContext { final Object[] store; IndexedContext(Object[] store) { diff --git a/components/context/src/main/java/datadog/context/NoopContextContinuation.java b/components/context/src/main/java/datadog/context/NoopContextContinuation.java new file mode 100644 index 00000000000..02ede9b862d --- /dev/null +++ b/components/context/src/main/java/datadog/context/NoopContextContinuation.java @@ -0,0 +1,31 @@ +package datadog.context; + +/** {@link ContextContinuation} that has no effect on execution units. */ +final class NoopContextContinuation implements ContextContinuation, ContextScope { + private final Context context; + + NoopContextContinuation(Context context) { + this.context = context; + } + + @Override + public ContextContinuation hold() { + return this; + } + + @Override + public Context context() { + return context; + } + + @Override + public ContextScope resume() { + return this; // acts as no-op scope, avoiding allocation + } + + @Override + public void release() {} + + @Override + public void close() {} +} diff --git a/components/context/src/main/java/datadog/context/NoopContextScope.java b/components/context/src/main/java/datadog/context/NoopContextScope.java new file mode 100644 index 00000000000..648221036c5 --- /dev/null +++ b/components/context/src/main/java/datadog/context/NoopContextScope.java @@ -0,0 +1,18 @@ +package datadog.context; + +/** {@link ContextScope} that has no effect on execution units. */ +final class NoopContextScope implements ContextScope { + private final Context context; + + NoopContextScope(Context context) { + this.context = context; + } + + @Override + public Context context() { + return context; + } + + @Override + public void close() {} +} diff --git a/components/context/src/main/java/datadog/context/SelfScopedContext.java b/components/context/src/main/java/datadog/context/SelfScopedContext.java new file mode 100644 index 00000000000..d8b201d2823 --- /dev/null +++ b/components/context/src/main/java/datadog/context/SelfScopedContext.java @@ -0,0 +1,17 @@ +package datadog.context; + +/** Context that acts as its own unattached scope. */ +interface SelfScopedContext extends Context, ContextScope { + @Override + default ContextScope asScope() { + return this; // acts as no-op scope, avoiding allocation + } + + @Override + default Context context() { + return this; + } + + @Override + default void close() {} +} diff --git a/components/context/src/main/java/datadog/context/SingletonContext.java b/components/context/src/main/java/datadog/context/SingletonContext.java index 94c9ff091fc..773c26b1fa3 100644 --- a/components/context/src/main/java/datadog/context/SingletonContext.java +++ b/components/context/src/main/java/datadog/context/SingletonContext.java @@ -5,11 +5,9 @@ import java.util.Objects; import javax.annotation.Nullable; -import javax.annotation.ParametersAreNonnullByDefault; /** {@link Context} containing a single value. */ -@ParametersAreNonnullByDefault -final class SingletonContext implements Context { +final class SingletonContext implements SelfScopedContext { final int index; final Object value; diff --git a/components/context/src/main/java/datadog/context/TestContextManager.java b/components/context/src/main/java/datadog/context/TestContextManager.java index 9c60f4dc2e0..a0b1302653c 100644 --- a/components/context/src/main/java/datadog/context/TestContextManager.java +++ b/components/context/src/main/java/datadog/context/TestContextManager.java @@ -27,6 +27,20 @@ public Context swap(Context context) { return delegate().swap(context); } + @Override + public ContextContinuation capture(Context context) { + return delegate().capture(context); + } + + @Override + public void addListener(ContextListener listener) { + delegate().addListener(listener); + } + + static void clearListeners() { + ThreadLocalContextManager.INSTANCE.clearListeners(); + } + private static ContextManager delegate() { ContextManager delegate = ContextProviders.customManager; if (delegate == TEST_INSTANCE) { diff --git a/components/context/src/main/java/datadog/context/ThreadLocalContextManager.java b/components/context/src/main/java/datadog/context/ThreadLocalContextManager.java index 0d652868e52..77347408a02 100644 --- a/components/context/src/main/java/datadog/context/ThreadLocalContextManager.java +++ b/components/context/src/main/java/datadog/context/ThreadLocalContextManager.java @@ -1,45 +1,297 @@ package datadog.context; +import java.util.Arrays; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import javax.annotation.Nullable; + /** {@link ContextManager} that uses a {@link ThreadLocal} to track context per thread. */ final class ThreadLocalContextManager implements ContextManager { - static final ContextManager INSTANCE = new ThreadLocalContextManager(); + static final ThreadLocalContextManager INSTANCE = new ThreadLocalContextManager(); + + private static final NoopContextContinuation ROOT_CONTINUATION = + new NoopContextContinuation(Context.root()); + + private static final ThreadLocal CONTEXT_HOLDER = + ThreadLocal.withInitial(ContextHolder::new); - private static final ThreadLocal CURRENT_HOLDER = - ThreadLocal.withInitial(() -> new Context[] {EmptyContext.INSTANCE}); + private final Object listenersWriteLock = new Object(); + volatile ContextListener[] listeners = {}; @Override public Context current() { - return CURRENT_HOLDER.get()[0]; + return CONTEXT_HOLDER.get().current; } @Override public ContextScope attach(Context context) { - Context[] holder = CURRENT_HOLDER.get(); - Context previous = holder[0]; - holder[0] = context; - return new ContextScope() { - private boolean closed; - - @Override - public Context context() { - return context; - } - - @Override - public void close() { - if (!closed && context == holder[0]) { - holder[0] = previous; - closed = true; - } + return doAttach(context, null); + } + + ContextScope doAttach(Context context, @Nullable ContextContinuationImpl continuation) { + ContextHolder holder = CONTEXT_HOLDER.get(); + + Context previous = holder.current; + if (context == previous) { + if (continuation != null) { + // already attached, safe to release early to avoid resource leak + continuation.releaseOnScopeClose(); + return continuation; // acts as no-op scope, avoiding allocation } - }; + return context.asScope(); // convert to scope without attaching + } + + ContextListener[] ls = listeners; + notifyDetach(previous, ls); + holder.current = context; + notifyAttach(context, ls); + + if (continuation == null) { + return new ContextScopeImpl(context, holder, previous); + } else { + return new ResumedScopeImpl(context, holder, previous, continuation); + } } @Override public Context swap(Context context) { - Context[] holder = CURRENT_HOLDER.get(); - Context previous = holder[0]; - holder[0] = context; + ContextHolder holder = CONTEXT_HOLDER.get(); + + Context previous = holder.current; + if (context == previous) { + return previous; + } + + ContextListener[] ls = listeners; + notifyDetach(previous, ls); + holder.current = context; + notifyAttach(context, ls); + return previous; } + + @Override + public ContextContinuation capture(Context context) { + if (context == Context.root()) { + return ROOT_CONTINUATION; + } else { + return new ContextContinuationImpl(context); + } + } + + @Override + public void addListener(ContextListener listener) { + synchronized (listenersWriteLock) { + for (ContextListener l : listeners) { + if (l == listener) { + return; + } + } + int oldLength = listeners.length; + ContextListener[] update = Arrays.copyOf(listeners, oldLength + 1); + update[oldLength] = listener; + listeners = update; + } + } + + void clearListeners() { + synchronized (listenersWriteLock) { + listeners = new ContextListener[] {}; + } + } + + static void notifyAttach(Context context, ContextListener[] listeners) { + if (context == Context.root()) { + return; // don't emit attach events for the default "no context" case + } + for (ContextListener l : listeners) { + try { + l.onAttach(context); + } catch (Throwable ignore) { + } + } + } + + static void notifyDetach(Context context, ContextListener[] listeners) { + if (context == Context.root()) { + return; // don't emit detach events for the default "no context" case + } + for (ContextListener l : listeners) { + try { + l.onDetach(context); + } catch (Throwable ignore) { + } + } + } + + static void notifyCapture(Context context, ContextListener[] listeners) { + // only called for non-empty continuations + for (ContextListener l : listeners) { + try { + l.onCapture(context); + } catch (Throwable ignore) { + } + } + } + + static void notifyRelease(Context context, ContextListener[] listeners) { + // only called for non-empty continuations + for (ContextListener l : listeners) { + try { + l.onRelease(context); + } catch (Throwable ignore) { + } + } + } + + private static class ContextScopeImpl implements ContextScope { + + private final Context context; + private final ContextHolder holder; + private final Context previous; + + private boolean closed; + + ContextScopeImpl(Context context, ContextHolder holder, Context previous) { + this.context = context; + this.holder = holder; + this.previous = previous; + } + + @Override + public final Context context() { + return context; + } + + @Override + public void close() { + // check for out-of-order close to avoid corrupting the current state + if (!closed && context == holder.current) { + ContextListener[] ls = INSTANCE.listeners; + notifyDetach(context, ls); + holder.current = previous; + notifyAttach(previous, ls); + closed = true; + } + } + } + + private static final class ResumedScopeImpl extends ContextScopeImpl { + @Nullable private ContextContinuationImpl continuation; + + ResumedScopeImpl( + Context context, + ContextHolder holder, + Context previous, + @Nullable ContextContinuationImpl continuation) { + super(context, holder, previous); + this.continuation = continuation; + } + + @Override + public void close() { + if (continuation != null) { + // release first to avoid resource leak, even on out-of-order close + continuation.releaseOnScopeClose(); + continuation = null; + } + super.close(); // proceed to try and update the current execution unit + } + } + + private static final class ContextContinuationImpl implements ContextContinuation, ContextScope { + + private static final AtomicIntegerFieldUpdater COUNT = + AtomicIntegerFieldUpdater.newUpdater(ContextContinuationImpl.class, "count"); + + // these boundaries were selected to allow for speculative counting and fuzzy checks + private static final int RELEASED = Integer.MIN_VALUE >> 1; + private static final int HELD = (Integer.MAX_VALUE >> 1) + 1; + + private final Context context; + + /** + * When positive this reflects the number of outstanding resumed scopes as well as whether there + * is an active hold on the continuation: + * + * + * + * + * + * + * + *
Value Meaning
0Not held or resumed
1..HELD-1Resumed, not held
HELDHeld, not yet resumed
HELD..MAX_INTResumed and held
+ * + * where HELD is at the mid-point between 1 and MAX_INT. + * + *

A negative value of RELEASED reflects that the continuation has either been resumed and + * all associated scopes are now closed, or it has been explicitly released. This value was + * chosen to be half the size of MIN_INT to avoid speculative additions in {@link #resume()} + * from overflowing to a positive count. + */ + private volatile int count = 0; + + ContextContinuationImpl(Context context) { + this.context = context; + notifyCapture(context, INSTANCE.listeners); + } + + @Override + public ContextContinuation hold() { + // update initial count to record that this continuation has a hold + COUNT.compareAndSet(this, 0, HELD); + return this; + } + + @Override + public Context context() { + return context; + } + + @Override + public ContextScope resume() { + if (COUNT.incrementAndGet(this) > 0) { + // speculative update succeeded, continuation can be resumed + return INSTANCE.doAttach(context, this); + } else { + // continuation released or too many resumes; rollback count + COUNT.decrementAndGet(this); + return this; // acts as no-op scope, avoiding allocation + } + } + + @Override + public void release() { + int current = count; + while (current >= HELD) { + // remove the hold on this continuation by removing the offset + COUNT.compareAndSet(this, current, current - HELD); + current = count; + } + while (current == 0) { + // no outstanding resumes and hold has been removed + if (COUNT.compareAndSet(this, current, RELEASED)) { + notifyRelease(context, INSTANCE.listeners); + return; + } + current = count; + } + } + + void releaseOnScopeClose() { + if (COUNT.compareAndSet(this, 1, RELEASED)) { + // fast path: only one resume of the continuation (no hold) + notifyRelease(context, INSTANCE.listeners); + } else if (COUNT.decrementAndGet(this) == 0) { + // slow path: multiple resumes, all scopes now closed (no hold) + release(); + } /* else there are outstanding resumes or hold is in place */ + } + + @Override + public void close() {} + } + + private static final class ContextHolder { + Context current = Context.root(); + } } diff --git a/components/context/src/main/java/datadog/context/WeakMapContextBinder.java b/components/context/src/main/java/datadog/context/WeakMapContextBinder.java index 9b5fd24299e..15e0154f25a 100644 --- a/components/context/src/main/java/datadog/context/WeakMapContextBinder.java +++ b/components/context/src/main/java/datadog/context/WeakMapContextBinder.java @@ -6,10 +6,8 @@ import java.util.Map; import java.util.WeakHashMap; -import javax.annotation.ParametersAreNonnullByDefault; /** {@link ContextBinder} that uses a global weak map of carriers to contexts. */ -@ParametersAreNonnullByDefault final class WeakMapContextBinder implements ContextBinder { static final ContextBinder INSTANCE = new WeakMapContextBinder(); diff --git a/components/context/src/test/java/datadog/context/ContextBinderTest.java b/components/context/src/test/java/datadog/context/ContextBinderTest.java index 9af265fea50..eb2bb35563e 100644 --- a/components/context/src/test/java/datadog/context/ContextBinderTest.java +++ b/components/context/src/test/java/datadog/context/ContextBinderTest.java @@ -8,15 +8,9 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -class ContextBinderTest { - @BeforeEach - void setUp() { - assertEquals(root(), current(), "No context is expected to be set"); - } - +class ContextBinderTest extends ContextTestBase { @Test void testAttachAndDetach() { // Setting up test diff --git a/components/context/src/test/java/datadog/context/ContextContinuationTest.java b/components/context/src/test/java/datadog/context/ContextContinuationTest.java new file mode 100644 index 00000000000..0b6508220bf --- /dev/null +++ b/components/context/src/test/java/datadog/context/ContextContinuationTest.java @@ -0,0 +1,368 @@ +package datadog.context; + +import static datadog.context.Context.current; +import static datadog.context.Context.root; +import static java.util.Arrays.asList; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import javax.annotation.ParametersAreNonnullByDefault; +import org.junit.jupiter.api.Test; + +@ParametersAreNonnullByDefault +class ContextContinuationTest extends ContextTestBase { + private static final ContextKey CONTINUATION_KEY = ContextKey.named("continuation-key"); + + @Test + void testCaptureRootContextIsNoop() { + ContextContinuation continuation = root().capture(); + assertEquals(root(), continuation.context()); + assertSame(continuation, continuation.hold()); // hold is a no-op, returns self + try (ContextScope scope = continuation.resume()) { + assertEquals(root(), current()); // nothing changes for root + } + assertEquals(root(), current()); + continuation.release(); // no-op + } + + @Test + void testCaptureStoresContext() { + Context context = root().with(CONTINUATION_KEY, "captured"); + try (ContextScope scope = context.attach()) { + ContextContinuation continuation = context.capture(); + assertEquals(context, continuation.context()); + continuation.release(); + } + } + + @Test + void testCaptureFiresOnCaptureEvent() { + List events = new ArrayList<>(); + ContextManager.register(trackingListener(events)); + Context context = root().with(CONTINUATION_KEY, "value"); + try (ContextScope scope = context.attach()) { + ContextContinuation continuation = + context.capture(); // capture while active (recommended pattern) + assertEquals(asList("attach", "capture"), events); + continuation.release(); + } + assertEquals(asList("attach", "capture", "release", "detach"), events); + } + + @Test + void testResumeAttachesContextAndRestoresPreviousOnClose() { + Context context = root().with(CONTINUATION_KEY, "value"); + ContextContinuation continuation; + try (ContextScope scope = context.attach()) { + continuation = context.capture(); // capture while active (recommended pattern) + } + // original scope is closed; resume the continuation here (same or different thread) + try (ContextScope scope = continuation.resume()) { + assertEquals(context, current()); + assertEquals(context, scope.context()); + } + assertEquals(root(), current()); + } + + @Test + void testResumeAndScopeCloseFiresLifecycleEvents() { + List events = new ArrayList<>(); + ContextManager.register(trackingListener(events)); + Context context = root().with(CONTINUATION_KEY, "value"); + ContextContinuation continuation; + try (ContextScope scope = context.attach()) { + continuation = context.capture(); // capture while active + } + assertEquals(asList("attach", "capture", "detach"), events); + try (ContextScope scope = continuation.resume()) { + assertEquals(asList("attach", "capture", "detach", "attach"), events); + } + // release fires before detach (continuation is released first inside ContextScopeImpl.close) + assertEquals(asList("attach", "capture", "detach", "attach", "release", "detach"), events); + } + + @Test + void testHoldPreventsAutoReleaseOnScopeClose() { + List events = new ArrayList<>(); + ContextManager.register(trackingListener(events)); + Context context = root().with(CONTINUATION_KEY, "value"); + ContextContinuation continuation; + try (ContextScope scope = context.attach()) { + continuation = context.capture(); // capture while active + continuation.hold(); + } + try (ContextScope scope = continuation.resume()) { + assertEquals(context, current()); + } + assertEquals(root(), current()); + assertEquals( + asList("attach", "capture", "detach", "attach", "detach"), + events, + "release should not fire while hold is active"); + continuation.release(); + assertEquals(asList("attach", "capture", "detach", "attach", "detach", "release"), events); + } + + @Test + void testExplicitReleaseWithoutResumeFiresReleaseEvent() { + List events = new ArrayList<>(); + ContextManager.register(trackingListener(events)); + Context context = root().with(CONTINUATION_KEY, "value"); + ContextContinuation continuation; + try (ContextScope scope = context.attach()) { + continuation = context.capture(); // capture while active + } + assertEquals(asList("attach", "capture", "detach"), events); + continuation.release(); + assertEquals(asList("attach", "capture", "detach", "release"), events); + } + + @Test + void testResumeAfterReleaseIsNoop() { + Context context = root().with(CONTINUATION_KEY, "value"); + ContextContinuation continuation; + try (ContextScope scope = context.attach()) { + continuation = context.capture(); // capture while active + } + continuation.release(); + // Resuming a released continuation should not attach the context + try (ContextScope scope = continuation.resume()) { + assertEquals(root(), current()); + } + assertEquals(root(), current()); + } + + @Test + void testResumeOnDifferentThread() { + Context context = root().with(CONTINUATION_KEY, "value"); + ContextContinuation continuation; + try (ContextScope scope = context.attach()) { + continuation = context.capture(); // capture while active (recommended pattern) + } + // original scope is closed; resume the context on another thread + ExecutorService executor = Executors.newSingleThreadExecutor(); + try { + Future future = + executor.submit( + () -> { + assertEquals(root(), current()); // thread starts with root context + try (ContextScope scope = continuation.resume()) { + assertEquals(context, current()); + } + assertEquals(root(), current()); // restored after scope close + }); + assertDoesNotThrow(() -> future.get()); + } finally { + executor.shutdown(); + } + } + + @Test + void testMultipleResumesReleaseAfterLastScopeCloses() throws InterruptedException { + List events = Collections.synchronizedList(new ArrayList<>()); + ContextManager.register( + new ContextListener() { + @Override + public void onRelease(Context c) { + events.add("release"); + } + }); + Context context = root().with(CONTINUATION_KEY, "value"); + ContextContinuation continuation; + try (ContextScope scope = context.attach()) { + continuation = context.capture(); // capture while active + } + CountDownLatch bothResumed = new CountDownLatch(2); + CountDownLatch closeFirst = new CountDownLatch(1); + CountDownLatch firstClosed = new CountDownLatch(1); + CountDownLatch closeSecond = new CountDownLatch(1); + ExecutorService executor = Executors.newFixedThreadPool(2); + try { + Future f1 = + executor.submit( + () -> { + try (ContextScope scope = continuation.resume()) { + bothResumed.countDown(); + closeFirst.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + firstClosed.countDown(); + } + }); + Future f2 = + executor.submit( + () -> { + try (ContextScope scope = continuation.resume()) { + bothResumed.countDown(); + closeSecond.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + bothResumed.await(); + assertTrue(events.isEmpty(), "release should not fire while scopes are open"); + closeFirst.countDown(); + firstClosed.await(); // wait for f1's scope to fully close + assertTrue(events.isEmpty(), "release should not fire after first scope closes"); + closeSecond.countDown(); + assertDoesNotThrow(() -> f1.get()); + assertDoesNotThrow(() -> f2.get()); + assertEquals(asList("release"), events); + } finally { + executor.shutdown(); + } + } + + @Test + void testSameContextResumeReleasesImmediately() { + List events = new ArrayList<>(); + ContextManager.register(trackingListener(events)); + Context context = root().with(CONTINUATION_KEY, "value"); + try (ContextScope outer = context.attach()) { + // Context is already current; resume is a noop and continuation is released immediately + ContextContinuation continuation = context.capture(); + try (ContextScope noop = continuation.resume()) { + assertEquals(context, current()); + assertEquals(asList("attach", "capture", "release"), events); // released synchronously + } + assertEquals(context, current()); // outer scope still holds context + } + assertEquals(asList("attach", "capture", "release", "detach"), events); + } + + @Test + void testOutOfOrderScopeCloseReleasesImmediately() { + // Recommended pattern: attach C, capture, close original scope + Context contextC = root().with(CONTINUATION_KEY, "C"); + ContextContinuation continuation; + try (ContextScope scope = contextC.attach()) { + continuation = contextC.capture(); + } + + List events = new ArrayList<>(); + ContextManager.register(keyedTrackingListener(events, CONTINUATION_KEY)); + + Context contextD = root().with(CONTINUATION_KEY, "D"); + try (ContextScope scopeR = continuation.resume()) { + assertEquals(contextC, current()); + try (ContextScope scopeD = contextD.attach()) { // attaching D fires detach:C, attach:D + assertEquals(contextD, current()); + + // close the resume scope out-of-order while D is still nested on top; + // release fires immediately, but detach:C does not (C is not current) + scopeR.close(); + assertEquals(asList("attach:C", "detach:C", "attach:D", "release:C"), events); + assertEquals(contextD, current()); // D is still current + } // scopeD closes here: unwind D normally, restores C + assertEquals( + asList("attach:C", "detach:C", "attach:D", "release:C", "detach:D", "attach:C"), events); + } // try-with-resources closes scopeR again; no second release, C unwinds to root + + assertEquals(root(), current()); + assertEquals( + asList("attach:C", "detach:C", "attach:D", "release:C", "detach:D", "attach:C", "detach:C"), + events); + } + + @Test + void testHoldWithOutOfOrderScopeCloseFiresReleaseOnExplicitRelease() { + // Regression test: hold() + out-of-order close must not corrupt the count, + // which would cause release() to silently no-op and lose the release event. + Context contextC = root().with(CONTINUATION_KEY, "C"); + ContextContinuation continuation; + try (ContextScope scope = contextC.attach()) { + continuation = contextC.capture(); + continuation.hold(); + } + + List events = new ArrayList<>(); + ContextManager.register(keyedTrackingListener(events, CONTINUATION_KEY)); + + Context contextD = root().with(CONTINUATION_KEY, "D"); + try (ContextScope scopeR = continuation.resume()) { + assertEquals(contextC, current()); + try (ContextScope scopeD = contextD.attach()) { // detach:C, attach:D + assertEquals(contextD, current()); + + scopeR.close(); // out-of-order close while D is still on top; hold prevents auto-release + assertEquals(asList("attach:C", "detach:C", "attach:D"), events); + assertEquals(contextD, current()); + } // scopeD closes here: unwind D, restores C + } // TWR closes scopeR again (now in-order); detach:C, no release yet (hold is active) + + assertEquals(root(), current()); + assertEquals( + asList("attach:C", "detach:C", "attach:D", "detach:D", "attach:C", "detach:C"), events); + + continuation.release(); // explicit release must fire release:C + assertEquals( + asList("attach:C", "detach:C", "attach:D", "detach:D", "attach:C", "detach:C", "release:C"), + events); + } + + @Test + void testMultipleHoldCallsAreIdempotent() { + // Calling hold() more than once should not require more than one explicit release(). + List events = new ArrayList<>(); + ContextManager.register(trackingListener(events)); + Context context = root().with(CONTINUATION_KEY, "value"); + ContextContinuation continuation; + try (ContextScope scope = context.attach()) { + continuation = context.capture(); + continuation.hold(); + continuation.hold(); // second hold must be a no-op + } + // One explicit release() is enough — no extra releases needed for the second hold(). + continuation.release(); + assertEquals(asList("attach", "capture", "detach", "release"), events); + continuation.release(); // still idempotent after the final release + assertEquals(asList("attach", "capture", "detach", "release"), events); + } + + @Test + void testHoldAfterReleaseIsIgnored() { + // hold() on an already-released continuation must not resurrect it. + List events = new ArrayList<>(); + ContextManager.register(trackingListener(events)); + Context context = root().with(CONTINUATION_KEY, "value"); + ContextContinuation continuation; + try (ContextScope scope = context.attach()) { + continuation = context.capture(); + } + continuation.release(); + assertEquals(asList("attach", "capture", "detach", "release"), events); + continuation.hold(); // must be silently ignored + // resume() after release is already a noop, even with the spurious hold() + try (ContextScope scope = continuation.resume()) { + assertEquals(root(), current()); + } + continuation.release(); // must not fire a second release event + assertEquals(asList("attach", "capture", "detach", "release"), events); + } + + @Test + void testHoldAllowsMultipleReleaseCalls() { + List events = new ArrayList<>(); + ContextManager.register(trackingListener(events)); + Context context = root().with(CONTINUATION_KEY, "value"); + ContextContinuation continuation; + try (ContextScope scope = context.attach()) { + continuation = context.capture(); // capture while active + continuation.hold(); + } + continuation.release(); + assertEquals(asList("attach", "capture", "detach", "release"), events); + continuation.release(); // second release is a no-op + assertEquals(asList("attach", "capture", "detach", "release"), events); + } +} diff --git a/components/context/src/test/java/datadog/context/ContextListenerEventTest.java b/components/context/src/test/java/datadog/context/ContextListenerEventTest.java new file mode 100644 index 00000000000..cce5d10f167 --- /dev/null +++ b/components/context/src/test/java/datadog/context/ContextListenerEventTest.java @@ -0,0 +1,104 @@ +package datadog.context; + +import static datadog.context.Context.current; +import static datadog.context.Context.root; +import static datadog.context.ContextTest.STRING_KEY; +import static java.util.Arrays.asList; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.ArrayList; +import java.util.List; +import org.junit.jupiter.api.Test; + +class ContextListenerEventTest extends ContextTestBase { + @Test + void testListenersNotifiedOnAttachAndDetach() { + List events = new ArrayList<>(); + ContextManager.register(keyedTrackingListener(events, STRING_KEY)); + Context context = root().with(STRING_KEY, "value"); + try (ContextScope scope = context.attach()) { + assertEquals(asList("attach:value"), events); + } + assertEquals(asList("attach:value", "detach:value"), events); + } + + @Test + void testListenersNotNotifiedForRootContext() { + List events = new ArrayList<>(); + ContextManager.register(trackingListener(events)); + root().attach(); // current is already root, no events + assertTrue(events.isEmpty(), "root attach should not trigger listeners"); + root().swap(); // current is already root, no events + assertTrue(events.isEmpty(), "root swap should not trigger listeners"); + Context context = root().with(STRING_KEY, "value"); + try (ContextScope scope = context.attach()) { + assertEquals(1, events.size()); // attach:non-root only + } + assertEquals(2, events.size()); // detach:non-root but not attach:root + } + + @Test + void testListenersNotNotifiedOnSameContextAttach() { + List events = new ArrayList<>(); + ContextManager.register(trackingListener(events)); + Context context = root().with(STRING_KEY, "same"); + try (ContextScope outer = context.attach()) { + assertEquals(asList("attach"), events); + try (ContextScope noop = context.attach()) { + assertEquals(context, current()); + assertEquals(asList("attach"), events); // no new events on same-context attach + } + assertEquals(asList("attach"), events); // noop close fires no events either + } + assertEquals(asList("attach", "detach"), events); + } + + @Test + void testListenersNotNotifiedOnSameContextSwap() { + List events = new ArrayList<>(); + ContextManager.register(trackingListener(events)); + Context context = root().with(STRING_KEY, "same"); + context.swap(); + assertEquals(asList("attach"), events); + context.swap(); // same context again, no events + assertEquals(asList("attach"), events); + root().swap(); + assertEquals(asList("attach", "detach"), events); + } + + @Test + void testDuplicateListenerIgnored() { + List events = new ArrayList<>(); + ContextListener listener = trackingListener(events); + ContextManager.register(listener); + ContextManager.register(listener); // should be ignored + try (ContextScope scope = root().with(STRING_KEY, "value").attach()) {} + assertEquals(asList("attach", "detach"), events); + } + + @Test + void testMultipleListenersAllNotified() { + List events1 = new ArrayList<>(); + List events2 = new ArrayList<>(); + ContextManager.register(trackingListener(events1)); + ContextManager.register(trackingListener(events2)); + try (ContextScope scope = root().with(STRING_KEY, "value").attach()) {} + assertEquals(asList("attach", "detach"), events1); + assertEquals(asList("attach", "detach"), events2); + } + + @Test + void testSwapNotifiesListeners() { + List events = new ArrayList<>(); + ContextManager.register(keyedTrackingListener(events, STRING_KEY)); + Context context = root().with(STRING_KEY, "value"); + Context previous = context.swap(); + assertSame(root(), previous); + assertEquals(asList("attach:value"), events); + previous = root().swap(); + assertSame(context, previous); + assertEquals(asList("attach:value", "detach:value"), events); + } +} diff --git a/components/context/src/test/java/datadog/context/ContextListenerExceptionTest.java b/components/context/src/test/java/datadog/context/ContextListenerExceptionTest.java new file mode 100644 index 00000000000..972f3733ddc --- /dev/null +++ b/components/context/src/test/java/datadog/context/ContextListenerExceptionTest.java @@ -0,0 +1,74 @@ +package datadog.context; + +import static datadog.context.Context.current; +import static datadog.context.Context.root; +import static datadog.context.ContextTest.STRING_KEY; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +import javax.annotation.ParametersAreNonnullByDefault; +import org.junit.jupiter.api.Test; + +@ParametersAreNonnullByDefault +class ContextListenerExceptionTest extends ContextTestBase { + @Test + void testListenerExceptionSwallowed() { + ContextManager.register( + new ContextListener() { + @Override + public void onAttach(Context c) { + throw new RuntimeException("listener failure"); + } + + @Override + public void onDetach(Context c) { + throw new RuntimeException("listener failure"); + } + }); + Context context = root().with(STRING_KEY, "value"); + assertDoesNotThrow( + () -> { + try (ContextScope scope = context.attach()) { + assertEquals(context, current()); + } + }); + } + + @Test + void testListenerExceptionSwallowedOnCapture() { + ContextManager.register( + new ContextListener() { + @Override + public void onCapture(Context c) { + throw new RuntimeException("listener failure on capture"); + } + }); + Context context = root().with(STRING_KEY, "value"); + try (ContextScope scope = context.attach()) { + assertDoesNotThrow( + () -> { + ContextContinuation continuation = context.capture(); + assertNotNull(continuation); + assertEquals(context, continuation.context()); + continuation.release(); + }); + } + } + + @Test + void testListenerExceptionSwallowedOnRelease() { + ContextManager.register( + new ContextListener() { + @Override + public void onRelease(Context c) { + throw new RuntimeException("listener failure on release"); + } + }); + Context context = root().with(STRING_KEY, "value"); + try (ContextScope scope = context.attach()) { + ContextContinuation continuation = context.capture(); + assertDoesNotThrow(continuation::release); + } + } +} diff --git a/components/context/src/test/java/datadog/context/ContextManagerTest.java b/components/context/src/test/java/datadog/context/ContextManagerTest.java index d8927abf76d..789f6fc6eaf 100644 --- a/components/context/src/test/java/datadog/context/ContextManagerTest.java +++ b/components/context/src/test/java/datadog/context/ContextManagerTest.java @@ -10,17 +10,9 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.Phaser; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -class ContextManagerTest { - @BeforeEach - void init() { - // Ensure no current context prior starting test - assertEquals(root(), current()); - } - +class ContextManagerTest extends ContextTestBase { @Test void testContextAttachment() { Context context1 = root().with(STRING_KEY, "value1"); @@ -57,22 +49,57 @@ void testContextSwapping() { assertEquals(root(), current()); } + @Test + void testNoopScopeContextReturnsAttachedContext() { + Context context = root().with(STRING_KEY, "value"); + try (ContextScope outer = context.attach()) { + // second attach returns a noop scope; verify context() reflects the attached context + try (ContextScope noop = context.attach()) { + assertEquals(context, noop.context()); + } + } + } + + @Test + void testNoopScopeReturnsCorrectContext() { + Context context = root().with(STRING_KEY, "value"); + try (ContextScope outer = context.attach()) { + try (ContextScope noop1 = context.attach(); + ContextScope noop2 = context.attach()) { + assertEquals(context, noop1.context()); + assertEquals(context, noop2.context()); + } + } + } + + @Test + void testNoopScopeCorrectContextAcrossManyContexts() { + for (int i = 0; i < 200; i++) { + Context ctx = root().with(STRING_KEY, "ctx-" + i); + try (ContextScope outer = ctx.attach()) { + try (ContextScope noop = ctx.attach()) { + assertEquals(ctx, noop.context()); + } + } + } + } + @Test void testAttachSameContextMultipleTimes() { Context context = root().with(STRING_KEY, "value1"); - try (ContextScope ignored1 = context.attach()) { + try (ContextScope scope1 = context.attach()) { assertEquals(context, current()); - try (ContextScope ignored2 = context.attach()) { - try (ContextScope ignored3 = context.attach()) { - assertEquals(context, current()); + // re-attaching an already-active context returns a noop scope + try (ContextScope noop2 = context.attach()) { + assertEquals(context, noop2.context()); + try (ContextScope noop3 = context.attach()) { + assertEquals(context, noop3.context()); } - // Test closing a scope on the current context should not deactivate it if activated - // multiple times - assertEquals(context, current()); + assertEquals(context, current()); // noop close: context remains active } + assertEquals(context, current()); // still active after all noop closes } - // Test closing the same number of scope as activation should deactivate the context - assertEquals(root(), current()); + assertEquals(root(), current()); // only the original scope deactivates on close } @Test @@ -96,15 +123,16 @@ void testClosingMultipleTimes() { Context context1 = root().with(STRING_KEY, "value1"); try (ContextScope ignored = context1.attach()) { Context context2 = context1.with(STRING_KEY, "value2"); - ContextScope scope = context2.attach(); - // Test current context - assertEquals(context2, current()); - // Test current context deactivation - scope.close(); - assertEquals(context1, current()); - // Test multiple context deactivations don’t change current context - scope.close(); - assertEquals(context1, current()); + try (ContextScope scope = context2.attach()) { + // Test current context + assertEquals(context2, current()); + // Test current context deactivation + scope.close(); + assertEquals(context1, current()); + // Test multiple context deactivations don’t change current context + scope.close(); + assertEquals(context1, current()); + } } } @@ -208,10 +236,4 @@ void testNonThreadInheritance() { assertDoesNotThrow(() -> future.get()); } } - - @AfterEach - void tearDown() { - // Ensure no current context after ending test - assertEquals(root(), current()); - } } diff --git a/components/context/src/test/java/datadog/context/ContextProvidersForkedTest.java b/components/context/src/test/java/datadog/context/ContextProvidersForkedTest.java index 915186554a6..b8e61ddc4a4 100644 --- a/components/context/src/test/java/datadog/context/ContextProvidersForkedTest.java +++ b/components/context/src/test/java/datadog/context/ContextProvidersForkedTest.java @@ -73,23 +73,21 @@ public Context current() { @Override public ContextScope attach(Context context) { - return new ContextScope() { - @Override - public Context context() { - return root(); - } - - @Override - public void close() { - // no-op - } - }; + return new NoopContextScope(root()); } @Override public Context swap(Context context) { return root(); } + + @Override + public ContextContinuation capture(Context context) { + return new NoopContextContinuation(root()); + } + + @Override + public void addListener(ContextListener listener) {} }); // NOOP manager, context will always be root diff --git a/components/context/src/test/java/datadog/context/ContextTestBase.java b/components/context/src/test/java/datadog/context/ContextTestBase.java new file mode 100644 index 00000000000..ecb0183e635 --- /dev/null +++ b/components/context/src/test/java/datadog/context/ContextTestBase.java @@ -0,0 +1,72 @@ +package datadog.context; + +import static datadog.context.Context.current; +import static datadog.context.Context.root; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.util.List; +import javax.annotation.ParametersAreNonnullByDefault; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; + +@ParametersAreNonnullByDefault +abstract class ContextTestBase { + @BeforeEach + void verifyNoContextBefore() { + assertEquals(root(), current()); + } + + @AfterEach + void verifyNoContextAfter() { + TestContextManager.clearListeners(); + assertEquals(root(), current()); + } + + static ContextListener trackingListener(List events) { + return new ContextListener() { + @Override + public void onAttach(Context c) { + events.add("attach"); + } + + @Override + public void onDetach(Context c) { + events.add("detach"); + } + + @Override + public void onCapture(Context c) { + events.add("capture"); + } + + @Override + public void onRelease(Context c) { + events.add("release"); + } + }; + } + + static ContextListener keyedTrackingListener(List events, ContextKey key) { + return new ContextListener() { + @Override + public void onAttach(Context c) { + events.add("attach:" + c.get(key)); + } + + @Override + public void onDetach(Context c) { + events.add("detach:" + c.get(key)); + } + + @Override + public void onCapture(Context c) { + events.add("capture:" + c.get(key)); + } + + @Override + public void onRelease(Context c) { + events.add("release:" + c.get(key)); + } + }; + } +} diff --git a/dd-trace-core/build.gradle b/dd-trace-core/build.gradle index 3df31b0388f..2a3a56e3b91 100644 --- a/dd-trace-core/build.gradle +++ b/dd-trace-core/build.gradle @@ -114,4 +114,15 @@ dependencies { jmh { jmhVersion = libs.versions.jmh.get() duplicateClassesStrategy = DuplicatesStrategy.EXCLUDE + if (project.hasProperty('jmhIncludes')) { + includes = [project.jmhIncludes] + } + if (project.hasProperty('jmhProfilers')) { + profilers = [project.jmhProfilers] + } + if (project.hasProperty('jmhJvm')) { + jvm = project.javaToolchains.launcherFor { + languageVersion = JavaLanguageVersion.of(project.jmhJvm as int) + }.map { it.executablePath.asFile.absolutePath } + } } diff --git a/dd-trace-core/src/jmh/java/datadog/context/ContextManagerBenchmark.java b/dd-trace-core/src/jmh/java/datadog/context/ContextManagerBenchmark.java new file mode 100644 index 00000000000..26875db31c6 --- /dev/null +++ b/dd-trace-core/src/jmh/java/datadog/context/ContextManagerBenchmark.java @@ -0,0 +1,352 @@ +package datadog.context; + +import static java.util.concurrent.TimeUnit.MICROSECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; + +import datadog.trace.core.scopemanager.ContinuableScopeManager; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Semaphore; +import java.util.concurrent.ThreadFactory; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +/** + * Compares {@link ThreadLocalContextManager} vs {@link ContinuableScopeManager} across context + * attach, swap, and cross-thread continuation scenarios — including virtual threads (requires JDK + * 21+). + * + *

For the same-non-root-context stack-depth scenario see {@link ContextManagerDepthBenchmark}. + * + *

Run with: + * + *

+ * {@code ./gradlew :dd-trace-core:jmh -PjmhIncludes=ContextManagerBenchmark -PjmhJvm=25 -PjmhProfilers=gc}
+ * 
+ */ +@State(Scope.Benchmark) +@Warmup(iterations = 3, time = 1) +@Measurement(iterations = 5, time = 1) +@BenchmarkMode(Mode.Throughput) +@Threads(8) +@OutputTimeUnit(MICROSECONDS) +@Fork(value = 1) +public class ContextManagerBenchmark { + + // ── Constants ────────────────────────────────────────────────────────────── + + // Reflective access to Thread.ofVirtual().factory() (Java 21+). + // Used to create fixed-size pools of virtual threads so no new VT is spawned per task. + // Falls back to platform threads on older JVMs — the benchmark still runs, but + // captureAndResumeOnVirtualThread and captureAndFanOutToVirtualThreads will measure + // platform-thread overhead instead. + static final boolean VIRTUAL_THREADS_AVAILABLE; + static final ThreadFactory VIRTUAL_OR_PLATFORM_FACTORY; + + static { + ThreadFactory factory = null; + try { + Object builder = Thread.class.getMethod("ofVirtual").invoke(null); + factory = (ThreadFactory) builder.getClass().getMethod("factory").invoke(builder); + } catch (Exception ignored) { + } + VIRTUAL_THREADS_AVAILABLE = factory != null; + VIRTUAL_OR_PLATFORM_FACTORY = factory != null ? factory : Thread::new; + } + + // Creates a fixed pool whose threads are virtual (Java 21+) or platform (older JVMs). + // Using a fixed pool rather than newVirtualThreadPerTaskExecutor avoids spawning a + // fresh virtual thread on every task submission, keeping thread-creation cost out of + // the measured critical path. + static ExecutorService newFixedVirtualPool(int nThreads) { + return Executors.newFixedThreadPool(nThreads, VIRTUAL_OR_PLATFORM_FACTORY); + } + + static final ContextKey KEY = ContextKey.named("benchmark-key"); + // power of 2 so cycling wraps cheaply with bit-mask + static final int CONTEXT_COUNT = 16; + // virtual threads spawned per continuation fan-out + static final int FAN_OUT = 8; + + // ── Parameters ───────────────────────────────────────────────────────────── + + /** + * Which {@link ContextManager} implementation to benchmark. + * + *

{@code ThreadLocal} — {@link ThreadLocalContextManager} (the lightweight default). + * + *

{@code Continuable} — {@link ContinuableScopeManager} (the full scope/span manager). + */ + @Param({"ThreadLocal", "Continuable"}) + public String managerType; + + // ── Benchmark-scoped shared state ───────────────────────────────────────── + + ContextManager manager; + // CONTEXT_COUNT distinct non-root contexts; threads cycle through them to + // avoid artificial same-context hits in benchmarks that don't want them + Context[] contexts; + + @Setup + public void setup() { + manager = createManager(managerType); + contexts = createContexts(); + } + + static ContextManager createManager(String type) { + return "Continuable".equals(type) + ? new ContinuableScopeManager(0, false) + : ThreadLocalContextManager.INSTANCE; + } + + static Context[] createContexts() { + Context[] contexts = new Context[CONTEXT_COUNT]; + for (int i = 0; i < CONTEXT_COUNT; i++) { + contexts[i] = Context.root().with(KEY, "value-" + i); + } + return contexts; + } + + // ── Per-thread state ─────────────────────────────────────────────────────── + + @State(Scope.Thread) + public static class ThreadState { + int index; + // Pre-allocated barrier reused across fan-out invocations. + // Avoids a new CountDownLatch allocation per invocation that would inflate gc.alloc.rate.norm. + final Semaphore fanOutBarrier = new Semaphore(0); + ExecutorService platformExecutor; + ExecutorService virtualExecutor; + + @Setup(Level.Trial) + public void setup() { + // Both pools are fixed-size so no new thread is created per submitted task. + // The virtual pool uses virtual threads (Java 21+) or falls back to platform threads. + // Pool size is intentionally larger than the JMH thread count to avoid executor starvation + // when benchmark threads all submit tasks concurrently. + platformExecutor = Executors.newFixedThreadPool(16); + virtualExecutor = newFixedVirtualPool(16); + } + + @TearDown(Level.Trial) + public void tearDown() throws InterruptedException { + platformExecutor.shutdown(); + virtualExecutor.shutdown(); + platformExecutor.awaitTermination(10, SECONDS); + virtualExecutor.awaitTermination(10, SECONDS); + } + + Context nextContext(Context[] contexts) { + return contexts[(index++) & (CONTEXT_COUNT - 1)]; + } + } + + // ── Thread state with a pre-attached context (for read benchmarks) ───────── + + /** + * Attaches a context once per trial so that {@link #current} and {@link #currentAndGet} measure + * only the read path, not the attach overhead. + */ + @State(Scope.Thread) + public static class ActiveContextState { + ContextScope scope; + + @Setup(Level.Trial) + public void setup(ContextManagerBenchmark benchmark) { + scope = benchmark.manager.attach(benchmark.contexts[0]); + } + + @TearDown(Level.Trial) + public void tearDown() { + scope.close(); + } + } + + // ── Scenario 1: attach a different context, close scope ─────────────────── + + /** Attach one distinct context then close its scope. The hot path for most instrumentations. */ + @Benchmark + public void attachAndClose(ThreadState thread) { + Context ctx = thread.nextContext(contexts); + try (ContextScope scope = manager.attach(ctx)) { + // scope is active + } + } + + // ── Scenario 2: nested attach of two different contexts ─────────────────── + + /** + * Attach two distinct contexts in sequence and close both. Exercises the stack push/pop cycle + * that occurs at every instrumented method boundary. + */ + @Benchmark + public void nestedAttachAndClose(ThreadState thread) { + Context outer = thread.nextContext(contexts); + Context inner = thread.nextContext(contexts); + try (ContextScope outerScope = manager.attach(outer)) { + try (ContextScope innerScope = manager.attach(inner)) { + // inner is active + } + } + } + + // ── Scenario 3: swap different contexts ─────────────────────────────────── + + /** + * Swap in a new context then swap back the previous one. {@link + * ContinuableScopeManager#swap(Context)} replaces the entire scope stack, making this a heavier + * operation than in {@link ThreadLocalContextManager}. + * + *

Note: GCProfiler will show allocation asymmetry here by design. {@link + * ContinuableScopeManager} swap allocates a {@code ScopeStack}, a {@code ContinuableScope}, and a + * {@code ScopeContext} per invocation; {@link ThreadLocalContextManager} swap is a plain field + * write. That asymmetry is the real cost of each manager's swap operation, not scaffolding. + */ + @Benchmark + public void swapContexts(ThreadState thread) { + Context ctx = thread.nextContext(contexts); + Context previous = manager.swap(ctx); + manager.swap(previous); + } + + // ── Scenario 4: capture + same-thread resume (continuation baseline) ─────── + + /** + * Capture the current context as a continuation and immediately resume it on the same thread. + * Establishes the allocation and atomic-counter cost of the continuation mechanism without any + * cross-thread scheduling overhead. + */ + @Benchmark + public void captureThenResumeSameThread(ThreadState thread) { + Context ctx = thread.nextContext(contexts); + try (ContextScope scope = manager.attach(ctx)) { + ContextContinuation cont = manager.capture(ctx); + try (ContextScope resumed = cont.resume()) { + // context restored on the same thread + } + } + } + + // ── Scenario 5: capture, resume on a platform thread ───────────────────── + + /** + * Capture the current context as a continuation and resume it on a pooled platform thread. + * Measures cross-thread handoff latency (submit + schedule + execute) for each manager. + * + *

Fewer JMH threads than the default so the platform executor is never saturated. + */ + @Benchmark + @Threads(4) + public void captureAndResumeOnPlatformThread(ThreadState thread) throws Exception { + captureAndResumeOnExecutor(thread, thread.platformExecutor); + } + + // ── Scenario 6: capture, resume on a virtual thread ────────────────────── + + /** + * Capture the current context as a continuation and resume it on a fixed-pool virtual thread. + * Shows how well each manager scales when continuations are used for structured concurrency or + * reactive pipelines on virtual threads. + */ + @Benchmark + @Threads(4) + public void captureAndResumeOnVirtualThread(ThreadState thread) throws Exception { + captureAndResumeOnExecutor(thread, thread.virtualExecutor); + } + + private void captureAndResumeOnExecutor(ThreadState thread, ExecutorService executor) + throws Exception { + Context ctx = thread.nextContext(contexts); + try (ContextScope scope = manager.attach(ctx)) { + ContextContinuation cont = manager.capture(ctx); + CompletableFuture.runAsync( + () -> { + try (ContextScope resumed = cont.resume()) { + // context propagated to executor thread + } + }, + executor) + .get(10, SECONDS); + } + } + + // ── Scenario 7: fan-out — one held continuation resumed on N virtual threads + + /** + * Capture one context, hold the continuation, then fan it out to {@value #FAN_OUT} virtual + * threads concurrently. Each virtual thread resumes the same continuation and closes its scope; + * only the explicit {@link ContextContinuation#release()} after the barrier completes the + * lifecycle. + * + *

This reflects async frameworks that dispatch a single request context to a pool of worker + * coroutines / virtual threads. + * + *

Uses {@link Mode#SampleTime} to capture percentile tail latency in addition to the mean. + * Warmup and measurement windows are extended because each invocation waits for {@value #FAN_OUT} + * round-trips before returning. + */ + @Benchmark + @Threads(2) + @BenchmarkMode(Mode.SampleTime) + @Warmup(iterations = 3, time = 3) + @Measurement(iterations = 5, time = 5) + public void captureAndFanOutToVirtualThreads(ThreadState thread) throws Exception { + Context ctx = thread.nextContext(contexts); + try (ContextScope scope = manager.attach(ctx)) { + ContextContinuation cont = manager.capture(ctx).hold(); + Semaphore barrier = thread.fanOutBarrier; + for (int i = 0; i < FAN_OUT; i++) { + thread.virtualExecutor.execute( + () -> { + try (ContextScope resumed = cont.resume()) { + // each virtual thread sees the same captured context + } finally { + barrier.release(); + } + }); + } + try { + if (!barrier.tryAcquire(FAN_OUT, 10, SECONDS)) { + throw new IllegalStateException("fan-out timed out"); + } + } finally { + cont.release(); + } + } + } + + // ── Scenario 8: read the current context ───────────────────────────────── + + /** + * Returns the currently active context. The most frequent operation in any traced application — + * called at every instrumented method boundary before reading a span or key. + */ + @Benchmark + public Context current(ActiveContextState active) { + return manager.current(); + } + + // ── Scenario 9: read a value from the current context ──────────────────── + + /** + * Returns a value from the currently active context. The full "read active span" path that + * instrumentation executes at every traced method boundary. + */ + @Benchmark + public Object currentAndGet(ActiveContextState active) { + return manager.current().get(KEY); + } +} diff --git a/dd-trace-core/src/jmh/java/datadog/context/ContextManagerDepthBenchmark.java b/dd-trace-core/src/jmh/java/datadog/context/ContextManagerDepthBenchmark.java new file mode 100644 index 00000000000..81c378d54d8 --- /dev/null +++ b/dd-trace-core/src/jmh/java/datadog/context/ContextManagerDepthBenchmark.java @@ -0,0 +1,88 @@ +package datadog.context; + +import static java.util.concurrent.TimeUnit.MICROSECONDS; + +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +/** + * Benchmarks attaching the same non-root context {@code depth} times then closing all scopes in + * LIFO order, isolating the same-context fast-path cost from the general attach/close benchmarks in + * {@link ContextManagerBenchmark}. + * + *

In {@link ThreadLocalContextManager} each re-attach after the first returns a no-op scope. In + * {@link datadog.trace.core.scopemanager.ContinuableScopeManager} the first attach creates the + * scope and subsequent re-attaches increment its reference count; each close decrements it, with + * the final close doing the real work. + * + *

Run with: + * + *

+ * {@code ./gradlew :dd-trace-core:jmh -PjmhIncludes=ContextManagerDepthBenchmark -PjmhJvm=25 -PjmhProfilers=gc}
+ * 
+ */ +@State(Scope.Benchmark) +@Warmup(iterations = 3, time = 1) +@Measurement(iterations = 5, time = 1) +@BenchmarkMode(Mode.Throughput) +@Threads(8) +@OutputTimeUnit(MICROSECONDS) +@Fork(value = 1) +public class ContextManagerDepthBenchmark { + + /** + * Which {@link ContextManager} implementation to benchmark. + * + * @see ContextManagerBenchmark#managerType + */ + @Param({"ThreadLocal", "Continuable"}) + public String managerType; + + @Param({"1", "4", "8", "100"}) + public int depth; + + ContextManager manager; + Context[] contexts; + + @Setup + public void setup() { + manager = ContextManagerBenchmark.createManager(managerType); + contexts = ContextManagerBenchmark.createContexts(); + } + + @State(Scope.Thread) + public static class ThreadState { + final ContextScope[] scopes = new ContextScope[100]; + + int nextContextIndex; + + Context nextContext(Context[] contexts) { + return contexts[(nextContextIndex++) & (ContextManagerBenchmark.CONTEXT_COUNT - 1)]; + } + } + + // ── Benchmark ───────────────────────────────────────────────────────────── + + /** Attach the same context {@code depth} times then close all scopes in LIFO order. */ + @Benchmark + public void attachSameContextDepth(ThreadState thread) { + Context ctx = thread.nextContext(contexts); + ContextScope[] scopes = thread.scopes; + for (int i = 0; i < depth; i++) { + scopes[i] = manager.attach(ctx); + } + for (int i = depth - 1; i >= 0; i--) { + scopes[i].close(); + } + } +} diff --git a/dd-trace-core/src/main/java/datadog/trace/core/scopemanager/ContinuableScopeManager.java b/dd-trace-core/src/main/java/datadog/trace/core/scopemanager/ContinuableScopeManager.java index 53e4d5b7da6..b62b4ee39f6 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/scopemanager/ContinuableScopeManager.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/scopemanager/ContinuableScopeManager.java @@ -13,6 +13,8 @@ import static java.util.concurrent.TimeUnit.SECONDS; import datadog.context.Context; +import datadog.context.ContextContinuation; +import datadog.context.ContextListener; import datadog.context.ContextManager; import datadog.context.ContextScope; import datadog.logging.RatelimitedLogger; @@ -409,6 +411,31 @@ public Context swap(Context context) { return new ScopeContext(oldStack); } + @Override + public ContextContinuation capture(Context context) { + // respect async propagation flag for Context.current().capture() + ContinuableScope activeScope = scopeStack().active(); + if (activeScope != null + && activeScope.context == context + && !activeScope.isAsyncPropagating()) { + return AgentTracer.noopContinuation(); + } + AgentSpan span = AgentSpan.fromContext(context); + AgentTraceCollector traceCollector; + if (span != null) { + traceCollector = span.context().getTraceCollector(); + } else { + traceCollector = AgentTracer.NoopAgentTraceCollector.INSTANCE; + } + return new ScopeContinuation(this, context, CONTEXT, traceCollector).register(); + } + + @Override + public void addListener(ContextListener unused) { + // this new API is not expected to be used in legacy mode... + log.warn("Unexpected call to ContextManager.addListener(...)"); + } + static final class ScopeStackThreadLocal extends ThreadLocal { private final ProfilingContextIntegration profilingContextIntegration; diff --git a/dd-trace-core/src/test/java/datadog/trace/core/scopemanager/ScopeManagerTest.java b/dd-trace-core/src/test/java/datadog/trace/core/scopemanager/ScopeManagerTest.java index 122ddd560c7..cf1748cece0 100644 --- a/dd-trace-core/src/test/java/datadog/trace/core/scopemanager/ScopeManagerTest.java +++ b/dd-trace-core/src/test/java/datadog/trace/core/scopemanager/ScopeManagerTest.java @@ -5,6 +5,7 @@ import static datadog.trace.core.scopemanager.ScopeManagerTest.EVENT.ACTIVATE; import static datadog.trace.core.scopemanager.ScopeManagerTest.EVENT.CLOSE; import static datadog.trace.test.util.GCUtils.awaitGC; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertInstanceOf; @@ -24,6 +25,7 @@ import static org.mockito.Mockito.when; import datadog.context.Context; +import datadog.context.ContextContinuation; import datadog.context.ContextKey; import datadog.context.ContextScope; import datadog.trace.api.DDTraceId; @@ -1075,6 +1077,72 @@ void contextsCanBeSwappedOutAndBack() { assertEquals(Context.root(), scopeManager.current()); } + @Test + void captureViaContextContinuationAPIHoldsTrace() throws Exception { + AgentSpan span = tracer.buildSpan("test", "test").start(); + AgentScope scope = tracer.activateSpan(span); + + // Context.current().capture() routes through ContinuableScopeManager.capture(Context) + ContextContinuation continuation = Context.current().capture(); + + scope.close(); + span.finish(); + assertTrue(writer.isEmpty()); // trace held pending continuation + + continuation.release(); // delegates to cancel(), unblocks trace reporting + writer.waitForTraces(1); + assertFalse(writer.isEmpty()); + } + + @Test + void continuationResumeActivatesSpan() throws Exception { + AgentSpan span = tracer.buildSpan("test", "test").start(); + AgentScope scope = tracer.activateSpan(span); + AgentScope.Continuation continuation = tracer.captureActiveSpan(); + scope.close(); + span.finish(); + + assertNull(scopeManager.active()); + assertTrue(writer.isEmpty()); // trace held by continuation + + // resume() delegates to activate() + ContextScope resumedScope = continuation.resume(); + assertSame(span, scopeManager.active().span()); + + resumedScope.close(); + assertNull(scopeManager.active()); + writer.waitForTraces(1); + assertFalse(writer.isEmpty()); + } + + @Test + void continuationReleaseIsSameAsCancel() throws Exception { + AgentSpan span = tracer.buildSpan("test", "test").start(); + AgentScope scope = tracer.activateSpan(span); + AgentScope.Continuation continuation = tracer.captureActiveSpan(); + scope.close(); + span.finish(); + + assertTrue(writer.isEmpty()); // trace held by continuation + + continuation.release(); // delegates to cancel() + writer.waitForTraces(1); + assertFalse(writer.isEmpty()); + } + + @Test + void captureContextWithoutSpanUsesNoopTraceCollector() { + ContextKey key = ContextKey.named("test-key"); + Context ctx = Context.root().with(key, "value"); + assertDoesNotThrow( + () -> { + // NoopAgentTraceCollector handles capture/release without throwing + try (ContextScope scope = ctx.attach()) { + Context.current().capture().release(); + } + }); + } + private boolean spanFinished(AgentSpan span) { return span instanceof DDSpan && ((DDSpan) span).isFinished(); } diff --git a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentScope.java b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentScope.java index ba94d72c218..3f37f88db63 100644 --- a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentScope.java +++ b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentScope.java @@ -1,6 +1,7 @@ package datadog.trace.bootstrap.instrumentation.api; import datadog.context.Context; +import datadog.context.ContextContinuation; import datadog.context.ContextScope; import datadog.trace.context.TraceScope; import java.io.Closeable; @@ -16,7 +17,7 @@ default Context context() { @Override void close(); - interface Continuation extends TraceScope.Continuation { + interface Continuation extends TraceScope.Continuation, ContextContinuation { @Override Continuation hold(); @@ -26,9 +27,19 @@ interface Continuation extends TraceScope.Continuation { /** Provide access to the captured span */ AgentSpan span(); - /** Provide access to the captured context */ + @Override default Context context() { return span(); } + + @Override + default ContextScope resume() { + return activate(); + } + + @Override + default void release() { + cancel(); + } } }