Skip to content

coordinator,scheduler: add drain aware scheduling#4761

Open
hongyunyan wants to merge 6 commits intomasterfrom
split/pr-4190-2c-coordinator-drain-runtime
Open

coordinator,scheduler: add drain aware scheduling#4761
hongyunyan wants to merge 6 commits intomasterfrom
split/pr-4190-2c-coordinator-drain-runtime

Conversation

@hongyunyan
Copy link
Copy Markdown
Collaborator

@hongyunyan hongyunyan commented Apr 7, 2026

What problem does this PR solve?

The dispatcher-drain work split out of #4190 still mixes coordinator scheduling and public API orchestration in one review path. This PR extracts the coordinator-side drain-aware scheduling layer so reviewers can focus on how maintainers are moved away from draining nodes after the maintainer runtime from #4760 is in place.

Issue Number: ref #3413

What is changed and how it works?

Background:

Motivation:

  • Separate coordinator-level maintainer placement rules from the maintainer-local dispatcher drain runtime.
  • Keep drain-aware balancing and drain-triggered maintainer moves in one review unit.
  • Let the final API/orchestration PR focus only on endpoint semantics and session control.

Summary:

  • add a dedicated coordinator drain scheduler
  • make coordinator basic scheduling skip draining and stopping nodes as placement targets
  • make coordinator balance scheduling exclude non-schedulable nodes and add a cooldown after drain completion
  • add focused coordinator scheduler tests for drain move generation and rebalance blocking

How it works:

  • coordinator liveness state identifies which nodes are still schedulable destinations
  • the drain scheduler proactively moves maintainers away from draining nodes
  • regular balance scheduling pauses while drain is active and resumes only after a cooldown window

Check List

Tests

  • Unit test
    • go test ./coordinator/scheduler
    • go test -run '^$' ./coordinator

Questions

Will it cause performance regression or break compatibility?

This PR does not add a public API. It only changes coordinator scheduling behavior so work is not assigned back to nodes that are being drained or stopped.

Do you need to update user documentation, design documentation or monitoring documentation?

No additional user-facing documentation is needed for this split. It is an internal decomposition of the drain-capture implementation.

Release note

None

Summary by CodeRabbit

New Features

  • Added drain scheduler to automatically evacuate maintainers from nodes undergoing maintenance, with configurable batch sizing for controlled evacuation
  • Schedulers now coordinate around draining nodes to prevent scheduling conflicts and ensure stable operations during node evacuation

Improvements

  • Balance scheduler now pauses execution during node drain periods and resumes with a cooldown window to allow system stabilization
  • Enhanced operator tracking to accurately count move operations from specific nodes

@ti-chi-bot ti-chi-bot Bot added the release-note-none Denotes a PR that doesn't merit a release note. label Apr 7, 2026
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Apr 7, 2026

📝 Walkthrough

Walkthrough

A drain-aware scheduler implementation is introduced alongside modifications to existing schedulers. A new drainScheduler evacuates maintainers from draining nodes, while basicScheduler and balanceScheduler are updated to respect drain state via a *drain.Controller. Helper functions filter schedulable nodes, and MoveMaintainerOperator exposes its origin node for operator tracking.

Changes

Cohort / File(s) Summary
Drain Scheduler Core
coordinator/scheduler/drain.go, coordinator/scheduler/drain_test.go
New drain scheduler implementation that evacuates maintainers from draining/stopping nodes with round-robin origin node rotation and least-loaded destination selection. Includes four test cases validating operator creation, in-flight operator skipping, unrelated operator isolation, and multi-node draining.
Drain Liveness Helpers
coordinator/scheduler/drain_liveness.go
Three filter functions for drain-aware scheduling: filterSchedulableNodeIDs, hasDrainingOrStoppingNode, and filterSchedulableAliveNodes to exclude unschedulable nodes during drain operations.
Existing Scheduler Integration
coordinator/scheduler/basic.go, coordinator/scheduler/balance.go, coordinator/scheduler/balance_test.go
Basic scheduler now filters candidate nodes via drain controller; balance scheduler accepts drain controller, gates execution when nodes are draining, enforces post-drain cooldown, and filters alive nodes. Comprehensive test suite validates drain blocking, cooldown behavior, and deferral logic.
Operator Extensions
coordinator/operator/operator_move.go, coordinator/operator/operator_controller.go, coordinator/operator/operator_controller_test.go
Added OriginNode() method to MoveMaintainerOperator for origin tracking. Added CountMoveMaintainerOperatorsFromNodes() to controller for querying in-flight operators from specific nodes. Includes unit test verifying count behavior.
Controller Wiring
coordinator/controller.go
Updated NewController to instantiate drainController and inject it into basic, balance, and new drain scheduler constructors; added drain scheduler to controller map.

Sequence Diagram(s)

sequenceDiagram
    actor Coordinator
    participant DrainCtrl as drain.Controller
    participant DrainSched as drainScheduler
    participant OpCtrl as OperatorController
    participant CfDB as ChangefeedDB
    participant NodeMgr as NodeManager

    Coordinator->>DrainCtrl: Observe node entering DRAINING state
    Coordinator->>DrainSched: Execute()
    
    DrainSched->>DrainCtrl: GetDrainingOrStoppingNodes()
    DrainCtrl-->>DrainSched: [node1, node2]
    
    DrainSched->>NodeMgr: GetAliveNodeIDs()
    NodeMgr-->>DrainSched: [node1, node2, node3]
    
    DrainSched->>DrainCtrl: Filter schedulable destinations
    DrainCtrl-->>DrainSched: [node2, node3]
    
    loop For each draining origin node
        DrainSched->>CfDB: GetMaintainersOnNode(origin)
        CfDB-->>DrainSched: [feed1, feed2, ...]
        
        loop For each maintainer
            DrainSched->>OpCtrl: CountMoveMaintainerOperatorsFromNodes([origin])
            OpCtrl-->>DrainSched: in-flight count
            
            alt No in-flight operator
                DrainSched->>DrainSched: chooseLeastLoadedDest()
                DrainSched->>OpCtrl: AddOperator(move from origin to dest)
                OpCtrl-->>DrainSched: operator created
            end
        end
    end
    
    DrainSched-->>Coordinator: Next execution time
Loading
sequenceDiagram
    actor Coordinator
    participant BalSched as balanceScheduler
    participant DrainCtrl as drain.Controller
    participant OpCtrl as OperatorController

    Coordinator->>BalSched: Execute()
    
    BalSched->>DrainCtrl: GetDrainingOrStoppingNodes()
    DrainCtrl-->>BalSched: node list
    
    alt Any node draining?
        BalSched->>BalSched: Set drainBalanceBlockedUntil = now + drainCooldown()
        BalSched-->>Coordinator: Return delayed next run (skip balance)
    else All nodes healthy?
        alt Within cooldown window?
            BalSched-->>Coordinator: Return delayed next run (skip balance)
        else Cooldown expired?
            BalSched->>DrainCtrl: Filter schedulable nodes
            DrainCtrl-->>BalSched: filtered alive nodes
            
            BalSched->>OpCtrl: CheckBalanceStatus(filtered nodes)
            OpCtrl-->>BalSched: balance needed?
            
            alt Balance needed
                BalSched->>OpCtrl: Balance(filtered nodes)
                OpCtrl-->>BalSched: operator created
            end
            
            BalSched-->>Coordinator: Schedule next balance check
        end
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

Suggested labels

lgtm, approved

Suggested reviewers

  • wk989898
  • 3AceShowHand
  • flowbehappy

Poem

🐰 Hop hop, the drains are flowing clear,
Maintainers migrate without fear,
Round-robin nodes, light-load paths,
Cooldown whispers soothe the wrath!
🌊✨

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 23.81% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title 'coordinator,scheduler: add drain aware scheduling' clearly and specifically describes the main change in the PR - adding drain-aware scheduling logic to the coordinator and scheduler components.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.
Description check ✅ Passed The PR description follows the repository template with all required sections: problem statement with linked issue, detailed technical explanation of changes, test coverage specifications, and answers to the questions checklist.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch split/pr-4190-2c-coordinator-drain-runtime

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@ti-chi-bot ti-chi-bot Bot added the size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files. label Apr 7, 2026
Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a new drainScheduler and updates the balance and basic schedulers to handle node draining and stopping states. The changes ensure that maintainers are proactively moved away from draining nodes and that no new work is assigned to them. Additionally, a cooldown mechanism is added to the balance scheduler to prevent immediate rebalancing churn after a node finishes draining. The review feedback highlights opportunities to improve the efficiency of the drain scheduler's execution loop by reducing redundant database lookups and complexity, and suggests making the hardcoded 120-second cooldown period configurable.

Comment thread coordinator/scheduler/balance.go Outdated
drainBalanceBlockedUntil time.Time
}

const drainBalanceCooldown = 120 * time.Second
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The cooldown period for rebalancing after a node drain is hardcoded to 120 seconds. This might be too long for some environments, leading to prolonged imbalance. Consider making this configurable or providing a justification for this specific duration.

Comment thread coordinator/scheduler/drain.go Outdated
destCandidates []node.ID,
nodeTaskSize map[node.ID]int,
) bool {
maintainers := s.changefeedDB.GetByNodeID(origin)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Calling GetByNodeID inside the scheduling loop is inefficient, especially when batchSize is large. This results in repeated database/map lookups and slice iterations for the same node. It is better to pre-fetch the maintainers for all draining nodes once at the beginning of the Execute method.

Suggested change
maintainers := s.changefeedDB.GetByNodeID(origin)
maintainers := maintainersMap[origin]

Comment thread coordinator/scheduler/drain.go Outdated
Comment on lines +134 to +146
for _, cf := range maintainers {
if s.operatorController.HasOperator(cf.ID.DisplayName) {
continue
}
dest, ok := chooseLeastLoadedDest(origin, destCandidates, nodeTaskSize)
if !ok {
return false
}
if s.operatorController.AddOperator(operator.NewMoveMaintainerOperator(s.changefeedDB, cf, origin, dest)) {
nodeTaskSize[dest]++
return true
}
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The current implementation of scheduleOneFromNode iterates through the entire list of maintainers every time it is called, even if it only schedules one move. This leads to $O(N \cdot \text{batchSize})$ complexity. Consider refactoring the logic to process multiple maintainers from a node in a single pass or maintaining a cursor to skip already processed items.

Base automatically changed from split/pr-4190-2b-maintainer-drain-runtime to master April 21, 2026 09:12
@hongyunyan hongyunyan force-pushed the split/pr-4190-2c-coordinator-drain-runtime branch from 3dc592f to 3e139a5 Compare April 21, 2026 09:42
@ti-chi-bot
Copy link
Copy Markdown

ti-chi-bot Bot commented Apr 21, 2026

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:
Once this PR has been reviewed and has the lgtm label, please assign wk989898 for approval. For more information see the Code Review Process.
Please ensure that each of them provides their approval before proceeding.

The full list of commands accepted by this bot can be found here.

Details Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (3)
coordinator/scheduler/balance.go (1)

115-120: Minor: the 1s fallback is reachable only from tests.

Production wires a non-zero balanceInterval, so the fallback is effectively test-only. Consider making the fallback explicit (e.g. a named defaultDrainCooldown constant) or validating checkBalanceInterval > 0 in the constructor, so the behavior is obvious at a glance.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@coordinator/scheduler/balance.go` around lines 115 - 120, The fallback to 1s
in balanceScheduler.drainCooldown() is only hit in tests; make the behavior
explicit by either introducing a named constant (e.g. defaultDrainCooldown) and
using that in drainCooldown(), or by validating/setting s.checkBalanceInterval >
0 in the balanceScheduler constructor (or factory that creates balanceScheduler)
so the zero-case can't occur at runtime; update references to
checkBalanceInterval and drainCooldown accordingly to use the constant or to
rely on the constructor guarantee.
coordinator/scheduler/balance_test.go (1)

34-181: LGTM, deterministic and targeted.

Tests correctly manipulate drainBalanceBlockedUntil to make the cooldown window deterministic and use require.WithinDuration with a generous tolerance for the interval-derivation case. A couple of optional tightenings you may consider:

  • setupCoordinatorSchedulerTestServices mutates the process-wide appcontext service registry with no cleanup; fine since these tests run serially, but a t.Cleanup that resets the registry would make this package safer against future test additions that call t.Parallel().
  • TestBalanceSchedulerSkipsUntilAllDrainCompleteAndCooldownExpires relies on the interval == 0 fallback to 1s in drainCooldown(). Passing an explicit small interval instead would decouple the test from that fallback branch.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@coordinator/scheduler/balance_test.go` around lines 34 - 181, Tests mutate
the global appcontext registry and rely on NewBalanceScheduler(..., interval=0)
fallback; update setupCoordinatorSchedulerTestServices to register a t.Cleanup
that resets the process-wide appcontext service registry so tests are safe for
t.Parallel, and in
TestBalanceSchedulerSkipsUntilAllDrainCompleteAndCooldownExpires call
NewBalanceScheduler with an explicit small interval (e.g., 10*time.Millisecond)
instead of 0 and continue using s.drainBalanceBlockedUntil to drive determinism;
touch setupCoordinatorSchedulerTestServices,
TestBalanceSchedulerSkipsUntilAllDrainCompleteAndCooldownExpires,
NewBalanceScheduler invocation, and the drainBalanceBlockedUntil field to
implement these changes.
coordinator/scheduler/drain.go (1)

83-83: Sort destination candidates before least-loaded tie-breaking.

chooseLeastLoadedDest picks the first minimum, so equal-load destinations inherit the order from GetAliveNodeIDs(). If that order comes from a map, scheduling becomes nondeterministic for the common “all destinations have equal load” case.

♻️ Proposed deterministic tie-break
 	destCandidates := filterSchedulableNodeIDs(s.nodeManager.GetAliveNodeIDs(), s.liveness)
+	slices.Sort(destCandidates)
 
 	if len(destCandidates) == 0 {

Also applies to: 172-180

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@coordinator/scheduler/drain.go` at line 83, The destination candidate list
coming from filterSchedulableNodeIDs/GetAliveNodeIDs can be nondeterministic;
before calling chooseLeastLoadedDest you must sort destCandidates into a
deterministic order (e.g., lexicographic or numeric ID order) so
chooseLeastLoadedDest's "first minimum" tie-break is stable. Update the code
that builds destCandidates (the destCandidates := filterSchedulableNodeIDs(...)
call used before chooseLeastLoadedDest) to sort the slice in-place, and apply
the same sort to the other occurrence in this file that constructs
destCandidates for tie-breaking.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@coordinator/scheduler/drain_liveness.go`:
- Around line 23-34: filterSchedulableNodeIDs currently reuses the caller's
backing array via filtered := nodeIDs[:0]; change it to allocate a new slice
(e.g., make([]node.ID, 0, len(nodeIDs))) so we don't mutate the caller's
underlying array, keep the nil liveness early-return, iterate over nodeIDs and
append nodeIDs that satisfy liveness.IsSchedulableDest(nodeID) into the newly
allocated filtered slice, and return that new slice.

In `@coordinator/scheduler/drain_test.go`:
- Around line 32-36: Each listed test sets global services via
appcontext.SetService (e.g., using messaging.NewMockMessageCenter ->
appcontext.MessageCenter and watcher.NewNodeManager -> watcher.NodeManagerName)
but never restores them; add t.Cleanup callbacks in
TestDrainSchedulerCreatesMoveOperators,
TestDrainSchedulerSkipsChangefeedWithInflightOperator,
TestDrainSchedulerIgnoresUnrelatedOperatorCapacity, and
TestDrainSchedulerRotatesAcrossDrainingNodes to restore the previous service
value (capture old := appcontext.GetService(key) or nil) or clear the service
after the test (call appcontext.SetService(key, old) or
appcontext.SetService(key, nil)) so global state is reset regardless of test
outcome.

---

Nitpick comments:
In `@coordinator/scheduler/balance_test.go`:
- Around line 34-181: Tests mutate the global appcontext registry and rely on
NewBalanceScheduler(..., interval=0) fallback; update
setupCoordinatorSchedulerTestServices to register a t.Cleanup that resets the
process-wide appcontext service registry so tests are safe for t.Parallel, and
in TestBalanceSchedulerSkipsUntilAllDrainCompleteAndCooldownExpires call
NewBalanceScheduler with an explicit small interval (e.g., 10*time.Millisecond)
instead of 0 and continue using s.drainBalanceBlockedUntil to drive determinism;
touch setupCoordinatorSchedulerTestServices,
TestBalanceSchedulerSkipsUntilAllDrainCompleteAndCooldownExpires,
NewBalanceScheduler invocation, and the drainBalanceBlockedUntil field to
implement these changes.

In `@coordinator/scheduler/balance.go`:
- Around line 115-120: The fallback to 1s in balanceScheduler.drainCooldown() is
only hit in tests; make the behavior explicit by either introducing a named
constant (e.g. defaultDrainCooldown) and using that in drainCooldown(), or by
validating/setting s.checkBalanceInterval > 0 in the balanceScheduler
constructor (or factory that creates balanceScheduler) so the zero-case can't
occur at runtime; update references to checkBalanceInterval and drainCooldown
accordingly to use the constant or to rely on the constructor guarantee.

In `@coordinator/scheduler/drain.go`:
- Line 83: The destination candidate list coming from
filterSchedulableNodeIDs/GetAliveNodeIDs can be nondeterministic; before calling
chooseLeastLoadedDest you must sort destCandidates into a deterministic order
(e.g., lexicographic or numeric ID order) so chooseLeastLoadedDest's "first
minimum" tie-break is stable. Update the code that builds destCandidates (the
destCandidates := filterSchedulableNodeIDs(...) call used before
chooseLeastLoadedDest) to sort the slice in-place, and apply the same sort to
the other occurrence in this file that constructs destCandidates for
tie-breaking.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: cb2f9297-39af-4fae-aec1-01f035b13a14

📥 Commits

Reviewing files that changed from the base of the PR and between f195970 and 3e139a5.

📒 Files selected for processing (10)
  • coordinator/controller.go
  • coordinator/operator/operator_controller.go
  • coordinator/operator/operator_controller_test.go
  • coordinator/operator/operator_move.go
  • coordinator/scheduler/balance.go
  • coordinator/scheduler/balance_test.go
  • coordinator/scheduler/basic.go
  • coordinator/scheduler/drain.go
  • coordinator/scheduler/drain_liveness.go
  • coordinator/scheduler/drain_test.go

Comment on lines +23 to +34
func filterSchedulableNodeIDs(nodeIDs []node.ID, liveness *drain.Controller) []node.ID {
if liveness == nil {
return nodeIDs
}
filtered := nodeIDs[:0]
for _, nodeID := range nodeIDs {
if liveness.IsSchedulableDest(nodeID) {
filtered = append(filtered, nodeID)
}
}
return filtered
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
ast-grep --pattern $'func ($_ *NodeManager) GetAliveNodeIDs() $_ {
  $$$
}'
# Also check all call sites of filterSchedulableNodeIDs in case any caller relies on the original slice post-call.
rg -nP -C2 '\bfilterSchedulableNodeIDs\s*\('

Repository: pingcap/ticdc

Length of output: 1746


Allocate a new slice instead of aliasing the caller's backing array.

The current implementation uses filtered := nodeIDs[:0] to mutate the caller's slice backing array in-place. While GetAliveNodeIDs() currently returns a fresh copy, this pattern is a forward-compatibility hazard: if it is ever refactored to return an internal reference, the subsequent append would silently corrupt the NodeManager's node list. Additionally, filterSchedulableAliveNodes (below this function) correctly allocates a new map, so this helper should be symmetric and allocate a new slice for consistency and defensive clarity.

Proposed fix
 func filterSchedulableNodeIDs(nodeIDs []node.ID, liveness *drain.Controller) []node.ID {
 	if liveness == nil {
 		return nodeIDs
 	}
-	filtered := nodeIDs[:0]
+	filtered := make([]node.ID, 0, len(nodeIDs))
 	for _, nodeID := range nodeIDs {
 		if liveness.IsSchedulableDest(nodeID) {
 			filtered = append(filtered, nodeID)
 		}
 	}
 	return filtered
 }
🧰 Tools
🪛 golangci-lint (2.11.4)

[error] 30-30: : # github.com/pingcap/ticdc/cmd/kafka-consumer [github.com/pingcap/ticdc/cmd/kafka-consumer.test]
cmd/kafka-consumer/consumer.go:30:22: undefined: kafka.ConfigMap
cmd/kafka-consumer/consumer.go:39:22: undefined: kafka.NewAdminClient
cmd/kafka-consumer/consumer.go:49:41: undefined: kafka.ErrTransport
cmd/kafka-consumer/consumer.go:70:16: undefined: kafka.Consumer
cmd/kafka-consumer/consumer.go:76:22: undefined: kafka.ConfigMap
cmd/kafka-consumer/writer.go:45:24: undefined: kafka.Offset
cmd/kafka-consumer/writer.go:59:79: undefined: kafka.Offset
cmd/kafka-consumer/writer.go:364:67: undefined: kafka.Message
cmd/kafka-consumer/writer.go:563:90: undefined: kafka.Offset
cmd/kafka-consumer/writer.go:590:103: undefined: kafka.Offset
cmd/kafka-consumer/consumer.go:76:22: too many errors

(typecheck)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@coordinator/scheduler/drain_liveness.go` around lines 23 - 34,
filterSchedulableNodeIDs currently reuses the caller's backing array via
filtered := nodeIDs[:0]; change it to allocate a new slice (e.g.,
make([]node.ID, 0, len(nodeIDs))) so we don't mutate the caller's underlying
array, keep the nil liveness early-return, iterate over nodeIDs and append
nodeIDs that satisfy liveness.IsSchedulableDest(nodeID) into the newly allocated
filtered slice, and return that new slice.

Comment on lines +32 to +36
mc := messaging.NewMockMessageCenter()
appcontext.SetService(appcontext.MessageCenter, mc)

nodeManager := watcher.NewNodeManager(nil, nil)
appcontext.SetService(watcher.NodeManagerName, nodeManager)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Description: Inspect scheduler tests for appcontext service mutation and parallel tests.

rg -n -C3 'appcontext\.SetService|t\.Parallel\(' --type=go

Repository: pingcap/ticdc

Length of output: 50370


🏁 Script executed:

# Read the drain_test.go file to verify the SetService calls and cleanup patterns
cat -n coordinator/scheduler/drain_test.go | head -200

Repository: pingcap/ticdc

Length of output: 8670


🏁 Script executed:

# Check for t.Parallel() and t.Cleanup() patterns in drain_test.go
grep -n "t\.Parallel\|t\.Cleanup\|appcontext\.SetService" coordinator/scheduler/drain_test.go

Repository: pingcap/ticdc

Length of output: 540


🏁 Script executed:

# Check if other tests in the scheduler package use t.Parallel()
fd -e _test.go coordinator/scheduler/ -x grep -l "t\.Parallel" {}

Repository: pingcap/ticdc

Length of output: 450


🏁 Script executed:

# Check if other tests in the scheduler package use t.Parallel()
fd -e _test.go . coordinator/scheduler/ -x grep -l "t\.Parallel" {}

Repository: pingcap/ticdc

Length of output: 39


🏁 Script executed:

# Search the entire codebase for appcontext service restoration patterns
rg -A5 "appcontext\.SetService" --type=go -g "*_test.go" | grep -A5 "t\.Cleanup\|defer.*appcontext" | head -50

Repository: pingcap/ticdc

Length of output: 721


Add cleanup for appcontext services in these tests.

These tests mutate global services via appcontext.SetService() without restoring previous state. While the file currently has no t.Parallel() tests, this pattern still makes tests order-dependent under go test -shuffle. Add t.Cleanup() to each test to restore or clear services after the test completes.

Affected test functions
  • TestDrainSchedulerCreatesMoveOperators (lines 32–36)
  • TestDrainSchedulerSkipsChangefeedWithInflightOperator (lines 88–92)
  • TestDrainSchedulerIgnoresUnrelatedOperatorCapacity (lines 139–143)
  • TestDrainSchedulerRotatesAcrossDrainingNodes (lines 175–179)

Per the coding guidelines, favor deterministic tests that do not leave side effects across the test suite.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@coordinator/scheduler/drain_test.go` around lines 32 - 36, Each listed test
sets global services via appcontext.SetService (e.g., using
messaging.NewMockMessageCenter -> appcontext.MessageCenter and
watcher.NewNodeManager -> watcher.NodeManagerName) but never restores them; add
t.Cleanup callbacks in TestDrainSchedulerCreatesMoveOperators,
TestDrainSchedulerSkipsChangefeedWithInflightOperator,
TestDrainSchedulerIgnoresUnrelatedOperatorCapacity, and
TestDrainSchedulerRotatesAcrossDrainingNodes to restore the previous service
value (capture old := appcontext.GetService(key) or nil) or clear the service
after the test (call appcontext.SetService(key, old) or
appcontext.SetService(key, nil)) so global state is reset regardless of test
outcome.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

release-note-none Denotes a PR that doesn't merit a release note. size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant