diff --git a/components/camel-hazelcast/pom.xml b/components/camel-hazelcast/pom.xml
index 2511a42c396d7..cb4f246d76392 100644
--- a/components/camel-hazelcast/pom.xml
+++ b/components/camel-hazelcast/pom.xml
@@ -79,6 +79,11 @@
${project.version}
test
+
+ org.assertj
+ assertj-core
+ test
+
diff --git a/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/policy/HazelcastRoutePolicyIT.java b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/policy/HazelcastRoutePolicyIT.java
index 05ae33a243ba2..7d841943c2b4a 100644
--- a/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/policy/HazelcastRoutePolicyIT.java
+++ b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/policy/HazelcastRoutePolicyIT.java
@@ -16,14 +16,13 @@
*/
package org.apache.camel.component.hazelcast.policy;
-import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
-import java.util.stream.IntStream;
import com.hazelcast.config.Config;
import com.hazelcast.core.Hazelcast;
@@ -32,12 +31,14 @@
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.test.infra.hazelcast.services.HazelcastService;
import org.apache.camel.test.infra.hazelcast.services.HazelcastServiceFactory;
-import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.assertj.core.api.Assertions.assertThat;
+
/**
* Integration test for {@link HazelcastRoutePolicy} that verifies leader election and route management using Hazelcast
* distributed locks.
@@ -45,35 +46,37 @@
public class HazelcastRoutePolicyIT {
private static final Logger LOGGER = LoggerFactory.getLogger(HazelcastRoutePolicyIT.class);
+ private static final List CLIENTS = List.of("0", "1", "2");
@RegisterExtension
public static HazelcastService hazelcastService = HazelcastServiceFactory.createService();
- private static final List CLIENTS = IntStream.range(0, 3).mapToObj(Integer::toString).toList();
- private static final List RESULTS = new ArrayList<>();
- private static final ScheduledExecutorService SCHEDULER = Executors.newScheduledThreadPool(CLIENTS.size() * 2);
- private static final CountDownLatch LATCH = new CountDownLatch(CLIENTS.size());
-
@Test
- public void test() throws Exception {
+ @Timeout(value = 2, unit = TimeUnit.MINUTES)
+ public void testLeaderElectionWithMultipleNodes() throws Exception {
+ List results = new CopyOnWriteArrayList<>();
+ CountDownLatch latch = new CountDownLatch(CLIENTS.size());
+ ExecutorService executor = Executors.newFixedThreadPool(CLIENTS.size());
+
for (String id : CLIENTS) {
- SCHEDULER.submit(() -> run(id));
+ executor.submit(() -> run(id, results, latch));
}
- LATCH.await(1, TimeUnit.MINUTES);
- SCHEDULER.shutdownNow();
+ assertThat(latch.await(1, TimeUnit.MINUTES)).as("All nodes should complete within timeout").isTrue();
+ executor.shutdown();
+ assertThat(executor.awaitTermination(30, TimeUnit.SECONDS)).as("Executor should terminate cleanly").isTrue();
- Assertions.assertEquals(CLIENTS.size(), RESULTS.size());
- Assertions.assertTrue(RESULTS.containsAll(CLIENTS));
+ assertThat(results).containsExactlyInAnyOrderElementsOf(CLIENTS);
}
- private static void run(String id) {
+ private static void run(String id, List results, CountDownLatch latch) {
+ HazelcastInstance instance = null;
try {
int events = ThreadLocalRandom.current().nextInt(2, 6);
CountDownLatch contextLatch = new CountDownLatch(events);
Config config = hazelcastService.createConfiguration(null, 0, "node-" + id, "set");
- HazelcastInstance instance = Hazelcast.newHazelcastInstance(config);
+ instance = Hazelcast.newHazelcastInstance(config);
HazelcastRoutePolicy policy = new HazelcastRoutePolicy(instance);
policy.setLockMapName("camel-route-policy");
@@ -81,36 +84,41 @@ private static void run(String id) {
policy.setLockValue("node-" + id);
policy.setTryLockTimeout(5, TimeUnit.SECONDS);
- DefaultCamelContext context = new DefaultCamelContext();
- context.disableJMX();
- context.getCamelContextExtension().setName("context-" + id);
- context.addRoutes(new RouteBuilder() {
- @Override
- public void configure() {
- from("timer:hazelcast?delay=1000&period=1000")
- .routeId("route-" + id)
- .routePolicy(policy)
- .log("From ${routeId}")
- .process(e -> contextLatch.countDown());
+ try (DefaultCamelContext context = new DefaultCamelContext()) {
+ context.disableJMX();
+ context.getCamelContextExtension().setName("context-" + id);
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() {
+ from("timer:hazelcast?delay=1000&period=1000")
+ .routeId("route-" + id)
+ .routePolicy(policy)
+ .log("From ${routeId}")
+ .process(e -> contextLatch.countDown());
+ }
+ });
+
+ // Deterministic staggered startup based on node index
+ Thread.sleep(Integer.parseInt(id) * 200L);
+
+ LOGGER.info("Starting CamelContext on node: {}", id);
+ context.start();
+ LOGGER.info("Started CamelContext on node: {}", id);
+
+ if (contextLatch.await(30, TimeUnit.SECONDS)) {
+ LOGGER.info("Node {} completed {} events successfully", id, events);
+ results.add(id);
+ } else {
+ LOGGER.warn("Node {} timed out waiting for route events (expected {} events)", id, events);
}
- });
-
- // Staggered startup
- Thread.sleep(ThreadLocalRandom.current().nextInt(500));
-
- LOGGER.info("Starting CamelContext on node: {}", id);
- context.start();
- LOGGER.info("Started CamelContext on node: {}", id);
-
- contextLatch.await(30, TimeUnit.SECONDS);
-
- LOGGER.info("Shutting down node {}", id);
- RESULTS.add(id);
- context.stop();
- instance.shutdown();
- LATCH.countDown();
+ }
} catch (Exception e) {
- LOGGER.warn("{}", e.getMessage(), e);
+ LOGGER.warn("Node {} failed: {}", id, e.getMessage(), e);
+ } finally {
+ if (instance != null) {
+ instance.shutdown();
+ }
+ latch.countDown();
}
}
}