From 6a991cc86d86a542ac567c8d60c2deb41949becf Mon Sep 17 00:00:00 2001 From: hutiefang Date: Sun, 21 Jun 2026 11:19:09 +0800 Subject: [PATCH] fix(collector): cancel duplicate job timeouts --- .../collector/timer/TimerDispatcher.java | 10 +- .../collector/timer/TimerDispatcherTest.java | 91 ++++++++++++++++++- 2 files changed, 98 insertions(+), 3 deletions(-) diff --git a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/timer/TimerDispatcher.java b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/timer/TimerDispatcher.java index a8d6a600021..c75e14220e3 100644 --- a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/timer/TimerDispatcher.java +++ b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/timer/TimerDispatcher.java @@ -118,14 +118,14 @@ public void addJob(Job addJob, CollectResponseEventListener eventListener) { if (addJob.isCyclic()) { Long nextExecutionTime = getNextExecutionInterval(addJob); Timeout timeout = wheelTimer.newTimeout(timerJob, nextExecutionTime, TimeUnit.SECONDS); - currentCyclicTaskMap.put(addJob.getId(), timeout); + cancelPreviousTimeout(currentCyclicTaskMap.put(addJob.getId(), timeout)); } else { for (Metrics metric : addJob.getMetrics()) { metric.setInterval(0L); } addJob.setIntervals(new ConcurrentLinkedDeque<>(List.of(0L))); Timeout timeout = wheelTimer.newTimeout(timerJob, addJob.getInterval(), TimeUnit.SECONDS); - currentTempTaskMap.put(addJob.getId(), timeout); + cancelPreviousTimeout(currentTempTaskMap.put(addJob.getId(), timeout)); eventListeners.put(addJob.getId(), eventListener); } } @@ -199,6 +199,12 @@ public void destroy() throws Exception { this.wheelTimer.stop(); } + private void cancelPreviousTimeout(Timeout previousTimeout) { + if (previousTimeout != null) { + previousTimeout.cancel(); + } + } + public Long getNextExecutionInterval(Job job) { if (ScheduleTypeEnum.CRON.getType().equals(job.getScheduleType()) && job.getCronExpression() != null && !job.getCronExpression().isEmpty()) { try { diff --git a/hertzbeat-collector/hertzbeat-collector-common/src/test/java/org/apache/hertzbeat/collector/timer/TimerDispatcherTest.java b/hertzbeat-collector/hertzbeat-collector-common/src/test/java/org/apache/hertzbeat/collector/timer/TimerDispatcherTest.java index 4554ca8f0a9..c0d609a687c 100644 --- a/hertzbeat-collector/hertzbeat-collector-common/src/test/java/org/apache/hertzbeat/collector/timer/TimerDispatcherTest.java +++ b/hertzbeat-collector/hertzbeat-collector-common/src/test/java/org/apache/hertzbeat/collector/timer/TimerDispatcherTest.java @@ -17,8 +17,14 @@ package org.apache.hertzbeat.collector.timer; +import java.lang.reflect.Field; +import java.util.List; +import java.util.Map; import org.apache.hertzbeat.collector.constants.ScheduleTypeEnum; +import org.apache.hertzbeat.collector.dispatch.entrance.internal.CollectResponseEventListener; import org.apache.hertzbeat.common.entity.job.Job; +import org.apache.hertzbeat.common.entity.job.Metrics; +import org.apache.hertzbeat.common.timer.Timeout; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.InjectMocks; @@ -26,6 +32,9 @@ import org.mockito.MockitoAnnotations; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotSame; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.when; /** @@ -179,4 +188,84 @@ void testGetNextExecutionIntervalWithCronAndNullExpression() { // Verify - Should fall back to interval value assertEquals(180L, result); } -} \ No newline at end of file + + @Test + void testAddCyclicJobCancelsPreviousTimeoutForSameJob() { + Job firstJob = Job.builder() + .id(1L) + .app("test") + .isCyclic(true) + .configmap(List.of()) + .metrics(List.of(Metrics.builder().interval(60L).build())) + .build(); + timerDispatcher.addJob(firstJob, null); + + Timeout firstTimeout = currentCyclicTaskMap().get(1L); + + Job updatedJob = Job.builder() + .id(1L) + .app("test") + .isCyclic(true) + .configmap(List.of()) + .metrics(List.of(Metrics.builder().interval(60L).build())) + .build(); + timerDispatcher.addJob(updatedJob, null); + + Timeout updatedTimeout = currentCyclicTaskMap().get(1L); + assertNotSame(firstTimeout, updatedTimeout); + assertTrue(firstTimeout.isCancelled()); + assertFalse(updatedTimeout.isCancelled()); + } + + @Test + void testAddTemporaryJobCancelsPreviousTimeoutForSameJob() { + Job firstJob = Job.builder() + .id(1L) + .app("test") + .isCyclic(false) + .configmap(List.of()) + .metrics(List.of(Metrics.builder().interval(60L).build())) + .build(); + timerDispatcher.addJob(firstJob, new CollectResponseEventListener() { + }); + + Timeout firstTimeout = currentTempTaskMap().get(1L); + + Job updatedJob = Job.builder() + .id(1L) + .app("test") + .isCyclic(false) + .configmap(List.of()) + .metrics(List.of(Metrics.builder().interval(60L).build())) + .build(); + timerDispatcher.addJob(updatedJob, new CollectResponseEventListener() { + }); + + Timeout updatedTimeout = currentTempTaskMap().get(1L); + assertNotSame(firstTimeout, updatedTimeout); + assertTrue(firstTimeout.isCancelled()); + assertFalse(updatedTimeout.isCancelled()); + } + + @SuppressWarnings("unchecked") + private Map currentCyclicTaskMap() { + try { + Field field = TimerDispatcher.class.getDeclaredField("currentCyclicTaskMap"); + field.setAccessible(true); + return (Map) field.get(timerDispatcher); + } catch (ReflectiveOperationException e) { + throw new AssertionError(e); + } + } + + @SuppressWarnings("unchecked") + private Map currentTempTaskMap() { + try { + Field field = TimerDispatcher.class.getDeclaredField("currentTempTaskMap"); + field.setAccessible(true); + return (Map) field.get(timerDispatcher); + } catch (ReflectiveOperationException e) { + throw new AssertionError(e); + } + } +}