diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java index aba28ec1fc357..2b39e1bf7cad5 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java @@ -1174,7 +1174,7 @@ public String toString() { } @Override - public void addWatch(String basePath, Watcher watcher, AddWatchMode mode, VoidCallback cb, Object ctx) { + public void addWatch(String basePath, Watcher watcher, AddWatchMode mode, VoidCallback cb, Object context) { final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, rateLimiter, setStats) { final VoidCallback vCb = new VoidCallback() { @@ -1185,7 +1185,7 @@ public void processResult(int rc, String path, Object ctx) { if (allowRetry(worker, rc)) { backOffAndRetry(that, worker.nextRetryWaitTime()); } else { - vCb.processResult(rc, basePath, ctx); + cb.processResult(rc, path, context); } } @@ -1195,15 +1195,15 @@ public void processResult(int rc, String path, Object ctx) { void zkRun() { ZooKeeper zkHandle = zk.get(); if (null == zkHandle) { - PulsarZooKeeperClient.super.addWatch(basePath, watcher, mode, cb, ctx); + PulsarZooKeeperClient.super.addWatch(basePath, watcher, mode, vCb, worker); } else { - zkHandle.addWatch(basePath, watcher, mode, cb, ctx); + zkHandle.addWatch(basePath, watcher, mode, vCb, worker); } } @Override public String toString() { - return String.format("setData (%s, mode = %s)", basePath, mode.name()); + return String.format("addWatch (%s, mode = %s)", basePath, mode.name()); } }; // execute it immediately diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java index 9d2d805c07a89..d23a1b9557219 100644 --- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java @@ -19,6 +19,12 @@ package org.apache.pulsar.metadata; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; @@ -40,6 +46,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; @@ -49,6 +56,7 @@ import lombok.Cleanup; import lombok.CustomLog; import lombok.SneakyThrows; +import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.metadata.api.GetResult; import org.apache.pulsar.metadata.api.MetadataStore; @@ -65,6 +73,9 @@ import org.apache.pulsar.metadata.impl.PulsarZooKeeperClient; import org.apache.pulsar.metadata.impl.ZKMetadataStore; import org.apache.pulsar.metadata.impl.oxia.OxiaMetadataStore; +import org.apache.zookeeper.AddWatchMode; +import org.apache.zookeeper.AsyncCallback.VoidCallback; +import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; @@ -548,6 +559,53 @@ public void testZkLoadConfigFromFile() throws Exception { assertFalse(zooKeeper.getClientConfig().isSaslClientEnabled()); } + @Test + @SuppressWarnings("unchecked") + public void testAsyncAddWatchRetriesWithWrapperCallback() throws Exception { + String path = newKey(); + @Cleanup + PulsarZooKeeperClient zkClient = PulsarZooKeeperClient.newBuilder() + .connectString(zks.getConnectionString()) + .sessionTimeoutMs(3000) + .operationRetryPolicy(new BoundExponentialBackoffRetryPolicy(0, 0, 3)) + .build(); + + ZooKeeper mockZk = mock(ZooKeeper.class); + AtomicInteger attempts = new AtomicInteger(); + doAnswer(invocation -> { + // The wrapper callback should consume this recoverable failure and retry the addWatch operation. + int rc = attempts.incrementAndGet() == 1 + ? KeeperException.Code.CONNECTIONLOSS.intValue() + : KeeperException.Code.OK.intValue(); + String callbackPath = invocation.getArgument(0); + VoidCallback callback = invocation.getArgument(3); + Object callbackContext = invocation.getArgument(4); + callback.processResult(rc, callbackPath, callbackContext); + return null; + }).when(mockZk).addWatch(eq(path), any(Watcher.class), eq(AddWatchMode.PERSISTENT_RECURSIVE), + any(VoidCallback.class), any()); + + // Force the Pulsar wrapper to delegate the async addWatch call to our controlled ZooKeeper instance. + var zooKeeperRef = (AtomicReference) WhiteboxImpl.getInternalState(zkClient, "zk"); + zooKeeperRef.set(mockZk); + + CountDownLatch callbackCalled = new CountDownLatch(1); + AtomicInteger callbackRc = new AtomicInteger(Integer.MIN_VALUE); + zkClient.addWatch(path, event -> { + }, AddWatchMode.PERSISTENT_RECURSIVE, (rc, callbackPath, ctx) -> { + callbackRc.set(rc); + callbackCalled.countDown(); + }, null); + + assertTrue(callbackCalled.await(5, TimeUnit.SECONDS)); + + // The caller should only see the final successful result after the retry, not the first CONNECTIONLOSS. + assertEquals(callbackRc.get(), KeeperException.Code.OK.intValue()); + assertEquals(attempts.get(), 2); + verify(mockZk, times(2)).addWatch(eq(path), any(Watcher.class), eq(AddWatchMode.PERSISTENT_RECURSIVE), + any(VoidCallback.class), any()); + } + @Test public void testOxiaLoadConfigFromFile() throws Exception { final String metadataStoreName = UUID.randomUUID().toString().replaceAll("-", "");