Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
import datadog.trace.api.InstrumenterConfig;
import datadog.trace.bootstrap.ContextStore;
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
import datadog.trace.bootstrap.instrumentation.api.ProfilerContext;
import java.util.Set;
import java.util.concurrent.ThreadPoolExecutor;

Expand All @@ -25,6 +28,8 @@ public final class TPEHelper {
private static final Set<String> excludedClasses;
// A ThreadLocal to store the Scope between beforeExecute and afterExecute if wrapping is not used
private static final ThreadLocal<AgentScope> threadLocalScope;
// Stores System.nanoTime() at task activation so onTaskDeactivation can compute duration
private static final ThreadLocal<Long> threadLocalActivationNano;

private static final ClassValue<Boolean> WRAP =
GenericClassValue.of(
Expand All @@ -42,8 +47,10 @@ public final class TPEHelper {
excludedClasses = config.getTraceThreadPoolExecutorsExclude();
if (useWrapping) {
threadLocalScope = null;
threadLocalActivationNano = null;
} else {
threadLocalScope = new ThreadLocal<>();
threadLocalActivationNano = new ThreadLocal<>();
}
}

Expand Down Expand Up @@ -82,7 +89,18 @@ public static AgentScope startScope(ContextStore<Runnable, State> contextStore,
if (task == null || exclude(RUNNABLE, task)) {
return null;
}
return AdviceUtils.startTaskScope(contextStore, task);
AgentScope scope = AdviceUtils.startTaskScope(contextStore, task);
if (scope != null && threadLocalActivationNano != null) {
long startNano = System.nanoTime();
threadLocalActivationNano.set(startNano);
AgentSpan span = scope.span();
if (span != null && span.context() instanceof ProfilerContext) {
AgentTracer.get()
.getProfilingContext()
.onTaskActivation((ProfilerContext) span.context(), startNano);
}
}
return scope;
}

public static void setThreadLocalScope(AgentScope scope, Runnable task) {
Expand Down Expand Up @@ -112,7 +130,23 @@ public static void endScope(AgentScope scope, Runnable task) {
if (task == null || exclude(RUNNABLE, task)) {
return;
}
AdviceUtils.endTaskScope(scope);
try {
if (scope != null && threadLocalActivationNano != null) {
Long startNano = threadLocalActivationNano.get();
// noinspection ThreadLocalSetWithNull
threadLocalActivationNano.set(null);
if (startNano != null) {
AgentSpan span = scope.span();
if (span != null && span.context() instanceof ProfilerContext) {
AgentTracer.get()
.getProfilingContext()
.onTaskDeactivation((ProfilerContext) span.context(), startNano);
}
}
}
} finally {
AdviceUtils.endTaskScope(scope);
}
}

public static void cancelTask(ContextStore<Runnable, State> contextStore, Runnable task) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
import static datadog.trace.bootstrap.instrumentation.java.concurrent.ExcludeFilter.exclude;

import datadog.trace.bootstrap.instrumentation.api.AgentScope;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
import datadog.trace.bootstrap.instrumentation.api.ProfilerContext;
import java.util.concurrent.RunnableFuture;

public class Wrapper<T extends Runnable> implements Runnable, AutoCloseable {
Expand Down Expand Up @@ -44,7 +47,23 @@ public Wrapper(T delegate, AgentScope.Continuation continuation) {
@Override
public void run() {
try (AgentScope scope = activate()) {
delegate.run();
long startNano = 0L;
ProfilerContext profilerCtx = null;
if (scope != null) {
AgentSpan span = scope.span();
if (span != null && span.context() instanceof ProfilerContext) {
profilerCtx = (ProfilerContext) span.context();
startNano = System.nanoTime();
AgentTracer.get().getProfilingContext().onTaskActivation(profilerCtx, startNano);
}
}
try {
delegate.run();
} finally {
if (profilerCtx != null) {
AgentTracer.get().getProfilingContext().onTaskDeactivation(profilerCtx, startNano);
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ dependencies {

testImplementation libs.bundles.jmc
testImplementation libs.bundles.junit5
testImplementation libs.bundles.mockito
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,15 +332,21 @@ String cmdStartProfiling(Path file) throws IllegalStateException {
return cmdString;
}

public void recordTraceRoot(long rootSpanId, String endpoint, String operation) {
if (!profiler.recordTraceRoot(rootSpanId, endpoint, operation, MAX_NUM_ENDPOINTS)) {
public void recordTraceRoot(
long rootSpanId, long parentSpanId, long startTicks, String endpoint, String operation) {
if (!profiler.recordTraceRoot(
rootSpanId, parentSpanId, startTicks, endpoint, operation, MAX_NUM_ENDPOINTS)) {
log.debug(
"Endpoint event not written because more than {} distinct endpoints have been encountered."
+ " This avoids excessive memory overhead.",
MAX_NUM_ENDPOINTS);
}
}

public long getCurrentTicks() {
return profiler.getCurrentTicks();
}

public int operationNameOffset() {
return offsetOf(OPERATION);
}
Expand Down Expand Up @@ -447,29 +453,65 @@ public void recordSetting(String name, String value, String unit) {
profiler.recordSetting(name, value, unit);
}

public QueueTimeTracker newQueueTimeTracker() {
return new QueueTimeTracker(this, profiler.getCurrentTicks());
public QueueTimeTracker newQueueTimeTracker(long submittingSpanId) {
return new QueueTimeTracker(this, profiler.getCurrentTicks(), submittingSpanId);
}

boolean shouldRecordQueueTimeEvent(long startMillis) {
return System.currentTimeMillis() - startMillis >= queueTimeThresholdMillis;
}

void recordTaskBlockEvent(
long startTicks, long spanId, long rootSpanId, long blocker, long unblockingSpanId) {
if (profiler != null) {
long endTicks = profiler.getCurrentTicks();
profiler.recordTaskBlock(startTicks, endTicks, spanId, rootSpanId, blocker, unblockingSpanId);
}
}

public void recordSpanNodeEvent(
long spanId,
long parentSpanId,
long rootSpanId,
long startNanos,
long durationNanos,
int encodedOperation,
int encodedResource) {
if (profiler != null) {
profiler.recordSpanNode(
spanId,
parentSpanId,
rootSpanId,
startNanos,
durationNanos,
encodedOperation,
encodedResource);
}
}

void recordQueueTimeEvent(
long startTicks,
Object task,
Class<?> scheduler,
Class<?> queueType,
int queueLength,
Thread origin) {
Thread origin,
long submittingSpanId) {
if (profiler != null) {
// note: because this type traversal can update secondary_super_cache (see JDK-8180450)
// we avoid doing this unless we are absolutely certain we will record the event
Class<?> taskType = TaskWrapper.getUnwrappedType(task);
if (taskType != null) {
long endTicks = profiler.getCurrentTicks();
profiler.recordQueueTime(
startTicks, endTicks, taskType, scheduler, queueType, queueLength, origin);
startTicks,
endTicks,
taskType,
scheduler,
queueType,
queueLength,
origin,
submittingSpanId);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import datadog.trace.api.profiling.ProfilingScope;
import datadog.trace.api.profiling.Timing;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
import datadog.trace.bootstrap.instrumentation.api.ProfilerContext;
import datadog.trace.bootstrap.instrumentation.api.ProfilingContextIntegration;

Expand Down Expand Up @@ -93,6 +94,76 @@ public String name() {
return "ddprof";
}

@Override
public long getCurrentTicks() {
return DDPROF.getCurrentTicks();
}

@Override
public void recordTaskBlock(
long startTicks, long spanId, long rootSpanId, long blocker, long unblockingSpanId) {
DDPROF.recordTaskBlockEvent(startTicks, spanId, rootSpanId, blocker, unblockingSpanId);
}

@Override
public void onSpanFinished(AgentSpan span) {
if (span == null || !(span.context() instanceof ProfilerContext)) return;
ProfilerContext ctx = (ProfilerContext) span.context();
DDPROF.recordSpanNodeEvent(
ctx.getSpanId(),
ctx.getParentSpanId(),
ctx.getRootSpanId(),
span.getStartTime(),
span.getDurationNano(),
ctx.getEncodedOperationName(),
ctx.getEncodedResourceName());
// Emit the actual execution thread captured in finishAndAddToTrace() so the backend can
// correctly attribute each span to the thread that ran it, rather than the event loop thread
// that calls CoreTracer.write() and commits the SpanNode event above.
long executionThreadId = ctx.getExecutionThreadId();
String executionThreadName = ctx.getExecutionThreadName();
if (executionThreadId > 0 && executionThreadName != null && !executionThreadName.isEmpty()) {
SpanExecutionThreadEvent event = new SpanExecutionThreadEvent();
event.spanId = ctx.getSpanId();
event.executionThreadId = executionThreadId;
event.executionThreadName = executionThreadName;
event.commit();
}
}

@Override
public void onTaskActivation(ProfilerContext profilerContext, long startTicks) {
// startTicks captured by TPEHelper is the authoritative start; nothing to do here.
}

@Override
public void onTaskDeactivation(ProfilerContext profilerContext, long startTicks) {
if (profilerContext == null) {
return;
}
long endNano = System.nanoTime();
long startNano = startTicks; // startTicks carries nanoTime at activation (see TPEHelper)
long durationNanos = endNano - startNano;
if (durationNanos <= 0) {
return;
}
// Compute epoch offset fresh each time: avoids cumulative drift between System.nanoTime()
// (monotonic, ignores NTP/wall-clock adjustments) and System.currentTimeMillis() over the
// JVM lifetime. Residual error is bounded by the 1 ms resolution of currentTimeMillis().
long epochOffset = System.currentTimeMillis() * 1_000_000L - endNano;
long startNanos = startNano + epochOffset;
long syntheticSpanId =
profilerContext.getSpanId() ^ ((long) Thread.currentThread().getId() << 32) ^ startNano;
DDPROF.recordSpanNodeEvent(
syntheticSpanId,
profilerContext.getSpanId(),
profilerContext.getRootSpanId(),
startNanos,
durationNanos,
profilerContext.getEncodedOperationName(),
profilerContext.getEncodedResourceName());
}

public void clearContext() {
DDPROF.clearSpanContext();
DDPROF.clearContextValue(SPAN_NAME_INDEX);
Expand All @@ -115,32 +186,49 @@ public void onRootSpanFinished(AgentSpan rootSpan, EndpointTracker tracker) {
CharSequence resourceName = rootSpan.getResourceName();
CharSequence operationName = rootSpan.getOperationName();
if (resourceName != null && operationName != null) {
long startTicks =
(tracker instanceof RootSpanTracker) ? ((RootSpanTracker) tracker).startTicks : 0L;
long parentSpanId = 0L;
if (rootSpan.context() instanceof ProfilerContext) {
parentSpanId = ((ProfilerContext) rootSpan.context()).getParentSpanId();
}
DDPROF.recordTraceRoot(
rootSpan.getSpanId(), resourceName.toString(), operationName.toString());
rootSpan.getSpanId(),
parentSpanId,
startTicks,
resourceName.toString(),
operationName.toString());
}
}
}

@Override
public EndpointTracker onRootSpanStarted(AgentSpan rootSpan) {
return NoOpEndpointTracker.INSTANCE;
return new RootSpanTracker(DDPROF.getCurrentTicks());
}

@Override
public Timing start(TimerType type) {
if (IS_PROFILING_QUEUEING_TIME_ENABLED && type == TimerType.QUEUEING) {
return DDPROF.newQueueTimeTracker();
AgentSpan span = AgentTracer.activeSpan();
long submittingSpanId = 0L;
if (span != null && span.context() instanceof ProfilerContext) {
submittingSpanId = ((ProfilerContext) span.context()).getSpanId();
}
return DDPROF.newQueueTimeTracker(submittingSpanId);
}
return Timing.NoOp.INSTANCE;
}

/**
* This implementation is actually stateless, so we don't actually need a tracker object, but
* we'll create a singleton to avoid returning null and risking NPEs elsewhere.
* Captures the TSC tick at root span start so we can emit real duration in the Endpoint event.
*/
private static final class NoOpEndpointTracker implements EndpointTracker {
private static final class RootSpanTracker implements EndpointTracker {
final long startTicks;

public static final NoOpEndpointTracker INSTANCE = new NoOpEndpointTracker();
RootSpanTracker(long startTicks) {
this.startTicks = startTicks;
}

@Override
public void endpointWritten(AgentSpan span) {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,20 @@ public class QueueTimeTracker implements QueueTiming {
private final Thread origin;
private final long startTicks;
private final long startMillis;
private final long submittingSpanId;
private WeakReference<Object> weakTask;
// FIXME this can be eliminated by altering the instrumentation
// since it is known when the item is polled from the queue
private Class<?> scheduler;
private Class<?> queue;
private int queueLength;

public QueueTimeTracker(DatadogProfiler profiler, long startTicks) {
public QueueTimeTracker(DatadogProfiler profiler, long startTicks, long submittingSpanId) {
this.profiler = profiler;
this.origin = Thread.currentThread();
this.startTicks = startTicks;
this.startMillis = System.currentTimeMillis();
this.submittingSpanId = submittingSpanId;
}

@Override
Expand Down Expand Up @@ -49,7 +51,8 @@ public void report() {
Object task = this.weakTask.get();
if (task != null) {
// indirection reduces shallow size of the tracker instance
profiler.recordQueueTimeEvent(startTicks, task, scheduler, queue, queueLength, origin);
profiler.recordQueueTimeEvent(
startTicks, task, scheduler, queue, queueLength, origin, submittingSpanId);
}
}

Expand Down
Loading
Loading