Skip to content

[fix][meta] Fix ZooKeeper session reconnect race condition in PulsarZooKeeperClient.clientCreator#25910

Open
oneby-wang wants to merge 6 commits into
apache:masterfrom
oneby-wang:testReacquireLeadershipAfterSessionLost_flaky_test
Open

[fix][meta] Fix ZooKeeper session reconnect race condition in PulsarZooKeeperClient.clientCreator#25910
oneby-wang wants to merge 6 commits into
apache:masterfrom
oneby-wang:testReacquireLeadershipAfterSessionLost_flaky_test

Conversation

@oneby-wang
Copy link
Copy Markdown
Contributor

Motivation

ZKSessionTest.testReacquireLeadershipAfterSessionLost can observe unstable metadata session events after a ZooKeeper session expires and PulsarZooKeeperClient creates a replacement client.

Failure test case1:

java.lang.AssertionError: expected [SessionReestablished] but found [ConnectionLost]
	at org.testng.Assert.fail(Assert.java:111)
	at org.testng.Assert.failNotEquals(Assert.java:1590)
	at org.testng.Assert.assertEqualsImpl(Assert.java:150)
	at org.testng.Assert.assertEquals(Assert.java:132)
	at org.testng.Assert.assertEquals(Assert.java:644)
	at org.apache.pulsar.metadata.ZKSessionTest.testReacquireLeadershipAfterSessionLost(ZKSessionTest.java:230)
	at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at org.testng.internal.invokers.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:141)
	at org.testng.internal.invokers.InvokeMethodRunnable.runOne(InvokeMethodRunnable.java:47)
	at org.testng.internal.invokers.InvokeMethodRunnable.call(InvokeMethodRunnable.java:76)
	at org.testng.internal.invokers.InvokeMethodRunnable.call(InvokeMethodRunnable.java:11)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1583)

Failure test case2:

java.util.concurrent.CompletionException: org.apache.pulsar.metadata.api.MetadataStoreException: org.apache.zookeeper.KeeperException$SessionExpiredException: KeeperErrorCode = Session expired
	at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:332)
	at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:347)
	at java.base/java.util.concurrent.CompletableFuture$UniRun.tryFire(CompletableFuture.java:781)
	at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
	at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2194)
	at org.apache.pulsar.metadata.impl.ZKMetadataStore.lambda$batchOperation$7(ZKMetadataStore.java:253)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
	at org.apache.pulsar.metadata.impl.ZKMetadataStore.lambda$batchOperation$9(ZKMetadataStore.java:253)
	at org.apache.pulsar.metadata.impl.PulsarZooKeeperClient$3$1.processResult(PulsarZooKeeperClient.java:524)
	at org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:702)
	at org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:541)
Caused by: org.apache.pulsar.metadata.api.MetadataStoreException: org.apache.zookeeper.KeeperException$SessionExpiredException: KeeperErrorCode = Session expired
	at org.apache.pulsar.metadata.impl.ZKMetadataStore.getException(ZKMetadataStore.java:528)
	at org.apache.pulsar.metadata.impl.ZKMetadataStore.lambda$batchOperation$9(ZKMetadataStore.java:252)
	... 3 more
Caused by: org.apache.zookeeper.KeeperException$SessionExpiredException: KeeperErrorCode = Session expired
	at org.apache.zookeeper.KeeperException.create(KeeperException.java:133)
	at org.apache.zookeeper.KeeperException.create(KeeperException.java:53)
	at org.apache.pulsar.metadata.impl.ZKMetadataStore.getException(ZKMetadataStore.java:518)
	... 4 more
Cause 1: org.apache.pulsar.metadata.api.MetadataStoreException: org.apache.zookeeper.KeeperException$SessionExpiredException: KeeperErrorCode = Session expired
	at app//org.apache.pulsar.metadata.impl.ZKMetadataStore.getException(ZKMetadataStore.java:528)
	at app//org.apache.pulsar.metadata.impl.ZKMetadataStore.lambda$batchOperation$9(ZKMetadataStore.java:252)
	at app//org.apache.pulsar.metadata.impl.PulsarZooKeeperClient$3$1.processResult(PulsarZooKeeperClient.java:524)
	at app//org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:702)
	at app//org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:541)
Caused by: org.apache.zookeeper.KeeperException$SessionExpiredException: KeeperErrorCode = Session expired
	at app//org.apache.zookeeper.KeeperException.create(KeeperException.java:133)
	at app//org.apache.zookeeper.KeeperException.create(KeeperException.java:53)
	at app//org.apache.pulsar.metadata.impl.ZKMetadataStore.getException(ZKMetadataStore.java:518)
	... 4 more

The race happens during the handoff from the expired ZooKeeper instance to the new one:

  • ZooKeeper can deliver SyncConnected while the new client is still being constructed.
  • ZooKeeperWatcherBase forwards that session event to child watchers.
  • Those child watchers can run before PulsarZooKeeperClient publishes the new ZooKeeper handle.
  • During that window, follow-up operations can still be routed to the old expired handle, and an old async session probe can later overwrite the state of the newly established session.

This can produce extra or incomplete session transitions around ConnectionLost, SessionLost, Reconnected, and SessionReestablished.

Modifications

This change keeps the reconnect flow local to PulsarZooKeeperClient and ZKSessionWatcher.

  • PulsarZooKeeperClient now creates replacement ZooKeeper clients with a forwarding watcher instead of passing watcherManager directly.
  • The forwarding watcher waits until the new ZooKeeper handle has been published before forwarding events to watcherManager.
  • The new handle is published before releasing the forwarding watcher, and waitForConnection() runs after that release because it depends on the forwarded SyncConnected event.
  • ZKSessionWatcher records the session id used for its async exists("/") probe and only applies the probe result if the current session id still matches.
  • The session-id check and state transition are guarded by the same synchronized section so a stale probe cannot race with a new-session event and overwrite the new session state.

Verifying this change

  • Make sure that the change passes the CI checks.

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

@oneby-wang oneby-wang changed the title Test reacquire leadership after session lost flaky test [fix][meta] Fix ZooKeeper session reconnect race condition in PulsarZooKeeperClient.clientCreator() Jun 1, 2026
@oneby-wang oneby-wang changed the title [fix][meta] Fix ZooKeeper session reconnect race condition in PulsarZooKeeperClient.clientCreator() [fix][meta] Fix ZooKeeper session reconnect race condition in PulsarZooKeeperClient.clientCreator Jun 1, 2026
@oneby-wang
Copy link
Copy Markdown
Contributor Author

oneby-wang commented Jun 1, 2026

@lhotari
I noticed that after receiving a ZooKeeper Expired event, PulsarZooKeeperClient creates a new ZooKeeper instance while continuing to use the same watcherManager.

With the current direct forwarding approach, events from the new ZooKeeper instance are delayed until the new handle is published. However, if the old expired client still has any queued or late events delivered to the same watcherManager(I'm not sure if this will happen.), those stale events could still affect the session state and potentially make the state transitions inconsistent.

Do you think we should add a lightweight generation fencing mechanism here, so that only events from the currently active ZooKeeper instance are forwarded/processed, and stale events from previous instances are ignored?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants