diff --git a/components/camel-hazelcast/pom.xml b/components/camel-hazelcast/pom.xml index 2511a42c396d7..cb4f246d76392 100644 --- a/components/camel-hazelcast/pom.xml +++ b/components/camel-hazelcast/pom.xml @@ -79,6 +79,11 @@ ${project.version} test + + org.assertj + assertj-core + test + diff --git a/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/policy/HazelcastRoutePolicyIT.java b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/policy/HazelcastRoutePolicyIT.java index 05ae33a243ba2..7d841943c2b4a 100644 --- a/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/policy/HazelcastRoutePolicyIT.java +++ b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/policy/HazelcastRoutePolicyIT.java @@ -16,14 +16,13 @@ */ package org.apache.camel.component.hazelcast.policy; -import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; -import java.util.stream.IntStream; import com.hazelcast.config.Config; import com.hazelcast.core.Hazelcast; @@ -32,12 +31,14 @@ import org.apache.camel.impl.DefaultCamelContext; import org.apache.camel.test.infra.hazelcast.services.HazelcastService; import org.apache.camel.test.infra.hazelcast.services.HazelcastServiceFactory; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.extension.RegisterExtension; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.assertj.core.api.Assertions.assertThat; + /** * Integration test for {@link HazelcastRoutePolicy} that verifies leader election and route management using Hazelcast * distributed locks. @@ -45,35 +46,37 @@ public class HazelcastRoutePolicyIT { private static final Logger LOGGER = LoggerFactory.getLogger(HazelcastRoutePolicyIT.class); + private static final List CLIENTS = List.of("0", "1", "2"); @RegisterExtension public static HazelcastService hazelcastService = HazelcastServiceFactory.createService(); - private static final List CLIENTS = IntStream.range(0, 3).mapToObj(Integer::toString).toList(); - private static final List RESULTS = new ArrayList<>(); - private static final ScheduledExecutorService SCHEDULER = Executors.newScheduledThreadPool(CLIENTS.size() * 2); - private static final CountDownLatch LATCH = new CountDownLatch(CLIENTS.size()); - @Test - public void test() throws Exception { + @Timeout(value = 2, unit = TimeUnit.MINUTES) + public void testLeaderElectionWithMultipleNodes() throws Exception { + List results = new CopyOnWriteArrayList<>(); + CountDownLatch latch = new CountDownLatch(CLIENTS.size()); + ExecutorService executor = Executors.newFixedThreadPool(CLIENTS.size()); + for (String id : CLIENTS) { - SCHEDULER.submit(() -> run(id)); + executor.submit(() -> run(id, results, latch)); } - LATCH.await(1, TimeUnit.MINUTES); - SCHEDULER.shutdownNow(); + assertThat(latch.await(1, TimeUnit.MINUTES)).as("All nodes should complete within timeout").isTrue(); + executor.shutdown(); + assertThat(executor.awaitTermination(30, TimeUnit.SECONDS)).as("Executor should terminate cleanly").isTrue(); - Assertions.assertEquals(CLIENTS.size(), RESULTS.size()); - Assertions.assertTrue(RESULTS.containsAll(CLIENTS)); + assertThat(results).containsExactlyInAnyOrderElementsOf(CLIENTS); } - private static void run(String id) { + private static void run(String id, List results, CountDownLatch latch) { + HazelcastInstance instance = null; try { int events = ThreadLocalRandom.current().nextInt(2, 6); CountDownLatch contextLatch = new CountDownLatch(events); Config config = hazelcastService.createConfiguration(null, 0, "node-" + id, "set"); - HazelcastInstance instance = Hazelcast.newHazelcastInstance(config); + instance = Hazelcast.newHazelcastInstance(config); HazelcastRoutePolicy policy = new HazelcastRoutePolicy(instance); policy.setLockMapName("camel-route-policy"); @@ -81,36 +84,41 @@ private static void run(String id) { policy.setLockValue("node-" + id); policy.setTryLockTimeout(5, TimeUnit.SECONDS); - DefaultCamelContext context = new DefaultCamelContext(); - context.disableJMX(); - context.getCamelContextExtension().setName("context-" + id); - context.addRoutes(new RouteBuilder() { - @Override - public void configure() { - from("timer:hazelcast?delay=1000&period=1000") - .routeId("route-" + id) - .routePolicy(policy) - .log("From ${routeId}") - .process(e -> contextLatch.countDown()); + try (DefaultCamelContext context = new DefaultCamelContext()) { + context.disableJMX(); + context.getCamelContextExtension().setName("context-" + id); + context.addRoutes(new RouteBuilder() { + @Override + public void configure() { + from("timer:hazelcast?delay=1000&period=1000") + .routeId("route-" + id) + .routePolicy(policy) + .log("From ${routeId}") + .process(e -> contextLatch.countDown()); + } + }); + + // Deterministic staggered startup based on node index + Thread.sleep(Integer.parseInt(id) * 200L); + + LOGGER.info("Starting CamelContext on node: {}", id); + context.start(); + LOGGER.info("Started CamelContext on node: {}", id); + + if (contextLatch.await(30, TimeUnit.SECONDS)) { + LOGGER.info("Node {} completed {} events successfully", id, events); + results.add(id); + } else { + LOGGER.warn("Node {} timed out waiting for route events (expected {} events)", id, events); } - }); - - // Staggered startup - Thread.sleep(ThreadLocalRandom.current().nextInt(500)); - - LOGGER.info("Starting CamelContext on node: {}", id); - context.start(); - LOGGER.info("Started CamelContext on node: {}", id); - - contextLatch.await(30, TimeUnit.SECONDS); - - LOGGER.info("Shutting down node {}", id); - RESULTS.add(id); - context.stop(); - instance.shutdown(); - LATCH.countDown(); + } } catch (Exception e) { - LOGGER.warn("{}", e.getMessage(), e); + LOGGER.warn("Node {} failed: {}", id, e.getMessage(), e); + } finally { + if (instance != null) { + instance.shutdown(); + } + latch.countDown(); } } }