From 59d8425e546f141ff0aa4da23ae8bcbcf144f84f Mon Sep 17 00:00:00 2001 From: Guillaume Nodet Date: Thu, 19 Mar 2026 15:51:05 +0100 Subject: [PATCH 1/5] CAMEL-22524: Add HazelcastRoutePolicy integration test Create automated IT that tests HazelcastRoutePolicy leader election using embedded Hazelcast. The test runs 3 concurrent nodes with staggered startup, each with its own HazelcastInstance and route policy, verifying that distributed lock-based leader election works correctly and all nodes eventually execute their routes. Co-Authored-By: Claude Opus 4.6 --- .../policy/HazelcastRoutePolicyIT.java | 116 ++++++++++++++++++ 1 file changed, 116 insertions(+) create mode 100644 components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/policy/HazelcastRoutePolicyIT.java diff --git a/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/policy/HazelcastRoutePolicyIT.java b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/policy/HazelcastRoutePolicyIT.java new file mode 100644 index 0000000000000..05ae33a243ba2 --- /dev/null +++ b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/policy/HazelcastRoutePolicyIT.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.hazelcast.policy; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; + +import com.hazelcast.config.Config; +import com.hazelcast.core.Hazelcast; +import com.hazelcast.core.HazelcastInstance; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.impl.DefaultCamelContext; +import org.apache.camel.test.infra.hazelcast.services.HazelcastService; +import org.apache.camel.test.infra.hazelcast.services.HazelcastServiceFactory; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Integration test for {@link HazelcastRoutePolicy} that verifies leader election and route management using Hazelcast + * distributed locks. + */ +public class HazelcastRoutePolicyIT { + + private static final Logger LOGGER = LoggerFactory.getLogger(HazelcastRoutePolicyIT.class); + + @RegisterExtension + public static HazelcastService hazelcastService = HazelcastServiceFactory.createService(); + + private static final List CLIENTS = IntStream.range(0, 3).mapToObj(Integer::toString).toList(); + private static final List RESULTS = new ArrayList<>(); + private static final ScheduledExecutorService SCHEDULER = Executors.newScheduledThreadPool(CLIENTS.size() * 2); + private static final CountDownLatch LATCH = new CountDownLatch(CLIENTS.size()); + + @Test + public void test() throws Exception { + for (String id : CLIENTS) { + SCHEDULER.submit(() -> run(id)); + } + + LATCH.await(1, TimeUnit.MINUTES); + SCHEDULER.shutdownNow(); + + Assertions.assertEquals(CLIENTS.size(), RESULTS.size()); + Assertions.assertTrue(RESULTS.containsAll(CLIENTS)); + } + + private static void run(String id) { + try { + int events = ThreadLocalRandom.current().nextInt(2, 6); + CountDownLatch contextLatch = new CountDownLatch(events); + + Config config = hazelcastService.createConfiguration(null, 0, "node-" + id, "set"); + HazelcastInstance instance = Hazelcast.newHazelcastInstance(config); + + HazelcastRoutePolicy policy = new HazelcastRoutePolicy(instance); + policy.setLockMapName("camel-route-policy"); + policy.setLockKey("my-lock"); + policy.setLockValue("node-" + id); + policy.setTryLockTimeout(5, TimeUnit.SECONDS); + + DefaultCamelContext context = new DefaultCamelContext(); + context.disableJMX(); + context.getCamelContextExtension().setName("context-" + id); + context.addRoutes(new RouteBuilder() { + @Override + public void configure() { + from("timer:hazelcast?delay=1000&period=1000") + .routeId("route-" + id) + .routePolicy(policy) + .log("From ${routeId}") + .process(e -> contextLatch.countDown()); + } + }); + + // Staggered startup + Thread.sleep(ThreadLocalRandom.current().nextInt(500)); + + LOGGER.info("Starting CamelContext on node: {}", id); + context.start(); + LOGGER.info("Started CamelContext on node: {}", id); + + contextLatch.await(30, TimeUnit.SECONDS); + + LOGGER.info("Shutting down node {}", id); + RESULTS.add(id); + context.stop(); + instance.shutdown(); + LATCH.countDown(); + } catch (Exception e) { + LOGGER.warn("{}", e.getMessage(), e); + } + } +} From 43441b88b0c863ee6d519ea763cb7445f71f81a9 Mon Sep 17 00:00:00 2001 From: Guillaume Nodet Date: Mon, 23 Mar 2026 22:25:18 +0100 Subject: [PATCH 2/5] CAMEL-22524: Address review feedback on HazelcastRoutePolicyIT - Simplify CLIENTS list to List.of() instead of IntStream - Use AssertJ containsExactlyInAnyOrderElementsOf for better error messages Co-Authored-By: Claude Opus 4.6 --- .../hazelcast/policy/HazelcastRoutePolicyIT.java | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/policy/HazelcastRoutePolicyIT.java b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/policy/HazelcastRoutePolicyIT.java index 05ae33a243ba2..de1c9c1add74b 100644 --- a/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/policy/HazelcastRoutePolicyIT.java +++ b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/policy/HazelcastRoutePolicyIT.java @@ -23,7 +23,6 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; -import java.util.stream.IntStream; import com.hazelcast.config.Config; import com.hazelcast.core.Hazelcast; @@ -32,12 +31,13 @@ import org.apache.camel.impl.DefaultCamelContext; import org.apache.camel.test.infra.hazelcast.services.HazelcastService; import org.apache.camel.test.infra.hazelcast.services.HazelcastServiceFactory; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.assertj.core.api.Assertions.assertThat; + /** * Integration test for {@link HazelcastRoutePolicy} that verifies leader election and route management using Hazelcast * distributed locks. @@ -49,7 +49,7 @@ public class HazelcastRoutePolicyIT { @RegisterExtension public static HazelcastService hazelcastService = HazelcastServiceFactory.createService(); - private static final List CLIENTS = IntStream.range(0, 3).mapToObj(Integer::toString).toList(); + private static final List CLIENTS = List.of("0", "1", "2"); private static final List RESULTS = new ArrayList<>(); private static final ScheduledExecutorService SCHEDULER = Executors.newScheduledThreadPool(CLIENTS.size() * 2); private static final CountDownLatch LATCH = new CountDownLatch(CLIENTS.size()); @@ -63,8 +63,7 @@ public void test() throws Exception { LATCH.await(1, TimeUnit.MINUTES); SCHEDULER.shutdownNow(); - Assertions.assertEquals(CLIENTS.size(), RESULTS.size()); - Assertions.assertTrue(RESULTS.containsAll(CLIENTS)); + assertThat(RESULTS).containsExactlyInAnyOrderElementsOf(CLIENTS); } private static void run(String id) { From 9e8439776e2b0cae670abbc366a9f1b7ef06c9bf Mon Sep 17 00:00:00 2001 From: Guillaume Nodet Date: Mon, 23 Mar 2026 22:41:01 +0100 Subject: [PATCH 3/5] CAMEL-22524: Fix concurrency and reliability issues in HazelcastRoutePolicyIT - Use CopyOnWriteArrayList instead of ArrayList for thread-safe results collection - Move mutable state from static fields to local test method variables - Move latch.countDown() to finally block so it always fires (even on failure) - Add proper cleanup in finally block for context and hazelcast instance - Check latch.await() return value to fail fast on timeout - Use executor.awaitTermination() to ensure clean shutdown before assertions - Use newFixedThreadPool instead of newScheduledThreadPool (no scheduling needed) Co-Authored-By: Claude Opus 4.6 --- .../policy/HazelcastRoutePolicyIT.java | 47 +++++++++++-------- 1 file changed, 28 insertions(+), 19 deletions(-) diff --git a/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/policy/HazelcastRoutePolicyIT.java b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/policy/HazelcastRoutePolicyIT.java index de1c9c1add74b..493f8b220efbc 100644 --- a/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/policy/HazelcastRoutePolicyIT.java +++ b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/policy/HazelcastRoutePolicyIT.java @@ -16,11 +16,11 @@ */ package org.apache.camel.component.hazelcast.policy; -import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; @@ -37,6 +37,7 @@ import org.slf4j.LoggerFactory; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertTrue; /** * Integration test for {@link HazelcastRoutePolicy} that verifies leader election and route management using Hazelcast @@ -45,34 +46,37 @@ public class HazelcastRoutePolicyIT { private static final Logger LOGGER = LoggerFactory.getLogger(HazelcastRoutePolicyIT.class); + private static final List CLIENTS = List.of("0", "1", "2"); @RegisterExtension public static HazelcastService hazelcastService = HazelcastServiceFactory.createService(); - private static final List CLIENTS = List.of("0", "1", "2"); - private static final List RESULTS = new ArrayList<>(); - private static final ScheduledExecutorService SCHEDULER = Executors.newScheduledThreadPool(CLIENTS.size() * 2); - private static final CountDownLatch LATCH = new CountDownLatch(CLIENTS.size()); - @Test public void test() throws Exception { + List results = new CopyOnWriteArrayList<>(); + CountDownLatch latch = new CountDownLatch(CLIENTS.size()); + ExecutorService executor = Executors.newFixedThreadPool(CLIENTS.size()); + for (String id : CLIENTS) { - SCHEDULER.submit(() -> run(id)); + executor.submit(() -> run(id, results, latch)); } - LATCH.await(1, TimeUnit.MINUTES); - SCHEDULER.shutdownNow(); + assertTrue(latch.await(1, TimeUnit.MINUTES), "All nodes should complete within timeout"); + executor.shutdown(); + assertTrue(executor.awaitTermination(30, TimeUnit.SECONDS), "Executor should terminate cleanly"); - assertThat(RESULTS).containsExactlyInAnyOrderElementsOf(CLIENTS); + assertThat(results).containsExactlyInAnyOrderElementsOf(CLIENTS); } - private static void run(String id) { + private static void run(String id, List results, CountDownLatch latch) { + DefaultCamelContext context = null; + HazelcastInstance instance = null; try { int events = ThreadLocalRandom.current().nextInt(2, 6); CountDownLatch contextLatch = new CountDownLatch(events); Config config = hazelcastService.createConfiguration(null, 0, "node-" + id, "set"); - HazelcastInstance instance = Hazelcast.newHazelcastInstance(config); + instance = Hazelcast.newHazelcastInstance(config); HazelcastRoutePolicy policy = new HazelcastRoutePolicy(instance); policy.setLockMapName("camel-route-policy"); @@ -80,7 +84,7 @@ private static void run(String id) { policy.setLockValue("node-" + id); policy.setTryLockTimeout(5, TimeUnit.SECONDS); - DefaultCamelContext context = new DefaultCamelContext(); + context = new DefaultCamelContext(); context.disableJMX(); context.getCamelContextExtension().setName("context-" + id); context.addRoutes(new RouteBuilder() { @@ -104,12 +108,17 @@ public void configure() { contextLatch.await(30, TimeUnit.SECONDS); LOGGER.info("Shutting down node {}", id); - RESULTS.add(id); - context.stop(); - instance.shutdown(); - LATCH.countDown(); + results.add(id); } catch (Exception e) { - LOGGER.warn("{}", e.getMessage(), e); + LOGGER.warn("Node {} failed: {}", id, e.getMessage(), e); + } finally { + if (context != null) { + context.stop(); + } + if (instance != null) { + instance.shutdown(); + } + latch.countDown(); } } } From 9111f6fe5b8be8286d4018ca565d3897cee44c07 Mon Sep 17 00:00:00 2001 From: Guillaume Nodet Date: Wed, 25 Mar 2026 18:16:27 +0100 Subject: [PATCH 4/5] CAMEL-22524: Fix HazelcastRoutePolicy IT review issues - Add assertj-core test dependency (fixes compilation) - Check contextLatch.await return value to verify leader election - Rename test to testLeaderElectionWithMultipleNodes - Add @Timeout annotation as CI safety net Co-Authored-By: Claude Opus 4.6 --- components/camel-hazelcast/pom.xml | 5 +++++ .../hazelcast/policy/HazelcastRoutePolicyIT.java | 14 +++++++++----- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/components/camel-hazelcast/pom.xml b/components/camel-hazelcast/pom.xml index 2511a42c396d7..cb4f246d76392 100644 --- a/components/camel-hazelcast/pom.xml +++ b/components/camel-hazelcast/pom.xml @@ -79,6 +79,11 @@ ${project.version} test + + org.assertj + assertj-core + test + diff --git a/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/policy/HazelcastRoutePolicyIT.java b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/policy/HazelcastRoutePolicyIT.java index 493f8b220efbc..f707f5e896acf 100644 --- a/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/policy/HazelcastRoutePolicyIT.java +++ b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/policy/HazelcastRoutePolicyIT.java @@ -32,6 +32,7 @@ import org.apache.camel.test.infra.hazelcast.services.HazelcastService; import org.apache.camel.test.infra.hazelcast.services.HazelcastServiceFactory; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.extension.RegisterExtension; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,7 +53,8 @@ public class HazelcastRoutePolicyIT { public static HazelcastService hazelcastService = HazelcastServiceFactory.createService(); @Test - public void test() throws Exception { + @Timeout(value = 2, unit = TimeUnit.MINUTES) + public void testLeaderElectionWithMultipleNodes() throws Exception { List results = new CopyOnWriteArrayList<>(); CountDownLatch latch = new CountDownLatch(CLIENTS.size()); ExecutorService executor = Executors.newFixedThreadPool(CLIENTS.size()); @@ -105,10 +107,12 @@ public void configure() { context.start(); LOGGER.info("Started CamelContext on node: {}", id); - contextLatch.await(30, TimeUnit.SECONDS); - - LOGGER.info("Shutting down node {}", id); - results.add(id); + if (contextLatch.await(30, TimeUnit.SECONDS)) { + LOGGER.info("Node {} completed successfully", id); + results.add(id); + } else { + LOGGER.warn("Node {} timed out waiting for route events", id); + } } catch (Exception e) { LOGGER.warn("Node {} failed: {}", id, e.getMessage(), e); } finally { From af9b662c146f82a8160f5a695579cee03b0d9d47 Mon Sep 17 00:00:00 2001 From: Guillaume Nodet Date: Thu, 2 Apr 2026 14:03:12 +0200 Subject: [PATCH 5/5] CAMEL-22524: Improve HazelcastRoutePolicy IT code quality - Use AssertJ consistently instead of mixing JUnit assertTrue - Use try-with-resources for CamelContext cleanup - Use deterministic staggered startup delays - Include event count in timeout log messages Co-Authored-By: Claude Opus 4.6 --- .../policy/HazelcastRoutePolicyIT.java | 60 +++++++++---------- 1 file changed, 28 insertions(+), 32 deletions(-) diff --git a/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/policy/HazelcastRoutePolicyIT.java b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/policy/HazelcastRoutePolicyIT.java index f707f5e896acf..7d841943c2b4a 100644 --- a/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/policy/HazelcastRoutePolicyIT.java +++ b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/policy/HazelcastRoutePolicyIT.java @@ -38,7 +38,6 @@ import org.slf4j.LoggerFactory; import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.jupiter.api.Assertions.assertTrue; /** * Integration test for {@link HazelcastRoutePolicy} that verifies leader election and route management using Hazelcast @@ -63,15 +62,14 @@ public void testLeaderElectionWithMultipleNodes() throws Exception { executor.submit(() -> run(id, results, latch)); } - assertTrue(latch.await(1, TimeUnit.MINUTES), "All nodes should complete within timeout"); + assertThat(latch.await(1, TimeUnit.MINUTES)).as("All nodes should complete within timeout").isTrue(); executor.shutdown(); - assertTrue(executor.awaitTermination(30, TimeUnit.SECONDS), "Executor should terminate cleanly"); + assertThat(executor.awaitTermination(30, TimeUnit.SECONDS)).as("Executor should terminate cleanly").isTrue(); assertThat(results).containsExactlyInAnyOrderElementsOf(CLIENTS); } private static void run(String id, List results, CountDownLatch latch) { - DefaultCamelContext context = null; HazelcastInstance instance = null; try { int events = ThreadLocalRandom.current().nextInt(2, 6); @@ -86,39 +84,37 @@ private static void run(String id, List results, CountDownLatch latch) { policy.setLockValue("node-" + id); policy.setTryLockTimeout(5, TimeUnit.SECONDS); - context = new DefaultCamelContext(); - context.disableJMX(); - context.getCamelContextExtension().setName("context-" + id); - context.addRoutes(new RouteBuilder() { - @Override - public void configure() { - from("timer:hazelcast?delay=1000&period=1000") - .routeId("route-" + id) - .routePolicy(policy) - .log("From ${routeId}") - .process(e -> contextLatch.countDown()); + try (DefaultCamelContext context = new DefaultCamelContext()) { + context.disableJMX(); + context.getCamelContextExtension().setName("context-" + id); + context.addRoutes(new RouteBuilder() { + @Override + public void configure() { + from("timer:hazelcast?delay=1000&period=1000") + .routeId("route-" + id) + .routePolicy(policy) + .log("From ${routeId}") + .process(e -> contextLatch.countDown()); + } + }); + + // Deterministic staggered startup based on node index + Thread.sleep(Integer.parseInt(id) * 200L); + + LOGGER.info("Starting CamelContext on node: {}", id); + context.start(); + LOGGER.info("Started CamelContext on node: {}", id); + + if (contextLatch.await(30, TimeUnit.SECONDS)) { + LOGGER.info("Node {} completed {} events successfully", id, events); + results.add(id); + } else { + LOGGER.warn("Node {} timed out waiting for route events (expected {} events)", id, events); } - }); - - // Staggered startup - Thread.sleep(ThreadLocalRandom.current().nextInt(500)); - - LOGGER.info("Starting CamelContext on node: {}", id); - context.start(); - LOGGER.info("Started CamelContext on node: {}", id); - - if (contextLatch.await(30, TimeUnit.SECONDS)) { - LOGGER.info("Node {} completed successfully", id); - results.add(id); - } else { - LOGGER.warn("Node {} timed out waiting for route events", id); } } catch (Exception e) { LOGGER.warn("Node {} failed: {}", id, e.getMessage(), e); } finally { - if (context != null) { - context.stop(); - } if (instance != null) { instance.shutdown(); }