diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index f857cfaa436b..e08a977f6464 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -503,29 +503,67 @@ public void handle() final int desiredTaskCount = computeDesiredTaskCount.call(); final int currentTaskCount = getCurrentTaskCount(); - if (desiredTaskCount <= 0) { - return; - } - ServiceMetricEvent.Builder event = ServiceMetricEvent.builder() .setDimension(DruidMetrics.SUPERVISOR_ID, supervisorId) .setDimension(DruidMetrics.DATASOURCE, dataSource) .setDimension(DruidMetrics.STREAM, getIoConfig().getStream()); - // 1) This should already be handled by the auto-scaler implementation, but make sure we catch/record these for auditability - if (desiredTaskCount == currentTaskCount) { + // Auto-scaler contract: a negative return is pathological (scaler couldn't compute a + // useful answer, e.g. insufficient metrics). + if (desiredTaskCount < 0) { log.warn( - "Skipping scaling for supervisor[%s] for dataSource[%s]: already at desired task count [%d]", + "Auto-scaler returned pathological taskCount[%d] for supervisor[%s] for dataSource[%s]; skipping scale.", + desiredTaskCount, + supervisorId, + dataSource + ); + emitter.emit(event.setDimension(AUTOSCALER_SKIP_REASON_DIMENSION, "Auto-scaler failed to compute a task count") + .setMetric(AUTOSCALER_REQUIRED_TASKS_METRIC, desiredTaskCount)); + return; + } + + // Clamp the scaler's preferred count to the supervisor's configured bounds. The scaler + // is responsible only for computing what it wants; bounds enforcement and the + // corresponding skip-reason emissions live here. Partition count is a hard ceiling — we + // cannot usefully run more tasks than partitions for stream ingestion — so it caps both + // configured min and max (protects against misconfigurations where an operator-set min + // exceeds partitionCount, which would otherwise force us to allocate idle tasks). + final int partitionCount = getPartitionCount(); + final int rawMin = autoScalerConfig.getTaskCountMin(); + final int rawMax = autoScalerConfig.getTaskCountMax(); + final int taskCountMin = partitionCount > 0 ? Math.min(rawMin, partitionCount) : rawMin; + final int taskCountMax = partitionCount > 0 ? Math.min(rawMax, partitionCount) : rawMax; + final int clampedTaskCount = Math.min(taskCountMax, Math.max(taskCountMin, desiredTaskCount)); + + // When the clamped value equals the current count there is nothing to do. Emit a specific + // skip reason reflecting *why* we stopped: at a configured bound if clamping was the cause, + // otherwise the scaler simply prefers the status quo. + if (clampedTaskCount == currentTaskCount) { + final String skipReason; + if (desiredTaskCount > taskCountMax) { + skipReason = "Already at max task count"; + } else if (desiredTaskCount < taskCountMin) { + skipReason = "Already at min task count"; + } else { + skipReason = "desired capacity reached"; + } + log.info( + "Skipping scaling for supervisor[%s] for dataSource[%s]: [%s] (scaler wants [%d], current [%d], bounds [%d,%d])", supervisorId, dataSource, - desiredTaskCount + skipReason, + desiredTaskCount, + currentTaskCount, + taskCountMin, + taskCountMax ); - emitter.emit(event.setDimension(AUTOSCALER_SKIP_REASON_DIMENSION, "desired capacity reached") + // Emit the *unclamped* desired count so operators see what the scaler actually wants. + emitter.emit(event.setDimension(AUTOSCALER_SKIP_REASON_DIMENSION, skipReason) .setMetric(AUTOSCALER_REQUIRED_TASKS_METRIC, desiredTaskCount)); return; } - // 2) Make sure we wait for any pending completion tasks to finish. + // Make sure we wait for any pending completion tasks to finish. // At this point there could be 3 generations of tasks: pending completion tasks (old generation), running tasks (current generation), and (after our scale) pending tasks (new generation). // We want to avoid killing any old generation tasks preemptively, as that might cause the current generation tasks' offsets to become invalid. for (CopyOnWriteArrayList list : pendingCompletionTaskGroups.values()) { @@ -546,32 +584,34 @@ public void handle() } } - // 3) Make sure we are not breaching any scaling cooldown limits. + // Make sure we are not breaching any scaling cooldown limits. // Scaling operations are disruptive — scale-down in particular can leave the supervisor // under-resourced while it recovers from lag induced by the scale event, so callers may // configure a longer cooldown for scale-down than for scale-up. Both directions are measured against the same // last-scale timestamp so that a rapid up/down oscillation is still subject to the appropriate cooldown, - // regardless of which direction triggered last. + // regardless of which direction triggered last. Direction is computed against the + // *clamped* value — that is the scale we would actually apply. final ScaleDirection scaleDirection; final long cooldownMillis; - if (desiredTaskCount > currentTaskCount) { + if (clampedTaskCount > currentTaskCount) { scaleDirection = ScaleDirection.SCALE_UP; cooldownMillis = autoScalerConfig.getMinScaleUpDelay().getMillis(); - } else { // desiredTaskCount < currentTaskCount + } else { // clampedTaskCount < currentTaskCount scaleDirection = ScaleDirection.SCALE_DOWN; cooldownMillis = autoScalerConfig.getMinScaleDownDelay().getMillis(); } if (nowTime - dynamicTriggerLastScaleRunTime < cooldownMillis) { log.info( - "DynamicAllocationTasksNotice submitted again in [%d]ms, [%s] cooldown is [%d]ms for supervisor[%s] for dataSource[%s], skipping it! desired task count is [%d], current task count is [%d]", + "DynamicAllocationTasksNotice submitted again in [%d]ms, [%s] cooldown is [%d]ms for supervisor[%s] for dataSource[%s], skipping it! scaler wants [%d] (clamped [%d]), current task count is [%d]", nowTime - dynamicTriggerLastScaleRunTime, scaleDirection, cooldownMillis, supervisorId, dataSource, desiredTaskCount, + clampedTaskCount, currentTaskCount ); @@ -583,10 +623,12 @@ public void handle() return; } - // At this point, we can reasonably attempt a scaling action, so emit our required task count + // At this point we can reasonably attempt a scaling action. Emit the scaler's unclamped + // preferred count as the operator hint: that number tells operators "the scaler wants + // taskCount[N]" regardless of whether we can give it the full amount. emitter.emit(event.setMetric(AUTOSCALER_REQUIRED_TASKS_METRIC, desiredTaskCount)); - boolean allocationSuccess = changeTaskCount(desiredTaskCount); + boolean allocationSuccess = changeTaskCount(clampedTaskCount); if (allocationSuccess) { onSuccessfulScale.run(); dynamicTriggerLastScaleRunTime = nowTime; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java index fa7db4f6928c..757551c12492 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java @@ -166,61 +166,47 @@ public int computeTaskCountForRollover() } } + /** + * Returns the task count the scaler wants to reach — the "optimal" count computed from current + * metrics, unclamped by the autoscaler's min/max bounds. The supervisor is responsible for + * clamping the return value to {@code [taskCountMin, taskCountMax]} and for deciding whether + * to actually scale. + *

+ * Contract: + *

+ */ public int computeTaskCountForScaleAction() { lastKnownMetrics = collectMetrics(); final int optimalTaskCount = computeOptimalTaskCount(lastKnownMetrics); - int currentTaskCount = supervisor.getIoConfig().getTaskCount(); - - // Take the current task count but clamp it to the configured boundaries if it is outside the boundaries. - // There might be a configuration instance with a handwritten taskCount that is outside the boundaries. - final boolean isTaskCountOutOfBounds = currentTaskCount < config.getTaskCountMin() - || currentTaskCount > config.getTaskCountMax(); - if (isTaskCountOutOfBounds) { - currentTaskCount = Math.min(config.getTaskCountMax(), Math.max(config.getTaskCountMin(), currentTaskCount)); + if (optimalTaskCount <= 0) { + // computeOptimalTaskCount returns -1 for pathological metrics states; propagate it. + return -1; } - // Perform scale-up actions; scale-down actions only if configured. - final int taskCount; - - // If task count is out of bounds, scale to the configured boundary - // regardless of optimal task count, to get back to a safe state. - if (isTaskCountOutOfBounds) { - taskCount = currentTaskCount; - log.info("Task count for supervisor[%s] was out of bounds [%d,%d], urgently scaling from [%d] to [%d].", - supervisorId, config.getTaskCountMin(), config.getTaskCountMax(), currentTaskCount, currentTaskCount); - } else if (optimalTaskCount > currentTaskCount) { - taskCount = optimalTaskCount; - log.info("Updating taskCount for supervisor[%s] from [%d] to [%d] (scale up).", supervisorId, currentTaskCount, taskCount); - } else if (!config.isScaleDownOnTaskRolloverOnly() - && optimalTaskCount < currentTaskCount - && optimalTaskCount > 0) { - taskCount = optimalTaskCount; - log.info("Updating taskCount for supervisor[%s] from [%d] to [%d] (scale down).", supervisorId, currentTaskCount, taskCount); - } else { - taskCount = -1; - log.debug("No scaling required for supervisor[%s]", supervisorId); - - // Emit metrics for scaling skip reasons; in case of min == max, signaling reaching - // max task count has bigger priority for the external observers / trackers - if (optimalTaskCount >= config.getTaskCountMax() || currentTaskCount == config.getTaskCountMax()) { - emitter.emit(getMetricBuilder() - .setDimension( - SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION, - "Already at max task count" - ) - .setMetric(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC, currentTaskCount)); - } else if (optimalTaskCount == config.getTaskCountMin() || currentTaskCount == config.getTaskCountMin()) { - emitter.emit(getMetricBuilder() - .setDimension( - SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION, - "Already at min task count" - ) - .setMetric(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC, currentTaskCount)); - } + final int currentTaskCount = supervisor.getIoConfig().getTaskCount(); + + if (config.isScaleDownOnTaskRolloverOnly() && optimalTaskCount < currentTaskCount) { + // In rollover-only scale-down mode, the scaler's preferred count is the current count — it + // explicitly does not want a proactive scale-down here. + return currentTaskCount; } - return taskCount; + + log.info( + "CostBasedAutoScaler for supervisor[%s] wants taskCount[%d] (current[%d]).", + supervisorId, + optimalTaskCount, + currentTaskCount + ); + return optimalTaskCount; } public CostBasedAutoScalerConfig getConfig() diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java index b96a1de7bd41..e39aa35b4e93 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java @@ -31,8 +31,6 @@ import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.service.ServiceEmitter; -import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; -import org.apache.druid.query.DruidMetrics; import org.apache.druid.utils.CollectionUtils; import java.util.ArrayList; @@ -52,7 +50,6 @@ public class LagBasedAutoScaler implements SupervisorTaskAutoScaler private final SeekableStreamSupervisor supervisor; private final LagBasedAutoScalerConfig lagBasedAutoScalerConfig; private final ServiceEmitter emitter; - private final ServiceMetricEvent.Builder metricBuilder; private static final ReentrantLock LOCK = new ReentrantLock(true); @@ -77,10 +74,6 @@ public LagBasedAutoScaler( this.spec = spec; this.supervisor = supervisor; this.emitter = emitter; - metricBuilder = ServiceMetricEvent.builder() - .setDimension(DruidMetrics.SUPERVISOR_ID, spec.getId()) - .setDimension(DruidMetrics.DATASOURCE, dataSource) - .setDimension(DruidMetrics.STREAM, this.supervisor.getIoConfig().getStream()); } @Override @@ -198,26 +191,26 @@ private Runnable computeAndCollectLag() } /** - * This method determines whether to do scale actions based on collected lag points. - * The current algorithm of scale is straightforward: + * Returns the task count this scaler wants to reach based on collected lag points. The value + * is the scaler's preferred count, unclamped by the autoscaler's min/max bounds — the supervisor + * is responsible for clamping to {@code [taskCountMin, taskCountMax]} and for deciding whether + * to actually scale. + *

+ * Contract: *

- * - * @param lags the lag metrics of Stream (Kafka/Kinesis) - * @return Integer, target number of tasksCount. -1 means skip scale action. + * The partition count remains a hard upper cap here because task count > partition count is + * nonsensical for stream ingestion and is not a min/max-config concern. */ @VisibleForTesting int computeDesiredTaskCount(List lags) { - // if the supervisor is not suspended, ensure required tasks are running - // if suspended, ensure tasks have been requested to gracefully stop log.debug( "Computing the desired task count for supervisor[%s], based on following lags : [%s]", spec.getId(), @@ -237,74 +230,31 @@ int computeDesiredTaskCount(List lags) double beyondProportion = beyond * 1.0 / metricsCount; double withinProportion = within * 1.0 / metricsCount; - log.debug("Calculated beyondProportion is [%s] and withinProportion is [%s] for supervisor[%s].", beyondProportion, - withinProportion, spec.getId() + log.debug( + "Calculated beyondProportion is [%s] and withinProportion is [%s] for supervisor[%s].", + beyondProportion, + withinProportion, + spec.getId() ); - int currentActiveTaskCount = supervisor.getIoConfig().getTaskCount(); - int desiredActiveTaskCount; final int partitionCount = supervisor.getPartitionCount(); if (partitionCount <= 0) { log.warn("Partition number for supervisor[%s] <= 0 ? how can it be?", spec.getId()); return -1; } - final int actualTaskCountMax = Math.min(lagBasedAutoScalerConfig.getTaskCountMax(), partitionCount); - final int actualTaskCountMin = Math.min(lagBasedAutoScalerConfig.getTaskCountMin(), partitionCount); - - // Take the current task count but clamp it to the configured boundaries if it is outside the boundaries. - // There might be a configuration instance with a handwritten taskCount that is outside the boundaries. - // If that is happening, take the bound and return early. - final boolean isTaskCountOutOfBounds = currentActiveTaskCount < actualTaskCountMin - || currentActiveTaskCount > actualTaskCountMax; - if (isTaskCountOutOfBounds) { - currentActiveTaskCount = Math.min(actualTaskCountMax, Math.max(actualTaskCountMin, currentActiveTaskCount)); - return currentActiveTaskCount; - } + final int currentActiveTaskCount = supervisor.getIoConfig().getTaskCount(); if (beyondProportion >= lagBasedAutoScalerConfig.getTriggerScaleOutFractionThreshold()) { - // Do Scale out - final int taskCount = currentActiveTaskCount + lagBasedAutoScalerConfig.getScaleOutStep(); - if (currentActiveTaskCount == actualTaskCountMax) { - log.debug( - "CurrentActiveTaskCount reached task count Max limit, skipping scale out action for supervisor[%s].", - spec.getId() - ); - emitter.emit(metricBuilder - .setDimension( - SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION, - "Already at max task count" - ) - .setMetric(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC, taskCount)); - return -1; - } else { - desiredActiveTaskCount = Math.min(taskCount, actualTaskCountMax); - } - return desiredActiveTaskCount; + // scale-out: step up from current, capped by partition count (scaler-internal constraint) + return Math.min(currentActiveTaskCount + lagBasedAutoScalerConfig.getScaleOutStep(), partitionCount); } - if (withinProportion >= lagBasedAutoScalerConfig.getTriggerScaleInFractionThreshold()) { - // Do Scale in - final int taskCount = currentActiveTaskCount - lagBasedAutoScalerConfig.getScaleInStep(); - if (currentActiveTaskCount == actualTaskCountMin) { - log.debug( - "CurrentActiveTaskCount reached task count Min limit[%d], skipping scale in action for supervisor[%s].", - actualTaskCountMin, - spec.getId() - ); - emitter.emit(metricBuilder - .setDimension( - SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION, - "Already at min task count" - ) - .setMetric(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC, taskCount)); - return -1; - } else { - desiredActiveTaskCount = Math.max(taskCount, actualTaskCountMin); - } - return desiredActiveTaskCount; + // scale-in: step down from current (the supervisor will clamp to taskCountMin) + return currentActiveTaskCount - lagBasedAutoScalerConfig.getScaleInStep(); } - return -1; + // Neither trigger fired; the scaler's preferred count is the current count. + return currentActiveTaskCount; } public LagBasedAutoScalerConfig getAutoScalerConfig() diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java index bf3d5f9d71e4..69b183e23a19 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java @@ -397,8 +397,11 @@ public void testSeekableStreamSupervisorSpecWithScaleOut() throws InterruptedExc EasyMock.expect(spec.getId()).andReturn(SUPERVISOR).anyTimes(); EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes(); + // Use taskCountMax=10 so both the scaler and the supervisor's bounds agree and multiple + // scale-out iterations have headroom before hitting max (which is what lets us observe the + // "Scale cooldown not elapsed yet" skip reason emitted by the supervisor on later iterations). EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes(); - EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig(1, true)).anyTimes(); + EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig(1, true, 10)).anyTimes(); EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes(); EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes(); EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes(); @@ -652,10 +655,41 @@ public void testSeekableStreamSupervisorSpecWithScaleIn() throws InterruptedExce @Test public void testSeekableStreamSupervisorSpecWithScaleInThresholdGreaterThanPartitions() throws InterruptedException { + // Verifies that when the operator misconfigures taskCountMin above partitionCount, the + // supervisor's partition-count ceiling brings both bounds down to partitionCount and any + // scale action settles at partitionCount. + final Map misconfiguredProps = getScaleInProperties(); + misconfiguredProps.put("taskCountMax", 20); + misconfiguredProps.put("taskCountMin", 15); + final AutoScalerConfig misconfiguredAutoScalerConfig = + mapper.convertValue(misconfiguredProps, AutoScalerConfig.class); + EasyMock.expect(spec.getId()).andReturn(SUPERVISOR).anyTimes(); EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes(); EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes(); - EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig(2, false)).anyTimes(); + // Use an ioConfig whose autoScalerConfig matches the test's intent (min=15, max=20) so the + // supervisor's bounds and the scaler's bounds agree. + EasyMock.expect(spec.getIoConfig()).andReturn(new SeekableStreamSupervisorIOConfig( + "stream", + new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false, false, false), + 1, + null, + new Period("PT1H"), + new Period("P1D"), + new Period("PT30S"), + false, + new Period("PT30M"), + null, + null, + misconfiguredAutoScalerConfig, + LagAggregator.DEFAULT, + null, + null, + null, + null + ) + { + }).anyTimes(); EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes(); EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes(); EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes(); @@ -671,33 +705,29 @@ public void testSeekableStreamSupervisorSpecWithScaleInThresholdGreaterThanParti EasyMock.replay(taskMaster); TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(10); - Map modifiedScaleInProps = getScaleInProperties(); - - modifiedScaleInProps.put("taskCountMax", 20); - modifiedScaleInProps.put("taskCountMin", 15); LagBasedAutoScaler autoScaler = new LagBasedAutoScaler( supervisor, mapper.convertValue( - modifiedScaleInProps, + misconfiguredProps, LagBasedAutoScalerConfig.class ), spec, emitter ); - // enable autoscaler so that taskcount config will be ignored and the init value of taskCount will use taskCountMin. - Assert.assertEquals(1, (int) supervisor.getIoConfig().getTaskCount()); + // Initial taskCount comes from the ioConfig's autoScalerConfig.taskCountMin (15) since + // taskCount was passed null in the ioConfig above. + Assert.assertEquals(15, (int) supervisor.getIoConfig().getTaskCount()); supervisor.getIoConfig().setTaskCount(2); - // When supervisor.start(); autoScaler.start(); supervisor.runInternal(); Assert.assertEquals(2, (int) supervisor.getIoConfig().getTaskCount()); Thread.sleep(2000); - // Then + // Supervisor caps min/max at partitionCount=10, so the first scale settles at partitionCount. Assert.assertEquals(10, (int) supervisor.getIoConfig().getTaskCount()); emitter.verifyEmitted(SeekableStreamSupervisor.AUTOSCALER_SCALING_TIME_METRIC, 1); @@ -1436,6 +1466,11 @@ private void mockIngestionSchema() } private SeekableStreamSupervisorIOConfig getIOConfig(int taskCount, boolean scaleOut) + { + return getIOConfig(taskCount, scaleOut, 2); + } + + private SeekableStreamSupervisorIOConfig getIOConfig(int taskCount, boolean scaleOut, int maxTaskCount) { if (scaleOut) { return new SeekableStreamSupervisorIOConfig( @@ -1450,7 +1485,7 @@ private SeekableStreamSupervisorIOConfig getIOConfig(int taskCount, boolean scal new Period("PT30M"), null, null, - mapper.convertValue(getScaleOutProperties(2), AutoScalerConfig.class), + mapper.convertValue(getScaleOutProperties(maxTaskCount), AutoScalerConfig.class), LagAggregator.DEFAULT, null, null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java index 5f6986551a76..c06508213176 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java @@ -3326,6 +3326,18 @@ public SupervisorStateManager.State getState() { return state; } + + /** + * The shared record-supplier mock in this test returns a single-partition stream, which would + * otherwise cause the supervisor's partition-count ceiling in DynamicAllocationTasksNotice to + * clamp every cooldown-test scale down to 1. Report a large partition count so the cooldown + * tests can exercise bounds at the autoscaler-config level only. + */ + @Override + public int getPartitionCount() + { + return 1_000; + } } private class TestEmittingTestSeekableStreamSupervisor extends BaseTestSeekableStreamSupervisor @@ -3687,6 +3699,126 @@ public void testDynamicAllocationScaleDownBlockedWhenCooldownNotElapsed() assertScaleSkipped(events.get(1), 1, "Scale cooldown not elapsed yet"); } + @Test + public void testDynamicAllocationClampsDesiredAboveMaxToMax() + { + // Scaler returns 20, but taskCountMax=10. Supervisor must clamp and scale to 10. + final StubServiceEmitter scalingEmitter = + setupSupervisorForAutoScalingTest(0L, 0L, 3, 1, 10); + final TestSeekableStreamSupervisor supervisor = + new StateOverrideTestSeekableStreamSupervisor(SupervisorStateManager.BasicState.RUNNING); + + supervisor.handleDynamicAllocationTasksNotice(() -> 20, () -> {}, scalingEmitter); + + // Task count is clamped to max (10), not the scaler's desired (20). + Assert.assertEquals(10, supervisor.getIoConfig().getTaskCount().intValue()); + + final List events = + scalingEmitter.getMetricEvents(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC); + Assert.assertEquals(1, events.size()); + // The emitted metric value reflects the scaler's unclamped desired (the operator hint), not + // the clamped value the supervisor actually applied. + assertScaledToTaskCount(events.get(0), 20); + } + + @Test + public void testDynamicAllocationClampsDesiredBelowMinToMin() + { + // Scaler returns 0, but taskCountMin=2. Supervisor must clamp and scale to 2. + final StubServiceEmitter scalingEmitter = + setupSupervisorForAutoScalingTest(0L, 0L, 5, 2, 10); + final TestSeekableStreamSupervisor supervisor = + new StateOverrideTestSeekableStreamSupervisor(SupervisorStateManager.BasicState.RUNNING); + + supervisor.handleDynamicAllocationTasksNotice(() -> 0, () -> {}, scalingEmitter); + + Assert.assertEquals(2, supervisor.getIoConfig().getTaskCount().intValue()); + + final List events = + scalingEmitter.getMetricEvents(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC); + Assert.assertEquals(1, events.size()); + assertScaledToTaskCount(events.get(0), 0); + } + + @Test + public void testDynamicAllocationEmitsAlreadyAtMaxWhenCurrentIsAtMaxAndDesiredAboveMax() + { + // Current (10) is already at configured max (10). Scaler wants 15 (above max). Supervisor + // clamps to 10 which equals current -> emits "Already at max task count" skip reason. + final StubServiceEmitter scalingEmitter = + setupSupervisorForAutoScalingTest(0L, 0L, 10, 1, 10); + final TestSeekableStreamSupervisor supervisor = + new StateOverrideTestSeekableStreamSupervisor(SupervisorStateManager.BasicState.RUNNING); + + supervisor.handleDynamicAllocationTasksNotice(() -> 15, () -> {}, scalingEmitter); + + Assert.assertEquals("Task count must not change when at max", 10, supervisor.getIoConfig().getTaskCount().intValue()); + + final List events = + scalingEmitter.getMetricEvents(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC); + Assert.assertEquals(1, events.size()); + assertScaleSkipped(events.get(0), 15, "Already at max task count"); + } + + @Test + public void testDynamicAllocationEmitsAlreadyAtMinWhenCurrentIsAtMinAndDesiredBelowMin() + { + // Current (2) is already at configured min (2). Scaler wants 0 (below min). Supervisor clamps + // to 2 which equals current -> emits "Already at min task count" skip reason. + final StubServiceEmitter scalingEmitter = + setupSupervisorForAutoScalingTest(0L, 0L, 2, 2, 10); + final TestSeekableStreamSupervisor supervisor = + new StateOverrideTestSeekableStreamSupervisor(SupervisorStateManager.BasicState.RUNNING); + + supervisor.handleDynamicAllocationTasksNotice(() -> 0, () -> {}, scalingEmitter); + + Assert.assertEquals("Task count must not change when at min", 2, supervisor.getIoConfig().getTaskCount().intValue()); + + final List events = + scalingEmitter.getMetricEvents(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC); + Assert.assertEquals(1, events.size()); + assertScaleSkipped(events.get(0), 0, "Already at min task count"); + } + + @Test + public void testDynamicAllocationEmitsDesiredCapacityReachedWhenDesiredEqualsCurrent() + { + // Scaler returns a value equal to current and within bounds -> "desired capacity reached". + final StubServiceEmitter scalingEmitter = + setupSupervisorForAutoScalingTest(0L, 0L, 5, 1, 10); + final TestSeekableStreamSupervisor supervisor = + new StateOverrideTestSeekableStreamSupervisor(SupervisorStateManager.BasicState.RUNNING); + + supervisor.handleDynamicAllocationTasksNotice(() -> 5, () -> {}, scalingEmitter); + + Assert.assertEquals(5, supervisor.getIoConfig().getTaskCount().intValue()); + + final List events = + scalingEmitter.getMetricEvents(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC); + Assert.assertEquals(1, events.size()); + assertScaleSkipped(events.get(0), 5, "desired capacity reached"); + } + + @Test + public void testDynamicAllocationEmitsPathologicalSkipReasonWhenScalerReturnsNegative() + { + // Scaler contract: -1 means "I could not compute a useful answer". Supervisor must not scale + // and must emit a skip reason surfacing the scaler's failure. + final StubServiceEmitter scalingEmitter = + setupSupervisorForAutoScalingTest(0L, 0L, 5, 1, 10); + final TestSeekableStreamSupervisor supervisor = + new StateOverrideTestSeekableStreamSupervisor(SupervisorStateManager.BasicState.RUNNING); + + supervisor.handleDynamicAllocationTasksNotice(() -> -1, () -> {}, scalingEmitter); + + Assert.assertEquals("Task count must not change on pathological return", 5, supervisor.getIoConfig().getTaskCount().intValue()); + + final List events = + scalingEmitter.getMetricEvents(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC); + Assert.assertEquals(1, events.size()); + assertScaleSkipped(events.get(0), -1, "Auto-scaler failed to compute a task count"); + } + /** * Asserts that a required-tasks emission represents an scale event: it carries the standard * supervisor/datasource/stream dims, no scalingSkipReason dim, and the metric value matches the @@ -3753,10 +3885,23 @@ private StubServiceEmitter setupSupervisorForAutoScalingTest( long minScaleDownDelayMillis, int initialTaskCount ) + { + return setupSupervisorForAutoScalingTest(minScaleUpDelayMillis, minScaleDownDelayMillis, initialTaskCount, 1, 100); + } + + private StubServiceEmitter setupSupervisorForAutoScalingTest( + long minScaleUpDelayMillis, + long minScaleDownDelayMillis, + int initialTaskCount, + int taskCountMin, + int taskCountMax + ) { final AutoScalerConfig autoScalerConfig = testAutoScalerConfig( minScaleUpDelayMillis, - minScaleDownDelayMillis + minScaleDownDelayMillis, + taskCountMin, + taskCountMax ); final SeekableStreamSupervisorIOConfig ioConfig = createSupervisorIOConfig( initialTaskCount, @@ -3770,6 +3915,16 @@ private StubServiceEmitter setupSupervisorForAutoScalingTest( * Returns a minimal test-only {@link AutoScalerConfig} */ private static AutoScalerConfig testAutoScalerConfig(long minScaleUpDelayMillis, long minScaleDownDelayMillis) + { + return testAutoScalerConfig(minScaleUpDelayMillis, minScaleDownDelayMillis, 1, 100); + } + + private static AutoScalerConfig testAutoScalerConfig( + long minScaleUpDelayMillis, + long minScaleDownDelayMillis, + int taskCountMin, + int taskCountMax + ) { return new AutoScalerConfig() { @@ -3800,13 +3955,13 @@ public Duration getMinScaleDownDelay() @Override public int getTaskCountMax() { - return 100; + return taskCountMax; } @Override public int getTaskCountMin() { - return 1; + return taskCountMin; } @Override diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerMockTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerMockTest.java index 6466b0966e8b..e4527a44699c 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerMockTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerMockTest.java @@ -23,13 +23,10 @@ import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig; import org.apache.druid.java.util.emitter.service.ServiceEmitter; -import org.apache.druid.java.util.emitter.service.ServiceEventBuilder; -import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.joda.time.Duration; import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import org.mockito.ArgumentCaptor; import org.mockito.Mockito; import java.util.List; @@ -38,7 +35,6 @@ import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class CostBasedAutoScalerMockTest @@ -97,8 +93,10 @@ public void testScaleUpWhenOptimalGreaterThanCurrent() } @Test - public void testNoOpWhenOptimalEqualsCurrent() + public void testReturnsOptimalWhenOptimalEqualsCurrent() { + // Scaler contract: return the optimal count the scaler wants, regardless of current/bounds. + // The supervisor handles the "equal to current -> no scale" decision. CostBasedAutoScaler autoScaler = spy(new CostBasedAutoScaler(mockSupervisor, config, mockSpec, mockEmitter)); int currentTaskCount = 25; @@ -109,7 +107,7 @@ public void testNoOpWhenOptimalEqualsCurrent() int result = autoScaler.computeTaskCountForScaleAction(); - Assert.assertEquals("Should return -1 when it equals current (no change needed)", -1, result); + Assert.assertEquals("Scaler should return its optimal count even when it equals current", optimalCount, result); } @Test @@ -176,8 +174,10 @@ public void testScaleUpFromMinimumTasks() } @Test - public void testReturnsTaskCountMinWhenConfiguredTaskCountIsBelowMin() + public void testReturnsUnclampedOptimalBelowMin() { + // Scaler no longer clamps to the autoScalerConfig bounds — the supervisor does. + // Verify the scaler returns the raw optimal even when it is below taskCountMin. CostBasedAutoScalerConfig boundedConfig = CostBasedAutoScalerConfig.builder() .taskCountMax(100) .taskCountMin(50) @@ -186,25 +186,25 @@ public void testReturnsTaskCountMinWhenConfiguredTaskCountIsBelowMin() CostBasedAutoScaler autoScaler = spy(new CostBasedAutoScaler(mockSupervisor, boundedConfig, mockSpec, mockEmitter)); final int configuredTaskCount = 1; - final int taskCountMin = 50; + final int belowMinOptimal = 49; // 1 below taskCountMin; expect unchanged through scaler - // Mock computeOptimalTaskCount to return a value different from the boundary, - // so the assertion proves the boundary clamping path was taken. - doReturn(taskCountMin - 1).when(autoScaler).computeOptimalTaskCount(any()); + doReturn(belowMinOptimal).when(autoScaler).computeOptimalTaskCount(any()); setupMocksForMetricsCollection(autoScaler, configuredTaskCount, 1000.0, 0.2); final int result = autoScaler.computeTaskCountForScaleAction(); Assert.assertEquals( - "Should scale to taskCountMin when the configured task count is below the minimum boundary", - taskCountMin, + "Scaler should return unclamped optimal; clamping is a supervisor concern", + belowMinOptimal, result ); } @Test - public void testReturnsTaskCountMaxWhenConfiguredTaskCountIsAboveMax() + public void testReturnsUnclampedOptimalAboveMax() { + // Scaler no longer clamps to the autoScalerConfig bounds — the supervisor does. + // Verify the scaler returns the raw optimal even when it is above taskCountMax. CostBasedAutoScalerConfig boundedConfig = CostBasedAutoScalerConfig.builder() .taskCountMax(50) .taskCountMin(1) @@ -213,18 +213,16 @@ public void testReturnsTaskCountMaxWhenConfiguredTaskCountIsAboveMax() CostBasedAutoScaler autoScaler = spy(new CostBasedAutoScaler(mockSupervisor, boundedConfig, mockSpec, mockEmitter)); final int configuredTaskCount = 100; - final int taskCountMax = 50; + final int aboveMaxOptimal = 51; // 1 above taskCountMax; expect unchanged through scaler - // Mock computeOptimalTaskCount to return a value different from the boundary, - // so the assertion proves the boundary clamping path was taken. - doReturn(taskCountMax + 1).when(autoScaler).computeOptimalTaskCount(any()); + doReturn(aboveMaxOptimal).when(autoScaler).computeOptimalTaskCount(any()); setupMocksForMetricsCollection(autoScaler, configuredTaskCount, 10.0, 0.8); final int result = autoScaler.computeTaskCountForScaleAction(); Assert.assertEquals( - "Should scale to taskCountMax when the configured task count is above the maximum boundary", - taskCountMax, + "Scaler should return unclamped optimal; clamping is a supervisor concern", + aboveMaxOptimal, result ); } @@ -292,6 +290,9 @@ public void testBoundaryConditionOptimalEqualsCurrentMinusOne() @Test public void testScaleDownBlockedWhenScaleDownOnRolloverOnlyEnabled() { + // When scaleDownDuringTaskRolloverOnly is true and the optimal would be a scale-down, the + // scaler's "preferred" count is to stay put — it signals that by returning the current count. + // The supervisor interprets equal-to-current as "desired capacity reached" and skips. CostBasedAutoScalerConfig rolloverOnlyConfig = CostBasedAutoScalerConfig.builder() .taskCountMax(100) .taskCountMin(1) @@ -314,8 +315,8 @@ public void testScaleDownBlockedWhenScaleDownOnRolloverOnlyEnabled() setupMocksForMetricsCollection(autoScaler, currentTaskCount, 10.0, 0.9); Assert.assertEquals( - "Should return -1 when scaleDownDuringTaskRolloverOnly is true", - -1, + "Should return current count (no-op signal) when scaleDownDuringTaskRolloverOnly suppresses the scale-down", + currentTaskCount, autoScaler.computeTaskCountForScaleAction() ); } @@ -354,88 +355,8 @@ public void testScaleDownAllowedDuringRolloverWhenScaleDownOnRolloverOnlyEnabled ); } - @Test - public void testEmitsMaxTaskCountSkipReasonWhenCurrentIsAtMax() - { - CostBasedAutoScalerConfig boundedConfig = CostBasedAutoScalerConfig.builder() - .taskCountMax(10) - .taskCountMin(1) - .enableTaskAutoScaler(true) - .build(); - CostBasedAutoScaler autoScaler = spy(new CostBasedAutoScaler(mockSupervisor, boundedConfig, mockSpec, mockEmitter)); - - final int currentTaskCount = 10; // already at max - doReturn(-1).when(autoScaler).computeOptimalTaskCount(any()); - setupMocksForMetricsCollection(autoScaler, currentTaskCount, 100.0, 0.5); - - Assert.assertEquals(-1, autoScaler.computeTaskCountForScaleAction()); - - @SuppressWarnings("unchecked") - ArgumentCaptor> captor = ArgumentCaptor.forClass(ServiceEventBuilder.class); - verify(mockEmitter).emit(captor.capture()); - Assert.assertEquals( - "Should emit 'Already at max task count' skip reason when current task count is at maximum", - "Already at max task count", - ((ServiceMetricEvent.Builder) captor.getValue()) - .getDimension(SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION) - ); - } - - @Test - public void testEmitsMinTaskCountSkipReasonWhenCurrentIsAtMin() - { - CostBasedAutoScalerConfig boundedConfig = CostBasedAutoScalerConfig.builder() - .taskCountMax(100) - .taskCountMin(10) - .enableTaskAutoScaler(true) - .build(); - CostBasedAutoScaler autoScaler = spy(new CostBasedAutoScaler(mockSupervisor, boundedConfig, mockSpec, mockEmitter)); - - final int currentTaskCount = 10; // already at min - doReturn(-1).when(autoScaler).computeOptimalTaskCount(any()); - setupMocksForMetricsCollection(autoScaler, currentTaskCount, 100.0, 0.5); - - Assert.assertEquals(-1, autoScaler.computeTaskCountForScaleAction()); - - @SuppressWarnings("unchecked") - ArgumentCaptor> captor = ArgumentCaptor.forClass(ServiceEventBuilder.class); - verify(mockEmitter).emit(captor.capture()); - Assert.assertEquals( - "Should emit 'Already at min task count' skip reason when current task count is at minimum", - "Already at min task count", - ((ServiceMetricEvent.Builder) captor.getValue()) - .getDimension(SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION) - ); - } - - @Test - public void testMaxSkipReasonTakesPriorityWhenMinEqualsMax() - { - // When min == max, current is simultaneously at both bounds. - // The comment in the production code states that signaling max has higher priority. - CostBasedAutoScalerConfig boundedConfig = CostBasedAutoScalerConfig.builder() - .taskCountMax(5) - .taskCountMin(5) - .enableTaskAutoScaler(true) - .build(); - CostBasedAutoScaler autoScaler = spy(new CostBasedAutoScaler(mockSupervisor, boundedConfig, mockSpec, mockEmitter)); - - final int currentTaskCount = 5; // at both min and max - doReturn(-1).when(autoScaler).computeOptimalTaskCount(any()); - setupMocksForMetricsCollection(autoScaler, currentTaskCount, 100.0, 0.5); - - Assert.assertEquals(-1, autoScaler.computeTaskCountForScaleAction()); - - @SuppressWarnings("unchecked") - ArgumentCaptor> captor = ArgumentCaptor.forClass(ServiceEventBuilder.class); - verify(mockEmitter).emit(captor.capture()); - Assert.assertEquals( - "Max skip reason should take priority over min skip reason when min equals max", - "Already at max task count", - ((ServiceMetricEvent.Builder) captor.getValue()) - .getDimension(SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION) - ); - } + // Skip-reason emissions ("Already at max/min task count") moved to SeekableStreamSupervisor — + // see SeekableStreamSupervisorStateTest for those assertions. private void setupMocksForMetricsCollection( CostBasedAutoScaler autoScaler, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerTest.java index ffbfee77b723..32e2f378fdba 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerTest.java @@ -94,20 +94,28 @@ public void testScaleOutDoesNotReturnCountBelowTaskCountMin() } @Test - public void testReturnsTaskCountMinWhenConfiguredTaskCountIsBelowMin() + public void testReturnsUnclampedScaleOutStepEvenWhenCurrentIsBelowMin() { + // Scaler no longer clamps to taskCountMin; it just steps the current count up by scaleOutStep. + // The supervisor is responsible for clamping below-min results up to taskCountMin. when(mockIoConfig.getTaskCount()).thenReturn(1); when(mockSupervisor.getPartitionCount()).thenReturn(PARTITION_COUNT); - Assert.assertEquals(50, createAutoScaler().computeDesiredTaskCount(createLagSamples(2_000_001L))); + // current (1) + scaleOutStep (4) = 5 — below configured taskCountMin (50); not clamped here. + Assert.assertEquals(5, createAutoScaler().computeDesiredTaskCount(createLagSamples(2_000_001L))); } @Test - public void testReturnsTaskCountMaxWhenConfiguredTaskCountIsAboveMax() + public void testReturnsUnclampedScaleInStepEvenWhenCurrentIsAboveMax() { + // Scaler no longer clamps to taskCountMax; it just steps the current count down by scaleInStep. + // The supervisor is responsible for clamping above-max results down to taskCountMax. when(mockIoConfig.getTaskCount()).thenReturn(101); when(mockSupervisor.getPartitionCount()).thenReturn(PARTITION_COUNT); + // current (101) - scaleInStep (1) = 100 — above configured taskCountMax (100) is still not + // clamped here. (In this particular case the result happens to equal taskCountMax; we just + // want the scaler's raw computation.) Assert.assertEquals(100, createAutoScaler().computeDesiredTaskCount(createLagSamples(299_999L))); }