diff --git a/store/src/test/java/org/apache/rocketmq/store/HATest.java b/store/src/test/java/org/apache/rocketmq/store/HATest.java index 5623adb64fa..8bdb82000b8 100644 --- a/store/src/test/java/org/apache/rocketmq/store/HATest.java +++ b/store/src/test/java/org/apache/rocketmq/store/HATest.java @@ -25,6 +25,7 @@ import java.net.InetSocketAddress; import java.net.SocketAddress; import java.time.Duration; +import java.util.List; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; @@ -38,6 +39,7 @@ import org.apache.rocketmq.store.config.BrokerRole; import org.apache.rocketmq.store.config.FlushDiskType; import org.apache.rocketmq.store.config.MessageStoreConfig; +import org.apache.rocketmq.store.ha.HAConnection; import org.apache.rocketmq.store.ha.HAConnectionState; import org.apache.rocketmq.store.stats.BrokerStatsManager; import org.junit.After; @@ -104,6 +106,7 @@ public void init() throws Exception { slaveMessageStore.start(); slaveMessageStore.updateHaMasterAddress("127.0.0.1:" + masterMessageStoreConfig.getHaListenPort()); await().atMost(6, SECONDS).until(() -> slaveMessageStore.getHaService().getHAClient().getCurrentState() == HAConnectionState.TRANSFER); + await().atMost(6, SECONDS).until(this::isSlaveReadyForReplication); } @Test @@ -281,6 +284,20 @@ private MessageExtBrokerInner buildMessage() { return msg; } + private boolean isSlaveReadyForReplication() { + if (slaveMessageStore.getHaService().getHAClient().getCurrentState() != HAConnectionState.TRANSFER) { + return false; + } + + long slaveMaxOffset = slaveMessageStore.getMaxPhyOffset(); + List connections = messageStore.getHaService().getConnectionList(); + synchronized (connections) { + return connections.stream().anyMatch(connection -> + connection.getCurrentState() == HAConnectionState.TRANSFER + && connection.getSlaveAckOffset() >= slaveMaxOffset); + } + } + private boolean isCommitLogAvailable(DefaultMessageStore store) { try { Field serviceField = store.getClass().getDeclaredField("reputMessageService");