Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
Expand Down Expand Up @@ -122,9 +123,34 @@ public ZooKeeper call() throws KeeperException, InterruptedException {
log.info().attr("connectString", connectString).log("Reconnecting zookeeper");
// close the previous one
closeZkHandle();

// ZooKeeper can deliver SyncConnected after createZooKeeper() returns but before zk.set(newZk)
// publishes the new instance. Hold these events until the new instance is published, so child
// watchers never observe a new-session event while PulsarZooKeeperClient still points at the
// old handle.
CountDownLatch newZkSetLatch = new CountDownLatch(1);
Watcher forwardEventsWatcher = event -> {
try {
boolean awaited = newZkSetLatch.await(sessionTimeoutMs, TimeUnit.MILLISECONDS);
if (!awaited) {
log.warn().attr("event", event)
.log("Timed out waiting for ZooKeeper instance to be published before "
+ "forwarding event");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn()
.attr("event", event)
.exception(e)
.log("Interrupted while waiting for ZooKeeper instance to be published");
return;
}
watcherManager.process(event);
};

ZooKeeper newZk;
try {
newZk = createZooKeeper();
newZk = createZooKeeper(forwardEventsWatcher);
} catch (IOException | QuorumPeerConfig.ConfigException e) {
log.error()
.attr("connectString", connectString)
Expand All @@ -133,8 +159,12 @@ public ZooKeeper call() throws KeeperException, InterruptedException {
.log("Failed to create zookeeper instance");
throw KeeperException.create(KeeperException.Code.CONNECTIONLOSS);
}
waitForConnection();

// Publish the new instance before releasing the forwarding watcher. waitForConnection() must
// happen after countDown(), since it depends on the forwarded SyncConnected event.
zk.set(newZk);
newZkSetLatch.countDown();
waitForConnection();
log.info()
.attr("sessionId", Long.toHexString(newZk.getSessionId()))
.attr("connectString", connectString)
Expand Down Expand Up @@ -363,12 +393,12 @@ public void waitForConnection() throws KeeperException, InterruptedException {
}

@SuppressWarnings("deprecation")
protected ZooKeeper createZooKeeper() throws IOException, QuorumPeerConfig.ConfigException {
protected ZooKeeper createZooKeeper(Watcher watcher) throws IOException, QuorumPeerConfig.ConfigException {
if (null != configPath) {
return new ZooKeeper(connectString, sessionTimeoutMs, watcherManager, allowReadOnlyMode,
return new ZooKeeper(connectString, sessionTimeoutMs, watcher, allowReadOnlyMode,
new ZKClientConfig(configPath));
}
return new ZooKeeper(connectString, sessionTimeoutMs, watcherManager, allowReadOnlyMode);
return new ZooKeeper(connectString, sessionTimeoutMs, watcher, allowReadOnlyMode);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ public void close() throws Exception {
// in the future.
private void checkConnectionStatus() {
try {
long checkedSessionId = zk.getSessionId();
CompletableFuture<Watcher.Event.KeeperState> future = new CompletableFuture<>();
zk.exists("/", false, (StatCallback) (rc, path, ctx, stat) -> {
switch (KeeperException.Code.get(rc)) {
Expand All @@ -112,7 +113,7 @@ private void checkConnectionStatus() {
zkClientState = Watcher.Event.KeeperState.Disconnected;
}

checkState(zkClientState);
checkStateIfSameSession(checkedSessionId, zkClientState);
} catch (RejectedExecutionException | InterruptedException e) {
task.cancel(true);
} catch (Throwable t) {
Expand All @@ -130,6 +131,24 @@ synchronized void setSessionInvalid() {
currentStatus = SessionEvent.SessionLost;
}

// PulsarZooKeeperClient publishes the new ZooKeeper instance before forwarding the corresponding session event to
// watcherManager, so zk.set(newZk) happens-before this watcher observes the new-session event. Keep the session-id
// check and state transition in the same synchronized section to prevent stale async probes from racing with that
// event and overwriting the state of the newly established session.
private synchronized void checkStateIfSameSession(long checkedSessionId,
Watcher.Event.KeeperState zkClientState) {
long currentSessionId = zk.getSessionId();
if (checkedSessionId != currentSessionId) {
log.warn()
.attr("checkedSessionId", checkedSessionId)
.attr("currentSessionId", currentSessionId)
.attr("zkClientState", zkClientState)
.log("Ignoring ZooKeeper session state from a stale session");
return;
}
checkState(zkClientState);
}

private synchronized void checkState(Watcher.Event.KeeperState zkClientState) {
switch (zkClientState) {
case Expired:
Expand Down