From e9ed0c7c9d7b461aeaf0b845f3e9deefd923e074 Mon Sep 17 00:00:00 2001 From: Oneby Wang <891734032@qq.com> Date: Sun, 31 May 2026 08:56:18 +0800 Subject: [PATCH 1/6] Reproduce flaky test --- .../apache/pulsar/metadata/impl/PulsarZooKeeperClient.java | 6 ++++++ .../test/java/org/apache/pulsar/metadata/ZKSessionTest.java | 2 +- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java index aba28ec1fc357..f64ef22d771b5 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java @@ -134,6 +134,12 @@ public ZooKeeper call() throws KeeperException, InterruptedException { throw KeeperException.create(KeeperException.Code.CONNECTIONLOSS); } waitForConnection(); + try { + Thread.sleep(10); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw e; + } zk.set(newZk); log.info() .attr("sessionId", Long.toHexString(newZk.getSessionId())) diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java index 59752abfc7e46..0ef8e6f34e593 100644 --- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java @@ -187,7 +187,7 @@ public void testReacquireLocksAfterSessionLost() throws Exception { assertFalse(lock.getLockExpiredFuture().isDone()); } - @Test + @Test(invocationCount = 100) public void testReacquireLeadershipAfterSessionLost() throws Exception { // --- init @Cleanup From 80684b1d1bbd490cb4c74b25896e171ce73609b1 Mon Sep 17 00:00:00 2001 From: Oneby Wang <891734032@qq.com> Date: Sun, 31 May 2026 22:23:29 +0800 Subject: [PATCH 2/6] Fix race condition --- .../metadata/impl/PulsarZooKeeperClient.java | 47 +++++++++++++++---- .../metadata/impl/ZKSessionWatcher.java | 17 ++++++- .../apache/pulsar/metadata/ZKSessionTest.java | 4 +- 3 files changed, 56 insertions(+), 12 deletions(-) diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java index f64ef22d771b5..b62258f842482 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; @@ -121,10 +122,36 @@ public void run() { public ZooKeeper call() throws KeeperException, InterruptedException { log.info().attr("connectString", connectString).log("Reconnecting zookeeper"); // close the previous one + ZooKeeper previousZk = zk.get(); closeZkHandle(); + AtomicReference newZkRef = new AtomicReference<>(); + CountDownLatch newZkCreated = new CountDownLatch(1); + Watcher publishingWatcher = event -> { + ZooKeeper newZk = newZkRef.get(); + if (newZk == null) { + try { + if (!newZkCreated.await(sessionTimeoutMs, TimeUnit.MILLISECONDS)) { + log.warn().attr("event", event) + .log("Failed to publish ZooKeeper handle before processing event"); + return; + } + newZk = newZkRef.get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } + } + + if (newZk != null) { + zk.compareAndSet(previousZk, newZk); + if (zk.get() == newZk) { + watcherManager.process(event); + } + } + }; ZooKeeper newZk; try { - newZk = createZooKeeper(); + newZk = createZooKeeper(publishingWatcher); } catch (IOException | QuorumPeerConfig.ConfigException e) { log.error() .attr("connectString", connectString) @@ -133,14 +160,9 @@ public ZooKeeper call() throws KeeperException, InterruptedException { .log("Failed to create zookeeper instance"); throw KeeperException.create(KeeperException.Code.CONNECTIONLOSS); } + newZkRef.set(newZk); + newZkCreated.countDown(); waitForConnection(); - try { - Thread.sleep(10); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw e; - } - zk.set(newZk); log.info() .attr("sessionId", Long.toHexString(newZk.getSessionId())) .attr("connectString", connectString) @@ -370,11 +392,16 @@ public void waitForConnection() throws KeeperException, InterruptedException { @SuppressWarnings("deprecation") protected ZooKeeper createZooKeeper() throws IOException, QuorumPeerConfig.ConfigException { + return createZooKeeper(watcherManager); + } + + @SuppressWarnings("deprecation") + protected ZooKeeper createZooKeeper(Watcher watcher) throws IOException, QuorumPeerConfig.ConfigException { if (null != configPath) { - return new ZooKeeper(connectString, sessionTimeoutMs, watcherManager, allowReadOnlyMode, + return new ZooKeeper(connectString, sessionTimeoutMs, watcher, allowReadOnlyMode, new ZKClientConfig(configPath)); } - return new ZooKeeper(connectString, sessionTimeoutMs, watcherManager, allowReadOnlyMode); + return new ZooKeeper(connectString, sessionTimeoutMs, watcher, allowReadOnlyMode); } @Override diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKSessionWatcher.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKSessionWatcher.java index e5726865d2c5f..2f617ed99dc41 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKSessionWatcher.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKSessionWatcher.java @@ -87,6 +87,7 @@ public void close() throws Exception { // in the future. private void checkConnectionStatus() { try { + long checkedSessionId = zk.getSessionId(); CompletableFuture future = new CompletableFuture<>(); zk.exists("/", false, (StatCallback) (rc, path, ctx, stat) -> { switch (KeeperException.Code.get(rc)) { @@ -112,7 +113,7 @@ private void checkConnectionStatus() { zkClientState = Watcher.Event.KeeperState.Disconnected; } - checkState(zkClientState); + checkStateIfSameSession(checkedSessionId, zkClientState); } catch (RejectedExecutionException | InterruptedException e) { task.cancel(true); } catch (Throwable t) { @@ -130,6 +131,20 @@ synchronized void setSessionInvalid() { currentStatus = SessionEvent.SessionLost; } + private synchronized void checkStateIfSameSession(long checkedSessionId, + Watcher.Event.KeeperState zkClientState) { + long currentSessionId = zk.getSessionId(); + if (checkedSessionId != currentSessionId) { + log.warn() + .attr("checkedSessionId", checkedSessionId) + .attr("currentSessionId", currentSessionId) + .attr("zkClientState", zkClientState) + .log("Ignoring ZooKeeper session state from a stale session"); + return; + } + checkState(zkClientState); + } + private synchronized void checkState(Watcher.Event.KeeperState zkClientState) { switch (zkClientState) { case Expired: diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java index 0ef8e6f34e593..385c769eb7b41 100644 --- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java @@ -187,7 +187,7 @@ public void testReacquireLocksAfterSessionLost() throws Exception { assertFalse(lock.getLockExpiredFuture().isDone()); } - @Test(invocationCount = 100) + @Test public void testReacquireLeadershipAfterSessionLost() throws Exception { // --- init @Cleanup @@ -228,6 +228,8 @@ public void testReacquireLeadershipAfterSessionLost() throws Exception { assertEquals(e, SessionEvent.Reconnected); e = sessionEvents.poll(10, TimeUnit.SECONDS); assertEquals(e, SessionEvent.SessionReestablished); + e = sessionEvents.poll(1, TimeUnit.SECONDS); + assertNull(e); Awaitility.await().atMost(Duration.ofSeconds(15)) .untilAsserted(()-> assertEquals(le1.getState(), LeaderElectionState.Leading)); assertTrue(store.get(path).join().isPresent()); From 2eb9cb597aa0a15c43983b51890526aa7a2db6a6 Mon Sep 17 00:00:00 2001 From: Oneby Wang <891734032@qq.com> Date: Mon, 1 Jun 2026 10:20:22 +0800 Subject: [PATCH 3/6] Fix race condition using simpler way --- .../metadata/impl/PulsarZooKeeperClient.java | 64 ++++++++++--------- .../metadata/impl/ZKSessionWatcher.java | 4 ++ .../apache/pulsar/metadata/ZKSessionTest.java | 2 - 3 files changed, 38 insertions(+), 32 deletions(-) diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java index b62258f842482..1c5ac88c0c35e 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java @@ -122,36 +122,35 @@ public void run() { public ZooKeeper call() throws KeeperException, InterruptedException { log.info().attr("connectString", connectString).log("Reconnecting zookeeper"); // close the previous one - ZooKeeper previousZk = zk.get(); closeZkHandle(); - AtomicReference newZkRef = new AtomicReference<>(); - CountDownLatch newZkCreated = new CountDownLatch(1); - Watcher publishingWatcher = event -> { - ZooKeeper newZk = newZkRef.get(); - if (newZk == null) { - try { - if (!newZkCreated.await(sessionTimeoutMs, TimeUnit.MILLISECONDS)) { - log.warn().attr("event", event) - .log("Failed to publish ZooKeeper handle before processing event"); - return; - } - newZk = newZkRef.get(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return; - } - } - if (newZk != null) { - zk.compareAndSet(previousZk, newZk); - if (zk.get() == newZk) { - watcherManager.process(event); + // ZooKeeper can deliver SyncConnected while createZooKeeper() is still constructing the + // client. Hold these events until the new instance is published, so child watchers never + // observe a new-session event while PulsarZooKeeperClient still points at the old handle. + CountDownLatch newZkSetLatch = new CountDownLatch(1); + Watcher forwardEventsWatcher = event -> { + try { + boolean awaited = newZkSetLatch.await(sessionTimeoutMs, TimeUnit.MILLISECONDS); + if (!awaited) { + log.warn().attr("event", event) + .log("Timed out waiting for ZooKeeper instance to be published before " + + "forwarding event"); + return; } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.warn() + .attr("event", event) + .exception(e) + .log("Interrupted while waiting for ZooKeeper instance to be published"); + return; } + watcherManager.process(event); }; + ZooKeeper newZk; try { - newZk = createZooKeeper(publishingWatcher); + newZk = createZooKeeper(forwardEventsWatcher); } catch (IOException | QuorumPeerConfig.ConfigException e) { log.error() .attr("connectString", connectString) @@ -160,8 +159,18 @@ public ZooKeeper call() throws KeeperException, InterruptedException { .log("Failed to create zookeeper instance"); throw KeeperException.create(KeeperException.Code.CONNECTIONLOSS); } - newZkRef.set(newZk); - newZkCreated.countDown(); + + try { + Thread.sleep(10); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw e; + } + + // Publish the new instance before releasing the forwarding watcher. waitForConnection() must + // happen after countDown(), since it depends on the forwarded SyncConnected event. + zk.set(newZk); + newZkSetLatch.countDown(); waitForConnection(); log.info() .attr("sessionId", Long.toHexString(newZk.getSessionId())) @@ -390,11 +399,6 @@ public void waitForConnection() throws KeeperException, InterruptedException { watcherManager.waitForConnection(); } - @SuppressWarnings("deprecation") - protected ZooKeeper createZooKeeper() throws IOException, QuorumPeerConfig.ConfigException { - return createZooKeeper(watcherManager); - } - @SuppressWarnings("deprecation") protected ZooKeeper createZooKeeper(Watcher watcher) throws IOException, QuorumPeerConfig.ConfigException { if (null != configPath) { diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKSessionWatcher.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKSessionWatcher.java index 2f617ed99dc41..08d87eb94b274 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKSessionWatcher.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKSessionWatcher.java @@ -131,6 +131,10 @@ synchronized void setSessionInvalid() { currentStatus = SessionEvent.SessionLost; } + // PulsarZooKeeperClient publishes the new ZooKeeper instance before forwarding the corresponding session event to + // watcherManager, so zk.set(newZk) happens-before this watcher observes the new-session event. Keep the session-id + // check and state transition in the same synchronized section to prevent stale async probes from racing with that + // event and overwriting the state of the newly established session. private synchronized void checkStateIfSameSession(long checkedSessionId, Watcher.Event.KeeperState zkClientState) { long currentSessionId = zk.getSessionId(); diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java index 385c769eb7b41..59752abfc7e46 100644 --- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java @@ -228,8 +228,6 @@ public void testReacquireLeadershipAfterSessionLost() throws Exception { assertEquals(e, SessionEvent.Reconnected); e = sessionEvents.poll(10, TimeUnit.SECONDS); assertEquals(e, SessionEvent.SessionReestablished); - e = sessionEvents.poll(1, TimeUnit.SECONDS); - assertNull(e); Awaitility.await().atMost(Duration.ofSeconds(15)) .untilAsserted(()-> assertEquals(le1.getState(), LeaderElectionState.Leading)); assertTrue(store.get(path).join().isPresent()); From 0bdec6d989342d52f4bb1dc1e318a42f891376d5 Mon Sep 17 00:00:00 2001 From: Oneby Wang <891734032@qq.com> Date: Mon, 1 Jun 2026 11:12:51 +0800 Subject: [PATCH 4/6] Remove sleep code --- .../apache/pulsar/metadata/impl/PulsarZooKeeperClient.java | 7 ------- 1 file changed, 7 deletions(-) diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java index 1c5ac88c0c35e..9c98ff8d9153a 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java @@ -160,13 +160,6 @@ public ZooKeeper call() throws KeeperException, InterruptedException { throw KeeperException.create(KeeperException.Code.CONNECTIONLOSS); } - try { - Thread.sleep(10); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw e; - } - // Publish the new instance before releasing the forwarding watcher. waitForConnection() must // happen after countDown(), since it depends on the forwarded SyncConnected event. zk.set(newZk); From 02e669021d54a27fb59fe1c06d1da5ebb5635a11 Mon Sep 17 00:00:00 2001 From: Oneby Wang <891734032@qq.com> Date: Mon, 1 Jun 2026 15:22:28 +0800 Subject: [PATCH 5/6] Modify comment --- .../apache/pulsar/metadata/impl/PulsarZooKeeperClient.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java index 9c98ff8d9153a..8e70c1369c754 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java @@ -124,9 +124,10 @@ public ZooKeeper call() throws KeeperException, InterruptedException { // close the previous one closeZkHandle(); - // ZooKeeper can deliver SyncConnected while createZooKeeper() is still constructing the - // client. Hold these events until the new instance is published, so child watchers never - // observe a new-session event while PulsarZooKeeperClient still points at the old handle. + // ZooKeeper can deliver SyncConnected after createZooKeeper() returns but before zk.set(newZk) + // publishes the new instance. Hold these events until the new instance is published, so child + // watchers never observe a new-session event while PulsarZooKeeperClient still points at the + // old handle. CountDownLatch newZkSetLatch = new CountDownLatch(1); Watcher forwardEventsWatcher = event -> { try { From 45dd9eaff4a8689905829f03c53430c5bd33c0dd Mon Sep 17 00:00:00 2001 From: Oneby Wang <891734032@qq.com> Date: Mon, 1 Jun 2026 17:05:43 +0800 Subject: [PATCH 6/6] Address pr comment --- .../org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java | 1 - 1 file changed, 1 deletion(-) diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java index 8e70c1369c754..c54bfafff3eac 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java @@ -136,7 +136,6 @@ public ZooKeeper call() throws KeeperException, InterruptedException { log.warn().attr("event", event) .log("Timed out waiting for ZooKeeper instance to be published before " + "forwarding event"); - return; } } catch (InterruptedException e) { Thread.currentThread().interrupt();