diff --git a/coordinator/controller.go b/coordinator/controller.go index c02324469a..7e38cf2fd7 100644 --- a/coordinator/controller.go +++ b/coordinator/controller.go @@ -134,6 +134,14 @@ func NewController( batchSize, oc, changefeedDB, + drainController, + ), + scheduler.DrainScheduler: coscheduler.NewDrainScheduler( + selfNode.ID.String(), + batchSize, + oc, + changefeedDB, + drainController, ), scheduler.BalanceScheduler: coscheduler.NewBalanceScheduler( selfNode.ID.String(), @@ -141,6 +149,7 @@ func NewController( oc, changefeedDB, balanceInterval, + drainController, ), }), eventCh: eventCh, diff --git a/coordinator/operator/operator_controller.go b/coordinator/operator/operator_controller.go index d6fd7a3095..2b168b09c9 100644 --- a/coordinator/operator/operator_controller.go +++ b/coordinator/operator/operator_controller.go @@ -232,6 +232,33 @@ func (oc *Controller) OperatorSize() int { return len(oc.operators) } +// CountMoveMaintainerOperatorsFromNodes returns the number of in-flight move +// operators whose origin is one of the given nodes. +func (oc *Controller) CountMoveMaintainerOperatorsFromNodes(origins []node.ID) int { + oc.mu.RLock() + defer oc.mu.RUnlock() + + if len(origins) == 0 { + return 0 + } + originSet := make(map[node.ID]struct{}, len(origins)) + for _, origin := range origins { + originSet[origin] = struct{}{} + } + + count := 0 + for _, op := range oc.operators { + moveOp, ok := op.OP.(*MoveMaintainerOperator) + if !ok { + continue + } + if _, ok := originSet[moveOp.OriginNode()]; ok { + count++ + } + } + return count +} + // HasOperatorInvolvingNode returns true if any in-flight operator affects n. func (oc *Controller) HasOperatorInvolvingNode(n node.ID) bool { oc.mu.RLock() diff --git a/coordinator/operator/operator_controller_test.go b/coordinator/operator/operator_controller_test.go index 5afc45a924..c86e8358a1 100644 --- a/coordinator/operator/operator_controller_test.go +++ b/coordinator/operator/operator_controller_test.go @@ -127,6 +127,28 @@ func TestController_HasOperatorInvolvingNode(t *testing.T) { require.False(t, oc.HasOperatorInvolvingNode("n3")) } +func TestController_CountMoveMaintainerOperatorsFromNodes(t *testing.T) { + changefeedDB := changefeed.NewChangefeedDB(1216) + ctrl := gomock.NewController(t) + backend := mock_changefeed.NewMockBackend(ctrl) + oc, self, nodeManager := newOperatorControllerForTest(t, changefeedDB, backend) + dest := node.NewInfo("localhost:8301", "") + nodeManager.GetAliveNodes()[dest.ID] = dest + + cfID := common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceName) + cf := changefeed.NewChangefeed(cfID, &config.ChangeFeedInfo{ + ChangefeedID: cfID, + Config: config.GetDefaultReplicaConfig(), + SinkURI: "mysql://127.0.0.1:3306", + }, 1, true) + changefeedDB.AddReplicatingMaintainer(cf, self.ID) + + require.True(t, oc.AddOperator(NewMoveMaintainerOperator(changefeedDB, cf, self.ID, dest.ID))) + + require.Equal(t, 1, oc.CountMoveMaintainerOperatorsFromNodes([]node.ID{self.ID})) + require.Equal(t, 0, oc.CountMoveMaintainerOperatorsFromNodes([]node.ID{"n3"})) +} + func TestController_StopChangefeedDuringAddOperator(t *testing.T) { // Setup test environment changefeedDB := changefeed.NewChangefeedDB(1216) diff --git a/coordinator/operator/operator_move.go b/coordinator/operator/operator_move.go index 560b1a1088..9c15c936af 100644 --- a/coordinator/operator/operator_move.go +++ b/coordinator/operator/operator_move.go @@ -144,6 +144,14 @@ func (m *MoveMaintainerOperator) AffectedNodes() []node.ID { return []node.ID{m.origin, m.dest} } +// OriginNode returns the source node of the move. +func (m *MoveMaintainerOperator) OriginNode() node.ID { + m.lck.Lock() + defer m.lck.Unlock() + + return m.origin +} + func (m *MoveMaintainerOperator) ID() common.ChangeFeedID { return m.changefeed.ID } diff --git a/coordinator/scheduler/balance.go b/coordinator/scheduler/balance.go index 4593a95729..d7d57fc66c 100644 --- a/coordinator/scheduler/balance.go +++ b/coordinator/scheduler/balance.go @@ -18,6 +18,7 @@ import ( "time" "github.com/pingcap/ticdc/coordinator/changefeed" + "github.com/pingcap/ticdc/coordinator/drain" "github.com/pingcap/ticdc/coordinator/operator" appcontext "github.com/pingcap/ticdc/pkg/common/context" "github.com/pingcap/ticdc/pkg/node" @@ -33,6 +34,7 @@ type balanceScheduler struct { operatorController *operator.Controller changefeedDB *changefeed.ChangefeedDB nodeManager *watcher.NodeManager + liveness *drain.Controller random *rand.Rand lastRebalanceTime time.Time @@ -44,6 +46,8 @@ type balanceScheduler struct { // `Schedule`. // It speeds up rebalance. forceBalance bool + + drainBalanceBlockedUntil time.Time } func NewBalanceScheduler( @@ -51,6 +55,7 @@ func NewBalanceScheduler( oc *operator.Controller, changefeedDB *changefeed.ChangefeedDB, balanceInterval time.Duration, + liveness *drain.Controller, ) *balanceScheduler { return &balanceScheduler{ id: id, @@ -61,14 +66,27 @@ func NewBalanceScheduler( nodeManager: appcontext.GetService[*watcher.NodeManager](watcher.NodeManagerName), checkBalanceInterval: balanceInterval, lastRebalanceTime: time.Now(), + liveness: liveness, } } func (s *balanceScheduler) Execute() time.Time { + now := time.Now() + if hasDrainingOrStoppingNode(s.liveness) { + // Pause regular balance scheduling while any node is observed draining or + // stopping. Each observation extends the block window by one balance + // interval so regular rebalance does not race with evacuation progress. + s.drainBalanceBlockedUntil = now.Add(s.drainCooldown()) + return now.Add(s.checkBalanceInterval) + } + if now.Before(s.drainBalanceBlockedUntil) { + // If drain disappears before the previously extended block window expires, + // keep skipping regular rebalance until that window elapses. + return now.Add(s.checkBalanceInterval) + } if !s.forceBalance && time.Since(s.lastRebalanceTime) < s.checkBalanceInterval { return s.lastRebalanceTime.Add(s.checkBalanceInterval) } - now := time.Now() if s.operatorController.OperatorSize() > 0 || s.changefeedDB.GetAbsentSize() > 0 { // not in stable schedule state, skip balance @@ -76,13 +94,15 @@ func (s *balanceScheduler) Execute() time.Time { } // check the balance status - moveSize := pkgScheduler.CheckBalanceStatus(s.changefeedDB.GetTaskSizePerNode(), s.nodeManager.GetAliveNodes()) + activeNodes := s.nodeManager.GetAliveNodes() + activeNodes = filterSchedulableAliveNodes(activeNodes, s.liveness) + moveSize := pkgScheduler.CheckBalanceStatus(s.changefeedDB.GetTaskSizePerNode(), activeNodes) if moveSize <= 0 { // fast check the balance status, no need to do the balance,skip return now.Add(s.checkBalanceInterval) } // balance changefeeds among the active nodes - movedSize := pkgScheduler.Balance(s.batchSize, s.random, s.nodeManager.GetAliveNodes(), s.changefeedDB.GetReplicating(), + movedSize := pkgScheduler.Balance(s.batchSize, s.random, activeNodes, s.changefeedDB.GetReplicating(), func(cf *changefeed.Changefeed, nodeID node.ID) bool { return s.operatorController.AddOperator(operator.NewMoveMaintainerOperator(s.changefeedDB, cf, cf.GetNodeID(), nodeID)) }) @@ -92,6 +112,13 @@ func (s *balanceScheduler) Execute() time.Time { return now.Add(s.checkBalanceInterval) } +func (s *balanceScheduler) drainCooldown() time.Duration { + if s.checkBalanceInterval > 0 { + return s.checkBalanceInterval + } + return time.Second +} + func (s *balanceScheduler) Name() string { return "balance-scheduler" } diff --git a/coordinator/scheduler/balance_test.go b/coordinator/scheduler/balance_test.go new file mode 100644 index 0000000000..2c386afb61 --- /dev/null +++ b/coordinator/scheduler/balance_test.go @@ -0,0 +1,208 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package scheduler + +import ( + "fmt" + "testing" + "time" + + "github.com/pingcap/ticdc/coordinator/changefeed" + "github.com/pingcap/ticdc/coordinator/drain" + "github.com/pingcap/ticdc/coordinator/operator" + "github.com/pingcap/ticdc/heartbeatpb" + "github.com/pingcap/ticdc/pkg/common" + appcontext "github.com/pingcap/ticdc/pkg/common/context" + "github.com/pingcap/ticdc/pkg/config" + "github.com/pingcap/ticdc/pkg/messaging" + "github.com/pingcap/ticdc/pkg/node" + "github.com/pingcap/ticdc/server/watcher" + "github.com/stretchr/testify/require" +) + +func TestBalanceSchedulerCreatesMoveOperators(t *testing.T) { + setupCoordinatorSchedulerTestServices() + mc := appcontext.GetService[messaging.MessageCenter](appcontext.MessageCenter) + nodeManager := appcontext.GetService[*watcher.NodeManager](watcher.NodeManagerName) + + nodeA := node.ID("node-a") + nodeB := node.ID("node-b") + nodeManager.GetAliveNodes()[nodeA] = &node.Info{ID: nodeA} + nodeManager.GetAliveNodes()[nodeB] = &node.Info{ID: nodeB} + + drainController := drain.NewController(mc) + db := changefeed.NewChangefeedDB(1) + addReplicatingMaintainer(t, db, "cf-a-1", nodeA) + addReplicatingMaintainer(t, db, "cf-a-2", nodeA) + + selfNode := &node.Info{ID: node.ID("coordinator")} + oc := operator.NewOperatorController(selfNode, db, nil, 10) + s := NewBalanceScheduler("test", 10, oc, db, 0, drainController) + _ = s.Execute() + + require.Equal(t, 1, oc.OperatorSize()) +} + +func TestBalanceSchedulerSkipsWhenDrainActive(t *testing.T) { + setupCoordinatorSchedulerTestServices() + mc := appcontext.GetService[messaging.MessageCenter](appcontext.MessageCenter) + nodeManager := appcontext.GetService[*watcher.NodeManager](watcher.NodeManagerName) + + drainingNode := node.ID("draining") + nodeA := node.ID("node-a") + nodeB := node.ID("node-b") + nodeManager.GetAliveNodes()[drainingNode] = &node.Info{ID: drainingNode} + nodeManager.GetAliveNodes()[nodeA] = &node.Info{ID: nodeA} + nodeManager.GetAliveNodes()[nodeB] = &node.Info{ID: nodeB} + + drainController := drain.NewController(mc) + drainController.ObserveHeartbeat(drainingNode, &heartbeatpb.NodeHeartbeat{ + Liveness: heartbeatpb.NodeLiveness_DRAINING, + NodeEpoch: 1, + }) + + db := changefeed.NewChangefeedDB(1) + addReplicatingMaintainer(t, db, "cf-drain", drainingNode) + addReplicatingMaintainer(t, db, "cf-a-1", nodeA) + addReplicatingMaintainer(t, db, "cf-a-2", nodeA) + addReplicatingMaintainer(t, db, "cf-a-3", nodeA) + addReplicatingMaintainer(t, db, "cf-a-4", nodeA) + + selfNode := &node.Info{ID: node.ID("coordinator")} + oc := operator.NewOperatorController(selfNode, db, nil, 10) + s := NewBalanceScheduler("test", 10, oc, db, 0, drainController) + _ = s.Execute() + + require.Equal(t, 0, oc.OperatorSize()) +} + +func TestBalanceSchedulerSkipsUntilObservedDrainBlockWindowExpires(t *testing.T) { + setupCoordinatorSchedulerTestServices() + mc := appcontext.GetService[messaging.MessageCenter](appcontext.MessageCenter) + nodeManager := appcontext.GetService[*watcher.NodeManager](watcher.NodeManagerName) + + drainingA := node.ID("draining-a") + drainingB := node.ID("draining-b") + nodeA := node.ID("node-a") + nodeB := node.ID("node-b") + nodeManager.GetAliveNodes()[drainingA] = &node.Info{ID: drainingA} + nodeManager.GetAliveNodes()[drainingB] = &node.Info{ID: drainingB} + nodeManager.GetAliveNodes()[nodeA] = &node.Info{ID: nodeA} + nodeManager.GetAliveNodes()[nodeB] = &node.Info{ID: nodeB} + + drainController := drain.NewController(mc) + drainController.ObserveHeartbeat(drainingA, &heartbeatpb.NodeHeartbeat{ + Liveness: heartbeatpb.NodeLiveness_DRAINING, + NodeEpoch: 1, + }) + drainController.ObserveHeartbeat(drainingB, &heartbeatpb.NodeHeartbeat{ + Liveness: heartbeatpb.NodeLiveness_DRAINING, + NodeEpoch: 1, + }) + + db := changefeed.NewChangefeedDB(1) + addReplicatingMaintainer(t, db, "cf-a-1", nodeA) + addReplicatingMaintainer(t, db, "cf-a-2", nodeA) + addReplicatingMaintainer(t, db, "cf-a-3", nodeA) + addReplicatingMaintainer(t, db, "cf-a-4", nodeA) + + selfNode := &node.Info{ID: node.ID("coordinator")} + oc := operator.NewOperatorController(selfNode, db, nil, 10) + s := NewBalanceScheduler("test", 10, oc, db, 0, drainController) + s.drainBalanceBlockedUntil = time.Time{} + + // Any observed draining node should block balance. + _ = s.Execute() + require.Equal(t, 0, oc.OperatorSize()) + + // One node remains draining, so another observation keeps extending the block window. + drainController.ObserveHeartbeat(drainingA, &heartbeatpb.NodeHeartbeat{ + Liveness: heartbeatpb.NodeLiveness_ALIVE, + NodeEpoch: 2, + }) + _ = s.Execute() + require.Equal(t, 0, oc.OperatorSize()) + + // After drain disappears, the previously extended block window still blocks balance. + drainController.ObserveHeartbeat(drainingB, &heartbeatpb.NodeHeartbeat{ + Liveness: heartbeatpb.NodeLiveness_ALIVE, + NodeEpoch: 2, + }) + s.drainBalanceBlockedUntil = time.Now().Add(100 * time.Millisecond) + _ = s.Execute() + require.Equal(t, 0, oc.OperatorSize()) + + s.drainBalanceBlockedUntil = time.Now().Add(-time.Millisecond) + _ = s.Execute() + require.Greater(t, oc.OperatorSize(), 0) +} + +func TestBalanceSchedulerUsesBalanceIntervalAsDrainBlockWindow(t *testing.T) { + setupCoordinatorSchedulerTestServices() + mc := appcontext.GetService[messaging.MessageCenter](appcontext.MessageCenter) + nodeManager := appcontext.GetService[*watcher.NodeManager](watcher.NodeManagerName) + + drainingNode := node.ID("draining") + nodeA := node.ID("node-a") + nodeB := node.ID("node-b") + nodeManager.GetAliveNodes()[drainingNode] = &node.Info{ID: drainingNode} + nodeManager.GetAliveNodes()[nodeA] = &node.Info{ID: nodeA} + nodeManager.GetAliveNodes()[nodeB] = &node.Info{ID: nodeB} + + drainController := drain.NewController(mc) + drainController.ObserveHeartbeat(drainingNode, &heartbeatpb.NodeHeartbeat{ + Liveness: heartbeatpb.NodeLiveness_DRAINING, + NodeEpoch: 1, + }) + + db := changefeed.NewChangefeedDB(1) + addReplicatingMaintainer(t, db, "cf-a-1", nodeA) + addReplicatingMaintainer(t, db, "cf-a-2", nodeA) + + selfNode := &node.Info{ID: node.ID("coordinator")} + oc := operator.NewOperatorController(selfNode, db, nil, 10) + interval := 200 * time.Millisecond + s := NewBalanceScheduler("test", 10, oc, db, interval, drainController) + + start := time.Now() + _ = s.Execute() + require.WithinDuration(t, start.Add(interval), s.drainBalanceBlockedUntil, 50*time.Millisecond) +} + +func setupCoordinatorSchedulerTestServices() { + mc := messaging.NewMockMessageCenter() + appcontext.SetService(appcontext.MessageCenter, mc) + nodeManager := watcher.NewNodeManager(nil, nil) + appcontext.SetService(watcher.NodeManagerName, nodeManager) +} + +func addReplicatingMaintainer( + t *testing.T, + db *changefeed.ChangefeedDB, + name string, + nodeID node.ID, +) common.ChangeFeedID { + t.Helper() + + cfID := common.NewChangeFeedIDWithName(name, common.DefaultKeyspaceName) + info := &config.ChangeFeedInfo{ + ChangefeedID: cfID, + SinkURI: fmt.Sprintf("blackhole://%s", name), + Config: config.GetDefaultReplicaConfig(), + State: config.StateNormal, + } + cf := changefeed.NewChangefeed(cfID, info, 1, false) + db.AddReplicatingMaintainer(cf, nodeID) + return cfID +} diff --git a/coordinator/scheduler/basic.go b/coordinator/scheduler/basic.go index 301a75f86d..90a828a132 100644 --- a/coordinator/scheduler/basic.go +++ b/coordinator/scheduler/basic.go @@ -17,6 +17,7 @@ import ( "time" "github.com/pingcap/ticdc/coordinator/changefeed" + "github.com/pingcap/ticdc/coordinator/drain" "github.com/pingcap/ticdc/coordinator/operator" appcontext "github.com/pingcap/ticdc/pkg/common/context" "github.com/pingcap/ticdc/pkg/node" @@ -35,12 +36,14 @@ type basicScheduler struct { operatorController *operator.Controller changefeedDB *changefeed.ChangefeedDB nodeManager *watcher.NodeManager + liveness *drain.Controller } func NewBasicScheduler( id string, batchSize int, oc *operator.Controller, changefeedDB *changefeed.ChangefeedDB, + liveness *drain.Controller, ) *basicScheduler { return &basicScheduler{ id: id, @@ -48,6 +51,7 @@ func NewBasicScheduler( operatorController: oc, changefeedDB: changefeedDB, nodeManager: appcontext.GetService[*watcher.NodeManager](watcher.NodeManagerName), + liveness: liveness, } } @@ -74,7 +78,7 @@ func (s *basicScheduler) doBasicSchedule(availableSize int) { absentChangefeeds := s.changefeedDB.GetAbsentByGroup(id, availableSize) nodeTaskSize := s.changefeedDB.GetTaskSizePerNodeByGroup(id) // add the absent node to the node size map - nodeIDs := s.nodeManager.GetAliveNodeIDs() + nodeIDs := filterSchedulableNodeIDs(s.nodeManager.GetAliveNodeIDs(), s.liveness) nodeSize := make(map[node.ID]int) for _, id := range nodeIDs { nodeSize[id] = nodeTaskSize[id] diff --git a/coordinator/scheduler/drain.go b/coordinator/scheduler/drain.go new file mode 100644 index 0000000000..cdce89068c --- /dev/null +++ b/coordinator/scheduler/drain.go @@ -0,0 +1,207 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. +package scheduler + +import ( + "math" + "slices" + "time" + + "github.com/pingcap/log" + "github.com/pingcap/ticdc/coordinator/changefeed" + "github.com/pingcap/ticdc/coordinator/drain" + "github.com/pingcap/ticdc/coordinator/operator" + appcontext "github.com/pingcap/ticdc/pkg/common/context" + "github.com/pingcap/ticdc/pkg/node" + "github.com/pingcap/ticdc/server/watcher" + "go.uber.org/zap" +) + +// drainScheduler generates move operators to move maintainers out of draining nodes. +// It skips changefeeds that already have in-flight operators. +type drainScheduler struct { + id string + batchSize int + + operatorController *operator.Controller + changefeedDB *changefeed.ChangefeedDB + nodeManager *watcher.NodeManager + liveness *drain.Controller + + // rrCursor rotates the starting draining node to avoid starving nodes later in the list. + rrCursor int +} + +// NewDrainScheduler creates a scheduler that migrates maintainers away from draining nodes. +func NewDrainScheduler( + id string, + batchSize int, + oc *operator.Controller, + changefeedDB *changefeed.ChangefeedDB, + liveness *drain.Controller, +) *drainScheduler { + return &drainScheduler{ + id: id, + batchSize: batchSize, + operatorController: oc, + changefeedDB: changefeedDB, + nodeManager: appcontext.GetService[*watcher.NodeManager](watcher.NodeManagerName), + liveness: liveness, + } +} + +// Execute schedules move operators from draining nodes to schedulable destination nodes. +// It limits drain scheduling by drain move slots and returns the next run time. +func (s *drainScheduler) Execute() time.Time { + if s.liveness == nil { + return time.Now().Add(time.Second) + } + + now := time.Now() + // Snapshot the nodes that still need evacuation in this tick. Execute works + // from a stable, sorted slice so round-robin fairness is deterministic even + // if the backing drain state changes between ticks. + drainingNodes := s.liveness.GetDrainingOrStoppingNodes() + if len(drainingNodes) == 0 { + return now.Add(time.Second) + } + slices.Sort(drainingNodes) + // Drain moves get their own slot budget so unrelated operators do not stall + // evacuation progress while a node is draining. + availableSize := s.batchSize - s.operatorController.CountMoveMaintainerOperatorsFromNodes(drainingNodes) + if availableSize <= 0 { + return now.Add(time.Millisecond * 200) + } + + destCandidates := filterSchedulableNodeIDs(s.nodeManager.GetAliveNodeIDs(), s.liveness) + + if len(destCandidates) == 0 { + log.Info("no alive destination node for drain", + zap.String("schedulerID", s.id), + zap.Int("drainingNodeCount", len(drainingNodes))) + return now.Add(time.Second) + } + + // Build per-node snapshots once for this tick: + // - nodeTaskSize is mutated in memory after each accepted move so later picks + // in the same tick observe the updated load distribution. + // - maintainersByNode avoids repeatedly querying ChangefeedDB while walking + // the draining nodes in round-robin order. + nodeTaskSize := s.changefeedDB.GetTaskSizePerNode() + maintainersByNode := make(map[node.ID][]*changefeed.Changefeed, len(drainingNodes)) + nextMaintainerIndex := make(map[node.ID]int, len(drainingNodes)) + for _, origin := range drainingNodes { + maintainersByNode[origin] = s.changefeedDB.GetByNodeID(origin) + } + scheduled := 0 + + if s.rrCursor >= len(drainingNodes) { + s.rrCursor = 0 + } + + // Walk draining nodes in rounds until we either consume the dedicated move + // budget or make no progress. Each round starts from rrCursor so a busy node + // does not monopolize the first scheduling chance forever. + for scheduled < availableSize { + progress := false + for i := 0; i < len(drainingNodes) && scheduled < availableSize; i++ { + origin := drainingNodes[(s.rrCursor+i)%len(drainingNodes)] + nextIndex, ok := s.scheduleOneFromNode( + origin, + maintainersByNode[origin], + nextMaintainerIndex[origin], + destCandidates, + nodeTaskSize, + ) + nextMaintainerIndex[origin] = nextIndex + if ok { + scheduled++ + progress = true + } + } + s.rrCursor = (s.rrCursor + 1) % len(drainingNodes) + if !progress { + break + } + } + + if scheduled > 0 { + log.Info("drain scheduler created move operators", + zap.Int("scheduled", scheduled), + zap.Int("drainingNodeCount", len(drainingNodes))) + } + + return now.Add(time.Millisecond * 200) +} + +// scheduleOneFromNode tries to schedule one maintainer move from origin, +// continuing from nextIndex within the pre-fetched maintainer slice. It skips +// changefeeds that already have in-flight operators. +func (s *drainScheduler) scheduleOneFromNode( + origin node.ID, + maintainers []*changefeed.Changefeed, + nextIndex int, + destCandidates []node.ID, + nodeTaskSize map[node.ID]int, +) (int, bool) { + for nextIndex < len(maintainers) { + cf := maintainers[nextIndex] + nextIndex++ + // Skip changefeeds that are already being added, moved, or stopped by + // another operator. Drain scheduling only fills genuinely free slots. + if s.operatorController.HasOperator(cf.ID.DisplayName) { + continue + } + // Choose against the mutable nodeTaskSize snapshot so multiple moves + // created in the same tick spread across destinations instead of all + // targeting the same initially cold node. + dest, ok := chooseLeastLoadedDest(origin, destCandidates, nodeTaskSize) + if !ok { + return nextIndex, false + } + if s.operatorController.AddOperator(operator.NewMoveMaintainerOperator(s.changefeedDB, cf, origin, dest)) { + nodeTaskSize[dest]++ + return nextIndex, true + } + } + return nextIndex, false +} + +// chooseLeastLoadedDest selects the destination with the smallest task count, excluding origin. +func chooseLeastLoadedDest( + origin node.ID, + destCandidates []node.ID, + nodeTaskSize map[node.ID]int, +) (node.ID, bool) { + minSize := math.MaxInt + var chosen node.ID + for _, id := range destCandidates { + if id == origin { + continue + } + size := nodeTaskSize[id] + if size < minSize { + minSize = size + chosen = id + } + } + if chosen.IsEmpty() { + return "", false + } + return chosen, true +} + +// Name returns the scheduler name used by scheduler controller and logs. +func (s *drainScheduler) Name() string { + return "drain-scheduler" +} diff --git a/coordinator/scheduler/drain_liveness.go b/coordinator/scheduler/drain_liveness.go new file mode 100644 index 0000000000..950fcdbcea --- /dev/null +++ b/coordinator/scheduler/drain_liveness.go @@ -0,0 +1,58 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. +package scheduler + +import ( + "github.com/pingcap/ticdc/coordinator/drain" + "github.com/pingcap/ticdc/pkg/node" +) + +// filterSchedulableNodeIDs removes nodes that are draining, stopping, or stale +// so drain scheduling never moves work onto a node that should no longer accept +// new maintainers. +func filterSchedulableNodeIDs(nodeIDs []node.ID, liveness *drain.Controller) []node.ID { + if liveness == nil { + return nodeIDs + } + filtered := nodeIDs[:0] + for _, nodeID := range nodeIDs { + if liveness.IsSchedulableDest(nodeID) { + filtered = append(filtered, nodeID) + } + } + return filtered +} + +// hasDrainingOrStoppingNode reports whether drain-aware schedulers should keep +// considering node evacuation in the current tick. +func hasDrainingOrStoppingNode(liveness *drain.Controller) bool { + return liveness != nil && len(liveness.GetDrainingOrStoppingNodes()) > 0 +} + +// filterSchedulableAliveNodes applies the same destination constraint to the +// alive-node map used by balance schedulers. +func filterSchedulableAliveNodes( + nodes map[node.ID]*node.Info, + liveness *drain.Controller, +) map[node.ID]*node.Info { + if liveness == nil { + return nodes + } + filtered := make(map[node.ID]*node.Info, len(nodes)) + for id, info := range nodes { + if liveness.IsSchedulableDest(id) { + filtered[id] = info + } + } + return filtered +} diff --git a/coordinator/scheduler/drain_test.go b/coordinator/scheduler/drain_test.go new file mode 100644 index 0000000000..fa20f54509 --- /dev/null +++ b/coordinator/scheduler/drain_test.go @@ -0,0 +1,219 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. +package scheduler + +import ( + "testing" + + "github.com/pingcap/ticdc/coordinator/changefeed" + "github.com/pingcap/ticdc/coordinator/drain" + "github.com/pingcap/ticdc/coordinator/operator" + "github.com/pingcap/ticdc/heartbeatpb" + "github.com/pingcap/ticdc/pkg/common" + appcontext "github.com/pingcap/ticdc/pkg/common/context" + "github.com/pingcap/ticdc/pkg/config" + "github.com/pingcap/ticdc/pkg/messaging" + "github.com/pingcap/ticdc/pkg/node" + "github.com/pingcap/ticdc/server/watcher" + "github.com/stretchr/testify/require" +) + +func TestDrainSchedulerCreatesMoveOperators(t *testing.T) { + mc := messaging.NewMockMessageCenter() + appcontext.SetService(appcontext.MessageCenter, mc) + + nodeManager := watcher.NewNodeManager(nil, nil) + appcontext.SetService(watcher.NodeManagerName, nodeManager) + + origin := node.ID("origin") + destHot := node.ID("dest-hot") + destCold := node.ID("dest-cold") + nodeManager.GetAliveNodes()[origin] = &node.Info{ID: origin} + nodeManager.GetAliveNodes()[destHot] = &node.Info{ID: destHot} + nodeManager.GetAliveNodes()[destCold] = &node.Info{ID: destCold} + + drainController := drain.NewController(mc) + drainController.ObserveHeartbeat(origin, &heartbeatpb.NodeHeartbeat{ + Liveness: heartbeatpb.NodeLiveness_DRAINING, + NodeEpoch: 1, + }) + + db := changefeed.NewChangefeedDB(1) + cfID := common.NewChangeFeedIDWithName("cf1", common.DefaultKeyspaceName) + info := &config.ChangeFeedInfo{ + ChangefeedID: cfID, + SinkURI: "blackhole://", + Config: config.GetDefaultReplicaConfig(), + State: config.StateNormal, + } + cf := changefeed.NewChangefeed(cfID, info, 1, false) + db.AddReplicatingMaintainer(cf, origin) + + // Create load on one destination to ensure the scheduler deterministically chooses the least loaded node. + loadInfo := &config.ChangeFeedInfo{ + SinkURI: "blackhole://load", + Config: config.GetDefaultReplicaConfig(), + State: config.StateNormal, + } + loadInfo1 := *loadInfo + loadInfo1.ChangefeedID = common.NewChangeFeedIDWithName("cf-load-1", common.DefaultKeyspaceName) + db.AddReplicatingMaintainer(changefeed.NewChangefeed(loadInfo1.ChangefeedID, &loadInfo1, 1, false), destHot) + loadInfo2 := *loadInfo + loadInfo2.ChangefeedID = common.NewChangeFeedIDWithName("cf-load-2", common.DefaultKeyspaceName) + db.AddReplicatingMaintainer(changefeed.NewChangefeed(loadInfo2.ChangefeedID, &loadInfo2, 1, false), destHot) + + selfNode := &node.Info{ID: node.ID("coordinator")} + oc := operator.NewOperatorController(selfNode, db, nil, 10) + + s := NewDrainScheduler("test", 10, oc, db, drainController) + _ = s.Execute() + + require.Equal(t, 1, oc.OperatorSize()) + op := oc.GetOperator(cfID) + require.NotNil(t, op) + require.ElementsMatch(t, []node.ID{origin, destCold}, op.AffectedNodes()) +} + +func TestDrainSchedulerSkipsChangefeedWithInflightOperator(t *testing.T) { + mc := messaging.NewMockMessageCenter() + appcontext.SetService(appcontext.MessageCenter, mc) + + nodeManager := watcher.NewNodeManager(nil, nil) + appcontext.SetService(watcher.NodeManagerName, nodeManager) + + origin := node.ID("origin") + dest := node.ID("dest") + nodeManager.GetAliveNodes()[origin] = &node.Info{ID: origin} + nodeManager.GetAliveNodes()[dest] = &node.Info{ID: dest} + + drainController := drain.NewController(mc) + drainController.ObserveHeartbeat(origin, &heartbeatpb.NodeHeartbeat{ + Liveness: heartbeatpb.NodeLiveness_DRAINING, + NodeEpoch: 1, + }) + + db := changefeed.NewChangefeedDB(1) + cfID1 := common.NewChangeFeedIDWithName("cf1", common.DefaultKeyspaceName) + cfID2 := common.NewChangeFeedIDWithName("cf2", common.DefaultKeyspaceName) + + info := &config.ChangeFeedInfo{ + SinkURI: "blackhole://", + Config: config.GetDefaultReplicaConfig(), + State: config.StateNormal, + } + + info1 := *info + info1.ChangefeedID = cfID1 + cf1 := changefeed.NewChangefeed(cfID1, &info1, 1, false) + db.AddReplicatingMaintainer(cf1, origin) + + info2 := *info + info2.ChangefeedID = cfID2 + cf2 := changefeed.NewChangefeed(cfID2, &info2, 1, false) + db.AddReplicatingMaintainer(cf2, origin) + + selfNode := &node.Info{ID: node.ID("coordinator")} + oc := operator.NewOperatorController(selfNode, db, nil, 10) + + require.True(t, oc.AddOperator(operator.NewMoveMaintainerOperator(db, cf1, origin, dest))) + require.Equal(t, 1, oc.OperatorSize()) + + s := NewDrainScheduler("test", 2, oc, db, drainController) + _ = s.Execute() + + require.Equal(t, 2, oc.OperatorSize()) + require.NotNil(t, oc.GetOperator(cfID2)) +} + +func TestDrainSchedulerIgnoresUnrelatedOperatorCapacity(t *testing.T) { + mc := messaging.NewMockMessageCenter() + appcontext.SetService(appcontext.MessageCenter, mc) + + nodeManager := watcher.NewNodeManager(nil, nil) + appcontext.SetService(watcher.NodeManagerName, nodeManager) + + origin := node.ID("origin") + other := node.ID("other") + dest := node.ID("dest") + nodeManager.GetAliveNodes()[origin] = &node.Info{ID: origin} + nodeManager.GetAliveNodes()[other] = &node.Info{ID: other} + nodeManager.GetAliveNodes()[dest] = &node.Info{ID: dest} + + drainController := drain.NewController(mc) + drainController.ObserveHeartbeat(origin, &heartbeatpb.NodeHeartbeat{ + Liveness: heartbeatpb.NodeLiveness_DRAINING, + NodeEpoch: 1, + }) + + db := changefeed.NewChangefeedDB(1) + cfID := addReplicatingMaintainer(t, db, "cf-drain", origin) + otherID := addReplicatingMaintainer(t, db, "cf-other", other) + otherCF := db.GetByID(otherID) + + selfNode := &node.Info{ID: node.ID("coordinator")} + oc := operator.NewOperatorController(selfNode, db, nil, 10) + require.True(t, oc.AddOperator(operator.NewMoveMaintainerOperator(db, otherCF, other, dest))) + + s := NewDrainScheduler("test", 1, oc, db, drainController) + _ = s.Execute() + + require.Equal(t, 2, oc.OperatorSize()) + require.NotNil(t, oc.GetOperator(cfID)) +} + +func TestDrainSchedulerRotatesAcrossDrainingNodes(t *testing.T) { + mc := messaging.NewMockMessageCenter() + appcontext.SetService(appcontext.MessageCenter, mc) + + nodeManager := watcher.NewNodeManager(nil, nil) + appcontext.SetService(watcher.NodeManagerName, nodeManager) + + originA := node.ID("origin-a") + originB := node.ID("origin-b") + dest := node.ID("dest") + nodeManager.GetAliveNodes()[originA] = &node.Info{ID: originA} + nodeManager.GetAliveNodes()[originB] = &node.Info{ID: originB} + nodeManager.GetAliveNodes()[dest] = &node.Info{ID: dest} + + drainController := drain.NewController(mc) + drainController.ObserveHeartbeat(originA, &heartbeatpb.NodeHeartbeat{ + Liveness: heartbeatpb.NodeLiveness_DRAINING, + NodeEpoch: 1, + }) + drainController.ObserveHeartbeat(originB, &heartbeatpb.NodeHeartbeat{ + Liveness: heartbeatpb.NodeLiveness_DRAINING, + NodeEpoch: 1, + }) + + db := changefeed.NewChangefeedDB(1) + cfA := addReplicatingMaintainer(t, db, "cf-a", originA) + cfB := addReplicatingMaintainer(t, db, "cf-b", originB) + + selfNode := &node.Info{ID: node.ID("coordinator")} + oc := operator.NewOperatorController(selfNode, db, nil, 10) + s := NewDrainScheduler("test", 1, oc, db, drainController) + + _ = s.Execute() + + first := oc.GetOperator(cfA) + require.NotNil(t, first) + require.Nil(t, oc.GetOperator(cfB)) + + first.OnTaskRemoved() + _ = oc.Execute() + require.Equal(t, 0, oc.OperatorSize()) + + _ = s.Execute() + + require.NotNil(t, oc.GetOperator(cfB)) +}