diff --git a/docs/ingestion/supervisor.md b/docs/ingestion/supervisor.md index ba92ed66b536..4e59f0fa5425 100644 --- a/docs/ingestion/supervisor.md +++ b/docs/ingestion/supervisor.md @@ -193,29 +193,30 @@ The following example shows a supervisor spec with `lagBased` autoscaler: ``` -**2. Cost-based autoscaler strategy (experimental)** +**2. Cost-based autoscaler strategy** -An autoscaler which computes the required supervisor task count via cost function based on ingestion lag and poll-to-idle ratio. -Task counts are selected from a bounded range derived from the current partitions-per-task ratio, -not strictly from factors/divisors of the partition count. This bounded partitions-per-task window enables gradual scaling while -voiding large jumps and still allowing non-divisor task counts when needed. +The cost-based autoscaler picks the number of ingestion tasks that minimizes a combined cost score. The score has two components: -**It is experimental and the implementation details as well as cost function parameters are subject to change.** +- **Lag cost** — how long it would take to drain the current backlog at the observed processing rate. More tasks reduce this cost. +- **Idle cost** — how far the predicted idle ratio is from the target of ~25%. Tasks that are too busy (under-provisioned) or too idle (over-provisioned) both drive the score up. +The sweet spot is roughly 25% idle, giving headroom to absorb traffic spikes without wasting resources. + +At every evaluation interval, Druid computes the score for each candidate task count and picks the one with the lowest total cost. Note: Kinesis is not supported yet, support is in progress. The following table outlines the configuration properties related to the `costBased` autoscaler strategy: -| Property|Description|Required|Default| -|---------|-----------|--------|-------| -|`scaleActionPeriodMillis`|The frequency in milliseconds to check if a scale action is triggered. | No | 600000 | -|`lagWeight`|The weight of extracted lag value in cost function.| No| 0.25 | -|`idleWeight`|The weight of extracted poll idle value in cost function. | No | 0.75 | -|`useTaskCountBoundaries`|Enables the bounded partitions-per-task window when selecting task counts.|No| `false` | -|`highLagThreshold`|Average partition lag threshold that triggers burst scale-up when set to a value greater than `0`. Set to a negative value to disable burst scale-up.|No|-1| -|`minScaleUpDelay`|Minimum cooldown duration after a scale-up action before the next scale-up is allowed, specified as an ISO-8601 duration string.|No|| -|`minScaleDownDelay`|Minimum cooldown duration after a scale-down action before the next scale-down is allowed, specified as an ISO-8601 duration string.|No|`PT30M`| -|`scaleDownDuringTaskRolloverOnly`|Indicates whether task scaling down is limited to periods during task rollovers only.|No|`false`| +| Property | Description | Required | Default | +|----------|-------------|----------|---------------------------| +|`scaleActionPeriodMillis`|How often, in milliseconds, Druid evaluates whether to scale.|No| `600000` (10 min) | +|`lagWeight`|How much weight to give the lag cost relative to the idle cost. Higher values make the autoscaler more aggressive about adding tasks to drain backlog.|No| `0.4` | +|`idleWeight`|How much weight to give the idle cost relative to the lag cost. Higher values make the autoscaler more aggressive about removing over-provisioned tasks.|No| `0.6` | +|`useTaskCountBoundariesOnScaleUp`|Limits scale-up to a small step relative to the current task count, preventing large jumps. Disable to allow the autoscaler to jump directly to any task count.|No| `false` | +|`useTaskCountBoundariesOnScaleDown`|Limits scale-down to a small step relative to the current task count, preventing large drops. Disable to allow the autoscaler to drop directly to any task count.|No| `true` | +|`minScaleUpDelay`|Minimum cooldown after a scale-up before the next scale-up is allowed. Specified as an ISO-8601 duration.|No| `scaleActionPeriodMillis` | +|`minScaleDownDelay`|Minimum cooldown after a scale-down before the next scale-down is allowed. Specified as an ISO-8601 duration.|No| `PT30M` | +|`scaleDownDuringTaskRolloverOnly`|If `true`, scale-down actions are deferred until the next task rollover. This avoids disrupting in-progress ingestion.|No| `false` | The following example shows a supervisor spec with `costBased` autoscaler: @@ -231,10 +232,10 @@ The following example shows a supervisor spec with `costBased` autoscaler: "autoScalerStrategy": "costBased", "taskCountMin": 1, "taskCountMax": 10, + "lagWeight": 0.4, + "idleWeight": 0.6, "minScaleUpDelay": "PT10M", - "minScaleDownDelay": "PT30M", - "lagWeight": 0.1, - "idleWeight": 0.9 + "minScaleDownDelay": "PT30M" } } } diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java index f57bd113873b..7e63c3f4f257 100644 --- a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java @@ -143,7 +143,7 @@ public void test_autoScaler_computesOptimalTaskCountAndProducesScaleUp() } }); - // These values were carefully handpicked to allow that test to pass in a stable manner. + // These values were carefully handpicked to allow that test to pass stably. final CostBasedAutoScalerConfig autoScalerConfig = CostBasedAutoScalerConfig .builder() .enableTaskAutoScaler(true) @@ -152,8 +152,8 @@ public void test_autoScaler_computesOptimalTaskCountAndProducesScaleUp() .taskCountStart(lowInitialTaskCount) .scaleActionPeriodMillis(500) .minTriggerScaleActionFrequencyMillis(1000) - .lagWeight(0.2) - .idleWeight(0.8) + .lagWeight(0.8) + .idleWeight(0.2) .build(); final KafkaSupervisorSpec kafkaSupervisorSpec = createKafkaSupervisorWithAutoScaler( @@ -192,11 +192,11 @@ public void test_autoScaler_scalesUpAndDown_withSlowPublish() final CostBasedAutoScalerConfig autoScalerConfig = CostBasedAutoScalerConfig .builder() + .enableTaskAutoScaler(true) .taskCountMin(1) .taskCountMax(4) - .lagWeight(1.0) - .idleWeight(1.0) - .enableTaskAutoScaler(true) + .lagWeight(0.5) + .idleWeight(0.5) .minTriggerScaleActionFrequencyMillis(10L) .scaleActionPeriodMillis(10L) .minScaleDownDelay(Duration.standardSeconds(1)) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java index fa7db4f6928c..9d50266cd735 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java @@ -39,21 +39,23 @@ import org.apache.druid.utils.CollectionUtils; import javax.annotation.Nullable; +import java.util.Arrays; import java.util.Map; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; /** * Cost-based auto-scaler for seekable stream supervisors. - * Uses a weighted cost function combining lag recovery time (seconds) and idleness cost (seconds) + * Uses a weighted cost function combining lag recovery time and idle-ratio cost * to determine optimal task counts. *
- * Candidate task counts are derived by scanning a bounded window of partitions-per-task (PPT) values - * around the current PPT, then converting those to task counts. This allows non-divisor task counts - * while keeping changes gradual (no large jumps). + * Candidate task counts are derived from possible partitions-per-task ratios, then converted + * to task counts. When configured, scale-up and scale-down can independently limit the evaluated + * candidates to a small window around the current task count to avoid large jumps. *
- * Scale-up and scale-down are both evaluated proactively. - * Future versions may perform scale-down on task rollover only. + * Scale-up is applied during regular scale-action checks. Scale-down is applied during regular + * checks unless {@link CostBasedAutoScalerConfig#isScaleDownOnTaskRolloverOnly()} defers it + * to task rollover. */ public class CostBasedAutoScaler implements SupervisorTaskAutoScaler { @@ -64,21 +66,17 @@ public class CostBasedAutoScaler implements SupervisorTaskAutoScaler public static final String OPTIMAL_TASK_COUNT_METRIC = "task/autoScaler/costBased/optimalTaskCount"; public static final String INVALID_METRICS_COUNT = "task/autoScaler/costBased/invalidMetrics"; - static final int MAX_INCREASE_IN_PARTITIONS_PER_TASK = 2; - static final int MAX_DECREASE_IN_PARTITIONS_PER_TASK = MAX_INCREASE_IN_PARTITIONS_PER_TASK * 2; - /** - * If average partition lag crosses this value and the processing rate is - * still zero, scaling actions are skipped and an alert is raised. + * Maximum number of candidate task counts to evaluate above or below the current task count + * when scale-up or scale-down boundaries are enabled. */ - static final int MAX_IDLENESS_PARTITION_LAG = 10_000; + static final int BOUNDARY_LIMIT_IN_PARTITIONS_PER_TASK = 2; /** - * Divisor for partition count in the K formula: K = (partitionCount / K_PARTITION_DIVISOR) / sqrt(currentTaskCount). - * This controls how aggressive the scaling is relative to partition count. - * That value was chosen by carefully analyzing the math model behind the implementation. + * If the average partition lag crosses this value and the processing rate is + * still zero, scaling actions are skipped and an alert is raised. */ - static final double K_PARTITION_DIVISOR = 6.4; + static final int MAX_IDLENESS_PARTITION_LAG = 10_000; private final String supervisorId; private final SeekableStreamSupervisor supervisor; @@ -176,7 +174,7 @@ public int computeTaskCountForScaleAction() // Take the current task count but clamp it to the configured boundaries if it is outside the boundaries. // There might be a configuration instance with a handwritten taskCount that is outside the boundaries. final boolean isTaskCountOutOfBounds = currentTaskCount < config.getTaskCountMin() - || currentTaskCount > config.getTaskCountMax(); + || currentTaskCount > config.getTaskCountMax(); if (isTaskCountOutOfBounds) { currentTaskCount = Math.min(config.getTaskCountMax(), Math.max(config.getTaskCountMin(), currentTaskCount)); } @@ -188,16 +186,28 @@ public int computeTaskCountForScaleAction() // regardless of optimal task count, to get back to a safe state. if (isTaskCountOutOfBounds) { taskCount = currentTaskCount; - log.info("Task count for supervisor[%s] was out of bounds [%d,%d], urgently scaling from [%d] to [%d].", - supervisorId, config.getTaskCountMin(), config.getTaskCountMax(), currentTaskCount, currentTaskCount); + log.info( + "Task count for supervisor[%s] was out of bounds [%d,%d], urgently scaling from [%d] to [%d].", + supervisorId, config.getTaskCountMin(), config.getTaskCountMax(), currentTaskCount, currentTaskCount + ); } else if (optimalTaskCount > currentTaskCount) { taskCount = optimalTaskCount; - log.info("Updating taskCount for supervisor[%s] from [%d] to [%d] (scale up).", supervisorId, currentTaskCount, taskCount); + log.info( + "Updating taskCount for supervisor[%s] from [%d] to [%d] (scale up).", + supervisorId, + currentTaskCount, + taskCount + ); } else if (!config.isScaleDownOnTaskRolloverOnly() && optimalTaskCount < currentTaskCount && optimalTaskCount > 0) { taskCount = optimalTaskCount; - log.info("Updating taskCount for supervisor[%s] from [%d] to [%d] (scale down).", supervisorId, currentTaskCount, taskCount); + log.info( + "Updating taskCount for supervisor[%s] from [%d] to [%d] (scale down).", + supervisorId, + currentTaskCount, + taskCount + ); } else { taskCount = -1; log.debug("No scaling required for supervisor[%s]", supervisorId); @@ -237,7 +247,7 @@ public CostBasedAutoScalerConfig getConfig() *
- * This uses a logarithmic formula for consistent absolute growth: - * {@code deltaTasks = K * ln(lagSeverity)} - * where {@code K = (partitionCount / 6.4) / sqrt(currentTaskCount)} - *
- * This ensures that small taskCount's get a massive relative boost, - * while large taskCount's receive more measured, stable increases. - */ - static int computeExtraPPTIncrease( - double lagThreshold, - double aggregateLag, - int partitionCount, - int currentTaskCount, - int taskCountMax - ) - { - if (partitionCount <= 0 || taskCountMax <= 0 || currentTaskCount <= 0) { - return 0; - } - - final double lagPerPartition = aggregateLag / partitionCount; - if (lagPerPartition < lagThreshold) { - return 0; - } - - final double lagSeverity = lagPerPartition / lagThreshold; - - // Logarithmic growth: ln(lagSeverity) is positive when lagSeverity > 1 - // First multoplier decreases with sqrt(currentTaskCount): aggressive when small, conservative when large - final double deltaTasks = (partitionCount / K_PARTITION_DIVISOR) / Math.sqrt(currentTaskCount) * Math.log( - lagSeverity); - - final double targetTaskCount = Math.min(taskCountMax, (double) currentTaskCount + deltaTasks); - - // Compute precise PPT reduction to avoid early integer truncation artifacts - final double currentPPT = (double) partitionCount / currentTaskCount; - final double targetPPT = (double) partitionCount / targetTaskCount; - - return Math.max(0, (int) Math.floor(currentPPT - targetPPT)); - } - /** * Extracts the average poll-idle-ratio metric from task stats. * This metric indicates how much time the consumer spends idle waiting for data. diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfig.java index b19a3e2cbbe8..68f9f5a5ad7e 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfig.java @@ -28,6 +28,7 @@ import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec; import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor; +import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.joda.time.Duration; @@ -37,16 +38,18 @@ /** * Configuration for cost-based auto-scaling of seekable stream supervisor tasks. * Uses a cost function combining lag and idle time metrics to determine optimal task counts. - * Task counts are selected from a bounded range derived from the current partitions-per-task (PPT) - * ratio, not strictly from factors/divisors of the partition count. This bounded PPT window enables - * gradual scaling while avoiding large jumps and still allowing non-divisor task counts when needed. + * Candidate task counts are derived from possible partitions-per-task ratios and are not limited + * to factors/divisors of the partition count. Optional scale-up and scale-down boundaries control + * how much of that candidate set is evaluated around the current task count. */ @JsonInclude(JsonInclude.Include.NON_NULL) public class CostBasedAutoScalerConfig implements AutoScalerConfig { + private static final EmittingLogger LOG = new EmittingLogger(CostBasedAutoScalerConfig.class); + static final long DEFAULT_SCALE_ACTION_PERIOD_MILLIS = 10 * 60 * 1000; // 10 minutes - static final double DEFAULT_LAG_WEIGHT = 0.25; - static final double DEFAULT_IDLE_WEIGHT = 0.75; + static final double DEFAULT_LAG_WEIGHT = 0.4; + static final double DEFAULT_IDLE_WEIGHT = 0.6; static final Duration DEFAULT_MIN_SCALE_DELAY = Duration.millis(DEFAULT_SCALE_ACTION_PERIOD_MILLIS * 3); private final boolean enableTaskAutoScaler; @@ -59,12 +62,18 @@ public class CostBasedAutoScalerConfig implements AutoScalerConfig private final double lagWeight; private final double idleWeight; - private final boolean useTaskCountBoundaries; - private final int highLagThreshold; + private final boolean useTaskCountBoundariesOnScaleUp; + private final boolean useTaskCountBoundariesOnScaleDown; private final Duration minScaleUpDelay; private final Duration minScaleDownDelay; private final boolean scaleDownDuringTaskRolloverOnly; + /** + * Creates a new CostBasedAutoScalerConfig instance. + *
+ * Note: useTaskCountBoundaries and highLagThreshold are kept for backward compatibility, + * but effectively they are removed. + */ @JsonCreator public CostBasedAutoScalerConfig( @JsonProperty("taskCountMax") Integer taskCountMax, @@ -78,6 +87,8 @@ public CostBasedAutoScalerConfig( @Nullable @JsonProperty("idleWeight") Double idleWeight, @Nullable @JsonProperty("useTaskCountBoundaries") Boolean useTaskCountBoundaries, @Nullable @JsonProperty("highLagThreshold") Integer highLagThreshold, + @Nullable @JsonProperty("useTaskCountBoundariesOnScaleUp") Boolean useTaskCountBoundariesOnScaleUp, + @Nullable @JsonProperty("useTaskCountBoundariesOnScaleDown") Boolean useTaskCountBoundariesOnScaleDown, @Nullable @JsonProperty("minScaleUpDelay") Duration minScaleUpDelay, @Nullable @JsonProperty("minScaleDownDelay") Duration minScaleDownDelay, @Nullable @JsonProperty("scaleDownDuringTaskRolloverOnly") Boolean scaleDownDuringTaskRolloverOnly @@ -97,13 +108,24 @@ public CostBasedAutoScalerConfig( // Cost function weights with defaults this.lagWeight = Configs.valueOrDefault(lagWeight, DEFAULT_LAG_WEIGHT); this.idleWeight = Configs.valueOrDefault(idleWeight, DEFAULT_IDLE_WEIGHT); - this.useTaskCountBoundaries = Configs.valueOrDefault(useTaskCountBoundaries, false); - this.highLagThreshold = Configs.valueOrDefault(highLagThreshold, -1); - this.minScaleUpDelay = Configs.valueOrDefault(minScaleUpDelay, Duration.millis(this.minTriggerScaleActionFrequencyMillis)); + this.useTaskCountBoundariesOnScaleUp = Configs.valueOrDefault(useTaskCountBoundariesOnScaleUp, false); + this.useTaskCountBoundariesOnScaleDown = Configs.valueOrDefault(useTaskCountBoundariesOnScaleDown, true); + this.minScaleUpDelay = Configs.valueOrDefault( + minScaleUpDelay, + Duration.millis(this.minTriggerScaleActionFrequencyMillis) + ); this.minScaleDownDelay = Configs.valueOrDefault(minScaleDownDelay, DEFAULT_MIN_SCALE_DELAY); this.scaleDownDuringTaskRolloverOnly = Configs.valueOrDefault(scaleDownDuringTaskRolloverOnly, false); if (this.enableTaskAutoScaler) { + if (useTaskCountBoundaries != null) { + LOG.warn("useTaskCountBoundaries is removed, " + + "use useTaskCountBoundariesOnScaleUp and useTaskCountBoundariesOnScaleDown instead"); + } + if (highLagThreshold != null) { + LOG.warn("highLagThreshold is removed, the autoscaler behavior is good enough just with cost function"); + } + Preconditions.checkNotNull(taskCountMax, "taskCountMax is required when enableTaskAutoScaler is true"); Preconditions.checkNotNull(taskCountMin, "taskCountMin is required when enableTaskAutoScaler is true"); Preconditions.checkArgument(taskCountMax >= taskCountMin, "taskCountMax must be >= taskCountMin"); @@ -127,8 +149,14 @@ public CostBasedAutoScalerConfig( Preconditions.checkArgument(this.lagWeight >= 0, "lagWeight must be >= 0"); Preconditions.checkArgument(this.idleWeight >= 0, "idleWeight must be >= 0"); - Preconditions.checkArgument(this.minScaleUpDelay.getMillis() >= 0, "minScaleUpDelay must be a duration >= 0 millis"); - Preconditions.checkArgument(this.minScaleDownDelay.getMillis() >= 0, "minScaleDownDelay must be a duration >= 0 millis"); + Preconditions.checkArgument( + this.minScaleUpDelay.getMillis() >= 0, + "minScaleUpDelay must be a duration >= 0 millis" + ); + Preconditions.checkArgument( + this.minScaleDownDelay.getMillis() >= 0, + "minScaleDownDelay must be a duration >= 0 millis" + ); } /** @@ -203,21 +231,25 @@ public double getIdleWeight() } /** - * Enables or disables the use of task count boundaries derived from the current partitions-per-task (PPT) ratio. + * Enables or disables the task-count evaluation window when considering scale-up candidates. + * If enabled, the optimizer only evaluates a small number of candidate task counts above the current count, + * which prevents large scale-up jumps. */ - @JsonProperty("useTaskCountBoundaries") - public boolean shouldUseTaskCountBoundaries() + @JsonProperty("useTaskCountBoundariesOnScaleUp") + public boolean shouldUseTaskCountBoundariesOnScaleUp() { - return useTaskCountBoundaries; + return useTaskCountBoundariesOnScaleUp; } /** - * Per-partition lag threshold allowing to activate a burst scaleup to eliminate high lag. + * Enables or disables the task-count evaluation window when considering scale-down candidates. + * If enabled, the optimizer only evaluates a small number of candidate task counts below the current count, + * which prevents large scale-down drops. */ - @JsonProperty("highLagThreshold") - public int getHighLagThreshold() + @JsonProperty("useTaskCountBoundariesOnScaleDown") + public boolean shouldUseTaskCountBoundariesOnScaleDown() { - return highLagThreshold; + return useTaskCountBoundariesOnScaleDown; } /** @@ -241,8 +273,8 @@ public Duration getMinScaleDownDelay() } /** - * Indicates whether task scaling down is limited to periods during task rollovers only. - * If set to {@code false}, allows scaling down during normal task run time. + * Indicates whether scale-down actions are deferred to task rollover. + * If set to {@code false}, scale-down can happen during regular scale-action checks. */ @JsonProperty("scaleDownDuringTaskRolloverOnly") public boolean isScaleDownOnTaskRolloverOnly() @@ -275,13 +307,13 @@ public boolean equals(Object o) && scaleActionPeriodMillis == that.scaleActionPeriodMillis && Double.compare(that.lagWeight, lagWeight) == 0 && Double.compare(that.idleWeight, idleWeight) == 0 - && useTaskCountBoundaries == that.useTaskCountBoundaries + && useTaskCountBoundariesOnScaleUp == that.useTaskCountBoundariesOnScaleUp + && useTaskCountBoundariesOnScaleDown == that.useTaskCountBoundariesOnScaleDown && Objects.equals(minScaleUpDelay, that.minScaleUpDelay) && Objects.equals(minScaleDownDelay, that.minScaleDownDelay) && scaleDownDuringTaskRolloverOnly == that.scaleDownDuringTaskRolloverOnly && Objects.equals(taskCountStart, that.taskCountStart) - && Objects.equals(stopTaskCountRatio, that.stopTaskCountRatio) - && highLagThreshold == that.highLagThreshold; + && Objects.equals(stopTaskCountRatio, that.stopTaskCountRatio); } @Override @@ -297,8 +329,8 @@ public int hashCode() scaleActionPeriodMillis, lagWeight, idleWeight, - useTaskCountBoundaries, - highLagThreshold, + useTaskCountBoundariesOnScaleUp, + useTaskCountBoundariesOnScaleDown, minScaleUpDelay, minScaleDownDelay, scaleDownDuringTaskRolloverOnly @@ -318,8 +350,8 @@ public String toString() ", scaleActionPeriodMillis=" + scaleActionPeriodMillis + ", lagWeight=" + lagWeight + ", idleWeight=" + idleWeight + - ", useTaskCountBoundaries=" + useTaskCountBoundaries + - ", highLagThreshold=" + highLagThreshold + + ", useTaskCountBoundariesOnScaleUp=" + useTaskCountBoundariesOnScaleUp + + ", useTaskCountBoundariesOnScaleDown=" + useTaskCountBoundariesOnScaleDown + ", minScaleUpDelay=" + minScaleUpDelay + ", minScaleDownDelay=" + minScaleDownDelay + ", scaleDownDuringTaskRolloverOnly=" + scaleDownDuringTaskRolloverOnly + @@ -341,8 +373,8 @@ public static class Builder private Long scaleActionPeriodMillis; private Double lagWeight; private Double idleWeight; - private Boolean useTaskCountBoundaries; - private Integer highLagThreshold; + private Boolean useTaskCountBoundariesOnScaleUp; + private Boolean useTaskCountBoundariesOnScaleDown; private Duration minScaleUpDelay; private Duration minScaleDownDelay; private Boolean scaleDownDuringTaskRolloverOnly; @@ -423,15 +455,15 @@ public Builder scaleDownDuringTaskRolloverOnly(boolean scaleDownDuringTaskRollov return this; } - public Builder useTaskCountBoundaries(boolean useTaskCountBoundaries) + public Builder useTaskCountBoundariesOnScaleUp(boolean useTaskCountBoundariesOnScaleUp) { - this.useTaskCountBoundaries = useTaskCountBoundaries; + this.useTaskCountBoundariesOnScaleUp = useTaskCountBoundariesOnScaleUp; return this; } - public Builder highLagThreshold(int highLagThreshold) + public Builder useTaskCountBoundariesOnScaleDown(boolean useTaskCountBoundariesOnScaleDown) { - this.highLagThreshold = highLagThreshold; + this.useTaskCountBoundariesOnScaleDown = useTaskCountBoundariesOnScaleDown; return this; } @@ -447,8 +479,10 @@ public CostBasedAutoScalerConfig build() scaleActionPeriodMillis, lagWeight, idleWeight, - useTaskCountBoundaries, - highLagThreshold, + null, + null, + useTaskCountBoundariesOnScaleUp, + useTaskCountBoundariesOnScaleDown, minScaleUpDelay, minScaleDownDelay, scaleDownDuringTaskRolloverOnly diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostResult.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostResult.java index 1ad946ff5175..a3925e282eff 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostResult.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostResult.java @@ -21,10 +21,16 @@ /** * Holds the result of a cost computation from {@link WeightedCostFunction#computeCost}. - * All costs are measured in seconds. + * Lag cost is based on recovery time; idle cost is based on the weighted idle-ratio penalty. */ public class CostResult { + static final CostResult INFINITE_COST = new CostResult( + Double.POSITIVE_INFINITY, + Double.POSITIVE_INFINITY, + Double.POSITIVE_INFINITY + ); + private final double totalCost; private final double lagCost; private final double idleCost; @@ -32,7 +38,7 @@ public class CostResult /** * @param totalCost the weighted sum of lagCost and idleCost * @param lagCost the weighted cost representing expected time (seconds) to recover current lag - * @param idleCost the weighted cost representing total compute time (seconds) wasted being idle per task duration + * @param idleCost the weighted cost representing the predicted idle-ratio penalty */ public CostResult(double totalCost, double lagCost, double idleCost) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunction.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunction.java index 8fe4f9d8d060..fa60c7a8e8a2 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunction.java @@ -24,17 +24,23 @@ /** * Weighted cost function using compute time as the core metric. - * Costs represent actual time in seconds, making them intuitive and debuggable. - * Uses linear scaling without mode inversions for predictable behavior. + * Lag cost is based on recovery time in seconds; idle cost is a penalty derived from + * the predicted idle ratio. + * + *
Idle cost uses a U-shaped penalty with minimum at {@link #IDEAL_IDLE_RATIO}. + * This penalizes both under-provisioning (low idle, no safety margin, lag risk) and + * over-provisioning (high idle, wasted capacity), with asymmetric severity controlled by + * {@link #UNDER_PROVISIONING_PENALTY} and {@link #OVER_PROVISIONING_PENALTY}. */ public class WeightedCostFunction { private static final Logger log = new Logger(WeightedCostFunction.class); + /** * Multiplier for a lag amplification factor; it was carefully chosen * during extensive testing as the most balanced multiplier for high-lag recovery. */ - static final double LAG_AMPLIFICATION_MULTIPLIER = 0.05; + static final double LAG_AMPLIFICATION_MULTIPLIER = 0.4; /** * Minimum rate of processing for any task in records per second. This is used @@ -43,17 +49,34 @@ public class WeightedCostFunction */ static final double MIN_PROCESSING_RATE = 1_000; + /** + * Target idle ratio representing the optimal operating point for the U-shaped idle cost. + * At this ratio the idle cost is at its minimum; both lower (risk) and higher (waste) are penalized. + */ + static final double IDEAL_IDLE_RATIO = 0.25; + + /** + * Penalty magnitude applied when idle ratio is 0 (no safety margin). + * Controls the steepness of the U-shape on the under-provisioning side. + */ + static final double UNDER_PROVISIONING_PENALTY = 2.0; + + /** + * Penalty magnitude applied when idle ratio is 1 (fully wasted capacity). + * Controls the steepness of the U-shape on the over-provisioning side. + */ + static final double OVER_PROVISIONING_PENALTY = 1.0; + /** * Computes cost for a given task count using compute time metrics. *
- * Costs are measured in 'seconds': + * Cost components are derived from: *
- * Formula: {@code lagWeight * lagRecoveryTime + idleWeight * idlenessCost}. - * This approach directly connects costs to operational metrics. + * Formula: {@code lagWeight * lagRecoveryTime + idleWeight * idleRatioPenalty}. * * @return CostResult containing totalCost, lagCost, and idleCost, * or result with {@link Double#POSITIVE_INFINITY} for invalid inputs @@ -65,40 +88,68 @@ public CostResult computeCost( ) { if (metrics == null || config == null || proposedTaskCount <= 0 || metrics.getPartitionCount() <= 0) { - return new CostResult(Double.POSITIVE_INFINITY, Double.POSITIVE_INFINITY, Double.POSITIVE_INFINITY); + return CostResult.INFINITE_COST; } final double avgProcessingRate = metrics.getAvgProcessingRate(); - final double lagRecoveryTime; if (avgProcessingRate < 0) { throw DruidException.defensive("Avg processing rate[%.2f] must not be negative.", avgProcessingRate); + } + + // Lag recovery time is decreasing by adding tasks and increasing by ejecting tasks. + // In case of increasing lag, we apply an amplification factor to reflect the urgency of addressing lag. + // Caution: we rely only on the metrics, the real issues may be absolutely different, up to hardware failure. + final double lagRecoveryTime; + if (metrics.getAggregateLag() <= 0) { + lagRecoveryTime = 0; } else { - // Lag recovery time is decreasing by adding tasks and increasing by ejecting tasks. - // In case of increasing lag, we apply an amplification factor to reflect the urgency of addressing lag. - // Caution: we rely only on the metrics, the real issues may be absolutely different, up to hardware failure. - if (metrics.getAggregateLag() <= 0) { - lagRecoveryTime = 0; + final double lagPerPartition = metrics.getAggregateLag() / metrics.getPartitionCount(); + final double amplification = Math.max(1.0, 1.0 + LAG_AMPLIFICATION_MULTIPLIER * Math.log(lagPerPartition)); + final double adjustedProcessingRate = Math.max(avgProcessingRate, MIN_PROCESSING_RATE); + lagRecoveryTime = metrics.getAggregateLag() * amplification / (proposedTaskCount * adjustedProcessingRate); + } + + // Capacity-based idle prediction. When the proposed count would oversaturate the cluster + // (busy work exceeds available capacity), the unmet demand becomes a virtual lag-recovery + // time on the same axis as real lag — so the optimizer treats predicted saturation as + // predicted lag, not as "perfect utilization". + final double currentPollIdleRatio = metrics.getPollIdleRatio(); + final int currentTaskCount = metrics.getCurrentTaskCount(); + final double predictedIdleRatio; + final double overrun; + if (currentPollIdleRatio < 0) { + predictedIdleRatio = 0.5; + overrun = 0.0; + } else if (currentTaskCount <= 0 || proposedTaskCount == currentTaskCount) { + predictedIdleRatio = currentPollIdleRatio; + overrun = 0.0; + } else { + final double busyFraction = 1.0 - currentPollIdleRatio; + final double taskRatio = (double) proposedTaskCount / currentTaskCount; + final double rawIdle = 1.0 - busyFraction / taskRatio; + if (rawIdle >= 0) { + predictedIdleRatio = Math.min(1.0, rawIdle); + overrun = 0.0; } else { - final double lagPerPartition = metrics.getAggregateLag() / metrics.getPartitionCount(); - final double amplification = Math.max(1.0, 1.0 + LAG_AMPLIFICATION_MULTIPLIER * Math.log(lagPerPartition)); - final double adjustedProcessingRate = Math.max(avgProcessingRate, MIN_PROCESSING_RATE); - lagRecoveryTime = metrics.getAggregateLag() * amplification / (proposedTaskCount * adjustedProcessingRate); + predictedIdleRatio = 0.0; + overrun = -rawIdle; } } + final double virtualLagRecoveryTime = overrun * metrics.getTaskDurationSeconds(); - final double predictedIdleRatio = estimateIdleRatio(metrics, proposedTaskCount); - final double idleCost = proposedTaskCount * predictedIdleRatio; - final double lagCost = config.getLagWeight() * lagRecoveryTime; + final double idleCost = uShapedIdleCost(predictedIdleRatio, proposedTaskCount); + final double lagCost = config.getLagWeight() * (lagRecoveryTime + virtualLagRecoveryTime); final double weightedIdleCost = config.getIdleWeight() * idleCost; final double cost = lagCost + weightedIdleCost; log.debug( "Cost for taskCount[%d]: lagCost[%.2fs], idleCost[%.2fs], " - + "predictedIdle[%.3f], finalCost[%.2fs]", + + "predictedIdle[%.3f], overrun[%.3f], finalCost[%.2fs]", proposedTaskCount, lagCost, weightedIdleCost, predictedIdleRatio, + overrun, cost ); @@ -106,33 +157,28 @@ public CostResult computeCost( } /** - * Estimates the idle ratio for a proposed task count with linear prediction. + * U-shaped idle cost with minimum at {@link #IDEAL_IDLE_RATIO}. * - * @param metrics current system metrics containing idle ratio and task count - * @param taskCount target task count to estimate an idle ratio for - * @return estimated idle ratio in range [0.0, 1.0] + *
+ * The ideal-idle baseline keeps cost non-zero at the optimum so the optimizer + * always has a finite trade-off against lag cost. */ - private double estimateIdleRatio(CostMetrics metrics, int taskCount) + double uShapedIdleCost(double predictedIdleRatio, int taskCount) { - final double currentPollIdleRatio = metrics.getPollIdleRatio(); - - if (currentPollIdleRatio < 0) { - // No idle data available, assume moderate idle - return 0.5; - } - - final int currentTaskCount = metrics.getCurrentTaskCount(); - if (currentTaskCount <= 0 || taskCount == currentTaskCount) { - return currentPollIdleRatio; + final double penalty; + if (predictedIdleRatio < IDEAL_IDLE_RATIO) { + final double norm = (IDEAL_IDLE_RATIO - predictedIdleRatio) / IDEAL_IDLE_RATIO; + penalty = UNDER_PROVISIONING_PENALTY * norm * norm; + } else { + final double norm = (predictedIdleRatio - IDEAL_IDLE_RATIO) / (1.0 - IDEAL_IDLE_RATIO); + penalty = OVER_PROVISIONING_PENALTY * norm * norm; } - - // Linear prediction (capacity-based) - existing logic - final double busyFraction = 1.0 - currentPollIdleRatio; - final double taskRatio = (double) taskCount / currentTaskCount; - final double linearPrediction = Math.max(0.0, Math.min(1.0, 1.0 - busyFraction / taskRatio)); - - // Clamp to valid range [0, 1] - return Math.max(0.0, linearPrediction); + return taskCount * (IDEAL_IDLE_RATIO + penalty); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfigTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfigTest.java index 05724d1e8375..4755b3cf4963 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfigTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfigTest.java @@ -49,7 +49,6 @@ public void testSerdeWithAllProperties() throws Exception + " \"scaleActionPeriodMillis\": 60000,\n" + " \"lagWeight\": 0.6,\n" + " \"idleWeight\": 0.4,\n" - + " \"highLagThreshold\": 30000,\n" + " \"minScaleUpDelay\": \"PT5M\",\n" + " \"minScaleDownDelay\": \"PT10M\",\n" + " \"scaleDownDuringTaskRolloverOnly\": true\n" @@ -68,7 +67,8 @@ public void testSerdeWithAllProperties() throws Exception Assert.assertEquals(Duration.standardMinutes(5), config.getMinScaleUpDelay()); Assert.assertEquals(Duration.standardMinutes(10), config.getMinScaleDownDelay()); Assert.assertTrue(config.isScaleDownOnTaskRolloverOnly()); - Assert.assertEquals(30000, config.getHighLagThreshold()); + Assert.assertFalse(config.shouldUseTaskCountBoundariesOnScaleUp()); + Assert.assertTrue(config.shouldUseTaskCountBoundariesOnScaleDown()); // Test serialization back to JSON String serialized = mapper.writeValueAsString(config); @@ -101,10 +101,10 @@ public void testSerdeWithDefaults() throws Exception Assert.assertEquals(Duration.millis(DEFAULT_SCALE_ACTION_PERIOD_MILLIS), config.getMinScaleUpDelay()); Assert.assertEquals(DEFAULT_MIN_SCALE_DELAY, config.getMinScaleDownDelay()); Assert.assertFalse(config.isScaleDownOnTaskRolloverOnly()); + Assert.assertFalse(config.shouldUseTaskCountBoundariesOnScaleUp()); + Assert.assertTrue(config.shouldUseTaskCountBoundariesOnScaleDown()); Assert.assertNull(config.getTaskCountStart()); Assert.assertNull(config.getStopTaskCountRatio()); - // When highLagThreshold is not set, it defaults to -1 (burst scale-up disabled) - Assert.assertEquals(-1, config.getHighLagThreshold()); } @Test @@ -185,10 +185,11 @@ public void testBuilder() .scaleActionPeriodMillis(60000L) .lagWeight(0.6) .idleWeight(0.4) + .useTaskCountBoundariesOnScaleUp(true) + .useTaskCountBoundariesOnScaleDown(true) .minScaleUpDelay(Duration.standardMinutes(5)) .minScaleDownDelay(Duration.standardMinutes(10)) .scaleDownDuringTaskRolloverOnly(true) - .highLagThreshold(30000) .build(); Assert.assertTrue(config.getEnableTaskAutoScaler()); @@ -199,10 +200,11 @@ public void testBuilder() Assert.assertEquals(60000L, config.getScaleActionPeriodMillis()); Assert.assertEquals(0.6, config.getLagWeight(), 0.001); Assert.assertEquals(0.4, config.getIdleWeight(), 0.001); + Assert.assertTrue(config.shouldUseTaskCountBoundariesOnScaleUp()); + Assert.assertTrue(config.shouldUseTaskCountBoundariesOnScaleDown()); Assert.assertEquals(Duration.standardMinutes(5), config.getMinScaleUpDelay()); Assert.assertEquals(Duration.standardMinutes(10), config.getMinScaleDownDelay()); Assert.assertTrue(config.isScaleDownOnTaskRolloverOnly()); - Assert.assertEquals(30000, config.getHighLagThreshold()); } @Test @@ -243,7 +245,10 @@ public void testScaleDelayDefaults() throws Exception .build(); Assert.assertEquals(Duration.standardMinutes(5), bothSet.getMinScaleUpDelay()); Assert.assertEquals(Duration.standardMinutes(20), bothSet.getMinScaleDownDelay()); - CostBasedAutoScalerConfig roundTripped = mapper.readValue(mapper.writeValueAsString(bothSet), CostBasedAutoScalerConfig.class); + CostBasedAutoScalerConfig roundTripped = mapper.readValue( + mapper.writeValueAsString(bothSet), + CostBasedAutoScalerConfig.class + ); Assert.assertEquals(bothSet, roundTripped); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerHighLagScalingTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerHighLagScalingTest.java deleted file mode 100644 index 122a449c5f79..000000000000 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerHighLagScalingTest.java +++ /dev/null @@ -1,154 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.indexing.seekablestream.supervisor.autoscaler; - -import org.junit.Assert; -import org.junit.Test; - -/** - * Tests for {@link CostBasedAutoScaler#computeExtraPPTIncrease}. - *
- * The burst scaling uses a logarithmic formula: - * {@code deltaTasks = K * ln(lagSeverity)} - * where {@code K = (partitionCount / 6.4) / sqrt(currentTaskCount)}. - */ -public class CostBasedAutoScalerHighLagScalingTest -{ - private static final int LAG_THRESHOLD = 50_000; - private static final int PARTITION_COUNT = 48; - private static final int TASK_COUNT_MAX = 48; - - /** - * Tests scaling behavior across different lag levels and task counts. - *
- * Expected behavior for 48 partitions with threshold=50K: - *
- * | Current | Lag/Part | PPT reduction | Notes | - * |---------|----------|---------------|-----------------------------------------| - * | any | <50K | 0 | Below threshold | - * | any | =50K | 0 | ln(1) = 0 | - * | 1 | 100K | 40 | Significant boost for recovery | - * | 1 | 200K | 43 | Large boost | - * | 4 | 200K | 6 | Moderate boost | - * | 12 | 200K | 0 | Delta too small for PPT change | - * | 24 | 200K | 0 | Delta too small for PPT change | - *- */ - @Test - public void testComputeExtraPPTIncrease() - { - // Below threshold: no boost - Assert.assertEquals( - "Below threshold should not increase PPT", - 0, - CostBasedAutoScaler.computeExtraPPTIncrease( - LAG_THRESHOLD, - PARTITION_COUNT * 40_000L, - PARTITION_COUNT, - 4, - TASK_COUNT_MAX - ) - ); - - // At threshold (lagSeverity=1, ln(1)=0): no boost - Assert.assertEquals( - "At threshold (ln(1)=0) should not increase PPT", - 0, - CostBasedAutoScaler.computeExtraPPTIncrease( - LAG_THRESHOLD, - PARTITION_COUNT * 50_000L, - PARTITION_COUNT, - 4, - TASK_COUNT_MAX - ) - ); - - // C=1, 100K lag (2x threshold): significant boost for emergency recovery - int boost1_100k = CostBasedAutoScaler.computeExtraPPTIncrease( - LAG_THRESHOLD, - PARTITION_COUNT * 100_000L, - PARTITION_COUNT, - 1, - TASK_COUNT_MAX - ); - Assert.assertEquals("C=1, 100K lag boost", 40, boost1_100k); - - // C=1, 200K lag (4x threshold): large boost - int boost1_200k = CostBasedAutoScaler.computeExtraPPTIncrease( - LAG_THRESHOLD, - PARTITION_COUNT * 200_000L, - PARTITION_COUNT, - 1, - TASK_COUNT_MAX - ); - Assert.assertEquals("C=1, 200K lag boost", 43, boost1_200k); - - // C=4, 200K lag: moderate boost (K decreases with sqrt(C)) - int boost4_200k = CostBasedAutoScaler.computeExtraPPTIncrease( - LAG_THRESHOLD, - PARTITION_COUNT * 200_000L, - PARTITION_COUNT, - 4, - TASK_COUNT_MAX - ); - Assert.assertEquals("C=4, 200K lag should yield a modest PPT increase", 6, boost4_200k); - - // C=12, 200K lag: delta too small to change PPT - int boost12_200k = CostBasedAutoScaler.computeExtraPPTIncrease( - LAG_THRESHOLD, - PARTITION_COUNT * 200_000L, - PARTITION_COUNT, - 12, - TASK_COUNT_MAX - ); - Assert.assertEquals("C=12, 200K lag should not change PPT", 0, boost12_200k); - - // C=24, 200K lag: delta too small to change PPT - int boost24_200k = CostBasedAutoScaler.computeExtraPPTIncrease( - LAG_THRESHOLD, - PARTITION_COUNT * 200_000L, - PARTITION_COUNT, - 24, - TASK_COUNT_MAX - ); - Assert.assertEquals("C=24, 200K lag should not change PPT", 0, boost24_200k); - } - - @Test - public void testComputeExtraPPTIncreaseInvalidInputs() - { - Assert.assertEquals( - 0, - CostBasedAutoScaler.computeExtraPPTIncrease(LAG_THRESHOLD, 1_000_000, 0, 4, 48) - ); - Assert.assertEquals( - 0, - CostBasedAutoScaler.computeExtraPPTIncrease(LAG_THRESHOLD, 1_000_000, 48, 0, 48) - ); - Assert.assertEquals( - 0, - CostBasedAutoScaler.computeExtraPPTIncrease(LAG_THRESHOLD, 1_000_000, 48, 4, 0) - ); - Assert.assertEquals( - 0, - CostBasedAutoScaler.computeExtraPPTIncrease(LAG_THRESHOLD, 1_000_000, -1, 4, 48) - ); - } -} diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java index 4b5666a85ee0..56eacb43eabe 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java @@ -77,94 +77,62 @@ public void setUp() @Test public void testComputeValidTaskCounts() { - boolean useTaskCountBoundaries = true; - int highLagThreshold = 50_000; - // For 100 partitions at 25 tasks (4 partitions/task), valid counts include 25 and 34 - int[] validTaskCounts = computeValidTaskCounts( + final int[] validTaskCounts = computeValidTaskCounts( 100, 25, - 0L, 1, - 100, - useTaskCountBoundaries, - highLagThreshold + 100 ); Assert.assertTrue("Expected current task count to be included", contains(validTaskCounts, 25)); Assert.assertTrue("Expected next scale-up option (34) to be included", contains(validTaskCounts, 34)); // Single partition - int[] singlePartition = computeValidTaskCounts( + final int[] singlePartition = computeValidTaskCounts( 1, 1, - 0L, 1, - 100, - useTaskCountBoundaries, - highLagThreshold + 100 ); Assert.assertTrue("Single partition should yield at least one valid count", singlePartition.length > 0); Assert.assertTrue("Single partition should include task count 1", contains(singlePartition, 1)); // Current exceeds partitions - should still yield valid, deduplicated options - int[] exceedsPartitions = computeValidTaskCounts( + final int[] exceedsPartitions = computeValidTaskCounts( 2, 5, - 0L, 1, - 100, - useTaskCountBoundaries, - highLagThreshold + 100 ); Assert.assertEquals(2, exceedsPartitions.length); Assert.assertTrue(contains(exceedsPartitions, 1)); Assert.assertTrue(contains(exceedsPartitions, 2)); - // Lag expansion: low lag should not include max, high lag should allow aggressive scaling - int[] lowLagCounts = computeValidTaskCounts(30, 3, 0L, 1, 30, useTaskCountBoundaries, highLagThreshold); - Assert.assertFalse("Low lag should not include max task count", contains(lowLagCounts, 30)); - Assert.assertTrue("Low lag should cap scale-up around 4 tasks", contains(lowLagCounts, 4)); - - // High lag uses logarithmic formula: K * ln(lagSeverity) where K = P/(6.4*sqrt(C)) - // For P=30, C=3, lagPerPartition=500K, threshold=50K: lagSeverity=10, K=2.7, delta=6.2 - // This allows controlled scaling to ~10-15 tasks (not all the way to max) - long highAggregateLag = 30L * 500_000L; - int[] highLagCounts = computeValidTaskCounts( - 30, - 3, - highAggregateLag, - 1, - 30, - useTaskCountBoundaries, - highLagThreshold - ); - Assert.assertTrue("High lag should allow scaling to 10 tasks", contains(highLagCounts, 10)); - Assert.assertTrue("High lag should allow scaling to 15 tasks", contains(highLagCounts, 15)); - Assert.assertFalse("High lag should not jump straight to max (30) from 3", contains(highLagCounts, 30)); + // Unbounded candidate generation includes both nearby and maximum task counts. + final int[] taskCounts = computeValidTaskCounts(30, 3, 1, 30); + Assert.assertTrue("Valid task counts should include max task count", contains(taskCounts, 30)); + Assert.assertTrue("Valid task counts should include nearby scale-up task count", contains(taskCounts, 4)); // Respects taskCountMax - int[] cappedCounts = computeValidTaskCounts( + final int[] cappedCounts = computeValidTaskCounts( 30, 4, - highAggregateLag, 1, - 3, - useTaskCountBoundaries, - highLagThreshold + 3 ); Assert.assertTrue("Should include taskCountMax when within bounds", contains(cappedCounts, 3)); Assert.assertFalse("Should not exceed taskCountMax", contains(cappedCounts, 4)); // Respects taskCountMin - filters out values below the minimum // With partitionCount=100, currentTaskCount=10, the computed range includes values like 8, 9, 10, 12, 13 - int[] minCappedCounts = computeValidTaskCounts(100, 10, 0L, 10, 100, useTaskCountBoundaries, highLagThreshold); + final int[] minCappedCounts = computeValidTaskCounts(100, 10, 10, 100); Assert.assertFalse("Should not include values below taskCountMin (8)", contains(minCappedCounts, 8)); Assert.assertFalse("Should not include values below taskCountMin (9)", contains(minCappedCounts, 9)); Assert.assertTrue("Should include values at taskCountMin (10)", contains(minCappedCounts, 10)); Assert.assertTrue("Should include values above taskCountMin (12)", contains(minCappedCounts, 12)); // Both bounds applied together - int[] bothBounds = computeValidTaskCounts(100, 10, 0L, 10, 12, useTaskCountBoundaries, highLagThreshold); + final int[] bothBounds = computeValidTaskCounts(100, 10, 10, 12); Assert.assertFalse("Should not include values below taskCountMin (8)", contains(bothBounds, 8)); Assert.assertFalse("Should not include values below taskCountMin (9)", contains(bothBounds, 9)); Assert.assertFalse("Should not include values above taskCountMax (13)", contains(bothBounds, 13)); @@ -182,7 +150,7 @@ public void testComputeOptimalTaskCount() Assert.assertEquals(-1, autoScaler.computeOptimalTaskCount(createMetrics(100.0, -1, 100, 0.3))); // Negative pollIdleRatio (metric unavailable) should still allow scaling - int unavailableIdleResult = autoScaler.computeOptimalTaskCount(createMetrics(100.0, 25, 100, -1.0)); + final int unavailableIdleResult = autoScaler.computeOptimalTaskCount(createMetrics(100.0, 25, 100, -1.0)); MatcherAssert.assertThat( "Negative pollIdleRatio should not reject scaling", unavailableIdleResult, @@ -190,16 +158,84 @@ public void testComputeOptimalTaskCount() ); // High idle (underutilized) - should scale down - int scaleDownResult = autoScaler.computeOptimalTaskCount(createMetrics(100.0, 25, 100, 0.8)); + final int scaleDownResult = autoScaler.computeOptimalTaskCount(createMetrics(100.0, 25, 100, 0.8)); Assert.assertTrue("Expected scale-down when idle ratio is high (>0.6)", scaleDownResult < 25); // Very high idle with high task count - should scale down - int highIdleResult = autoScaler.computeOptimalTaskCount(createMetrics(10.0, 50, 100, 0.9)); + final int highIdleResult = autoScaler.computeOptimalTaskCount(createMetrics(10.0, 50, 100, 0.9)); Assert.assertTrue("High idle should not suggest scale-up", highIdleResult <= 50); - // With low idle and balanced weights, the algorithm should not scale up aggressively - int lowIdleResult = autoScaler.computeOptimalTaskCount(createMetrics(1000.0, 25, 100, 0.1)); - Assert.assertTrue("With low idle and balanced weights, avoid aggressive scale-up", lowIdleResult <= 25); + // With idle below ideal (0.1 < 0.25), U-shaped cost penalizes under-provisioning, + // driving a moderate scale-up toward the ideal operating point. + final int lowIdleResult = autoScaler.computeOptimalTaskCount(createMetrics(1000.0, 25, 100, 0.1)); + Assert.assertTrue( + "Low idle below ideal should drive scale-up toward ideal operating point", + lowIdleResult > 25 + ); + } + + @Test + public void testComputeOptimalTaskCountLimitsTaskCountJumps() + { + final CostBasedAutoScalerConfig boundedScaleUpConfig = CostBasedAutoScalerConfig.builder() + .taskCountMax(100) + .taskCountMin(1) + .enableTaskAutoScaler(true) + .lagWeight(1.0) + .idleWeight(0.0) + .useTaskCountBoundariesOnScaleUp(true) + .build(); + final CostBasedAutoScaler boundedScaleUpScaler = createAutoScaler(boundedScaleUpConfig); + + Assert.assertEquals( + "Scale-up should only evaluate two task-count candidates above the current count", + 13, + boundedScaleUpScaler.computeOptimalTaskCount(createMetrics(100_000.0, 10, 100, 0.25)) + ); + + final CostBasedAutoScalerConfig unboundedScaleUpConfig = CostBasedAutoScalerConfig.builder() + .taskCountMax(100) + .taskCountMin(1) + .enableTaskAutoScaler(true) + .lagWeight(1.0) + .idleWeight(0.0) + .build(); + final CostBasedAutoScaler unboundedScaleUpScaler = createAutoScaler(unboundedScaleUpConfig); + Assert.assertEquals( + "Without scale-up boundaries, lag-only optimization should jump to max task count", + 100, + unboundedScaleUpScaler.computeOptimalTaskCount(createMetrics(100_000.0, 10, 100, 0.25)) + ); + + final CostBasedAutoScalerConfig boundedScaleDownConfig = CostBasedAutoScalerConfig.builder() + .taskCountMax(100) + .taskCountMin(1) + .enableTaskAutoScaler(true) + .lagWeight(0.0) + .idleWeight(1.0) + .useTaskCountBoundariesOnScaleDown(true) + .build(); + final CostBasedAutoScaler boundedScaleDownScaler = createAutoScaler(boundedScaleDownConfig); + + Assert.assertEquals( + "Scale-down should only evaluate two task-count candidates below the current count", + 34, + boundedScaleDownScaler.computeOptimalTaskCount(createMetrics(0.0, 100, 100, 0.9)) + ); + + final CostBasedAutoScalerConfig unboundedScaleDownConfig = CostBasedAutoScalerConfig.builder() + .taskCountMax(25) + .taskCountMin(1) + .enableTaskAutoScaler(true) + .lagWeight(0.0) + .idleWeight(1.0) + .build(); + final CostBasedAutoScaler unboundedScaleDownScaler = createAutoScaler(unboundedScaleDownConfig); + Assert.assertEquals( + "Without scale-down boundaries, idle-only optimization may select a much lower task count", + 1, + unboundedScaleDownScaler.computeOptimalTaskCount(createMetrics(0.0, 100, 100, 0.9)) + ); } @Test @@ -531,6 +567,21 @@ private CostMetrics createMetrics( ); } + private CostBasedAutoScaler createAutoScaler(CostBasedAutoScalerConfig config) + { + final SupervisorSpec mockSupervisorSpec = Mockito.mock(SupervisorSpec.class); + final SeekableStreamSupervisor mockSupervisor = Mockito.mock(SeekableStreamSupervisor.class); + final ServiceEmitter mockEmitter = Mockito.mock(ServiceEmitter.class); + final SeekableStreamSupervisorIOConfig mockIoConfig = Mockito.mock(SeekableStreamSupervisorIOConfig.class); + + when(mockSupervisorSpec.getId()).thenReturn("test-supervisor"); + when(mockSupervisorSpec.getDataSources()).thenReturn(List.of("test-datasource")); + when(mockSupervisor.getIoConfig()).thenReturn(mockIoConfig); + when(mockIoConfig.getStream()).thenReturn("test-stream"); + + return new CostBasedAutoScaler(mockSupervisor, config, mockSupervisorSpec, mockEmitter); + } + private boolean contains(int[] array, int value) { for (int i : array) { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunctionTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunctionTest.java index f96df30049e6..8d5448065fa0 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunctionTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunctionTest.java @@ -195,10 +195,10 @@ public void testNegativeProcessingRate_throwsDefensiveException() } @Test - public void testIdleCostMonotonicWithTaskCount() + public void testIdleCostIsUShapedAroundIdealRatio() { - // Test that idle cost increases monotonically with task count. - // With fixed load, adding more tasks means each task has less work, so idle increases. + // U-shaped cost: minimum near IDEAL_IDLE_RATIO=0.25, higher on both sides. + // Current: 10 tasks with 25% idle (already at ideal). CostBasedAutoScalerConfig idleOnlyConfig = CostBasedAutoScalerConfig.builder() .taskCountMax(100) .taskCountMin(1) @@ -207,18 +207,19 @@ public void testIdleCostMonotonicWithTaskCount() .idleWeight(1.0) .build(); - // Current: 10 tasks with 40% idle (60% busy) - CostMetrics metrics = createMetrics(0.0, 10, 100, 0.4); + CostMetrics metrics = createMetrics(0.0, 10, 100, 0.25); + + // At current (idle=0.25=ideal): baseline cost only, penalty=0 + double costAtIdeal = costFunction.computeCost(metrics, 10, idleOnlyConfig).totalCost(); + + // Scale down → predicted idle falls below ideal → under-provisioning penalty + double costScaleDown = costFunction.computeCost(metrics, 5, idleOnlyConfig).totalCost(); - double costAt5 = costFunction.computeCost(metrics, 5, idleOnlyConfig).totalCost(); - double costAt10 = costFunction.computeCost(metrics, 10, idleOnlyConfig).totalCost(); - double costAt15 = costFunction.computeCost(metrics, 15, idleOnlyConfig).totalCost(); - double costAt20 = costFunction.computeCost(metrics, 20, idleOnlyConfig).totalCost(); + // Scale up → predicted idle rises above ideal → over-provisioning penalty + double costScaleUp = costFunction.computeCost(metrics, 20, idleOnlyConfig).totalCost(); - // Monotonically increasing idle cost as tasks increase - Assert.assertTrue("cost(5) < cost(10)", costAt5 < costAt10); - Assert.assertTrue("cost(10) < cost(15)", costAt10 < costAt15); - Assert.assertTrue("cost(15) < cost(20)", costAt15 < costAt20); + Assert.assertTrue("scale-down costs more than ideal", costScaleDown > costAtIdeal); + Assert.assertTrue("scale-up costs more than ideal", costScaleUp > costAtIdeal); } @Test @@ -238,8 +239,10 @@ public void testIdleRatioClampingAtBoundaries() CostMetrics metrics = createMetrics(0.0, 10, 100, 0.4); double costAt2 = costFunction.computeCost(metrics, 2, idleOnlyConfig).totalCost(); - // idlenessCost = taskCount * 0.0 (clamped) = 0 - Assert.assertEquals("Idle cost should be 0 when predicted idle is clamped to 0", 0.0, costAt2, 0.0001); + // idle = 0: max under-provisioning penalty; cost = taskCount * (IDEAL + UNDER_PENALTY) + double expectedAt2 = 2 * (WeightedCostFunction.IDEAL_IDLE_RATIO + WeightedCostFunction.UNDER_PROVISIONING_PENALTY); + Assert.assertEquals("Idle cost at clamped-to-zero idle ratio should reflect full under-provisioning penalty", + expectedAt2, costAt2, 0.0001); // Extreme scale-up shouldn't exceed 1.0 for idle ratio // 10 tasks → 100 tasks with 10% idle @@ -267,11 +270,13 @@ public void testIdleRatioWithMissingData() double cost10 = costFunction.computeCost(missingIdleData, 10, idleOnlyConfig).totalCost(); double cost20 = costFunction.computeCost(missingIdleData, 20, idleOnlyConfig).totalCost(); - // With missing data, predicted idle = 0.5 for all task counts - // idlenessCost at 10 = 10 * 0.5 = 5 - // idlenessCost at 20 = 20 * 0.5 = 10 - Assert.assertEquals("Cost at 10 tasks with missing idle data", 10 * 0.5, cost10, 0.0001); - Assert.assertEquals("Cost at 20 tasks with missing idle data", 20 * 0.5, cost20, 0.0001); + // With missing data, predicted idle = 0.5 for all task counts regardless of proposed count. + // U-shaped cost at idle=0.5: idle > IDEAL(0.25), norm=(0.5-0.25)/0.75=1/3, penalty=1*(1/3)^2=1/9 + double expectedCostPerTask = WeightedCostFunction.IDEAL_IDLE_RATIO + + WeightedCostFunction.OVER_PROVISIONING_PENALTY * (1.0 / 3.0) * (1.0 / 3.0); + Assert.assertEquals("Cost at 10 tasks with missing idle data", 10 * expectedCostPerTask, cost10, 0.0001); + Assert.assertEquals("Cost at 20 tasks with missing idle data", 20 * expectedCostPerTask, cost20, 0.0001); + Assert.assertEquals("Cost scales linearly with task count at fixed idle ratio", 2 * cost10, cost20, 0.0001); } @Test @@ -338,6 +343,51 @@ public void testAmplificationGrowsWithLag() } + @Test + public void testUShapedIdleCostFormula() + { + int n = 10; + + // At ideal ratio: penalty = 0, cost = n * IDEAL_IDLE_RATIO + Assert.assertEquals( + n * WeightedCostFunction.IDEAL_IDLE_RATIO, + costFunction.uShapedIdleCost(WeightedCostFunction.IDEAL_IDLE_RATIO, n), + 1e-9 + ); + + // At idle = 0 (fully under-provisioned): norm = 1, penalty = UNDER_PROVISIONING_PENALTY + Assert.assertEquals( + n * (WeightedCostFunction.IDEAL_IDLE_RATIO + WeightedCostFunction.UNDER_PROVISIONING_PENALTY), + costFunction.uShapedIdleCost(0.0, n), + 1e-9 + ); + + // At idle = 1 (fully over-provisioned): norm = 1, penalty = OVER_PROVISIONING_PENALTY + Assert.assertEquals( + n * (WeightedCostFunction.IDEAL_IDLE_RATIO + WeightedCostFunction.OVER_PROVISIONING_PENALTY), + costFunction.uShapedIdleCost(1.0, n), + 1e-9 + ); + + // Both extremes exceed the ideal cost + double idealCost = costFunction.uShapedIdleCost(WeightedCostFunction.IDEAL_IDLE_RATIO, n); + Assert.assertTrue("idle=0 costs more than ideal", costFunction.uShapedIdleCost(0.0, n) > idealCost); + Assert.assertTrue("idle=1 costs more than ideal", costFunction.uShapedIdleCost(1.0, n) > idealCost); + + // Under-provisioning is penalized more than over-provisioning (UNDER > OVER) + Assert.assertTrue( + "under-provisioning penalty exceeds over-provisioning penalty", + costFunction.uShapedIdleCost(0.0, n) > costFunction.uShapedIdleCost(1.0, n) + ); + + // Cost scales linearly with task count at any fixed idle ratio + Assert.assertEquals( + 2 * costFunction.uShapedIdleCost(0.5, n), + costFunction.uShapedIdleCost(0.5, 2 * n), + 1e-9 + ); + } + private CostMetrics createMetrics( double avgPartitionLag, int currentTaskCount,