You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
ZKSessionTest.testReacquireLeadershipAfterSessionLost can observe unstable metadata session events after a ZooKeeper session expires and PulsarZooKeeperClient creates a replacement client.
Failure test case1:
java.lang.AssertionError: expected [SessionReestablished] but found [ConnectionLost]
at org.testng.Assert.fail(Assert.java:111)
at org.testng.Assert.failNotEquals(Assert.java:1590)
at org.testng.Assert.assertEqualsImpl(Assert.java:150)
at org.testng.Assert.assertEquals(Assert.java:132)
at org.testng.Assert.assertEquals(Assert.java:644)
at org.apache.pulsar.metadata.ZKSessionTest.testReacquireLeadershipAfterSessionLost(ZKSessionTest.java:230)
at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
at java.base/java.lang.reflect.Method.invoke(Method.java:580)
at org.testng.internal.invokers.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:141)
at org.testng.internal.invokers.InvokeMethodRunnable.runOne(InvokeMethodRunnable.java:47)
at org.testng.internal.invokers.InvokeMethodRunnable.call(InvokeMethodRunnable.java:76)
at org.testng.internal.invokers.InvokeMethodRunnable.call(InvokeMethodRunnable.java:11)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
at java.base/java.lang.Thread.run(Thread.java:1583)
Failure test case2:
java.util.concurrent.CompletionException: org.apache.pulsar.metadata.api.MetadataStoreException: org.apache.zookeeper.KeeperException$SessionExpiredException: KeeperErrorCode = Session expired
at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:332)
at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:347)
at java.base/java.util.concurrent.CompletableFuture$UniRun.tryFire(CompletableFuture.java:781)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2194)
at org.apache.pulsar.metadata.impl.ZKMetadataStore.lambda$batchOperation$7(ZKMetadataStore.java:253)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
at org.apache.pulsar.metadata.impl.ZKMetadataStore.lambda$batchOperation$9(ZKMetadataStore.java:253)
at org.apache.pulsar.metadata.impl.PulsarZooKeeperClient$3$1.processResult(PulsarZooKeeperClient.java:524)
at org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:702)
at org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:541)
Caused by: org.apache.pulsar.metadata.api.MetadataStoreException: org.apache.zookeeper.KeeperException$SessionExpiredException: KeeperErrorCode = Session expired
at org.apache.pulsar.metadata.impl.ZKMetadataStore.getException(ZKMetadataStore.java:528)
at org.apache.pulsar.metadata.impl.ZKMetadataStore.lambda$batchOperation$9(ZKMetadataStore.java:252)
... 3 more
Caused by: org.apache.zookeeper.KeeperException$SessionExpiredException: KeeperErrorCode = Session expired
at org.apache.zookeeper.KeeperException.create(KeeperException.java:133)
at org.apache.zookeeper.KeeperException.create(KeeperException.java:53)
at org.apache.pulsar.metadata.impl.ZKMetadataStore.getException(ZKMetadataStore.java:518)
... 4 more
Cause 1: org.apache.pulsar.metadata.api.MetadataStoreException: org.apache.zookeeper.KeeperException$SessionExpiredException: KeeperErrorCode = Session expired
at app//org.apache.pulsar.metadata.impl.ZKMetadataStore.getException(ZKMetadataStore.java:528)
at app//org.apache.pulsar.metadata.impl.ZKMetadataStore.lambda$batchOperation$9(ZKMetadataStore.java:252)
at app//org.apache.pulsar.metadata.impl.PulsarZooKeeperClient$3$1.processResult(PulsarZooKeeperClient.java:524)
at app//org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:702)
at app//org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:541)
Caused by: org.apache.zookeeper.KeeperException$SessionExpiredException: KeeperErrorCode = Session expired
at app//org.apache.zookeeper.KeeperException.create(KeeperException.java:133)
at app//org.apache.zookeeper.KeeperException.create(KeeperException.java:53)
at app//org.apache.pulsar.metadata.impl.ZKMetadataStore.getException(ZKMetadataStore.java:518)
... 4 more
The race happens during the handoff from the expired ZooKeeper instance to the new one:
ZooKeeper can deliver SyncConnected while the new client is still being constructed.
ZooKeeperWatcherBase forwards that session event to child watchers.
Those child watchers can run before PulsarZooKeeperClient publishes the new ZooKeeper handle.
During that window, follow-up operations can still be routed to the old expired handle, and an old async session probe can later overwrite the state of the newly established session.
This can produce extra or incomplete session transitions around ConnectionLost, SessionLost, Reconnected, and SessionReestablished.
Modifications
This change keeps the reconnect flow local to PulsarZooKeeperClient and ZKSessionWatcher.
PulsarZooKeeperClient now creates replacement ZooKeeper clients with a forwarding watcher instead of passing watcherManager directly.
The forwarding watcher waits until the new ZooKeeper handle has been published before forwarding events to watcherManager.
The new handle is published before releasing the forwarding watcher, and waitForConnection() runs after that release because it depends on the forwarded SyncConnected event.
ZKSessionWatcher records the session id used for its async exists("/") probe and only applies the probe result if the current session id still matches.
The session-id check and state transition are guarded by the same synchronized section so a stale probe cannot race with a new-session event and overwrite the new session state.
Verifying this change
Make sure that the change passes the CI checks.
Does this pull request potentially affect one of the following parts:
If the box was checked, please highlight the changes
oneby-wang
changed the title
Test reacquire leadership after session lost flaky test
[fix][meta] Fix ZooKeeper session reconnect race condition in PulsarZooKeeperClient.clientCreator()
Jun 1, 2026
oneby-wang
changed the title
[fix][meta] Fix ZooKeeper session reconnect race condition in PulsarZooKeeperClient.clientCreator()
[fix][meta] Fix ZooKeeper session reconnect race condition in PulsarZooKeeperClient.clientCreator
Jun 1, 2026
@lhotari
I noticed that after receiving a ZooKeeper Expired event, PulsarZooKeeperClient creates a new ZooKeeper instance while continuing to use the same watcherManager.
With the current direct forwarding approach, events from the new ZooKeeper instance are delayed until the new handle is published. However, if the old expired client still has any queued or late events delivered to the same watcherManager(I'm not sure if this will happen.), those stale events could still affect the session state and potentially make the state transitions inconsistent.
Do you think we should add a lightweight generation fencing mechanism here, so that only events from the currently active ZooKeeper instance are forwarded/processed, and stale events from previous instances are ignored?
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Motivation
ZKSessionTest.testReacquireLeadershipAfterSessionLostcan observe unstable metadata session events after a ZooKeeper session expires andPulsarZooKeeperClientcreates a replacement client.Failure test case1:
Failure test case2:
The race happens during the handoff from the expired ZooKeeper instance to the new one:
ZooKeepercan deliverSyncConnectedwhile the new client is still being constructed.ZooKeeperWatcherBaseforwards that session event to child watchers.PulsarZooKeeperClientpublishes the newZooKeeperhandle.This can produce extra or incomplete session transitions around
ConnectionLost,SessionLost,Reconnected, andSessionReestablished.Modifications
This change keeps the reconnect flow local to
PulsarZooKeeperClientandZKSessionWatcher.PulsarZooKeeperClientnow creates replacement ZooKeeper clients with a forwarding watcher instead of passingwatcherManagerdirectly.watcherManager.waitForConnection()runs after that release because it depends on the forwardedSyncConnectedevent.ZKSessionWatcherrecords the session id used for its asyncexists("/")probe and only applies the probe result if the current session id still matches.Verifying this change
Does this pull request potentially affect one of the following parts:
If the box was checked, please highlight the changes