Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -503,29 +503,67 @@ public void handle()
final int desiredTaskCount = computeDesiredTaskCount.call();
final int currentTaskCount = getCurrentTaskCount();

if (desiredTaskCount <= 0) {
return;
}

ServiceMetricEvent.Builder event = ServiceMetricEvent.builder()
.setDimension(DruidMetrics.SUPERVISOR_ID, supervisorId)
.setDimension(DruidMetrics.DATASOURCE, dataSource)
.setDimension(DruidMetrics.STREAM, getIoConfig().getStream());

// 1) This should already be handled by the auto-scaler implementation, but make sure we catch/record these for auditability
if (desiredTaskCount == currentTaskCount) {
// Auto-scaler contract: a negative return is pathological (scaler couldn't compute a
// useful answer, e.g. insufficient metrics).
if (desiredTaskCount < 0) {
log.warn(
"Skipping scaling for supervisor[%s] for dataSource[%s]: already at desired task count [%d]",
"Auto-scaler returned pathological taskCount[%d] for supervisor[%s] for dataSource[%s]; skipping scale.",
desiredTaskCount,
supervisorId,
dataSource
);
emitter.emit(event.setDimension(AUTOSCALER_SKIP_REASON_DIMENSION, "Auto-scaler failed to compute a task count")
.setMetric(AUTOSCALER_REQUIRED_TASKS_METRIC, desiredTaskCount));
return;
}

// Clamp the scaler's preferred count to the supervisor's configured bounds. The scaler
// is responsible only for computing what it wants; bounds enforcement and the
// corresponding skip-reason emissions live here. Partition count is a hard ceiling — we
// cannot usefully run more tasks than partitions for stream ingestion — so it caps both
// configured min and max (protects against misconfigurations where an operator-set min
// exceeds partitionCount, which would otherwise force us to allocate idle tasks).
final int partitionCount = getPartitionCount();
final int rawMin = autoScalerConfig.getTaskCountMin();
final int rawMax = autoScalerConfig.getTaskCountMax();
final int taskCountMin = partitionCount > 0 ? Math.min(rawMin, partitionCount) : rawMin;
final int taskCountMax = partitionCount > 0 ? Math.min(rawMax, partitionCount) : rawMax;
final int clampedTaskCount = Math.min(taskCountMax, Math.max(taskCountMin, desiredTaskCount));

// When the clamped value equals the current count there is nothing to do. Emit a specific
// skip reason reflecting *why* we stopped: at a configured bound if clamping was the cause,
// otherwise the scaler simply prefers the status quo.
if (clampedTaskCount == currentTaskCount) {
final String skipReason;
if (desiredTaskCount > taskCountMax) {
skipReason = "Already at max task count";
} else if (desiredTaskCount < taskCountMin) {
skipReason = "Already at min task count";
} else {
skipReason = "desired capacity reached";
}
log.info(
"Skipping scaling for supervisor[%s] for dataSource[%s]: [%s] (scaler wants [%d], current [%d], bounds [%d,%d])",
supervisorId,
dataSource,
desiredTaskCount
skipReason,
desiredTaskCount,
currentTaskCount,
taskCountMin,
taskCountMax
);
emitter.emit(event.setDimension(AUTOSCALER_SKIP_REASON_DIMENSION, "desired capacity reached")
// Emit the *unclamped* desired count so operators see what the scaler actually wants.
emitter.emit(event.setDimension(AUTOSCALER_SKIP_REASON_DIMENSION, skipReason)
.setMetric(AUTOSCALER_REQUIRED_TASKS_METRIC, desiredTaskCount));
return;
}

// 2) Make sure we wait for any pending completion tasks to finish.
// Make sure we wait for any pending completion tasks to finish.
// At this point there could be 3 generations of tasks: pending completion tasks (old generation), running tasks (current generation), and (after our scale) pending tasks (new generation).
// We want to avoid killing any old generation tasks preemptively, as that might cause the current generation tasks' offsets to become invalid.
for (CopyOnWriteArrayList<TaskGroup> list : pendingCompletionTaskGroups.values()) {
Expand All @@ -546,32 +584,34 @@ public void handle()
}
}

// 3) Make sure we are not breaching any scaling cooldown limits.
// Make sure we are not breaching any scaling cooldown limits.
// Scaling operations are disruptive — scale-down in particular can leave the supervisor
// under-resourced while it recovers from lag induced by the scale event, so callers may
// configure a longer cooldown for scale-down than for scale-up. Both directions are measured against the same
// last-scale timestamp so that a rapid up/down oscillation is still subject to the appropriate cooldown,
// regardless of which direction triggered last.
// regardless of which direction triggered last. Direction is computed against the
// *clamped* value — that is the scale we would actually apply.
final ScaleDirection scaleDirection;
final long cooldownMillis;

if (desiredTaskCount > currentTaskCount) {
if (clampedTaskCount > currentTaskCount) {
scaleDirection = ScaleDirection.SCALE_UP;
cooldownMillis = autoScalerConfig.getMinScaleUpDelay().getMillis();
} else { // desiredTaskCount < currentTaskCount
} else { // clampedTaskCount < currentTaskCount
scaleDirection = ScaleDirection.SCALE_DOWN;
cooldownMillis = autoScalerConfig.getMinScaleDownDelay().getMillis();
}

if (nowTime - dynamicTriggerLastScaleRunTime < cooldownMillis) {
log.info(
"DynamicAllocationTasksNotice submitted again in [%d]ms, [%s] cooldown is [%d]ms for supervisor[%s] for dataSource[%s], skipping it! desired task count is [%d], current task count is [%d]",
"DynamicAllocationTasksNotice submitted again in [%d]ms, [%s] cooldown is [%d]ms for supervisor[%s] for dataSource[%s], skipping it! scaler wants [%d] (clamped [%d]), current task count is [%d]",
nowTime - dynamicTriggerLastScaleRunTime,
scaleDirection,
cooldownMillis,
supervisorId,
dataSource,
desiredTaskCount,
clampedTaskCount,
currentTaskCount
);

Expand All @@ -583,10 +623,12 @@ public void handle()
return;
}

// At this point, we can reasonably attempt a scaling action, so emit our required task count
// At this point we can reasonably attempt a scaling action. Emit the scaler's unclamped
// preferred count as the operator hint: that number tells operators "the scaler wants
// taskCount[N]" regardless of whether we can give it the full amount.
emitter.emit(event.setMetric(AUTOSCALER_REQUIRED_TASKS_METRIC, desiredTaskCount));

boolean allocationSuccess = changeTaskCount(desiredTaskCount);
boolean allocationSuccess = changeTaskCount(clampedTaskCount);
if (allocationSuccess) {
onSuccessfulScale.run();
dynamicTriggerLastScaleRunTime = nowTime;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,61 +166,47 @@ public int computeTaskCountForRollover()
}
}

/**
* Returns the task count the scaler wants to reach — the "optimal" count computed from current
* metrics, unclamped by the autoscaler's min/max bounds. The supervisor is responsible for
* clamping the return value to {@code [taskCountMin, taskCountMax]} and for deciding whether
* to actually scale.
* <p>
* Contract:
* <ul>
* <li>Returns {@code -1} only for pathological cases (metrics unavailable / optimal cannot be
* computed). This signals to the supervisor that there is no useful hint for operators.</li>
* <li>Returns the current task count when scale-down is configured to happen on rollover only
* and the optimal would otherwise be a scale-down — the scaler's "preferred" count in that
* mode is to stay put.</li>
* <li>Otherwise returns the optimal task count unchanged.</li>
* </ul>
*/
public int computeTaskCountForScaleAction()
{
lastKnownMetrics = collectMetrics();

final int optimalTaskCount = computeOptimalTaskCount(lastKnownMetrics);
int currentTaskCount = supervisor.getIoConfig().getTaskCount();

// Take the current task count but clamp it to the configured boundaries if it is outside the boundaries.
// There might be a configuration instance with a handwritten taskCount that is outside the boundaries.
final boolean isTaskCountOutOfBounds = currentTaskCount < config.getTaskCountMin()
|| currentTaskCount > config.getTaskCountMax();
if (isTaskCountOutOfBounds) {
currentTaskCount = Math.min(config.getTaskCountMax(), Math.max(config.getTaskCountMin(), currentTaskCount));
if (optimalTaskCount <= 0) {
// computeOptimalTaskCount returns -1 for pathological metrics states; propagate it.
return -1;
}

// Perform scale-up actions; scale-down actions only if configured.
final int taskCount;

// If task count is out of bounds, scale to the configured boundary
// regardless of optimal task count, to get back to a safe state.
if (isTaskCountOutOfBounds) {
taskCount = currentTaskCount;
log.info("Task count for supervisor[%s] was out of bounds [%d,%d], urgently scaling from [%d] to [%d].",
supervisorId, config.getTaskCountMin(), config.getTaskCountMax(), currentTaskCount, currentTaskCount);
} else if (optimalTaskCount > currentTaskCount) {
taskCount = optimalTaskCount;
log.info("Updating taskCount for supervisor[%s] from [%d] to [%d] (scale up).", supervisorId, currentTaskCount, taskCount);
} else if (!config.isScaleDownOnTaskRolloverOnly()
&& optimalTaskCount < currentTaskCount
&& optimalTaskCount > 0) {
taskCount = optimalTaskCount;
log.info("Updating taskCount for supervisor[%s] from [%d] to [%d] (scale down).", supervisorId, currentTaskCount, taskCount);
} else {
taskCount = -1;
log.debug("No scaling required for supervisor[%s]", supervisorId);

// Emit metrics for scaling skip reasons; in case of min == max, signaling reaching
// max task count has bigger priority for the external observers / trackers
if (optimalTaskCount >= config.getTaskCountMax() || currentTaskCount == config.getTaskCountMax()) {
emitter.emit(getMetricBuilder()
.setDimension(
SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION,
"Already at max task count"
)
.setMetric(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC, currentTaskCount));
} else if (optimalTaskCount == config.getTaskCountMin() || currentTaskCount == config.getTaskCountMin()) {
emitter.emit(getMetricBuilder()
.setDimension(
SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION,
"Already at min task count"
)
.setMetric(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC, currentTaskCount));
}
final int currentTaskCount = supervisor.getIoConfig().getTaskCount();

if (config.isScaleDownOnTaskRolloverOnly() && optimalTaskCount < currentTaskCount) {
// In rollover-only scale-down mode, the scaler's preferred count is the current count — it
// explicitly does not want a proactive scale-down here.
return currentTaskCount;
}
return taskCount;

log.info(
"CostBasedAutoScaler for supervisor[%s] wants taskCount[%d] (current[%d]).",
supervisorId,
optimalTaskCount,
currentTaskCount
);
return optimalTaskCount;
}

public CostBasedAutoScalerConfig getConfig()
Expand Down
Loading
Loading