From 33c3917c2ede239885e40bd183ab98659e2a83fa Mon Sep 17 00:00:00 2001 From: becomeStar Date: Sat, 25 Apr 2026 17:24:06 +0900 Subject: [PATCH 1/2] core,opentelemetry: Fix early-close metric labels Resolve generated-method classification before serverCallStarted() so close metrics do not fall back to "other" when streamClosed() happens first. Keep fallback registry lookup on the existing async path and avoid tracer-side HandlerRegistry access to match the maintainer constraints from issue #12117. Add regressions for primary generated, primary non-generated, and fallback-generated server paths, plus the tracer-level early-resolution contract. --- .../java/io/grpc/internal/ServerImpl.java | 25 +- .../io/grpc/internal/StatsTraceContext.java | 23 ++ .../java/io/grpc/internal/ServerImplTest.java | 224 ++++++++++++++++++ .../OpenTelemetryMetricsModule.java | 35 ++- .../OpenTelemetryMetricsModuleTest.java | 34 +++ 5 files changed, 323 insertions(+), 18 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/ServerImpl.java b/core/src/main/java/io/grpc/internal/ServerImpl.java index d469fdb33dc..610acfe2da2 100644 --- a/core/src/main/java/io/grpc/internal/ServerImpl.java +++ b/core/src/main/java/io/grpc/internal/ServerImpl.java @@ -99,7 +99,7 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume private final ObjectPool executorPool; /** Executor for application processing. Safe to read after {@link #start()}. */ private Executor executor; - private final HandlerRegistry registry; + private final InternalHandlerRegistry registry; private final HandlerRegistry fallbackRegistry; private final List transportFilters; // This is iterated on a per-call basis. Use an array instead of a Collection to avoid iterator @@ -498,8 +498,12 @@ private void streamCreatedInternal( final StatsTraceContext statsTraceCtx = Preconditions.checkNotNull( stream.statsTraceContext(), "statsTraceCtx not present from stream"); + final ServerMethodDefinition primaryMethod = registry.lookupMethod(methodName, null); final Context.CancellableContext context = createContext(headers, statsTraceCtx); + if (primaryMethod != null) { + statsTraceCtx.serverCallMethodResolved(primaryMethod.getMethodDescriptor()); + } final Link link = PerfMark.linkOut(); @@ -536,7 +540,7 @@ private void runInternal() { ServerMethodDefinition wrapMethod; ServerCallParameters callParams; try { - ServerMethodDefinition method = registry.lookupMethod(methodName); + ServerMethodDefinition method = primaryMethod; if (method == null) { method = fallbackRegistry.lookupMethod(methodName, stream.getAuthority()); } @@ -554,7 +558,12 @@ private void runInternal() { future.cancel(false); return; } - wrapMethod = wrapMethod(stream, method, statsTraceCtx); + statsTraceCtx.serverCallStarted( + new ServerCallInfoImpl<>( + method.getMethodDescriptor(), // notify with original method descriptor + stream.getAttributes(), + stream.getAuthority())); + wrapMethod = wrapMethod(method); callParams = maySwitchExecutor(wrapMethod, stream, headers, context, tag); future.set(callParams); } catch (Throwable t) { @@ -653,14 +662,8 @@ private Context.CancellableContext createContext( } /** Never returns {@code null}. */ - private ServerMethodDefinition wrapMethod(ServerStream stream, - ServerMethodDefinition methodDef, StatsTraceContext statsTraceCtx) { - // TODO(ejona86): should we update fullMethodName to have the canonical path of the method? - statsTraceCtx.serverCallStarted( - new ServerCallInfoImpl<>( - methodDef.getMethodDescriptor(), // notify with original method descriptor - stream.getAttributes(), - stream.getAuthority())); + private ServerMethodDefinition wrapMethod( + ServerMethodDefinition methodDef) { ServerCallHandler handler = methodDef.getServerCallHandler(); for (ServerInterceptor interceptor : interceptors) { handler = InternalServerInterceptors.interceptCallHandlerCreate(interceptor, handler); diff --git a/core/src/main/java/io/grpc/internal/StatsTraceContext.java b/core/src/main/java/io/grpc/internal/StatsTraceContext.java index 650f0b979ae..007aefc0fb8 100644 --- a/core/src/main/java/io/grpc/internal/StatsTraceContext.java +++ b/core/src/main/java/io/grpc/internal/StatsTraceContext.java @@ -23,6 +23,7 @@ import io.grpc.ClientStreamTracer; import io.grpc.Context; import io.grpc.Metadata; +import io.grpc.MethodDescriptor; import io.grpc.ServerStreamTracer; import io.grpc.ServerStreamTracer.ServerCallInfo; import io.grpc.Status; @@ -38,6 +39,14 @@ */ @ThreadSafe public final class StatsTraceContext { + /** + * Internal hook for server tracers that can use the resolved method descriptor before + * {@link ServerStreamTracer#serverCallStarted(ServerCallInfo)} runs. + */ + public interface ServerCallMethodListener { + void serverCallMethodResolved(MethodDescriptor method); + } + public static final StatsTraceContext NOOP = new StatsTraceContext(new StreamTracer[0]); private final StreamTracer[] tracers; @@ -144,6 +153,20 @@ public void serverCallStarted(ServerCallInfo callInfo) { } } + /** + * Notifies server tracers that a primary-registry method descriptor was resolved before + * {@link ServerStreamTracer#serverCallStarted(ServerCallInfo)}. + * + *

Called from {@link io.grpc.internal.ServerImpl}. + */ + public void serverCallMethodResolved(MethodDescriptor method) { + for (StreamTracer tracer : tracers) { + if (tracer instanceof ServerCallMethodListener) { + ((ServerCallMethodListener) tracer).serverCallMethodResolved(method); + } + } + } + /** * See {@link StreamTracer#streamClosed}. This may be called multiple times, and only the first * value will be taken. diff --git a/core/src/test/java/io/grpc/internal/ServerImplTest.java b/core/src/test/java/io/grpc/internal/ServerImplTest.java index 3405cb9bb0c..91969dd6910 100644 --- a/core/src/test/java/io/grpc/internal/ServerImplTest.java +++ b/core/src/test/java/io/grpc/internal/ServerImplTest.java @@ -129,6 +129,10 @@ public class ServerImplTest { .setRequestMarshaller(STRING_MARSHALLER) .setResponseMarshaller(INTEGER_MARSHALLER) .build(); + private static final MethodDescriptor GENERATED_METHOD = + METHOD.toBuilder() + .setSampledToLocalTracing(true) + .build(); private static final Context.Key SERVER_ONLY = Context.key("serverOnly"); private static final Context.Key SERVER_TRACER_ADDED_KEY = Context.key("tracer-added"); private static final Context.CancellableContext SERVER_CONTEXT = @@ -142,6 +146,60 @@ public boolean shouldAccept(Runnable runnable) { }; private static final String AUTHORITY = "some_authority"; + private static final class MethodNameCapturingTracer extends ServerStreamTracer + implements StatsTraceContext.ServerCallMethodListener { + @Nullable private ServerCallInfo serverCallInfo; + @Nullable private String recordedMethodName; + @Nullable private String resolvedMethodName; + private boolean streamClosed; + + @Override + public synchronized void serverCallMethodResolved(MethodDescriptor method) { + resolvedMethodName = + recordMethodName(method.isSampledToLocalTracing(), method.getFullMethodName()); + } + + @Override + public synchronized void streamClosed(Status status) { + streamClosed = true; + if (serverCallInfo != null) { + recordedMethodName = + recordMethodName( + serverCallInfo.getMethodDescriptor().isSampledToLocalTracing(), + serverCallInfo.getMethodDescriptor().getFullMethodName()); + } else if (resolvedMethodName != null) { + recordedMethodName = resolvedMethodName; + } else { + recordedMethodName = "other"; + } + } + + @Override + public synchronized void serverCallStarted(ServerCallInfo callInfo) { + serverCallInfo = callInfo; + if (streamClosed) { + recordedMethodName = + recordMethodName( + callInfo.getMethodDescriptor().isSampledToLocalTracing(), + callInfo.getMethodDescriptor().getFullMethodName()); + } + } + + @Nullable + synchronized ServerCallInfo getServerCallInfo() { + return serverCallInfo; + } + + @Nullable + synchronized String getRecordedMethodName() { + return recordedMethodName; + } + + private static String recordMethodName(boolean generatedMethod, String fullMethodName) { + return generatedMethod ? fullMethodName : "other"; + } + } + @Rule public final MockitoRule mocks = MockitoJUnit.rule(); @BeforeClass @@ -462,6 +520,172 @@ public void methodNotFound() throws Exception { assertEquals(Status.Code.UNIMPLEMENTED, statusCaptor.getValue().getCode()); } + @Test + public void primaryRegistryGeneratedMethod_streamClosedBeforeStart_preservesMethodName() + throws Exception { + MethodNameCapturingTracer methodNameTracer = new MethodNameCapturingTracer(); + streamTracerFactories = + Collections.singletonList( + new ServerStreamTracer.Factory() { + @Override + public ServerStreamTracer newServerStreamTracer( + String fullMethodName, Metadata headers) { + return methodNameTracer; + } + }); + builder.addService( + ServerServiceDefinition.builder(new ServiceDescriptor("Waiter", GENERATED_METHOD)) + .addMethod( + GENERATED_METHOD, + new ServerCallHandler() { + @Override + public ServerCall.Listener startCall( + ServerCall call, Metadata headers) { + return callListener; + } + }) + .build()); + + createAndStartServer(); + ServerTransportListener transportListener + = transportServer.registerNewServerTransport(new SimpleServerTransport()); + transportListener.transportReady(Attributes.EMPTY); + Metadata requestHeaders = new Metadata(); + StatsTraceContext statsTraceCtx = + StatsTraceContext.newServerContext( + streamTracerFactories, GENERATED_METHOD.getFullMethodName(), requestHeaders); + when(stream.getAttributes()).thenReturn(Attributes.EMPTY); + when(stream.statsTraceContext()).thenReturn(statsTraceCtx); + + transportListener.streamCreated(stream, GENERATED_METHOD.getFullMethodName(), requestHeaders); + verify(stream).setListener(isA(ServerStreamListener.class)); + verify(stream, atLeast(1)).statsTraceContext(); + + statsTraceCtx.streamClosed(Status.CANCELLED); + assertNull(methodNameTracer.getServerCallInfo()); + assertEquals( + GENERATED_METHOD.getFullMethodName(), + methodNameTracer.getRecordedMethodName()); + + assertEquals(1, executor.runDueTasks()); + + assertNotNull(methodNameTracer.getServerCallInfo()); + assertSame(GENERATED_METHOD, methodNameTracer.getServerCallInfo().getMethodDescriptor()); + assertEquals( + GENERATED_METHOD.getFullMethodName(), + methodNameTracer.getRecordedMethodName()); + verify(fallbackRegistry, never()).lookupMethod(anyString(), any()); + } + + @Test + public void primaryRegistryNonGeneratedMethod_streamClosedBeforeStart_recordsOther() + throws Exception { + MethodNameCapturingTracer methodNameTracer = new MethodNameCapturingTracer(); + streamTracerFactories = + Collections.singletonList( + new ServerStreamTracer.Factory() { + @Override + public ServerStreamTracer newServerStreamTracer( + String fullMethodName, Metadata headers) { + return methodNameTracer; + } + }); + builder.addService( + ServerServiceDefinition.builder(new ServiceDescriptor("Waiter", METHOD)) + .addMethod( + METHOD, + new ServerCallHandler() { + @Override + public ServerCall.Listener startCall( + ServerCall call, Metadata headers) { + return callListener; + } + }) + .build()); + + createAndStartServer(); + ServerTransportListener transportListener + = transportServer.registerNewServerTransport(new SimpleServerTransport()); + transportListener.transportReady(Attributes.EMPTY); + Metadata requestHeaders = new Metadata(); + StatsTraceContext statsTraceCtx = + StatsTraceContext.newServerContext( + streamTracerFactories, METHOD.getFullMethodName(), requestHeaders); + when(stream.getAttributes()).thenReturn(Attributes.EMPTY); + when(stream.statsTraceContext()).thenReturn(statsTraceCtx); + + transportListener.streamCreated(stream, METHOD.getFullMethodName(), requestHeaders); + verify(stream).setListener(isA(ServerStreamListener.class)); + verify(stream, atLeast(1)).statsTraceContext(); + + statsTraceCtx.streamClosed(Status.CANCELLED); + assertNull(methodNameTracer.getServerCallInfo()); + assertEquals("other", methodNameTracer.getRecordedMethodName()); + + assertEquals(1, executor.runDueTasks()); + + assertNotNull(methodNameTracer.getServerCallInfo()); + assertSame(METHOD, methodNameTracer.getServerCallInfo().getMethodDescriptor()); + assertEquals("other", methodNameTracer.getRecordedMethodName()); + verify(fallbackRegistry, never()).lookupMethod(anyString(), any()); + } + + @Test + public void fallbackRegistryGeneratedMethod_streamClosedBeforeStart_resolvesOnAsyncLookup() + throws Exception { + MethodNameCapturingTracer methodNameTracer = new MethodNameCapturingTracer(); + streamTracerFactories = + Collections.singletonList( + new ServerStreamTracer.Factory() { + @Override + public ServerStreamTracer newServerStreamTracer( + String fullMethodName, Metadata headers) { + return methodNameTracer; + } + }); + mutableFallbackRegistry.addService( + ServerServiceDefinition.builder(new ServiceDescriptor("Waiter", GENERATED_METHOD)) + .addMethod( + GENERATED_METHOD, + new ServerCallHandler() { + @Override + public ServerCall.Listener startCall( + ServerCall call, Metadata headers) { + return callListener; + } + }) + .build()); + + createAndStartServer(); + ServerTransportListener transportListener + = transportServer.registerNewServerTransport(new SimpleServerTransport()); + transportListener.transportReady(Attributes.EMPTY); + Metadata requestHeaders = new Metadata(); + StatsTraceContext statsTraceCtx = + StatsTraceContext.newServerContext( + streamTracerFactories, GENERATED_METHOD.getFullMethodName(), requestHeaders); + when(stream.getAttributes()).thenReturn(Attributes.EMPTY); + when(stream.statsTraceContext()).thenReturn(statsTraceCtx); + + transportListener.streamCreated(stream, GENERATED_METHOD.getFullMethodName(), requestHeaders); + verify(stream).setListener(isA(ServerStreamListener.class)); + verify(stream, atLeast(1)).statsTraceContext(); + + statsTraceCtx.streamClosed(Status.CANCELLED); + assertNull(methodNameTracer.getServerCallInfo()); + assertEquals("other", methodNameTracer.getRecordedMethodName()); + verify(fallbackRegistry, never()).lookupMethod(anyString(), any()); + + assertEquals(1, executor.runDueTasks()); + + assertNotNull(methodNameTracer.getServerCallInfo()); + assertSame(GENERATED_METHOD, methodNameTracer.getServerCallInfo().getMethodDescriptor()); + assertEquals( + GENERATED_METHOD.getFullMethodName(), + methodNameTracer.getRecordedMethodName()); + verify(fallbackRegistry).lookupMethod(GENERATED_METHOD.getFullMethodName(), AUTHORITY); + } + @Test public void executorSupplierSameExecutorBasic() throws Exception { diff --git a/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java b/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java index c9e623b4415..f783b9495dd 100644 --- a/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java +++ b/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java @@ -47,6 +47,7 @@ import io.grpc.Status; import io.grpc.Status.Code; import io.grpc.StreamTracer; +import io.grpc.internal.StatsTraceContext.ServerCallMethodListener; import io.grpc.opentelemetry.GrpcOpenTelemetry.TargetFilter; import io.opentelemetry.api.baggage.Baggage; import io.opentelemetry.api.common.AttributesBuilder; @@ -526,7 +527,8 @@ void recordFinishedCall(CallOptions callOptions) { } } - private static final class ServerTracer extends ServerStreamTracer { + private static final class ServerTracer extends ServerStreamTracer + implements ServerCallMethodListener { @Nullable private static final AtomicIntegerFieldUpdater streamClosedUpdater; @Nullable private static final AtomicLongFieldUpdater outboundWireSizeUpdater; @Nullable private static final AtomicLongFieldUpdater inboundWireSizeUpdater; @@ -587,6 +589,11 @@ public io.grpc.Context filterContext(io.grpc.Context context) { return context; } + @Override + public void serverCallMethodResolved(MethodDescriptor method) { + isGeneratedMethod = method.isSampledToLocalTracing(); + } + @Override public void serverCallStarted(ServerCallInfo callInfo) { // Only record method name as an attribute if isSampledToLocalTracing is set to true, @@ -644,9 +651,24 @@ public void streamClosed(Status status) { } stopwatch.stop(); long elapsedTimeNanos = stopwatch.elapsed(TimeUnit.NANOSECONDS); - AttributesBuilder builder = io.opentelemetry.api.common.Attributes.builder() - .put(METHOD_KEY, recordMethodName(fullMethodName, isGeneratedMethod)) - .put(STATUS_KEY, status.getCode().toString()); + recordClosedStream( + status, + elapsedTimeNanos, + outboundWireSize, + inboundWireSize, + isGeneratedMethod); + } + + private void recordClosedStream( + Status status, + long elapsedTimeNanos, + long closedOutboundWireSize, + long closedInboundWireSize, + boolean generatedMethod) { + AttributesBuilder builder = + io.opentelemetry.api.common.Attributes.builder() + .put(METHOD_KEY, recordMethodName(fullMethodName, generatedMethod)) + .put(STATUS_KEY, status.getCode().toString()); for (OpenTelemetryPlugin.ServerStreamPlugin plugin : streamPlugins) { plugin.addLabels(builder); } @@ -658,11 +680,11 @@ public void streamClosed(Status status) { } if (module.resource.serverTotalSentCompressedMessageSizeCounter() != null) { module.resource.serverTotalSentCompressedMessageSizeCounter() - .record(outboundWireSize, attributes, otelContext); + .record(closedOutboundWireSize, attributes, otelContext); } if (module.resource.serverTotalReceivedCompressedMessageSizeCounter() != null) { module.resource.serverTotalReceivedCompressedMessageSizeCounter() - .record(inboundWireSize, attributes, otelContext); + .record(closedInboundWireSize, attributes, otelContext); } } } @@ -744,4 +766,3 @@ public void onClose(Status status, Metadata trailers) { } } } - diff --git a/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java b/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java index 14139b8e439..0d74bc836e3 100644 --- a/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java +++ b/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java @@ -57,6 +57,7 @@ import io.grpc.inprocess.InProcessChannelBuilder; import io.grpc.inprocess.InProcessServerBuilder; import io.grpc.internal.FakeClock; +import io.grpc.internal.StatsTraceContext.ServerCallMethodListener; import io.grpc.opentelemetry.GrpcOpenTelemetry.TargetFilter; import io.grpc.opentelemetry.OpenTelemetryMetricsModule.CallAttemptsTracerFactory; import io.grpc.opentelemetry.internal.OpenTelemetryConstants; @@ -1734,6 +1735,39 @@ public void serverBasicMetrics() { } + @Test + public void serverMetrics_methodResolvedBeforeStreamClosed_generatedMethodRecordsName() { + OpenTelemetryMetricsResource resource = GrpcOpenTelemetry.createMetricInstruments(testMeter, + enabledMetricsMap, disableDefaultMetrics); + OpenTelemetryMetricsModule module = newOpenTelemetryMetricsModule(resource); + ServerStreamTracer.Factory tracerFactory = module.getServerTracerFactory(); + ServerStreamTracer tracer = + tracerFactory.newServerStreamTracer(method.getFullMethodName(), new Metadata()); + + ((ServerCallMethodListener) tracer).serverCallMethodResolved(method); + fakeClock.forwardTime(10, MILLISECONDS); + tracer.streamClosed(Status.CANCELLED); + + io.opentelemetry.api.common.Attributes serverAttributes = + io.opentelemetry.api.common.Attributes.of( + METHOD_KEY, method.getFullMethodName(), + STATUS_KEY, Code.CANCELLED.toString()); + + assertThat(openTelemetryTesting.getMetrics()) + .anySatisfy( + metric -> + assertThat(metric) + .hasName(SERVER_CALL_DURATION) + .hasUnit("s") + .hasHistogramSatisfying( + histogram -> + histogram.hasPointsSatisfying( + point -> + point + .hasCount(1) + .hasSum(0.01) + .hasAttributes(serverAttributes)))); + } @Test public void targetAttributeFilter_notSet_usesOriginalTarget() { From 36680b1b24286ea397526099ccebb347ff06bb6a Mon Sep 17 00:00:00 2001 From: becomeStar Date: Sat, 25 Apr 2026 18:23:31 +0900 Subject: [PATCH 2/2] core: Move serverCallStarted back into wrapMethod --- .../main/java/io/grpc/internal/ServerImpl.java | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/ServerImpl.java b/core/src/main/java/io/grpc/internal/ServerImpl.java index 610acfe2da2..d9f64c2d473 100644 --- a/core/src/main/java/io/grpc/internal/ServerImpl.java +++ b/core/src/main/java/io/grpc/internal/ServerImpl.java @@ -558,12 +558,7 @@ private void runInternal() { future.cancel(false); return; } - statsTraceCtx.serverCallStarted( - new ServerCallInfoImpl<>( - method.getMethodDescriptor(), // notify with original method descriptor - stream.getAttributes(), - stream.getAuthority())); - wrapMethod = wrapMethod(method); + wrapMethod = wrapMethod(stream, method, statsTraceCtx); callParams = maySwitchExecutor(wrapMethod, stream, headers, context, tag); future.set(callParams); } catch (Throwable t) { @@ -662,8 +657,14 @@ private Context.CancellableContext createContext( } /** Never returns {@code null}. */ - private ServerMethodDefinition wrapMethod( - ServerMethodDefinition methodDef) { + private ServerMethodDefinition wrapMethod(ServerStream stream, + ServerMethodDefinition methodDef, StatsTraceContext statsTraceCtx) { + // TODO(ejona86): should we update fullMethodName to have the canonical path of the method? + statsTraceCtx.serverCallStarted( + new ServerCallInfoImpl<>( + methodDef.getMethodDescriptor(), // notify with original method descriptor + stream.getAttributes(), + stream.getAuthority())); ServerCallHandler handler = methodDef.getServerCallHandler(); for (ServerInterceptor interceptor : interceptors) { handler = InternalServerInterceptors.interceptCallHandlerCreate(interceptor, handler);