diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java index aba28ec1fc357..c54bfafff3eac 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; @@ -122,9 +123,34 @@ public ZooKeeper call() throws KeeperException, InterruptedException { log.info().attr("connectString", connectString).log("Reconnecting zookeeper"); // close the previous one closeZkHandle(); + + // ZooKeeper can deliver SyncConnected after createZooKeeper() returns but before zk.set(newZk) + // publishes the new instance. Hold these events until the new instance is published, so child + // watchers never observe a new-session event while PulsarZooKeeperClient still points at the + // old handle. + CountDownLatch newZkSetLatch = new CountDownLatch(1); + Watcher forwardEventsWatcher = event -> { + try { + boolean awaited = newZkSetLatch.await(sessionTimeoutMs, TimeUnit.MILLISECONDS); + if (!awaited) { + log.warn().attr("event", event) + .log("Timed out waiting for ZooKeeper instance to be published before " + + "forwarding event"); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.warn() + .attr("event", event) + .exception(e) + .log("Interrupted while waiting for ZooKeeper instance to be published"); + return; + } + watcherManager.process(event); + }; + ZooKeeper newZk; try { - newZk = createZooKeeper(); + newZk = createZooKeeper(forwardEventsWatcher); } catch (IOException | QuorumPeerConfig.ConfigException e) { log.error() .attr("connectString", connectString) @@ -133,8 +159,12 @@ public ZooKeeper call() throws KeeperException, InterruptedException { .log("Failed to create zookeeper instance"); throw KeeperException.create(KeeperException.Code.CONNECTIONLOSS); } - waitForConnection(); + + // Publish the new instance before releasing the forwarding watcher. waitForConnection() must + // happen after countDown(), since it depends on the forwarded SyncConnected event. zk.set(newZk); + newZkSetLatch.countDown(); + waitForConnection(); log.info() .attr("sessionId", Long.toHexString(newZk.getSessionId())) .attr("connectString", connectString) @@ -363,12 +393,12 @@ public void waitForConnection() throws KeeperException, InterruptedException { } @SuppressWarnings("deprecation") - protected ZooKeeper createZooKeeper() throws IOException, QuorumPeerConfig.ConfigException { + protected ZooKeeper createZooKeeper(Watcher watcher) throws IOException, QuorumPeerConfig.ConfigException { if (null != configPath) { - return new ZooKeeper(connectString, sessionTimeoutMs, watcherManager, allowReadOnlyMode, + return new ZooKeeper(connectString, sessionTimeoutMs, watcher, allowReadOnlyMode, new ZKClientConfig(configPath)); } - return new ZooKeeper(connectString, sessionTimeoutMs, watcherManager, allowReadOnlyMode); + return new ZooKeeper(connectString, sessionTimeoutMs, watcher, allowReadOnlyMode); } @Override diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKSessionWatcher.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKSessionWatcher.java index e5726865d2c5f..08d87eb94b274 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKSessionWatcher.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKSessionWatcher.java @@ -87,6 +87,7 @@ public void close() throws Exception { // in the future. private void checkConnectionStatus() { try { + long checkedSessionId = zk.getSessionId(); CompletableFuture future = new CompletableFuture<>(); zk.exists("/", false, (StatCallback) (rc, path, ctx, stat) -> { switch (KeeperException.Code.get(rc)) { @@ -112,7 +113,7 @@ private void checkConnectionStatus() { zkClientState = Watcher.Event.KeeperState.Disconnected; } - checkState(zkClientState); + checkStateIfSameSession(checkedSessionId, zkClientState); } catch (RejectedExecutionException | InterruptedException e) { task.cancel(true); } catch (Throwable t) { @@ -130,6 +131,24 @@ synchronized void setSessionInvalid() { currentStatus = SessionEvent.SessionLost; } + // PulsarZooKeeperClient publishes the new ZooKeeper instance before forwarding the corresponding session event to + // watcherManager, so zk.set(newZk) happens-before this watcher observes the new-session event. Keep the session-id + // check and state transition in the same synchronized section to prevent stale async probes from racing with that + // event and overwriting the state of the newly established session. + private synchronized void checkStateIfSameSession(long checkedSessionId, + Watcher.Event.KeeperState zkClientState) { + long currentSessionId = zk.getSessionId(); + if (checkedSessionId != currentSessionId) { + log.warn() + .attr("checkedSessionId", checkedSessionId) + .attr("currentSessionId", currentSessionId) + .attr("zkClientState", zkClientState) + .log("Ignoring ZooKeeper session state from a stale session"); + return; + } + checkState(zkClientState); + } + private synchronized void checkState(Watcher.Event.KeeperState zkClientState) { switch (zkClientState) { case Expired: