Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 20 additions & 19 deletions docs/ingestion/supervisor.md
Original file line number Diff line number Diff line change
Expand Up @@ -193,29 +193,30 @@ The following example shows a supervisor spec with `lagBased` autoscaler:
```
</details>

**2. Cost-based autoscaler strategy (experimental)**
**2. Cost-based autoscaler strategy**

An autoscaler which computes the required supervisor task count via cost function based on ingestion lag and poll-to-idle ratio.
Task counts are selected from a bounded range derived from the current partitions-per-task ratio,
not strictly from factors/divisors of the partition count. This bounded partitions-per-task window enables gradual scaling while
voiding large jumps and still allowing non-divisor task counts when needed.
The cost-based autoscaler picks the number of ingestion tasks that minimizes a combined cost score. The score has two components:

**It is experimental and the implementation details as well as cost function parameters are subject to change.**
- **Lag cost** — how long it would take to drain the current backlog at the observed processing rate. More tasks reduce this cost.
- **Idle cost** — how far the predicted idle ratio is from the target of ~25%. Tasks that are too busy (under-provisioned) or too idle (over-provisioned) both drive the score up.
The sweet spot is roughly 25% idle, giving headroom to absorb traffic spikes without wasting resources.

At every evaluation interval, Druid computes the score for each candidate task count and picks the one with the lowest total cost.

Note: Kinesis is not supported yet, support is in progress.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it really in progress?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to verify if anybody have a Kinesis workload with CBA working. If you want, we can remove that part.


The following table outlines the configuration properties related to the `costBased` autoscaler strategy:

| Property|Description|Required|Default|
|---------|-----------|--------|-------|
|`scaleActionPeriodMillis`|The frequency in milliseconds to check if a scale action is triggered. | No | 600000 |
|`lagWeight`|The weight of extracted lag value in cost function.| No| 0.25 |
|`idleWeight`|The weight of extracted poll idle value in cost function. | No | 0.75 |
|`useTaskCountBoundaries`|Enables the bounded partitions-per-task window when selecting task counts.|No| `false` |
|`highLagThreshold`|Average partition lag threshold that triggers burst scale-up when set to a value greater than `0`. Set to a negative value to disable burst scale-up.|No|-1|
|`minScaleUpDelay`|Minimum cooldown duration after a scale-up action before the next scale-up is allowed, specified as an ISO-8601 duration string.|No||
|`minScaleDownDelay`|Minimum cooldown duration after a scale-down action before the next scale-down is allowed, specified as an ISO-8601 duration string.|No|`PT30M`|
|`scaleDownDuringTaskRolloverOnly`|Indicates whether task scaling down is limited to periods during task rollovers only.|No|`false`|
| Property | Description | Required | Default |
|----------|-------------|----------|---------------------------|
|`scaleActionPeriodMillis`|How often, in milliseconds, Druid evaluates whether to scale.|No| `600000` (10 min) |
|`lagWeight`|How much weight to give the lag cost relative to the idle cost. Higher values make the autoscaler more aggressive about adding tasks to drain backlog.|No| `0.4` |
|`idleWeight`|How much weight to give the idle cost relative to the lag cost. Higher values make the autoscaler more aggressive about removing over-provisioned tasks.|No| `0.6` |
|`useTaskCountBoundariesOnScaleUp`|Limits scale-up to a small step relative to the current task count, preventing large jumps. Disable to allow the autoscaler to jump directly to any task count.|No| `false` |
|`useTaskCountBoundariesOnScaleDown`|Limits scale-down to a small step relative to the current task count, preventing large drops. Disable to allow the autoscaler to drop directly to any task count.|No| `true` |
|`minScaleUpDelay`|Minimum cooldown after a scale-up before the next scale-up is allowed. Specified as an ISO-8601 duration.|No| `scaleActionPeriodMillis` |
|`minScaleDownDelay`|Minimum cooldown after a scale-down before the next scale-down is allowed. Specified as an ISO-8601 duration.|No| `PT30M` |
|`scaleDownDuringTaskRolloverOnly`|If `true`, scale-down actions are deferred until the next task rollover. This avoids disrupting in-progress ingestion.|No| `false` |

The following example shows a supervisor spec with `costBased` autoscaler:

Expand All @@ -231,10 +232,10 @@ The following example shows a supervisor spec with `costBased` autoscaler:
"autoScalerStrategy": "costBased",
"taskCountMin": 1,
"taskCountMax": 10,
"lagWeight": 0.4,
"idleWeight": 0.6,
"minScaleUpDelay": "PT10M",
"minScaleDownDelay": "PT30M",
"lagWeight": 0.1,
"idleWeight": 0.9
"minScaleDownDelay": "PT30M"
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public void test_autoScaler_computesOptimalTaskCountAndProducesScaleUp()
}
});

// These values were carefully handpicked to allow that test to pass in a stable manner.
// These values were carefully handpicked to allow that test to pass stably.
final CostBasedAutoScalerConfig autoScalerConfig = CostBasedAutoScalerConfig
.builder()
.enableTaskAutoScaler(true)
Expand All @@ -152,8 +152,8 @@ public void test_autoScaler_computesOptimalTaskCountAndProducesScaleUp()
.taskCountStart(lowInitialTaskCount)
.scaleActionPeriodMillis(500)
.minTriggerScaleActionFrequencyMillis(1000)
.lagWeight(0.2)
.idleWeight(0.8)
.lagWeight(0.8)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will be the effect of the change to lag and idle weights?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was passing without any problems in normal circumstances. The main idea of the change is to reduce the potential of not scaling over the timeout due to CI CPU pressure.

.idleWeight(0.2)
.build();

final KafkaSupervisorSpec kafkaSupervisorSpec = createKafkaSupervisorWithAutoScaler(
Expand Down Expand Up @@ -192,11 +192,11 @@ public void test_autoScaler_scalesUpAndDown_withSlowPublish()

final CostBasedAutoScalerConfig autoScalerConfig = CostBasedAutoScalerConfig
.builder()
.enableTaskAutoScaler(true)
.taskCountMin(1)
.taskCountMax(4)
.lagWeight(1.0)
.idleWeight(1.0)
.enableTaskAutoScaler(true)
.lagWeight(0.5)
.idleWeight(0.5)
.minTriggerScaleActionFrequencyMillis(10L)
.scaleActionPeriodMillis(10L)
.minScaleDownDelay(Duration.standardSeconds(1))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,21 +39,23 @@
import org.apache.druid.utils.CollectionUtils;

import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
* Cost-based auto-scaler for seekable stream supervisors.
* Uses a weighted cost function combining lag recovery time (seconds) and idleness cost (seconds)
* Uses a weighted cost function combining lag recovery time and idle-ratio cost
* to determine optimal task counts.
* <p>
* Candidate task counts are derived by scanning a bounded window of partitions-per-task (PPT) values
* around the current PPT, then converting those to task counts. This allows non-divisor task counts
* while keeping changes gradual (no large jumps).
* Candidate task counts are derived from possible partitions-per-task ratios, then converted
* to task counts. When configured, scale-up and scale-down can independently limit the evaluated
* candidates to a small window around the current task count to avoid large jumps.
* <p>
* Scale-up and scale-down are both evaluated proactively.
* Future versions may perform scale-down on task rollover only.
* Scale-up is applied during regular scale-action checks. Scale-down is applied during regular
* checks unless {@link CostBasedAutoScalerConfig#isScaleDownOnTaskRolloverOnly()} defers it
* to task rollover.
*/
public class CostBasedAutoScaler implements SupervisorTaskAutoScaler
{
Expand All @@ -64,21 +66,17 @@ public class CostBasedAutoScaler implements SupervisorTaskAutoScaler
public static final String OPTIMAL_TASK_COUNT_METRIC = "task/autoScaler/costBased/optimalTaskCount";
public static final String INVALID_METRICS_COUNT = "task/autoScaler/costBased/invalidMetrics";

static final int MAX_INCREASE_IN_PARTITIONS_PER_TASK = 2;
static final int MAX_DECREASE_IN_PARTITIONS_PER_TASK = MAX_INCREASE_IN_PARTITIONS_PER_TASK * 2;

/**
* If average partition lag crosses this value and the processing rate is
* still zero, scaling actions are skipped and an alert is raised.
* Maximum number of candidate task counts to evaluate above or below the current task count
* when scale-up or scale-down boundaries are enabled.
*/
static final int MAX_IDLENESS_PARTITION_LAG = 10_000;
static final int BOUNDARY_LIMIT_IN_PARTITIONS_PER_TASK = 2;

/**
* Divisor for partition count in the K formula: K = (partitionCount / K_PARTITION_DIVISOR) / sqrt(currentTaskCount).
* This controls how aggressive the scaling is relative to partition count.
* That value was chosen by carefully analyzing the math model behind the implementation.
* If the average partition lag crosses this value and the processing rate is
* still zero, scaling actions are skipped and an alert is raised.
*/
static final double K_PARTITION_DIVISOR = 6.4;
static final int MAX_IDLENESS_PARTITION_LAG = 10_000;

private final String supervisorId;
private final SeekableStreamSupervisor supervisor;
Expand Down Expand Up @@ -176,7 +174,7 @@ public int computeTaskCountForScaleAction()
// Take the current task count but clamp it to the configured boundaries if it is outside the boundaries.
// There might be a configuration instance with a handwritten taskCount that is outside the boundaries.
final boolean isTaskCountOutOfBounds = currentTaskCount < config.getTaskCountMin()
|| currentTaskCount > config.getTaskCountMax();
|| currentTaskCount > config.getTaskCountMax();
if (isTaskCountOutOfBounds) {
currentTaskCount = Math.min(config.getTaskCountMax(), Math.max(config.getTaskCountMin(), currentTaskCount));
}
Expand All @@ -188,16 +186,28 @@ public int computeTaskCountForScaleAction()
// regardless of optimal task count, to get back to a safe state.
if (isTaskCountOutOfBounds) {
taskCount = currentTaskCount;
log.info("Task count for supervisor[%s] was out of bounds [%d,%d], urgently scaling from [%d] to [%d].",
supervisorId, config.getTaskCountMin(), config.getTaskCountMax(), currentTaskCount, currentTaskCount);
log.info(
"Task count for supervisor[%s] was out of bounds [%d,%d], urgently scaling from [%d] to [%d].",
supervisorId, config.getTaskCountMin(), config.getTaskCountMax(), currentTaskCount, currentTaskCount
);
} else if (optimalTaskCount > currentTaskCount) {
taskCount = optimalTaskCount;
log.info("Updating taskCount for supervisor[%s] from [%d] to [%d] (scale up).", supervisorId, currentTaskCount, taskCount);
log.info(
"Updating taskCount for supervisor[%s] from [%d] to [%d] (scale up).",
supervisorId,
currentTaskCount,
taskCount
);
} else if (!config.isScaleDownOnTaskRolloverOnly()
&& optimalTaskCount < currentTaskCount
&& optimalTaskCount > 0) {
taskCount = optimalTaskCount;
log.info("Updating taskCount for supervisor[%s] from [%d] to [%d] (scale down).", supervisorId, currentTaskCount, taskCount);
log.info(
"Updating taskCount for supervisor[%s] from [%d] to [%d] (scale down).",
supervisorId,
currentTaskCount,
taskCount
);
} else {
taskCount = -1;
log.debug("No scaling required for supervisor[%s]", supervisorId);
Expand Down Expand Up @@ -237,7 +247,7 @@ public CostBasedAutoScalerConfig getConfig()
* <li>Current task count already optimal</li>
* </ul>
*
* @return optimal task count for scale-up, or -1 if no scaling action needed
* @return optimal task count, or -1 if no scaling action is needed
*/
int computeOptimalTaskCount(CostMetrics metrics)
{
Expand All @@ -261,11 +271,8 @@ int computeOptimalTaskCount(CostMetrics metrics)
final int[] validTaskCounts = CostBasedAutoScaler.computeValidTaskCounts(
partitionCount,
currentTaskCount,
(long) metrics.getAggregateLag(),
config.getTaskCountMin(),
config.getTaskCountMax(),
config.shouldUseTaskCountBoundaries(),
config.getHighLagThreshold()
config.getTaskCountMax()
);

if (validTaskCounts.length == 0) {
Expand All @@ -290,9 +297,28 @@ int computeOptimalTaskCount(CostMetrics metrics)
config.getIdleWeight()
);

// Find the task count which reduces cost
// Find the evaluated task count with the lowest cost.
final CostResult[] costResults = new CostResult[validTaskCounts.length];
for (int i = 0; i < validTaskCounts.length; ++i) {
Arrays.fill(costResults, CostResult.INFINITE_COST);

int startIndex = 0;
int endIndex = validTaskCounts.length - 1;

if (config.shouldUseTaskCountBoundariesOnScaleUp()) {
int currentTaskCountIndex = Arrays.binarySearch(validTaskCounts, currentTaskCount);
endIndex = currentTaskCountIndex >= 0
? Math.min(currentTaskCountIndex + BOUNDARY_LIMIT_IN_PARTITIONS_PER_TASK, endIndex)
: endIndex;
}

if (config.shouldUseTaskCountBoundariesOnScaleDown()) {
int currentTaskCountIndex = Arrays.binarySearch(validTaskCounts, currentTaskCount);
startIndex = currentTaskCountIndex >= 0
? Math.max(currentTaskCountIndex - BOUNDARY_LIMIT_IN_PARTITIONS_PER_TASK, startIndex)
: startIndex;
}

for (int i = startIndex; i <= endIndex; ++i) {
final int taskCount = validTaskCounts[i];
CostResult costResult = costFunction.computeCost(metrics, taskCount, config);
double cost = costResult.totalCost();
Expand All @@ -317,57 +343,33 @@ int computeOptimalTaskCount(CostMetrics metrics)
);
}

// Scale up is performed eagerly.
// Scale-up is applied eagerly; scale-down may be deferred by computeTaskCountForScaleAction().
return optimalTaskCount;
}

/**
* Generates valid task counts based on partitions-per-task ratios.
* Generates valid task counts by converting every possible partitions-per-task ratio
* into a task count and filtering by configured min/max task count bounds.
*
* @return sorted list of valid task counts within bounds
* @return list of valid task counts within bounds
*/
@SuppressWarnings({"ReassignedVariable"})
static int[] computeValidTaskCounts(
int partitionCount,
int currentTaskCount,
double aggregateLag,
int taskCountMin,
int taskCountMax,
boolean isTaskCountBoundariesEnabled,
int highLagThreshold
int taskCountMax
)
{
if (partitionCount <= 0 || currentTaskCount <= 0) {
return new int[]{};
}

IntSet result = new IntArraySet();
final int currentPartitionsPerTask = partitionCount / currentTaskCount;

// Minimum partitions per task correspond to the maximum number of tasks (scale up) and vice versa.
int minPartitionsPerTask = Math.min(1, partitionCount / taskCountMax);
int maxPartitionsPerTask = Math.max(partitionCount, partitionCount / taskCountMin);

if (isTaskCountBoundariesEnabled) {
maxPartitionsPerTask = Math.min(
partitionCount,
currentPartitionsPerTask + MAX_DECREASE_IN_PARTITIONS_PER_TASK
);

int extraIncrease = 0;
if (highLagThreshold > 0) {
extraIncrease = computeExtraPPTIncrease(
highLagThreshold,
aggregateLag,
partitionCount,
currentTaskCount,
taskCountMax
);
}
int effectiveMaxIncrease = MAX_INCREASE_IN_PARTITIONS_PER_TASK + extraIncrease;
minPartitionsPerTask = Math.max(minPartitionsPerTask, currentPartitionsPerTask - effectiveMaxIncrease);
}

for (int partitionsPerTask = maxPartitionsPerTask; partitionsPerTask >= minPartitionsPerTask
&& partitionsPerTask != 0; partitionsPerTask--) {
final int taskCount = (partitionCount + partitionsPerTask - 1) / partitionsPerTask;
Expand All @@ -378,50 +380,6 @@ static int[] computeValidTaskCounts(
return result.toIntArray();
}

/**
* Computes extra allowed increase in partitions-per-task in scenarios when the average per-partition lag
* is above the configured threshold.
* <p>
* This uses a logarithmic formula for consistent absolute growth:
* {@code deltaTasks = K * ln(lagSeverity)}
* where {@code K = (partitionCount / 6.4) / sqrt(currentTaskCount)}
* <p>
* This ensures that small taskCount's get a massive relative boost,
* while large taskCount's receive more measured, stable increases.
*/
static int computeExtraPPTIncrease(
double lagThreshold,
double aggregateLag,
int partitionCount,
int currentTaskCount,
int taskCountMax
)
{
if (partitionCount <= 0 || taskCountMax <= 0 || currentTaskCount <= 0) {
return 0;
}

final double lagPerPartition = aggregateLag / partitionCount;
if (lagPerPartition < lagThreshold) {
return 0;
}

final double lagSeverity = lagPerPartition / lagThreshold;

// Logarithmic growth: ln(lagSeverity) is positive when lagSeverity > 1
// First multoplier decreases with sqrt(currentTaskCount): aggressive when small, conservative when large
final double deltaTasks = (partitionCount / K_PARTITION_DIVISOR) / Math.sqrt(currentTaskCount) * Math.log(
lagSeverity);

final double targetTaskCount = Math.min(taskCountMax, (double) currentTaskCount + deltaTasks);

// Compute precise PPT reduction to avoid early integer truncation artifacts
final double currentPPT = (double) partitionCount / currentTaskCount;
final double targetPPT = (double) partitionCount / targetTaskCount;

return Math.max(0, (int) Math.floor(currentPPT - targetPPT));
}

/**
* Extracts the average poll-idle-ratio metric from task stats.
* This metric indicates how much time the consumer spends idle waiting for data.
Expand Down
Loading
Loading