Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ public final class RatisAttributes {
public static final AttributeKey<String> OPERATION_NAME = AttributeKey.stringKey("raft.operation.name");
public static final AttributeKey<String> OPERATION_TYPE = AttributeKey.stringKey("raft.operation.type");

/** Number of log entries in a single {@code AppendEntries} RPC (0 for heartbeat). */
public static final AttributeKey<Long> APPEND_ENTRIES_COUNT = AttributeKey.longKey("raft.append.entries.count");

private RatisAttributes() {
}
Expand Down
35 changes: 35 additions & 0 deletions ratis-common/src/main/java/org/apache/ratis/trace/TraceServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,14 @@
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.context.Context;
import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
import org.apache.ratis.proto.RaftProtos.RaftRpcRequestProto;
import org.apache.ratis.proto.RaftProtos.SpanContextProto;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.util.function.CheckedSupplier;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;

/** Server-side OpenTelemetry helpers. */
Expand Down Expand Up @@ -56,4 +61,34 @@ private static Span createServerSpanFromClientRequest(RaftClientRequest request,
span.setAttribute(RatisAttributes.MEMBER_ID, memberId);
return span;
}

/**
* Traces follower handling of {@link AppendEntriesRequestProto} when the leader attached trace
* context (client-originated) for replication.
*/
public static <T> CompletableFuture<T> traceAppendEntriesAsync(
CheckedSupplier<CompletableFuture<T>, IOException> action,
AppendEntriesRequestProto request, String memberId) throws IOException {
if (!TraceUtils.isEnabled()) {
return action.get();
}
final RaftRpcRequestProto rpc = request.getServerRequest();
final SpanContextProto spanContext = rpc.getSpanContext();
// If the leader sent no parent span context, still trace as a root span
// rather than skipping tracing entirely.
final Context remoteContext = (spanContext == null || spanContext.getContextMap().isEmpty())
? Context.root()
: TraceUtils.extractContextFromProto(spanContext);
return TraceUtils.traceAsyncMethod(action, () -> {
final Span span = TraceUtils.getGlobalTracer()
.spanBuilder("raft.server.appendEntriesAsync")
.setParent(remoteContext)
.setSpanKind(SpanKind.INTERNAL)
.startSpan();
span.setAttribute(RatisAttributes.MEMBER_ID, memberId);
span.setAttribute(RatisAttributes.PEER_ID, String.valueOf(RaftPeerId.valueOf(rpc.getRequestorId())));
span.setAttribute(RatisAttributes.APPEND_ENTRIES_COUNT, (long) request.getEntriesCount());
return span;
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public static void setTracerWhenEnabled(boolean enabled) {
}
}

static boolean isEnabled() {
public static boolean isEnabled() {
return TRACER.get() != null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
import org.apache.ratis.proto.RaftProtos.RoleInfoProto;
import org.apache.ratis.proto.RaftProtos.SpanContextProto;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftClientRequest;
Expand All @@ -52,6 +53,7 @@
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.server.util.ServerStringUtils;
import org.apache.ratis.statemachine.TransactionContext;
import org.apache.ratis.trace.TraceUtils;
import org.apache.ratis.util.CodeInjectionForTesting;
import org.apache.ratis.util.CollectionUtils;
import org.apache.ratis.util.Daemon;
Expand Down Expand Up @@ -354,6 +356,13 @@ boolean isApplied() {
private final long followerMaxGapThreshold;
private final PendingStepDown pendingStepDown;

/**
* Client-originated trace context keyed by log index; attached to {@link AppendEntriesRequestProto}
* so follower {@code appendEntries} spans join the same trace as the client write.
*/
private final ConcurrentHashMap<Long, SpanContextProto> replicationTraceByLogIndex =
new ConcurrentHashMap<>();

private final ReadIndexHeartbeats readIndexHeartbeats;
private final RaftServerConfigKeys.Read.ReadIndex.Type readIndexType;
private final Supplier<Long> readIndexSupplier;
Expand Down Expand Up @@ -459,6 +468,7 @@ CompletableFuture<Void> stop() {
LOG.info("{} is already stopped", this);
return CompletableFuture.completedFuture(null);
}
replicationTraceByLogIndex.clear();
// do not interrupt event processor since it may be in the middle of logSync
final CompletableFuture<Void> f = senders.stopAll();
final NotLeaderException nle = server.generateNotLeaderException();
Expand Down Expand Up @@ -558,7 +568,28 @@ PendingRequest addPendingRequest(PendingRequests.Permit permit, RaftClientReques
LOG.debug("{}: addPendingRequest at {}, entry={}", this, request,
LogProtoUtils.toLogEntryString(entry.getLogEntry()));
}
return pendingRequests.add(permit, request, entry);
final PendingRequest pending = pendingRequests.add(permit, request, entry);
if (pending != null && TraceUtils.isEnabled()) {
final SpanContextProto spanContext = request.getSpanContext();
if (spanContext != null && !spanContext.getContextMap().isEmpty()) {
replicationTraceByLogIndex.put(pending.getTermIndex().getIndex(), spanContext);
}
}
return pending;
}

private static SpanContextProto tracingContextForReplication(List<LogEntryProto> entries,
ConcurrentHashMap<Long, SpanContextProto> traceByIndex) {
if (entries == null || entries.isEmpty()) {
return null;
}
for (LogEntryProto e : entries) {
final SpanContextProto sc = traceByIndex.get(e.getIndex());
if (sc != null && !sc.getContextMap().isEmpty()) {
return sc;
}
}
return null;
}

CompletableFuture<RaftClientReply> streamAsync(RaftClientRequest request) {
Expand Down Expand Up @@ -645,9 +676,10 @@ public AppendEntriesRequestProto newAppendEntriesRequestProto(FollowerInfo follo
List<LogEntryProto> entries, TermIndex previous, long callId) {
final boolean initializing = !isCaughtUp(follower);
final RaftPeerId targetId = follower.getId();
final SpanContextProto tracing = tracingContextForReplication(entries, replicationTraceByLogIndex);
return ServerProtoUtils.toAppendEntriesRequestProto(server.getMemberId(), targetId, getCurrentTerm(), entries,
ServerImplUtils.effectiveCommitIndex(readIndexSupplier.get(), previous, entries.size()),
initializing, previous, server.getCommitInfos(), callId);
initializing, previous, server.getCommitInfos(), callId, tracing);
}

/**
Expand Down Expand Up @@ -733,6 +765,7 @@ void submitStepDownEvent(long term, StepDownReason reason) {
}

private void stepDown(long term, StepDownReason reason) {
replicationTraceByLogIndex.clear();
try {
lease.getAndSetEnabled(false);
server.changeToFollowerAndPersistMetadata(term, false, reason)
Expand Down Expand Up @@ -1242,6 +1275,7 @@ private boolean checkLeaderLease() {
}

void replyPendingRequest(TermIndex termIndex, RaftClientReply reply, RetryCacheImpl.CacheEntry cacheEntry) {
replicationTraceByLogIndex.remove(termIndex.getIndex());
final PendingRequest pending = pendingRequests.remove(termIndex);

final LongSupplier replyMethod = () -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1563,7 +1563,9 @@ public CompletableFuture<AppendEntriesReplyProto> appendEntriesAsync(AppendEntri
assertGroup(getMemberId(), leaderId, leaderGroupId);
assertEntries(r, previous, state);

return appendEntriesAsync(leaderId, request.getCallId(), previous, r);
return TraceServer.traceAppendEntriesAsync(
() -> appendEntriesAsync(leaderId, request.getCallId(), previous, r),
r, getMemberId().toString());
} catch(Exception t) {
LOG.error("{}: Failed appendEntries* {}", getMemberId(),
toAppendEntriesRequestString(r, stateMachine::toStateMachineLogEntryString), t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,13 @@ static AppendEntriesReplyProto toAppendEntriesReplyProto(
static AppendEntriesRequestProto toAppendEntriesRequestProto(
RaftGroupMemberId requestorId, RaftPeerId replyId, long leaderTerm,
List<LogEntryProto> entries, long leaderCommit, boolean initializing,
TermIndex previous, Collection<CommitInfoProto> commitInfos, long callId) {
TermIndex previous, Collection<CommitInfoProto> commitInfos, long callId,
SpanContextProto tracingContext) {
final RaftRpcRequestProto.Builder rpcRequest = ClientProtoUtils.toRaftRpcRequestProtoBuilder(requestorId, replyId)
.setCallId(callId);
if (tracingContext != null && !tracingContext.getContextMap().isEmpty()) {
rpcRequest.setSpanContext(tracingContext);
}
final AppendEntriesRequestProto.Builder b = AppendEntriesRequestProto
.newBuilder()
.setServerRequest(rpcRequest)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,16 @@

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.testing.junit5.OpenTelemetryExtension;
import io.opentelemetry.sdk.trace.data.SpanData;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto;
import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.proto.RaftProtos.RaftRpcRequestProto;
import org.apache.ratis.proto.RaftProtos.SpanContextProto;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftGroup;
Expand All @@ -32,13 +38,20 @@
import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.statemachine.impl.SimpleStateMachine4Testing;
import org.apache.ratis.trace.RatisAttributes;
import org.apache.ratis.trace.TraceConfigKeys;
import org.apache.ratis.trace.TraceServer;
import org.apache.ratis.trace.TraceUtils;
import org.apache.ratis.util.JavaUtils;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.function.Supplier;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -84,6 +97,114 @@ public void testSubmitClientRequestAsyncTracingDisabled() throws Exception {
);
}

@Test
public void testTraceAppendEntriesAsyncCreatesInternalSpan() throws Exception {
long callId = randomCallId();
int entriesCount = 3;
final List<SpanData> spans = traceAppendEntriesAndCollectNewSpans(true, newAppendEntriesRequest(
RaftPeerId.valueOf("leader1"), callId, entriesCount, injectedSpanContext()));
final SpanData appendSpan = spans.stream()
.filter(s -> s.getKind() == SpanKind.INTERNAL && s.getName().equals("raft.server.appendEntriesAsync"))
.findFirst()
.orElseThrow(() -> new IllegalStateException("Expected INTERNAL raft.server.appendEntriesAsync span"));
assertEquals("n1", appendSpan.getAttributes().get(RatisAttributes.MEMBER_ID));
assertEquals("leader1", appendSpan.getAttributes().get(RatisAttributes.PEER_ID));
assertEquals(entriesCount, appendSpan.getAttributes().get(RatisAttributes.APPEND_ENTRIES_COUNT));
}

@Test
public void testTraceAppendEntriesAsyncTracingDisabled() throws Exception {
int entriesCount = 1;
final List<SpanData> spans = traceAppendEntriesAndCollectNewSpans(false, newAppendEntriesRequest(
RaftPeerId.valueOf("leader1"), randomCallId(), entriesCount, injectedSpanContext()));
assertTrue(
spans.stream().noneMatch(s -> s.getName().equals("raft.server.appendEntriesAsync")),
"Expected no appendEntries span when tracing disabled, got: " + spans);
}

@Test
public void testTraceAppendEntriesAsyncSkipsWhenSpanContextEmpty() throws Exception {
int entriesCount = 1;
final AppendEntriesRequestProto request = newAppendEntriesRequest(
RaftPeerId.valueOf("leader1"), randomCallId(), entriesCount, SpanContextProto.getDefaultInstance());
final List<SpanData> spans = traceAppendEntriesAndCollectNewSpans(true, request);
assertEquals(1,
spans.stream().filter(s -> s.getName().equals("raft.server.appendEntriesAsync")).count());
}

@Test
public void testTraceAppendEntriesAsyncSpanRecordsErrorOnFailure() throws Exception {
int entriesCount = 0;
final List<SpanData> spans = traceAppendEntriesAndCollectNewSpans(true, newAppendEntriesRequest(
RaftPeerId.valueOf("leader1"), randomCallId(), entriesCount, injectedSpanContext()),
() -> JavaUtils.completeExceptionally(new IOException("Planned record error")));
assertEquals(1,
spans.stream().filter(s -> s.getName().equals("raft.server.appendEntriesAsync")).count());
final SpanData appendSpan = spans.stream()
.filter(s -> s.getKind() == SpanKind.INTERNAL && s.getName().equals("raft.server.appendEntriesAsync"))
.findFirst()
.orElseThrow(() -> new IllegalStateException("Expected INTERNAL raft.server.appendEntriesAsync span"));
assertEquals(StatusCode.ERROR, appendSpan.getStatus().getStatusCode());
}

private static List<SpanData> traceAppendEntriesAndCollectNewSpans(
boolean enableTracing, AppendEntriesRequestProto request) throws Exception {
return traceAppendEntriesAndCollectNewSpans(enableTracing, request,
() -> CompletableFuture.completedFuture(AppendEntriesReplyProto.getDefaultInstance()));
}

private static List<SpanData> traceAppendEntriesAndCollectNewSpans(
boolean enableTracing,
AppendEntriesRequestProto request,
Supplier<CompletableFuture<AppendEntriesReplyProto>> action)
throws Exception {
final int before = openTelemetryExtension.getSpans().size();
try {
TraceUtils.setTracerWhenEnabled(enableTracing);
final CompletableFuture<AppendEntriesReplyProto> traced =
TraceServer.traceAppendEntriesAsync(action::get, request, "n1");
try {
traced.join();
} catch (CompletionException e) {
// allowed for failure-path test
}
} finally {
TraceUtils.setTracerWhenEnabled(false);
}
final List<SpanData> after = openTelemetryExtension.getSpans();
return new ArrayList<>(after.subList(before, after.size()));
}

private static SpanContextProto injectedSpanContext() {
final Span remoteParent = openTelemetryExtension.getOpenTelemetry().getTracer("test")
.spanBuilder("remote-parent")
.setSpanKind(SpanKind.CLIENT)
.startSpan();
try {
return TraceUtils.injectContextToProto(Context.current().with(remoteParent));
} finally {
remoteParent.end();
}
}

private static AppendEntriesRequestProto newAppendEntriesRequest(
RaftPeerId leaderId, long callId, int entriesCount, SpanContextProto spanContext) {
final RaftRpcRequestProto.Builder rpc = RaftRpcRequestProto.newBuilder()
.setRequestorId(leaderId.toByteString())
.setCallId(callId);
if (spanContext != null && !spanContext.getContextMap().isEmpty()) {
rpc.setSpanContext(spanContext);
}
final AppendEntriesRequestProto.Builder b = AppendEntriesRequestProto.newBuilder()
.setServerRequest(rpc.build())
.setLeaderTerm(1L)
.setLeaderCommit(0L);
for (int i = 0; i < entriesCount; i++) {
b.addEntries(LogEntryProto.newBuilder().setTerm(1L).setIndex(i + 1L).build());
}
return b.build();
}

private static List<SpanData> submitClientRequestAndCollectNewSpans(boolean enableTracing)
throws Exception {
final int before = openTelemetryExtension.getSpans().size();
Expand Down Expand Up @@ -137,5 +258,10 @@ private static RaftClientRequest newRaftClientRequest(RaftClientRequest.Type typ
clientSpan.end();
}
}

private long randomCallId() {
return (long) (Math.random() * 100);
}

}

Loading