From 396ad79127005e2788c28d51f0b7292ce1090853 Mon Sep 17 00:00:00 2001 From: "Tak Lon (Stephen) Wu" Date: Wed, 29 Apr 2026 13:08:28 -0700 Subject: [PATCH] RATIS-2397. Add trace support for Log Appender --- .../apache/ratis/trace/RatisAttributes.java | 2 + .../org/apache/ratis/trace/TraceServer.java | 35 +++++ .../org/apache/ratis/trace/TraceUtils.java | 2 +- .../ratis/server/impl/LeaderStateImpl.java | 38 +++++- .../ratis/server/impl/RaftServerImpl.java | 4 +- .../ratis/server/impl/ServerProtoUtils.java | 6 +- .../impl/RaftServerImplTracingTests.java | 126 ++++++++++++++++++ 7 files changed, 208 insertions(+), 5 deletions(-) diff --git a/ratis-common/src/main/java/org/apache/ratis/trace/RatisAttributes.java b/ratis-common/src/main/java/org/apache/ratis/trace/RatisAttributes.java index 3c3be83e79..ea0c3a3428 100644 --- a/ratis-common/src/main/java/org/apache/ratis/trace/RatisAttributes.java +++ b/ratis-common/src/main/java/org/apache/ratis/trace/RatisAttributes.java @@ -33,6 +33,8 @@ public final class RatisAttributes { public static final AttributeKey OPERATION_NAME = AttributeKey.stringKey("raft.operation.name"); public static final AttributeKey OPERATION_TYPE = AttributeKey.stringKey("raft.operation.type"); + /** Number of log entries in a single {@code AppendEntries} RPC (0 for heartbeat). */ + public static final AttributeKey APPEND_ENTRIES_COUNT = AttributeKey.longKey("raft.append.entries.count"); private RatisAttributes() { } diff --git a/ratis-common/src/main/java/org/apache/ratis/trace/TraceServer.java b/ratis-common/src/main/java/org/apache/ratis/trace/TraceServer.java index 9670f0d763..f0737d9379 100644 --- a/ratis-common/src/main/java/org/apache/ratis/trace/TraceServer.java +++ b/ratis-common/src/main/java/org/apache/ratis/trace/TraceServer.java @@ -20,9 +20,14 @@ import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.SpanKind; import io.opentelemetry.context.Context; +import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto; +import org.apache.ratis.proto.RaftProtos.RaftRpcRequestProto; +import org.apache.ratis.proto.RaftProtos.SpanContextProto; import org.apache.ratis.protocol.RaftClientRequest; +import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.util.function.CheckedSupplier; +import java.io.IOException; import java.util.concurrent.CompletableFuture; /** Server-side OpenTelemetry helpers. */ @@ -56,4 +61,34 @@ private static Span createServerSpanFromClientRequest(RaftClientRequest request, span.setAttribute(RatisAttributes.MEMBER_ID, memberId); return span; } + + /** + * Traces follower handling of {@link AppendEntriesRequestProto} when the leader attached trace + * context (client-originated) for replication. + */ + public static CompletableFuture traceAppendEntriesAsync( + CheckedSupplier, IOException> action, + AppendEntriesRequestProto request, String memberId) throws IOException { + if (!TraceUtils.isEnabled()) { + return action.get(); + } + final RaftRpcRequestProto rpc = request.getServerRequest(); + final SpanContextProto spanContext = rpc.getSpanContext(); + // If the leader sent no parent span context, still trace as a root span + // rather than skipping tracing entirely. + final Context remoteContext = (spanContext == null || spanContext.getContextMap().isEmpty()) + ? Context.root() + : TraceUtils.extractContextFromProto(spanContext); + return TraceUtils.traceAsyncMethod(action, () -> { + final Span span = TraceUtils.getGlobalTracer() + .spanBuilder("raft.server.appendEntriesAsync") + .setParent(remoteContext) + .setSpanKind(SpanKind.INTERNAL) + .startSpan(); + span.setAttribute(RatisAttributes.MEMBER_ID, memberId); + span.setAttribute(RatisAttributes.PEER_ID, String.valueOf(RaftPeerId.valueOf(rpc.getRequestorId()))); + span.setAttribute(RatisAttributes.APPEND_ENTRIES_COUNT, (long) request.getEntriesCount()); + return span; + }); + } } diff --git a/ratis-common/src/main/java/org/apache/ratis/trace/TraceUtils.java b/ratis-common/src/main/java/org/apache/ratis/trace/TraceUtils.java index f350ca8884..3dc3e228f1 100644 --- a/ratis-common/src/main/java/org/apache/ratis/trace/TraceUtils.java +++ b/ratis-common/src/main/java/org/apache/ratis/trace/TraceUtils.java @@ -81,7 +81,7 @@ public static void setTracerWhenEnabled(boolean enabled) { } } - static boolean isEnabled() { + public static boolean isEnabled() { return TRACER.get() != null; } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java index 1c986ca638..3ba318b8e9 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java @@ -26,6 +26,7 @@ import org.apache.ratis.proto.RaftProtos.RaftPeerRole; import org.apache.ratis.proto.RaftProtos.ReplicationLevel; import org.apache.ratis.proto.RaftProtos.RoleInfoProto; +import org.apache.ratis.proto.RaftProtos.SpanContextProto; import org.apache.ratis.protocol.Message; import org.apache.ratis.protocol.RaftClientReply; import org.apache.ratis.protocol.RaftClientRequest; @@ -52,6 +53,7 @@ import org.apache.ratis.server.raftlog.RaftLog; import org.apache.ratis.server.util.ServerStringUtils; import org.apache.ratis.statemachine.TransactionContext; +import org.apache.ratis.trace.TraceUtils; import org.apache.ratis.util.CodeInjectionForTesting; import org.apache.ratis.util.CollectionUtils; import org.apache.ratis.util.Daemon; @@ -354,6 +356,13 @@ boolean isApplied() { private final long followerMaxGapThreshold; private final PendingStepDown pendingStepDown; + /** + * Client-originated trace context keyed by log index; attached to {@link AppendEntriesRequestProto} + * so follower {@code appendEntries} spans join the same trace as the client write. + */ + private final ConcurrentHashMap replicationTraceByLogIndex = + new ConcurrentHashMap<>(); + private final ReadIndexHeartbeats readIndexHeartbeats; private final RaftServerConfigKeys.Read.ReadIndex.Type readIndexType; private final Supplier readIndexSupplier; @@ -459,6 +468,7 @@ CompletableFuture stop() { LOG.info("{} is already stopped", this); return CompletableFuture.completedFuture(null); } + replicationTraceByLogIndex.clear(); // do not interrupt event processor since it may be in the middle of logSync final CompletableFuture f = senders.stopAll(); final NotLeaderException nle = server.generateNotLeaderException(); @@ -558,7 +568,28 @@ PendingRequest addPendingRequest(PendingRequests.Permit permit, RaftClientReques LOG.debug("{}: addPendingRequest at {}, entry={}", this, request, LogProtoUtils.toLogEntryString(entry.getLogEntry())); } - return pendingRequests.add(permit, request, entry); + final PendingRequest pending = pendingRequests.add(permit, request, entry); + if (pending != null && TraceUtils.isEnabled()) { + final SpanContextProto spanContext = request.getSpanContext(); + if (spanContext != null && !spanContext.getContextMap().isEmpty()) { + replicationTraceByLogIndex.put(pending.getTermIndex().getIndex(), spanContext); + } + } + return pending; + } + + private static SpanContextProto tracingContextForReplication(List entries, + ConcurrentHashMap traceByIndex) { + if (entries == null || entries.isEmpty()) { + return null; + } + for (LogEntryProto e : entries) { + final SpanContextProto sc = traceByIndex.get(e.getIndex()); + if (sc != null && !sc.getContextMap().isEmpty()) { + return sc; + } + } + return null; } CompletableFuture streamAsync(RaftClientRequest request) { @@ -645,9 +676,10 @@ public AppendEntriesRequestProto newAppendEntriesRequestProto(FollowerInfo follo List entries, TermIndex previous, long callId) { final boolean initializing = !isCaughtUp(follower); final RaftPeerId targetId = follower.getId(); + final SpanContextProto tracing = tracingContextForReplication(entries, replicationTraceByLogIndex); return ServerProtoUtils.toAppendEntriesRequestProto(server.getMemberId(), targetId, getCurrentTerm(), entries, ServerImplUtils.effectiveCommitIndex(readIndexSupplier.get(), previous, entries.size()), - initializing, previous, server.getCommitInfos(), callId); + initializing, previous, server.getCommitInfos(), callId, tracing); } /** @@ -733,6 +765,7 @@ void submitStepDownEvent(long term, StepDownReason reason) { } private void stepDown(long term, StepDownReason reason) { + replicationTraceByLogIndex.clear(); try { lease.getAndSetEnabled(false); server.changeToFollowerAndPersistMetadata(term, false, reason) @@ -1242,6 +1275,7 @@ private boolean checkLeaderLease() { } void replyPendingRequest(TermIndex termIndex, RaftClientReply reply, RetryCacheImpl.CacheEntry cacheEntry) { + replicationTraceByLogIndex.remove(termIndex.getIndex()); final PendingRequest pending = pendingRequests.remove(termIndex); final LongSupplier replyMethod = () -> { diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index c0e93338a6..1712c58a72 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -1563,7 +1563,9 @@ public CompletableFuture appendEntriesAsync(AppendEntri assertGroup(getMemberId(), leaderId, leaderGroupId); assertEntries(r, previous, state); - return appendEntriesAsync(leaderId, request.getCallId(), previous, r); + return TraceServer.traceAppendEntriesAsync( + () -> appendEntriesAsync(leaderId, request.getCallId(), previous, r), + r, getMemberId().toString()); } catch(Exception t) { LOG.error("{}: Failed appendEntries* {}", getMemberId(), toAppendEntriesRequestString(r, stateMachine::toStateMachineLogEntryString), t); diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java index 19d4ce6a75..bbe438c0c0 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java @@ -157,9 +157,13 @@ static AppendEntriesReplyProto toAppendEntriesReplyProto( static AppendEntriesRequestProto toAppendEntriesRequestProto( RaftGroupMemberId requestorId, RaftPeerId replyId, long leaderTerm, List entries, long leaderCommit, boolean initializing, - TermIndex previous, Collection commitInfos, long callId) { + TermIndex previous, Collection commitInfos, long callId, + SpanContextProto tracingContext) { final RaftRpcRequestProto.Builder rpcRequest = ClientProtoUtils.toRaftRpcRequestProtoBuilder(requestorId, replyId) .setCallId(callId); + if (tracingContext != null && !tracingContext.getContextMap().isEmpty()) { + rpcRequest.setSpanContext(tracingContext); + } final AppendEntriesRequestProto.Builder b = AppendEntriesRequestProto .newBuilder() .setServerRequest(rpcRequest) diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerImplTracingTests.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerImplTracingTests.java index 300cf51cde..dc03a1a3d3 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerImplTracingTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerImplTracingTests.java @@ -19,10 +19,16 @@ import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.api.trace.StatusCode; import io.opentelemetry.context.Context; import io.opentelemetry.sdk.testing.junit5.OpenTelemetryExtension; import io.opentelemetry.sdk.trace.data.SpanData; import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto; +import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto; +import org.apache.ratis.proto.RaftProtos.LogEntryProto; +import org.apache.ratis.proto.RaftProtos.RaftRpcRequestProto; +import org.apache.ratis.proto.RaftProtos.SpanContextProto; import org.apache.ratis.protocol.ClientId; import org.apache.ratis.protocol.RaftClientRequest; import org.apache.ratis.protocol.RaftGroup; @@ -32,13 +38,20 @@ import org.apache.ratis.server.storage.RaftStorage; import org.apache.ratis.statemachine.StateMachine; import org.apache.ratis.statemachine.impl.SimpleStateMachine4Testing; +import org.apache.ratis.trace.RatisAttributes; import org.apache.ratis.trace.TraceConfigKeys; +import org.apache.ratis.trace.TraceServer; import org.apache.ratis.trace.TraceUtils; +import org.apache.ratis.util.JavaUtils; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.function.Supplier; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -84,6 +97,114 @@ public void testSubmitClientRequestAsyncTracingDisabled() throws Exception { ); } + @Test + public void testTraceAppendEntriesAsyncCreatesInternalSpan() throws Exception { + long callId = randomCallId(); + int entriesCount = 3; + final List spans = traceAppendEntriesAndCollectNewSpans(true, newAppendEntriesRequest( + RaftPeerId.valueOf("leader1"), callId, entriesCount, injectedSpanContext())); + final SpanData appendSpan = spans.stream() + .filter(s -> s.getKind() == SpanKind.INTERNAL && s.getName().equals("raft.server.appendEntriesAsync")) + .findFirst() + .orElseThrow(() -> new IllegalStateException("Expected INTERNAL raft.server.appendEntriesAsync span")); + assertEquals("n1", appendSpan.getAttributes().get(RatisAttributes.MEMBER_ID)); + assertEquals("leader1", appendSpan.getAttributes().get(RatisAttributes.PEER_ID)); + assertEquals(entriesCount, appendSpan.getAttributes().get(RatisAttributes.APPEND_ENTRIES_COUNT)); + } + + @Test + public void testTraceAppendEntriesAsyncTracingDisabled() throws Exception { + int entriesCount = 1; + final List spans = traceAppendEntriesAndCollectNewSpans(false, newAppendEntriesRequest( + RaftPeerId.valueOf("leader1"), randomCallId(), entriesCount, injectedSpanContext())); + assertTrue( + spans.stream().noneMatch(s -> s.getName().equals("raft.server.appendEntriesAsync")), + "Expected no appendEntries span when tracing disabled, got: " + spans); + } + + @Test + public void testTraceAppendEntriesAsyncSkipsWhenSpanContextEmpty() throws Exception { + int entriesCount = 1; + final AppendEntriesRequestProto request = newAppendEntriesRequest( + RaftPeerId.valueOf("leader1"), randomCallId(), entriesCount, SpanContextProto.getDefaultInstance()); + final List spans = traceAppendEntriesAndCollectNewSpans(true, request); + assertEquals(1, + spans.stream().filter(s -> s.getName().equals("raft.server.appendEntriesAsync")).count()); + } + + @Test + public void testTraceAppendEntriesAsyncSpanRecordsErrorOnFailure() throws Exception { + int entriesCount = 0; + final List spans = traceAppendEntriesAndCollectNewSpans(true, newAppendEntriesRequest( + RaftPeerId.valueOf("leader1"), randomCallId(), entriesCount, injectedSpanContext()), + () -> JavaUtils.completeExceptionally(new IOException("Planned record error"))); + assertEquals(1, + spans.stream().filter(s -> s.getName().equals("raft.server.appendEntriesAsync")).count()); + final SpanData appendSpan = spans.stream() + .filter(s -> s.getKind() == SpanKind.INTERNAL && s.getName().equals("raft.server.appendEntriesAsync")) + .findFirst() + .orElseThrow(() -> new IllegalStateException("Expected INTERNAL raft.server.appendEntriesAsync span")); + assertEquals(StatusCode.ERROR, appendSpan.getStatus().getStatusCode()); + } + + private static List traceAppendEntriesAndCollectNewSpans( + boolean enableTracing, AppendEntriesRequestProto request) throws Exception { + return traceAppendEntriesAndCollectNewSpans(enableTracing, request, + () -> CompletableFuture.completedFuture(AppendEntriesReplyProto.getDefaultInstance())); + } + + private static List traceAppendEntriesAndCollectNewSpans( + boolean enableTracing, + AppendEntriesRequestProto request, + Supplier> action) + throws Exception { + final int before = openTelemetryExtension.getSpans().size(); + try { + TraceUtils.setTracerWhenEnabled(enableTracing); + final CompletableFuture traced = + TraceServer.traceAppendEntriesAsync(action::get, request, "n1"); + try { + traced.join(); + } catch (CompletionException e) { + // allowed for failure-path test + } + } finally { + TraceUtils.setTracerWhenEnabled(false); + } + final List after = openTelemetryExtension.getSpans(); + return new ArrayList<>(after.subList(before, after.size())); + } + + private static SpanContextProto injectedSpanContext() { + final Span remoteParent = openTelemetryExtension.getOpenTelemetry().getTracer("test") + .spanBuilder("remote-parent") + .setSpanKind(SpanKind.CLIENT) + .startSpan(); + try { + return TraceUtils.injectContextToProto(Context.current().with(remoteParent)); + } finally { + remoteParent.end(); + } + } + + private static AppendEntriesRequestProto newAppendEntriesRequest( + RaftPeerId leaderId, long callId, int entriesCount, SpanContextProto spanContext) { + final RaftRpcRequestProto.Builder rpc = RaftRpcRequestProto.newBuilder() + .setRequestorId(leaderId.toByteString()) + .setCallId(callId); + if (spanContext != null && !spanContext.getContextMap().isEmpty()) { + rpc.setSpanContext(spanContext); + } + final AppendEntriesRequestProto.Builder b = AppendEntriesRequestProto.newBuilder() + .setServerRequest(rpc.build()) + .setLeaderTerm(1L) + .setLeaderCommit(0L); + for (int i = 0; i < entriesCount; i++) { + b.addEntries(LogEntryProto.newBuilder().setTerm(1L).setIndex(i + 1L).build()); + } + return b.build(); + } + private static List submitClientRequestAndCollectNewSpans(boolean enableTracing) throws Exception { final int before = openTelemetryExtension.getSpans().size(); @@ -137,5 +258,10 @@ private static RaftClientRequest newRaftClientRequest(RaftClientRequest.Type typ clientSpan.end(); } } + + private long randomCallId() { + return (long) (Math.random() * 100); + } + }