Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -118,14 +118,14 @@ public void addJob(Job addJob, CollectResponseEventListener eventListener) {
if (addJob.isCyclic()) {
Long nextExecutionTime = getNextExecutionInterval(addJob);
Timeout timeout = wheelTimer.newTimeout(timerJob, nextExecutionTime, TimeUnit.SECONDS);
currentCyclicTaskMap.put(addJob.getId(), timeout);
cancelPreviousTimeout(currentCyclicTaskMap.put(addJob.getId(), timeout));
} else {
for (Metrics metric : addJob.getMetrics()) {
metric.setInterval(0L);
}
addJob.setIntervals(new ConcurrentLinkedDeque<>(List.of(0L)));
Timeout timeout = wheelTimer.newTimeout(timerJob, addJob.getInterval(), TimeUnit.SECONDS);
currentTempTaskMap.put(addJob.getId(), timeout);
cancelPreviousTimeout(currentTempTaskMap.put(addJob.getId(), timeout));
eventListeners.put(addJob.getId(), eventListener);
}
}
Expand Down Expand Up @@ -199,6 +199,12 @@ public void destroy() throws Exception {
this.wheelTimer.stop();
}

private void cancelPreviousTimeout(Timeout previousTimeout) {
if (previousTimeout != null) {
previousTimeout.cancel();
}
}

public Long getNextExecutionInterval(Job job) {
if (ScheduleTypeEnum.CRON.getType().equals(job.getScheduleType()) && job.getCronExpression() != null && !job.getCronExpression().isEmpty()) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,24 @@

package org.apache.hertzbeat.collector.timer;

import java.lang.reflect.Field;
import java.util.List;
import java.util.Map;
import org.apache.hertzbeat.collector.constants.ScheduleTypeEnum;
import org.apache.hertzbeat.collector.dispatch.entrance.internal.CollectResponseEventListener;
import org.apache.hertzbeat.common.entity.job.Job;
import org.apache.hertzbeat.common.entity.job.Metrics;
import org.apache.hertzbeat.common.timer.Timeout;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.when;

/**
Expand Down Expand Up @@ -179,4 +188,84 @@ void testGetNextExecutionIntervalWithCronAndNullExpression() {
// Verify - Should fall back to interval value
assertEquals(180L, result);
}
}

@Test
void testAddCyclicJobCancelsPreviousTimeoutForSameJob() {
Job firstJob = Job.builder()
.id(1L)
.app("test")
.isCyclic(true)
.configmap(List.of())
.metrics(List.of(Metrics.builder().interval(60L).build()))
.build();
timerDispatcher.addJob(firstJob, null);

Timeout firstTimeout = currentCyclicTaskMap().get(1L);

Job updatedJob = Job.builder()
.id(1L)
.app("test")
.isCyclic(true)
.configmap(List.of())
.metrics(List.of(Metrics.builder().interval(60L).build()))
.build();
timerDispatcher.addJob(updatedJob, null);

Timeout updatedTimeout = currentCyclicTaskMap().get(1L);
assertNotSame(firstTimeout, updatedTimeout);
assertTrue(firstTimeout.isCancelled());
assertFalse(updatedTimeout.isCancelled());
}

@Test
void testAddTemporaryJobCancelsPreviousTimeoutForSameJob() {
Job firstJob = Job.builder()
.id(1L)
.app("test")
.isCyclic(false)
.configmap(List.of())
.metrics(List.of(Metrics.builder().interval(60L).build()))
.build();
timerDispatcher.addJob(firstJob, new CollectResponseEventListener() {
});

Timeout firstTimeout = currentTempTaskMap().get(1L);

Job updatedJob = Job.builder()
.id(1L)
.app("test")
.isCyclic(false)
.configmap(List.of())
.metrics(List.of(Metrics.builder().interval(60L).build()))
.build();
timerDispatcher.addJob(updatedJob, new CollectResponseEventListener() {
});

Timeout updatedTimeout = currentTempTaskMap().get(1L);
assertNotSame(firstTimeout, updatedTimeout);
assertTrue(firstTimeout.isCancelled());
assertFalse(updatedTimeout.isCancelled());
}

@SuppressWarnings("unchecked")
private Map<Long, Timeout> currentCyclicTaskMap() {
try {
Field field = TimerDispatcher.class.getDeclaredField("currentCyclicTaskMap");
field.setAccessible(true);
return (Map<Long, Timeout>) field.get(timerDispatcher);
} catch (ReflectiveOperationException e) {
throw new AssertionError(e);
}
}

@SuppressWarnings("unchecked")
private Map<Long, Timeout> currentTempTaskMap() {
try {
Field field = TimerDispatcher.class.getDeclaredField("currentTempTaskMap");
field.setAccessible(true);
return (Map<Long, Timeout>) field.get(timerDispatcher);
} catch (ReflectiveOperationException e) {
throw new AssertionError(e);
}
}
}
Loading