From aba97eff48dc6662a94e883e766d467cce1b2db9 Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Mon, 9 Mar 2026 11:02:05 +0800 Subject: [PATCH 01/14] Call onError in GrpcLogAppender#stop to prevent StreamObserver leak in follower --- .../ratis/grpc/server/GrpcLogAppender.java | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java index b4d78c207a..601a8fbf8f 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java @@ -33,6 +33,7 @@ import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.server.raftlog.RaftLog; import org.apache.ratis.server.util.ServerStringUtils; +import org.apache.ratis.thirdparty.io.grpc.Status; import org.apache.ratis.thirdparty.io.grpc.StatusRuntimeException; import org.apache.ratis.thirdparty.io.grpc.stub.CallStreamObserver; import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver; @@ -366,11 +367,28 @@ void onNext(AppendEntriesRequestProto proto) while (!stream.isReady() && running) { sleep(waitForReady, isHeartBeat); } + if (!running) { + return; + } stream.onNext(proto); } void stop() { running = false; + try { + appendLog.onError(new StatusRuntimeException(Status.CANCELLED + .withDescription("Stream stopped by resetClient"))); + } catch (Exception e) { + LOG.debug("Failed to close appendLog stream", e); + } + if (heartbeat != null) { + try { + heartbeat.onError(new StatusRuntimeException(Status.CANCELLED + .withDescription("Stream stopped by resetClient"))); + } catch (Exception e) { + LOG.debug("Failed to close heartbeat stream", e); + } + } } void onCompleted() { From d5c3b2ea23dd5f51cc7463eba75f27680e6b07f0 Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Mon, 9 Mar 2026 11:20:32 +0800 Subject: [PATCH 02/14] Set previousOnNext and requestFuture to null to prevent leak --- .../apache/ratis/grpc/server/GrpcServerProtocolService.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java index a13e74b89d..df90b3b672 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java @@ -172,18 +172,22 @@ public void onCompleted() { BatchLogger.print(BatchLogKey.COMPLETED_REQUEST, getName(), suffix -> LOG.info("{}: Completed {}, lastRequest: {} {}", getId(), op, getPreviousRequestString(), suffix)); + previousOnNext.set(null); requestFuture.get().thenAccept(reply -> { BatchLogger.print(BatchLogKey.COMPLETED_REPLY, getName(), suffix -> LOG.info("{}: Completed {}, lastReply: {} {}", getId(), op, ProtoUtils.shortDebugString(reply), suffix)); responseObserver.onCompleted(); }); + requestFuture.set(null); } } @Override public void onError(Throwable t) { GrpcUtil.warn(LOG, () -> getId() + ": "+ op + " onError, lastRequest: " + getPreviousRequestString(), t); if (isClosed.compareAndSet(false, true)) { + previousOnNext.set(null); + requestFuture.set(null); Status status = Status.fromThrowable(t); if (status != null && status.getCode() != Status.Code.CANCELLED) { responseObserver.onCompleted(); From b0db797fb34fe2147e39f01932e33f24061c2f9b Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Mon, 9 Mar 2026 11:22:30 +0800 Subject: [PATCH 03/14] Handle complete or cancel based on the Event --- .../ratis/grpc/server/GrpcLogAppender.java | 42 ++++++++++++------- 1 file changed, 28 insertions(+), 14 deletions(-) diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java index 601a8fbf8f..88570c41cc 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java @@ -217,6 +217,11 @@ private void resetClient(AppendEntriesRequest request, Event event) { getClient().resetConnectBackoff(); if (appendLogRequestObserver != null) { appendLogRequestObserver.stop(); + if (event == Event.COMPLETE) { + appendLogRequestObserver.onCompleted(); + } else { + appendLogRequestObserver.cancelStream("resetClient due to " + event); + } appendLogRequestObserver = null; } final int errorCount = replyState.process(event); @@ -375,25 +380,34 @@ void onNext(AppendEntriesRequestProto proto) void stop() { running = false; + } + + void onCompleted() { try { - appendLog.onError(new StatusRuntimeException(Status.CANCELLED - .withDescription("Stream stopped by resetClient"))); + appendLog.onCompleted(); } catch (Exception e) { - LOG.debug("Failed to close appendLog stream", e); - } - if (heartbeat != null) { - try { - heartbeat.onError(new StatusRuntimeException(Status.CANCELLED - .withDescription("Stream stopped by resetClient"))); - } catch (Exception e) { - LOG.debug("Failed to close heartbeat stream", e); - } + LOG.debug("Failed to complete appendLog stream", e); + } + try { + Optional.ofNullable(heartbeat).ifPresent(StreamObserver::onCompleted); + } catch (Exception e) { + LOG.debug("Failed to complete heartbeat stream", e); } } - void onCompleted() { - appendLog.onCompleted(); - Optional.ofNullable(heartbeat).ifPresent(StreamObserver::onCompleted); + void cancelStream(String reason) { + try { + appendLog.onError(new StatusRuntimeException( + Status.CANCELLED.withDescription(reason))); + } catch (Exception e) { + LOG.debug("Failed to cancel appendLog stream", e); + } + try { + Optional.ofNullable(heartbeat).ifPresent((heartbeat) -> + heartbeat.onError(new StatusRuntimeException(Status.CANCELLED.withDescription(reason)))); + } catch (Exception e) { + LOG.debug("Failed to cancel heartbeat stream", e); + } } } From e36fe7cf0b50f4793838a4cbe170be854ed8db6d Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Mon, 9 Mar 2026 11:23:42 +0800 Subject: [PATCH 04/14] Fix checkstyle --- .../java/org/apache/ratis/grpc/server/GrpcLogAppender.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java index 88570c41cc..c01868f94f 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java @@ -403,8 +403,8 @@ void cancelStream(String reason) { LOG.debug("Failed to cancel appendLog stream", e); } try { - Optional.ofNullable(heartbeat).ifPresent((heartbeat) -> - heartbeat.onError(new StatusRuntimeException(Status.CANCELLED.withDescription(reason)))); + Optional.ofNullable(heartbeat).ifPresent((hb) -> + hb.onError(new StatusRuntimeException(Status.CANCELLED.withDescription(reason)))); } catch (Exception e) { LOG.debug("Failed to cancel heartbeat stream", e); } From 2a371c986f8f2622576293a98a0d30062a139805 Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Mon, 9 Mar 2026 18:10:12 +0800 Subject: [PATCH 05/14] Push logic inside StreamObservers to prevent NPE --- .../apache/ratis/grpc/server/GrpcLogAppender.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java index c01868f94f..e8580d2a88 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java @@ -216,12 +216,7 @@ private void resetClient(AppendEntriesRequest request, Event event) { try (AutoCloseableLock writeLock = lock.writeLock(caller, LOG::trace)) { getClient().resetConnectBackoff(); if (appendLogRequestObserver != null) { - appendLogRequestObserver.stop(); - if (event == Event.COMPLETE) { - appendLogRequestObserver.onCompleted(); - } else { - appendLogRequestObserver.cancelStream("resetClient due to " + event); - } + appendLogRequestObserver.stop(event); appendLogRequestObserver = null; } final int errorCount = replyState.process(event); @@ -378,8 +373,13 @@ void onNext(AppendEntriesRequestProto proto) stream.onNext(proto); } - void stop() { + void stop(Event event) { running = false; + if (event == Event.COMPLETE) { + onCompleted(); + } else { + cancelStream("resetClient due to " + event); + } } void onCompleted() { From 53e346b42e57ffdc46ae14c21eea61653fbc83e4 Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Mon, 9 Mar 2026 18:12:09 +0800 Subject: [PATCH 06/14] Tidy up --- .../java/org/apache/ratis/grpc/server/GrpcLogAppender.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java index e8580d2a88..14ea609179 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java @@ -378,7 +378,7 @@ void stop(Event event) { if (event == Event.COMPLETE) { onCompleted(); } else { - cancelStream("resetClient due to " + event); + cancelStream("stop due to " + event); } } @@ -397,8 +397,7 @@ void onCompleted() { void cancelStream(String reason) { try { - appendLog.onError(new StatusRuntimeException( - Status.CANCELLED.withDescription(reason))); + appendLog.onError(new StatusRuntimeException(Status.CANCELLED.withDescription(reason))); } catch (Exception e) { LOG.debug("Failed to cancel appendLog stream", e); } From 653984bce29cff5de89a599daf0fc80e80845b2d Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Tue, 10 Mar 2026 10:39:20 +0800 Subject: [PATCH 07/14] Call onCompleted in the finally block --- .../ratis/grpc/server/GrpcLogAppender.java | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java index 14ea609179..87c982629a 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java @@ -267,16 +267,18 @@ private boolean installSnapshot() { @Override public void run() throws IOException { - for(; isRunning(); mayWait()) { - //HB period is expired OR we have messages OR follower is behind with commit index - if (shouldSendAppendEntries() || isFollowerCommitBehindLastCommitIndex()) { - final boolean installingSnapshot = installSnapshot(); - appendLog(installingSnapshot || haveTooManyPendingRequests()); + try { + for (; isRunning(); mayWait()) { + //HB period is expired OR we have messages OR follower is behind with commit index + if (shouldSendAppendEntries() || isFollowerCommitBehindLastCommitIndex()) { + final boolean installingSnapshot = installSnapshot(); + appendLog(installingSnapshot || haveTooManyPendingRequests()); + } + getLeaderState().checkHealth(getFollower()); } - getLeaderState().checkHealth(getFollower()); + } finally { + Optional.ofNullable(appendLogRequestObserver).ifPresent(StreamObservers::onCompleted); } - - Optional.ofNullable(appendLogRequestObserver).ifPresent(StreamObservers::onCompleted); } public long getWaitTimeMs() { From 1275af63c4a83d0c8c171f769d2d488fbd1830c2 Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Wed, 11 Mar 2026 17:44:03 +0800 Subject: [PATCH 08/14] Lock before completing --- .../java/org/apache/ratis/grpc/server/GrpcLogAppender.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java index 87c982629a..0cf07ee04b 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java @@ -277,7 +277,12 @@ public void run() throws IOException { getLeaderState().checkHealth(getFollower()); } } finally { - Optional.ofNullable(appendLogRequestObserver).ifPresent(StreamObservers::onCompleted); + try (AutoCloseableLock writeLock = lock.writeLock(caller, LOG::trace)) { + if (appendLogRequestObserver != null) { + appendLogRequestObserver.onCompleted(); + appendLogRequestObserver = null; + } + } } } From 1678938b53730828262cda1644a2655cd195104d Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Wed, 11 Mar 2026 18:26:16 +0800 Subject: [PATCH 09/14] Null previousOnSet in handleError --- .../server/GrpcServerProtocolService.java | 1 + .../ratis/grpc/TestLogAppenderWithGrpc.java | 109 ++++++++++++++++++ 2 files changed, 110 insertions(+) diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java index df90b3b672..88c1b6d03e 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java @@ -114,6 +114,7 @@ StatusRuntimeException wrapException(Throwable e, REQUEST request) { private void handleError(Throwable e, REQUEST request) { GrpcUtil.warn(LOG, () -> getId() + ": Failed " + op + " request " + requestToString(request), e); if (isClosed.compareAndSet(false, true)) { + previousOnNext.set(null); responseObserver.onError(wrapException(e, request)); } } diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java index 107cd7ba9a..92a85e230c 100644 --- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java +++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java @@ -25,9 +25,11 @@ import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.grpc.metrics.GrpcServerMetrics; import org.apache.ratis.protocol.RaftClientReply; +import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.leader.FollowerInfo; +import org.apache.ratis.server.leader.LogAppender; import org.apache.ratis.server.impl.RaftServerTestUtil; import org.apache.ratis.statemachine.impl.SimpleStateMachine4Testing; import org.apache.ratis.statemachine.StateMachine; @@ -39,10 +41,13 @@ import org.slf4j.event.Level; import java.io.IOException; +import java.lang.ref.WeakReference; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; import static org.apache.ratis.RaftTestUtil.waitForLeader; @@ -148,4 +153,108 @@ private void runTestRestartLogAppender(MiniRaftClusterWithGrpc cluster) throws E Assertions.assertTrue(newleaderMetrics.getRegistry().counter(counter).getCount() >= 1L); } } + + /** + * Verify that old LogAppender instances are properly cleaned up (gRPC streams terminated) + * after restartLogAppenders. Without the fix, gRPC holds references to unterminated + * stream response handlers, preventing old LogAppender instances from being collected. + */ + @ParameterizedTest + @MethodSource("data") + public void testLogAppenderStreamCleanupOnRestart(Boolean separateHeartbeat) throws Exception { + GrpcConfigKeys.Server.setHeartbeatChannel(getProperties(), separateHeartbeat); + runWithNewCluster(3, this::runTestLogAppenderStreamCleanupOnRestart); + } + + private void runTestLogAppenderStreamCleanupOnRestart(MiniRaftClusterWithGrpc cluster) throws Exception { + final RaftServer.Division leader = waitForLeader(cluster); + final RaftPeerId leaderId = leader.getId(); + + try (RaftClient client = cluster.createClient(leaderId)) { + for (int i = 0; i < 10; i++) { + final RaftClientReply reply = client.io().send(new RaftTestUtil.SimpleMessage("m" + i)); + Assertions.assertTrue(reply.isSuccess()); + } + } + + final List> weakRefs = + RaftServerTestUtil.getLogAppenders(leader) + .map(WeakReference::new) + .collect(Collectors.toList()); + Assertions.assertFalse(weakRefs.isEmpty()); + + RaftServerTestUtil.restartLogAppenders(leader); + + try (RaftClient client = cluster.createClient(leaderId)) { + for (int i = 0; i < 10; i++) { + client.io().send(new RaftTestUtil.SimpleMessage("after-" + i)); + } + } + + // Old appenders should be GC-able once their gRPC streams are terminated. + // If streams leaked, gRPC retains references to the response handlers + // (inner classes of GrpcLogAppender), preventing collection. + JavaUtils.attempt(() -> { + System.gc(); + for (WeakReference ref : weakRefs) { + Assertions.assertNull(ref.get(), + "Old LogAppender should be garbage collected after stream cleanup"); + } + }, 20, ONE_SECOND, "old-appender-gc", LOG); + } + + /** + * Reproduce the StreamObserver leak by repeatedly killing and restarting a follower. + * Each kill triggers onError on the leader's response handler, calling + * resetClient -> stop(ERROR) -> cancelStream. Without the fix, the old gRPC streams + * are never terminated and accumulate on both leader and follower sides. + */ + @ParameterizedTest + @MethodSource("data") + public void testStreamObserverCleanupOnFollowerKillRestart(Boolean separateHeartbeat) throws Exception { + GrpcConfigKeys.Server.setHeartbeatChannel(getProperties(), separateHeartbeat); + runWithNewCluster(3, this::runTestStreamObserverCleanupOnFollowerKillRestart); + } + + private void runTestStreamObserverCleanupOnFollowerKillRestart(MiniRaftClusterWithGrpc cluster) throws Exception { + final RaftServer.Division leader = waitForLeader(cluster); + final RaftPeerId leaderId = leader.getId(); + final int numCycles = 5; + + try (RaftClient client = cluster.createClient(leaderId)) { + for (int i = 0; i < 5; i++) { + Assertions.assertTrue(client.io().send( + new RaftTestUtil.SimpleMessage("init-" + i)).isSuccess()); + } + } + + for (int cycle = 0; cycle < numCycles; cycle++) { + LOG.info("=== Kill/Restart cycle {} ===", cycle); + final RaftPeerId followerId = cluster.getFollowers().get(0).getId(); + + cluster.killServer(followerId); + + try (RaftClient client = cluster.createClient(leaderId)) { + for (int i = 0; i < 5; i++) { + Assertions.assertTrue(client.io().send( + new RaftTestUtil.SimpleMessage("cycle" + cycle + "-" + i)).isSuccess()); + } + } + + cluster.restartServer(followerId, false); + + JavaUtils.attempt(() -> { + final RaftServer.Division f = cluster.getDivision(followerId); + Assertions.assertTrue(f.getInfo().getLastAppliedIndex() > 0, + "Follower " + followerId + " should have applied entries"); + }, 10, ONE_SECOND, "follower-catchup-" + cycle, LOG); + } + + // Verify all entries committed across the cluster + try (RaftClient client = cluster.createClient(leaderId)) { + final RaftClientReply reply = client.io().send(new RaftTestUtil.SimpleMessage("final")); + Assertions.assertTrue(reply.isSuccess()); + client.io().watch(reply.getLogIndex(), RaftProtos.ReplicationLevel.ALL_COMMITTED); + } + } } From e346a22dfb40dbd8ddf4e4afd8a6923db1c13f2f Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Wed, 11 Mar 2026 18:26:32 +0800 Subject: [PATCH 10/14] Add test --- .../ratis/grpc/TestLogAppenderWithGrpc.java | 86 +++++++++++++++++++ 1 file changed, 86 insertions(+) diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java index 92a85e230c..fb239b1d3a 100644 --- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java +++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java @@ -47,8 +47,12 @@ import java.util.Collection; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; +import org.apache.ratis.server.impl.BlockRequestHandlingInjection; +import org.apache.ratis.util.CodeInjectionForTesting; + import static org.apache.ratis.RaftTestUtil.waitForLeader; public class TestLogAppenderWithGrpc @@ -203,6 +207,88 @@ private void runTestLogAppenderStreamCleanupOnRestart(MiniRaftClusterWithGrpc cl }, 20, ONE_SECOND, "old-appender-gc", LOG); } + /** + * Verify that the follower's ServerRequestStreamObserver cleans up previousOnNext + * when handleError is triggered. This injects failures at the APPEND_ENTRIES point + * on a specific follower, causing process(request) to fail and handleError to be called. + * Without the fix, previousOnNext retains the last PendingServerRequest (including the + * full AppendEntriesRequestProto with log entry data) after handleError closes the stream. + */ + @ParameterizedTest + @MethodSource("data") + public void testFollowerHandleErrorCleanup(Boolean separateHeartbeat) throws Exception { + GrpcConfigKeys.Server.setHeartbeatChannel(getProperties(), separateHeartbeat); + runWithNewCluster(3, this::runTestFollowerHandleErrorCleanup); + } + + private void runTestFollowerHandleErrorCleanup(MiniRaftClusterWithGrpc cluster) throws Exception { + final RaftServer.Division leader = waitForLeader(cluster); + final RaftPeerId leaderId = leader.getId(); + + try (RaftClient client = cluster.createClient(leaderId)) { + for (int i = 0; i < 5; i++) { + Assertions.assertTrue(client.io().send( + new RaftTestUtil.SimpleMessage("init-" + i)).isSuccess()); + } + } + + final RaftPeerId followerId = cluster.getFollowers().get(0).getId(); + final String APPEND_ENTRIES = "RaftServerImpl.appendEntries"; + final AtomicBoolean shouldFail = new AtomicBoolean(false); + + CodeInjectionForTesting.put(APPEND_ENTRIES, (localId, remoteId, args) -> { + if (shouldFail.get() && localId.toString().equals(followerId.toString())) { + throw new RuntimeException("Injected failure for handleError test"); + } + return false; + }); + + try { + final int numCycles = 3; + for (int cycle = 0; cycle < numCycles; cycle++) { + LOG.info("=== HandleError cycle {} ===", cycle); + + shouldFail.set(true); + + try (RaftClient client = cluster.createClient(leaderId)) { + for (int i = 0; i < 5; i++) { + client.io().send(new RaftTestUtil.SimpleMessage("fail-" + cycle + "-" + i)); + } + } + + // Wait for the leader to detect the stream errors from the failing follower + JavaUtils.attempt(() -> { + final long leaderCommit = leader.getRaftLog().getLastCommittedIndex(); + Assertions.assertTrue(leaderCommit > 0); + }, 10, ONE_SECOND, "leader-commit-cycle-" + cycle, LOG); + + shouldFail.set(false); + + try (RaftClient client = cluster.createClient(leaderId)) { + for (int i = 0; i < 5; i++) { + Assertions.assertTrue(client.io().send( + new RaftTestUtil.SimpleMessage("recover-" + cycle + "-" + i)).isSuccess()); + } + } + + final int c = cycle; + JavaUtils.attempt(() -> { + final RaftServer.Division f = cluster.getDivision(followerId); + Assertions.assertTrue(f.getInfo().getLastAppliedIndex() > 0, + "Follower " + followerId + " should recover after handleError"); + }, 10, ONE_SECOND, "follower-recover-" + c, LOG); + } + + try (RaftClient client = cluster.createClient(leaderId)) { + final RaftClientReply reply = client.io().send(new RaftTestUtil.SimpleMessage("final")); + Assertions.assertTrue(reply.isSuccess()); + client.io().watch(reply.getLogIndex(), RaftProtos.ReplicationLevel.ALL_COMMITTED); + } + } finally { + CodeInjectionForTesting.put(APPEND_ENTRIES, BlockRequestHandlingInjection.getInstance()); + } + } + /** * Reproduce the StreamObserver leak by repeatedly killing and restarting a follower. * Each kill triggers onError on the leader's response handler, calling From 538e9f25349df2c2028ec46152e973ad39e1a43a Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Wed, 11 Mar 2026 18:44:33 +0800 Subject: [PATCH 11/14] Add leak-detection counter to validate handleError cleanup Add a static counter in GrpcServerProtocolService that tracks handleError closures where previousOnNext was not cleaned up. The test asserts the counter remains zero, which fails without the previousOnNext.set(null) fix. Made-with: Cursor --- .../server/GrpcServerProtocolService.java | 7 +++++ .../ratis/grpc/TestLogAppenderWithGrpc.java | 5 ++++ .../GrpcServerProtocolServiceTestUtil.java | 30 +++++++++++++++++++ 3 files changed, 42 insertions(+) create mode 100644 ratis-test/src/test/java/org/apache/ratis/grpc/server/GrpcServerProtocolServiceTestUtil.java diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java index 88c1b6d03e..fcd9bf3693 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java @@ -17,6 +17,7 @@ */ package org.apache.ratis.grpc.server; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.function.Function; import org.apache.ratis.grpc.GrpcUtil; @@ -46,6 +47,9 @@ class GrpcServerProtocolService extends RaftServerProtocolServiceImplBase { public static final Logger LOG = LoggerFactory.getLogger(GrpcServerProtocolService.class); + /** Counts handleError closures where previousOnNext was not cleaned up. */ + static final AtomicInteger HANDLE_ERROR_LEAK_COUNT = new AtomicInteger(0); + private enum BatchLogKey implements BatchLogger.Key { COMPLETED_REQUEST, COMPLETED_REPLY @@ -116,6 +120,9 @@ private void handleError(Throwable e, REQUEST request) { if (isClosed.compareAndSet(false, true)) { previousOnNext.set(null); responseObserver.onError(wrapException(e, request)); + if (previousOnNext.get() != null) { + HANDLE_ERROR_LEAK_COUNT.incrementAndGet(); + } } } diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java index fb239b1d3a..024660572f 100644 --- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java +++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java @@ -50,6 +50,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; +import org.apache.ratis.grpc.server.GrpcServerProtocolServiceTestUtil; import org.apache.ratis.server.impl.BlockRequestHandlingInjection; import org.apache.ratis.util.CodeInjectionForTesting; @@ -222,6 +223,7 @@ public void testFollowerHandleErrorCleanup(Boolean separateHeartbeat) throws Exc } private void runTestFollowerHandleErrorCleanup(MiniRaftClusterWithGrpc cluster) throws Exception { + GrpcServerProtocolServiceTestUtil.resetHandleErrorLeakCount(); final RaftServer.Division leader = waitForLeader(cluster); final RaftPeerId leaderId = leader.getId(); @@ -284,6 +286,9 @@ private void runTestFollowerHandleErrorCleanup(MiniRaftClusterWithGrpc cluster) Assertions.assertTrue(reply.isSuccess()); client.io().watch(reply.getLogIndex(), RaftProtos.ReplicationLevel.ALL_COMMITTED); } + + Assertions.assertEquals(0, GrpcServerProtocolServiceTestUtil.getHandleErrorLeakCount(), + "previousOnNext should be cleaned up in handleError to prevent memory leaks"); } finally { CodeInjectionForTesting.put(APPEND_ENTRIES, BlockRequestHandlingInjection.getInstance()); } diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/server/GrpcServerProtocolServiceTestUtil.java b/ratis-test/src/test/java/org/apache/ratis/grpc/server/GrpcServerProtocolServiceTestUtil.java new file mode 100644 index 0000000000..52a537b558 --- /dev/null +++ b/ratis-test/src/test/java/org/apache/ratis/grpc/server/GrpcServerProtocolServiceTestUtil.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.grpc.server; + +public final class GrpcServerProtocolServiceTestUtil { + private GrpcServerProtocolServiceTestUtil() {} + + public static int getHandleErrorLeakCount() { + return GrpcServerProtocolService.HANDLE_ERROR_LEAK_COUNT.get(); + } + + public static void resetHandleErrorLeakCount() { + GrpcServerProtocolService.HANDLE_ERROR_LEAK_COUNT.set(0); + } +} From ad3852850b19039ce911cacff3677203eb79fa03 Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Thu, 12 Mar 2026 11:43:34 +0800 Subject: [PATCH 12/14] Improve the injection naming --- .../server/GrpcServerProtocolService.java | 13 ++++---- .../ratis/grpc/TestLogAppenderWithGrpc.java | 19 +++++++++--- .../GrpcServerProtocolServiceTestUtil.java | 30 ------------------- 3 files changed, 21 insertions(+), 41 deletions(-) delete mode 100644 ratis-test/src/test/java/org/apache/ratis/grpc/server/GrpcServerProtocolServiceTestUtil.java diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java index fcd9bf3693..41c89f7035 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java @@ -17,7 +17,6 @@ */ package org.apache.ratis.grpc.server; -import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.function.Function; import org.apache.ratis.grpc.GrpcUtil; @@ -32,6 +31,8 @@ import org.apache.ratis.proto.RaftProtos.*; import org.apache.ratis.proto.grpc.RaftServerProtocolServiceGrpc.RaftServerProtocolServiceImplBase; import org.apache.ratis.util.BatchLogger; +import org.apache.ratis.util.CodeInjectionForTesting; +import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.MemoizedSupplier; import org.apache.ratis.util.ProtoUtils; import org.slf4j.Logger; @@ -44,11 +45,11 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; -class GrpcServerProtocolService extends RaftServerProtocolServiceImplBase { +public class GrpcServerProtocolService extends RaftServerProtocolServiceImplBase { public static final Logger LOG = LoggerFactory.getLogger(GrpcServerProtocolService.class); - /** Counts handleError closures where previousOnNext was not cleaned up. */ - static final AtomicInteger HANDLE_ERROR_LEAK_COUNT = new AtomicInteger(0); + public static final String GRPC_SERVER_HANDLE_ERROR = + JavaUtils.getClassSimpleName(GrpcServerProtocolService.class) + ".handleError"; private enum BatchLogKey implements BatchLogger.Key { COMPLETED_REQUEST, @@ -120,9 +121,7 @@ private void handleError(Throwable e, REQUEST request) { if (isClosed.compareAndSet(false, true)) { previousOnNext.set(null); responseObserver.onError(wrapException(e, request)); - if (previousOnNext.get() != null) { - HANDLE_ERROR_LEAK_COUNT.incrementAndGet(); - } + CodeInjectionForTesting.execute(GRPC_SERVER_HANDLE_ERROR, getId(), null, previousOnNext.get()); } } diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java index 024660572f..a0ee62bb27 100644 --- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java +++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java @@ -48,13 +48,14 @@ import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; -import org.apache.ratis.grpc.server.GrpcServerProtocolServiceTestUtil; import org.apache.ratis.server.impl.BlockRequestHandlingInjection; import org.apache.ratis.util.CodeInjectionForTesting; import static org.apache.ratis.RaftTestUtil.waitForLeader; +import static org.apache.ratis.grpc.server.GrpcServerProtocolService.GRPC_SERVER_HANDLE_ERROR; public class TestLogAppenderWithGrpc extends LogAppenderTests @@ -223,7 +224,6 @@ public void testFollowerHandleErrorCleanup(Boolean separateHeartbeat) throws Exc } private void runTestFollowerHandleErrorCleanup(MiniRaftClusterWithGrpc cluster) throws Exception { - GrpcServerProtocolServiceTestUtil.resetHandleErrorLeakCount(); final RaftServer.Division leader = waitForLeader(cluster); final RaftPeerId leaderId = leader.getId(); @@ -237,6 +237,8 @@ private void runTestFollowerHandleErrorCleanup(MiniRaftClusterWithGrpc cluster) final RaftPeerId followerId = cluster.getFollowers().get(0).getId(); final String APPEND_ENTRIES = "RaftServerImpl.appendEntries"; final AtomicBoolean shouldFail = new AtomicBoolean(false); + final AtomicInteger handleErrorCount = new AtomicInteger(0); + final AtomicInteger leakCount = new AtomicInteger(0); CodeInjectionForTesting.put(APPEND_ENTRIES, (localId, remoteId, args) -> { if (shouldFail.get() && localId.toString().equals(followerId.toString())) { @@ -244,6 +246,13 @@ private void runTestFollowerHandleErrorCleanup(MiniRaftClusterWithGrpc cluster) } return false; }); + CodeInjectionForTesting.put(GRPC_SERVER_HANDLE_ERROR, (localId, remoteId, args) -> { + handleErrorCount.incrementAndGet(); + if (args != null && args.length > 0 && args[0] != null) { + leakCount.incrementAndGet(); + } + return false; + }); try { final int numCycles = 3; @@ -258,7 +267,6 @@ private void runTestFollowerHandleErrorCleanup(MiniRaftClusterWithGrpc cluster) } } - // Wait for the leader to detect the stream errors from the failing follower JavaUtils.attempt(() -> { final long leaderCommit = leader.getRaftLog().getLastCommittedIndex(); Assertions.assertTrue(leaderCommit > 0); @@ -287,10 +295,13 @@ private void runTestFollowerHandleErrorCleanup(MiniRaftClusterWithGrpc cluster) client.io().watch(reply.getLogIndex(), RaftProtos.ReplicationLevel.ALL_COMMITTED); } - Assertions.assertEquals(0, GrpcServerProtocolServiceTestUtil.getHandleErrorLeakCount(), + Assertions.assertTrue(handleErrorCount.get() > 0, + "handleError should have been triggered by the injected failures"); + Assertions.assertEquals(0, leakCount.get(), "previousOnNext should be cleaned up in handleError to prevent memory leaks"); } finally { CodeInjectionForTesting.put(APPEND_ENTRIES, BlockRequestHandlingInjection.getInstance()); + CodeInjectionForTesting.remove(GRPC_SERVER_HANDLE_ERROR); } } diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/server/GrpcServerProtocolServiceTestUtil.java b/ratis-test/src/test/java/org/apache/ratis/grpc/server/GrpcServerProtocolServiceTestUtil.java deleted file mode 100644 index 52a537b558..0000000000 --- a/ratis-test/src/test/java/org/apache/ratis/grpc/server/GrpcServerProtocolServiceTestUtil.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ratis.grpc.server; - -public final class GrpcServerProtocolServiceTestUtil { - private GrpcServerProtocolServiceTestUtil() {} - - public static int getHandleErrorLeakCount() { - return GrpcServerProtocolService.HANDLE_ERROR_LEAK_COUNT.get(); - } - - public static void resetHandleErrorLeakCount() { - GrpcServerProtocolService.HANDLE_ERROR_LEAK_COUNT.set(0); - } -} From 196d195b1ba510a0b4a4ce20dc106cc08ddf3cae Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Thu, 12 Mar 2026 11:47:56 +0800 Subject: [PATCH 13/14] Don't make GrpcServerProtocolService public --- .../org/apache/ratis/grpc/server/GrpcServerProtocolService.java | 2 +- .../java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java index 41c89f7035..aca9357d9c 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java @@ -45,7 +45,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; -public class GrpcServerProtocolService extends RaftServerProtocolServiceImplBase { +class GrpcServerProtocolService extends RaftServerProtocolServiceImplBase { public static final Logger LOG = LoggerFactory.getLogger(GrpcServerProtocolService.class); public static final String GRPC_SERVER_HANDLE_ERROR = diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java index a0ee62bb27..061ceb1453 100644 --- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java +++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java @@ -55,7 +55,6 @@ import org.apache.ratis.util.CodeInjectionForTesting; import static org.apache.ratis.RaftTestUtil.waitForLeader; -import static org.apache.ratis.grpc.server.GrpcServerProtocolService.GRPC_SERVER_HANDLE_ERROR; public class TestLogAppenderWithGrpc extends LogAppenderTests @@ -236,6 +235,7 @@ private void runTestFollowerHandleErrorCleanup(MiniRaftClusterWithGrpc cluster) final RaftPeerId followerId = cluster.getFollowers().get(0).getId(); final String APPEND_ENTRIES = "RaftServerImpl.appendEntries"; + final String GRPC_SERVER_HANDLE_ERROR = "GrpcServerProtocolService.handleError"; final AtomicBoolean shouldFail = new AtomicBoolean(false); final AtomicInteger handleErrorCount = new AtomicInteger(0); final AtomicInteger leakCount = new AtomicInteger(0); From 10fae388b47cd95ff2217bee92446235915f3a90 Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Thu, 12 Mar 2026 11:55:02 +0800 Subject: [PATCH 14/14] CodeInjectionForTesting to return previous value to allow revert --- .../org/apache/ratis/util/CodeInjectionForTesting.java | 4 ++-- .../org/apache/ratis/grpc/TestLogAppenderWithGrpc.java | 7 +++++-- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/ratis-common/src/main/java/org/apache/ratis/util/CodeInjectionForTesting.java b/ratis-common/src/main/java/org/apache/ratis/util/CodeInjectionForTesting.java index 112f6bd250..290fa4287b 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/CodeInjectionForTesting.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/CodeInjectionForTesting.java @@ -50,9 +50,9 @@ public interface Code { = new ConcurrentHashMap<>(); /** Put an injection point. */ - public static void put(String injectionPoint, Code code) { + public static Code put(String injectionPoint, Code code) { LOG.debug("put: {}, {}", injectionPoint, code); - INJECTION_POINTS.put(injectionPoint, code); + return INJECTION_POINTS.put(injectionPoint, code); } /** Execute the injected code, if there is any. */ diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java index 061ceb1453..729c681fee 100644 --- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java +++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java @@ -33,6 +33,7 @@ import org.apache.ratis.server.impl.RaftServerTestUtil; import org.apache.ratis.statemachine.impl.SimpleStateMachine4Testing; import org.apache.ratis.statemachine.StateMachine; +import org.apache.ratis.util.CodeInjectionForTesting.Code; import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.Slf4jUtils; import org.junit.jupiter.api.Assertions; @@ -240,7 +241,7 @@ private void runTestFollowerHandleErrorCleanup(MiniRaftClusterWithGrpc cluster) final AtomicInteger handleErrorCount = new AtomicInteger(0); final AtomicInteger leakCount = new AtomicInteger(0); - CodeInjectionForTesting.put(APPEND_ENTRIES, (localId, remoteId, args) -> { + Code previousAECode = CodeInjectionForTesting.put(APPEND_ENTRIES, (localId, remoteId, args) -> { if (shouldFail.get() && localId.toString().equals(followerId.toString())) { throw new RuntimeException("Injected failure for handleError test"); } @@ -300,7 +301,9 @@ private void runTestFollowerHandleErrorCleanup(MiniRaftClusterWithGrpc cluster) Assertions.assertEquals(0, leakCount.get(), "previousOnNext should be cleaned up in handleError to prevent memory leaks"); } finally { - CodeInjectionForTesting.put(APPEND_ENTRIES, BlockRequestHandlingInjection.getInstance()); + if (previousAECode != null) { + CodeInjectionForTesting.put(APPEND_ENTRIES, previousAECode); + } CodeInjectionForTesting.remove(GRPC_SERVER_HANDLE_ERROR); } }