coordinator,scheduler: add drain aware scheduling#4761
coordinator,scheduler: add drain aware scheduling#4761hongyunyan wants to merge 6 commits intomasterfrom
Conversation
📝 WalkthroughWalkthroughA drain-aware scheduler implementation is introduced alongside modifications to existing schedulers. A new Changes
Sequence Diagram(s)sequenceDiagram
actor Coordinator
participant DrainCtrl as drain.Controller
participant DrainSched as drainScheduler
participant OpCtrl as OperatorController
participant CfDB as ChangefeedDB
participant NodeMgr as NodeManager
Coordinator->>DrainCtrl: Observe node entering DRAINING state
Coordinator->>DrainSched: Execute()
DrainSched->>DrainCtrl: GetDrainingOrStoppingNodes()
DrainCtrl-->>DrainSched: [node1, node2]
DrainSched->>NodeMgr: GetAliveNodeIDs()
NodeMgr-->>DrainSched: [node1, node2, node3]
DrainSched->>DrainCtrl: Filter schedulable destinations
DrainCtrl-->>DrainSched: [node2, node3]
loop For each draining origin node
DrainSched->>CfDB: GetMaintainersOnNode(origin)
CfDB-->>DrainSched: [feed1, feed2, ...]
loop For each maintainer
DrainSched->>OpCtrl: CountMoveMaintainerOperatorsFromNodes([origin])
OpCtrl-->>DrainSched: in-flight count
alt No in-flight operator
DrainSched->>DrainSched: chooseLeastLoadedDest()
DrainSched->>OpCtrl: AddOperator(move from origin to dest)
OpCtrl-->>DrainSched: operator created
end
end
end
DrainSched-->>Coordinator: Next execution time
sequenceDiagram
actor Coordinator
participant BalSched as balanceScheduler
participant DrainCtrl as drain.Controller
participant OpCtrl as OperatorController
Coordinator->>BalSched: Execute()
BalSched->>DrainCtrl: GetDrainingOrStoppingNodes()
DrainCtrl-->>BalSched: node list
alt Any node draining?
BalSched->>BalSched: Set drainBalanceBlockedUntil = now + drainCooldown()
BalSched-->>Coordinator: Return delayed next run (skip balance)
else All nodes healthy?
alt Within cooldown window?
BalSched-->>Coordinator: Return delayed next run (skip balance)
else Cooldown expired?
BalSched->>DrainCtrl: Filter schedulable nodes
DrainCtrl-->>BalSched: filtered alive nodes
BalSched->>OpCtrl: CheckBalanceStatus(filtered nodes)
OpCtrl-->>BalSched: balance needed?
alt Balance needed
BalSched->>OpCtrl: Balance(filtered nodes)
OpCtrl-->>BalSched: operator created
end
BalSched-->>Coordinator: Schedule next balance check
end
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request introduces a new drainScheduler and updates the balance and basic schedulers to handle node draining and stopping states. The changes ensure that maintainers are proactively moved away from draining nodes and that no new work is assigned to them. Additionally, a cooldown mechanism is added to the balance scheduler to prevent immediate rebalancing churn after a node finishes draining. The review feedback highlights opportunities to improve the efficiency of the drain scheduler's execution loop by reducing redundant database lookups and complexity, and suggests making the hardcoded 120-second cooldown period configurable.
| drainBalanceBlockedUntil time.Time | ||
| } | ||
|
|
||
| const drainBalanceCooldown = 120 * time.Second |
| destCandidates []node.ID, | ||
| nodeTaskSize map[node.ID]int, | ||
| ) bool { | ||
| maintainers := s.changefeedDB.GetByNodeID(origin) |
There was a problem hiding this comment.
Calling GetByNodeID inside the scheduling loop is inefficient, especially when batchSize is large. This results in repeated database/map lookups and slice iterations for the same node. It is better to pre-fetch the maintainers for all draining nodes once at the beginning of the Execute method.
| maintainers := s.changefeedDB.GetByNodeID(origin) | |
| maintainers := maintainersMap[origin] |
| for _, cf := range maintainers { | ||
| if s.operatorController.HasOperator(cf.ID.DisplayName) { | ||
| continue | ||
| } | ||
| dest, ok := chooseLeastLoadedDest(origin, destCandidates, nodeTaskSize) | ||
| if !ok { | ||
| return false | ||
| } | ||
| if s.operatorController.AddOperator(operator.NewMoveMaintainerOperator(s.changefeedDB, cf, origin, dest)) { | ||
| nodeTaskSize[dest]++ | ||
| return true | ||
| } | ||
| } |
There was a problem hiding this comment.
The current implementation of scheduleOneFromNode iterates through the entire list of maintainers every time it is called, even if it only schedules one move. This leads to
3dc592f to
3e139a5
Compare
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (3)
coordinator/scheduler/balance.go (1)
115-120: Minor: the1sfallback is reachable only from tests.Production wires a non-zero
balanceInterval, so the fallback is effectively test-only. Consider making the fallback explicit (e.g. a nameddefaultDrainCooldownconstant) or validatingcheckBalanceInterval > 0in the constructor, so the behavior is obvious at a glance.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@coordinator/scheduler/balance.go` around lines 115 - 120, The fallback to 1s in balanceScheduler.drainCooldown() is only hit in tests; make the behavior explicit by either introducing a named constant (e.g. defaultDrainCooldown) and using that in drainCooldown(), or by validating/setting s.checkBalanceInterval > 0 in the balanceScheduler constructor (or factory that creates balanceScheduler) so the zero-case can't occur at runtime; update references to checkBalanceInterval and drainCooldown accordingly to use the constant or to rely on the constructor guarantee.coordinator/scheduler/balance_test.go (1)
34-181: LGTM, deterministic and targeted.Tests correctly manipulate
drainBalanceBlockedUntilto make the cooldown window deterministic and userequire.WithinDurationwith a generous tolerance for the interval-derivation case. A couple of optional tightenings you may consider:
setupCoordinatorSchedulerTestServicesmutates the process-wideappcontextservice registry with no cleanup; fine since these tests run serially, but at.Cleanupthat resets the registry would make this package safer against future test additions that callt.Parallel().TestBalanceSchedulerSkipsUntilAllDrainCompleteAndCooldownExpiresrelies on theinterval == 0fallback to1sindrainCooldown(). Passing an explicit small interval instead would decouple the test from that fallback branch.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@coordinator/scheduler/balance_test.go` around lines 34 - 181, Tests mutate the global appcontext registry and rely on NewBalanceScheduler(..., interval=0) fallback; update setupCoordinatorSchedulerTestServices to register a t.Cleanup that resets the process-wide appcontext service registry so tests are safe for t.Parallel, and in TestBalanceSchedulerSkipsUntilAllDrainCompleteAndCooldownExpires call NewBalanceScheduler with an explicit small interval (e.g., 10*time.Millisecond) instead of 0 and continue using s.drainBalanceBlockedUntil to drive determinism; touch setupCoordinatorSchedulerTestServices, TestBalanceSchedulerSkipsUntilAllDrainCompleteAndCooldownExpires, NewBalanceScheduler invocation, and the drainBalanceBlockedUntil field to implement these changes.coordinator/scheduler/drain.go (1)
83-83: Sort destination candidates before least-loaded tie-breaking.
chooseLeastLoadedDestpicks the first minimum, so equal-load destinations inherit the order fromGetAliveNodeIDs(). If that order comes from a map, scheduling becomes nondeterministic for the common “all destinations have equal load” case.♻️ Proposed deterministic tie-break
destCandidates := filterSchedulableNodeIDs(s.nodeManager.GetAliveNodeIDs(), s.liveness) + slices.Sort(destCandidates) if len(destCandidates) == 0 {Also applies to: 172-180
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@coordinator/scheduler/drain.go` at line 83, The destination candidate list coming from filterSchedulableNodeIDs/GetAliveNodeIDs can be nondeterministic; before calling chooseLeastLoadedDest you must sort destCandidates into a deterministic order (e.g., lexicographic or numeric ID order) so chooseLeastLoadedDest's "first minimum" tie-break is stable. Update the code that builds destCandidates (the destCandidates := filterSchedulableNodeIDs(...) call used before chooseLeastLoadedDest) to sort the slice in-place, and apply the same sort to the other occurrence in this file that constructs destCandidates for tie-breaking.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@coordinator/scheduler/drain_liveness.go`:
- Around line 23-34: filterSchedulableNodeIDs currently reuses the caller's
backing array via filtered := nodeIDs[:0]; change it to allocate a new slice
(e.g., make([]node.ID, 0, len(nodeIDs))) so we don't mutate the caller's
underlying array, keep the nil liveness early-return, iterate over nodeIDs and
append nodeIDs that satisfy liveness.IsSchedulableDest(nodeID) into the newly
allocated filtered slice, and return that new slice.
In `@coordinator/scheduler/drain_test.go`:
- Around line 32-36: Each listed test sets global services via
appcontext.SetService (e.g., using messaging.NewMockMessageCenter ->
appcontext.MessageCenter and watcher.NewNodeManager -> watcher.NodeManagerName)
but never restores them; add t.Cleanup callbacks in
TestDrainSchedulerCreatesMoveOperators,
TestDrainSchedulerSkipsChangefeedWithInflightOperator,
TestDrainSchedulerIgnoresUnrelatedOperatorCapacity, and
TestDrainSchedulerRotatesAcrossDrainingNodes to restore the previous service
value (capture old := appcontext.GetService(key) or nil) or clear the service
after the test (call appcontext.SetService(key, old) or
appcontext.SetService(key, nil)) so global state is reset regardless of test
outcome.
---
Nitpick comments:
In `@coordinator/scheduler/balance_test.go`:
- Around line 34-181: Tests mutate the global appcontext registry and rely on
NewBalanceScheduler(..., interval=0) fallback; update
setupCoordinatorSchedulerTestServices to register a t.Cleanup that resets the
process-wide appcontext service registry so tests are safe for t.Parallel, and
in TestBalanceSchedulerSkipsUntilAllDrainCompleteAndCooldownExpires call
NewBalanceScheduler with an explicit small interval (e.g., 10*time.Millisecond)
instead of 0 and continue using s.drainBalanceBlockedUntil to drive determinism;
touch setupCoordinatorSchedulerTestServices,
TestBalanceSchedulerSkipsUntilAllDrainCompleteAndCooldownExpires,
NewBalanceScheduler invocation, and the drainBalanceBlockedUntil field to
implement these changes.
In `@coordinator/scheduler/balance.go`:
- Around line 115-120: The fallback to 1s in balanceScheduler.drainCooldown() is
only hit in tests; make the behavior explicit by either introducing a named
constant (e.g. defaultDrainCooldown) and using that in drainCooldown(), or by
validating/setting s.checkBalanceInterval > 0 in the balanceScheduler
constructor (or factory that creates balanceScheduler) so the zero-case can't
occur at runtime; update references to checkBalanceInterval and drainCooldown
accordingly to use the constant or to rely on the constructor guarantee.
In `@coordinator/scheduler/drain.go`:
- Line 83: The destination candidate list coming from
filterSchedulableNodeIDs/GetAliveNodeIDs can be nondeterministic; before calling
chooseLeastLoadedDest you must sort destCandidates into a deterministic order
(e.g., lexicographic or numeric ID order) so chooseLeastLoadedDest's "first
minimum" tie-break is stable. Update the code that builds destCandidates (the
destCandidates := filterSchedulableNodeIDs(...) call used before
chooseLeastLoadedDest) to sort the slice in-place, and apply the same sort to
the other occurrence in this file that constructs destCandidates for
tie-breaking.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: cb2f9297-39af-4fae-aec1-01f035b13a14
📒 Files selected for processing (10)
coordinator/controller.gocoordinator/operator/operator_controller.gocoordinator/operator/operator_controller_test.gocoordinator/operator/operator_move.gocoordinator/scheduler/balance.gocoordinator/scheduler/balance_test.gocoordinator/scheduler/basic.gocoordinator/scheduler/drain.gocoordinator/scheduler/drain_liveness.gocoordinator/scheduler/drain_test.go
| func filterSchedulableNodeIDs(nodeIDs []node.ID, liveness *drain.Controller) []node.ID { | ||
| if liveness == nil { | ||
| return nodeIDs | ||
| } | ||
| filtered := nodeIDs[:0] | ||
| for _, nodeID := range nodeIDs { | ||
| if liveness.IsSchedulableDest(nodeID) { | ||
| filtered = append(filtered, nodeID) | ||
| } | ||
| } | ||
| return filtered | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
ast-grep --pattern $'func ($_ *NodeManager) GetAliveNodeIDs() $_ {
$$$
}'
# Also check all call sites of filterSchedulableNodeIDs in case any caller relies on the original slice post-call.
rg -nP -C2 '\bfilterSchedulableNodeIDs\s*\('Repository: pingcap/ticdc
Length of output: 1746
Allocate a new slice instead of aliasing the caller's backing array.
The current implementation uses filtered := nodeIDs[:0] to mutate the caller's slice backing array in-place. While GetAliveNodeIDs() currently returns a fresh copy, this pattern is a forward-compatibility hazard: if it is ever refactored to return an internal reference, the subsequent append would silently corrupt the NodeManager's node list. Additionally, filterSchedulableAliveNodes (below this function) correctly allocates a new map, so this helper should be symmetric and allocate a new slice for consistency and defensive clarity.
Proposed fix
func filterSchedulableNodeIDs(nodeIDs []node.ID, liveness *drain.Controller) []node.ID {
if liveness == nil {
return nodeIDs
}
- filtered := nodeIDs[:0]
+ filtered := make([]node.ID, 0, len(nodeIDs))
for _, nodeID := range nodeIDs {
if liveness.IsSchedulableDest(nodeID) {
filtered = append(filtered, nodeID)
}
}
return filtered
}🧰 Tools
🪛 golangci-lint (2.11.4)
[error] 30-30: : # github.com/pingcap/ticdc/cmd/kafka-consumer [github.com/pingcap/ticdc/cmd/kafka-consumer.test]
cmd/kafka-consumer/consumer.go:30:22: undefined: kafka.ConfigMap
cmd/kafka-consumer/consumer.go:39:22: undefined: kafka.NewAdminClient
cmd/kafka-consumer/consumer.go:49:41: undefined: kafka.ErrTransport
cmd/kafka-consumer/consumer.go:70:16: undefined: kafka.Consumer
cmd/kafka-consumer/consumer.go:76:22: undefined: kafka.ConfigMap
cmd/kafka-consumer/writer.go:45:24: undefined: kafka.Offset
cmd/kafka-consumer/writer.go:59:79: undefined: kafka.Offset
cmd/kafka-consumer/writer.go:364:67: undefined: kafka.Message
cmd/kafka-consumer/writer.go:563:90: undefined: kafka.Offset
cmd/kafka-consumer/writer.go:590:103: undefined: kafka.Offset
cmd/kafka-consumer/consumer.go:76:22: too many errors
(typecheck)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@coordinator/scheduler/drain_liveness.go` around lines 23 - 34,
filterSchedulableNodeIDs currently reuses the caller's backing array via
filtered := nodeIDs[:0]; change it to allocate a new slice (e.g.,
make([]node.ID, 0, len(nodeIDs))) so we don't mutate the caller's underlying
array, keep the nil liveness early-return, iterate over nodeIDs and append
nodeIDs that satisfy liveness.IsSchedulableDest(nodeID) into the newly allocated
filtered slice, and return that new slice.
| mc := messaging.NewMockMessageCenter() | ||
| appcontext.SetService(appcontext.MessageCenter, mc) | ||
|
|
||
| nodeManager := watcher.NewNodeManager(nil, nil) | ||
| appcontext.SetService(watcher.NodeManagerName, nodeManager) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Description: Inspect scheduler tests for appcontext service mutation and parallel tests.
rg -n -C3 'appcontext\.SetService|t\.Parallel\(' --type=goRepository: pingcap/ticdc
Length of output: 50370
🏁 Script executed:
# Read the drain_test.go file to verify the SetService calls and cleanup patterns
cat -n coordinator/scheduler/drain_test.go | head -200Repository: pingcap/ticdc
Length of output: 8670
🏁 Script executed:
# Check for t.Parallel() and t.Cleanup() patterns in drain_test.go
grep -n "t\.Parallel\|t\.Cleanup\|appcontext\.SetService" coordinator/scheduler/drain_test.goRepository: pingcap/ticdc
Length of output: 540
🏁 Script executed:
# Check if other tests in the scheduler package use t.Parallel()
fd -e _test.go coordinator/scheduler/ -x grep -l "t\.Parallel" {}Repository: pingcap/ticdc
Length of output: 450
🏁 Script executed:
# Check if other tests in the scheduler package use t.Parallel()
fd -e _test.go . coordinator/scheduler/ -x grep -l "t\.Parallel" {}Repository: pingcap/ticdc
Length of output: 39
🏁 Script executed:
# Search the entire codebase for appcontext service restoration patterns
rg -A5 "appcontext\.SetService" --type=go -g "*_test.go" | grep -A5 "t\.Cleanup\|defer.*appcontext" | head -50Repository: pingcap/ticdc
Length of output: 721
Add cleanup for appcontext services in these tests.
These tests mutate global services via appcontext.SetService() without restoring previous state. While the file currently has no t.Parallel() tests, this pattern still makes tests order-dependent under go test -shuffle. Add t.Cleanup() to each test to restore or clear services after the test completes.
Affected test functions
TestDrainSchedulerCreatesMoveOperators(lines 32–36)TestDrainSchedulerSkipsChangefeedWithInflightOperator(lines 88–92)TestDrainSchedulerIgnoresUnrelatedOperatorCapacity(lines 139–143)TestDrainSchedulerRotatesAcrossDrainingNodes(lines 175–179)
Per the coding guidelines, favor deterministic tests that do not leave side effects across the test suite.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@coordinator/scheduler/drain_test.go` around lines 32 - 36, Each listed test
sets global services via appcontext.SetService (e.g., using
messaging.NewMockMessageCenter -> appcontext.MessageCenter and
watcher.NewNodeManager -> watcher.NodeManagerName) but never restores them; add
t.Cleanup callbacks in TestDrainSchedulerCreatesMoveOperators,
TestDrainSchedulerSkipsChangefeedWithInflightOperator,
TestDrainSchedulerIgnoresUnrelatedOperatorCapacity, and
TestDrainSchedulerRotatesAcrossDrainingNodes to restore the previous service
value (capture old := appcontext.GetService(key) or nil) or clear the service
after the test (call appcontext.SetService(key, old) or
appcontext.SetService(key, nil)) so global state is reset regardless of test
outcome.
What problem does this PR solve?
The dispatcher-drain work split out of #4190 still mixes coordinator scheduling and public API orchestration in one review path. This PR extracts the coordinator-side drain-aware scheduling layer so reviewers can focus on how maintainers are moved away from draining nodes after the maintainer runtime from #4760 is in place.
Issue Number: ref #3413
What is changed and how it works?
Background:
Motivation:
Summary:
How it works:
Check List
Tests
go test ./coordinator/schedulergo test -run '^$' ./coordinatorQuestions
Will it cause performance regression or break compatibility?
This PR does not add a public API. It only changes coordinator scheduling behavior so work is not assigned back to nodes that are being drained or stopped.
Do you need to update user documentation, design documentation or monitoring documentation?
No additional user-facing documentation is needed for this split. It is an internal decomposition of the drain-capture implementation.
Release note
Summary by CodeRabbit
New Features
Improvements