From 77a9cfae9f086c9f7bc33dcb45ef77a137101698 Mon Sep 17 00:00:00 2001 From: hongyunyan <649330952@qq.com> Date: Wed, 8 Apr 2026 07:27:09 +0800 Subject: [PATCH 1/6] coordinator,scheduler: add drain aware scheduling --- coordinator/controller.go | 9 ++ coordinator/scheduler/balance.go | 26 +++- coordinator/scheduler/balance_test.go | 176 +++++++++++++++++++++++ coordinator/scheduler/basic.go | 6 +- coordinator/scheduler/drain.go | 177 ++++++++++++++++++++++++ coordinator/scheduler/drain_liveness.go | 58 ++++++++ coordinator/scheduler/drain_test.go | 136 ++++++++++++++++++ 7 files changed, 584 insertions(+), 4 deletions(-) create mode 100644 coordinator/scheduler/balance_test.go create mode 100644 coordinator/scheduler/drain.go create mode 100644 coordinator/scheduler/drain_liveness.go create mode 100644 coordinator/scheduler/drain_test.go diff --git a/coordinator/controller.go b/coordinator/controller.go index c02324469a..7e38cf2fd7 100644 --- a/coordinator/controller.go +++ b/coordinator/controller.go @@ -134,6 +134,14 @@ func NewController( batchSize, oc, changefeedDB, + drainController, + ), + scheduler.DrainScheduler: coscheduler.NewDrainScheduler( + selfNode.ID.String(), + batchSize, + oc, + changefeedDB, + drainController, ), scheduler.BalanceScheduler: coscheduler.NewBalanceScheduler( selfNode.ID.String(), @@ -141,6 +149,7 @@ func NewController( oc, changefeedDB, balanceInterval, + drainController, ), }), eventCh: eventCh, diff --git a/coordinator/scheduler/balance.go b/coordinator/scheduler/balance.go index 4593a95729..20cf15668e 100644 --- a/coordinator/scheduler/balance.go +++ b/coordinator/scheduler/balance.go @@ -18,6 +18,7 @@ import ( "time" "github.com/pingcap/ticdc/coordinator/changefeed" + "github.com/pingcap/ticdc/coordinator/drain" "github.com/pingcap/ticdc/coordinator/operator" appcontext "github.com/pingcap/ticdc/pkg/common/context" "github.com/pingcap/ticdc/pkg/node" @@ -33,6 +34,7 @@ type balanceScheduler struct { operatorController *operator.Controller changefeedDB *changefeed.ChangefeedDB nodeManager *watcher.NodeManager + liveness *drain.Controller random *rand.Rand lastRebalanceTime time.Time @@ -44,13 +46,18 @@ type balanceScheduler struct { // `Schedule`. // It speeds up rebalance. forceBalance bool + + drainBalanceBlockedUntil time.Time } +const drainBalanceCooldown = 120 * time.Second + func NewBalanceScheduler( id string, batchSize int, oc *operator.Controller, changefeedDB *changefeed.ChangefeedDB, balanceInterval time.Duration, + liveness *drain.Controller, ) *balanceScheduler { return &balanceScheduler{ id: id, @@ -61,14 +68,25 @@ func NewBalanceScheduler( nodeManager: appcontext.GetService[*watcher.NodeManager](watcher.NodeManagerName), checkBalanceInterval: balanceInterval, lastRebalanceTime: time.Now(), + liveness: liveness, } } func (s *balanceScheduler) Execute() time.Time { + now := time.Now() + if hasDrainingOrStoppingNode(s.liveness) { + // Pause regular balance scheduling while any node is draining/stopping. + s.drainBalanceBlockedUntil = now.Add(drainBalanceCooldown) + return now.Add(s.checkBalanceInterval) + } + if now.Before(s.drainBalanceBlockedUntil) { + // Keep a cooldown window after all draining/stopping nodes are gone + // to avoid immediate rebalance churn. + return now.Add(s.checkBalanceInterval) + } if !s.forceBalance && time.Since(s.lastRebalanceTime) < s.checkBalanceInterval { return s.lastRebalanceTime.Add(s.checkBalanceInterval) } - now := time.Now() if s.operatorController.OperatorSize() > 0 || s.changefeedDB.GetAbsentSize() > 0 { // not in stable schedule state, skip balance @@ -76,13 +94,15 @@ func (s *balanceScheduler) Execute() time.Time { } // check the balance status - moveSize := pkgScheduler.CheckBalanceStatus(s.changefeedDB.GetTaskSizePerNode(), s.nodeManager.GetAliveNodes()) + activeNodes := s.nodeManager.GetAliveNodes() + activeNodes = filterSchedulableAliveNodes(activeNodes, s.liveness) + moveSize := pkgScheduler.CheckBalanceStatus(s.changefeedDB.GetTaskSizePerNode(), activeNodes) if moveSize <= 0 { // fast check the balance status, no need to do the balance,skip return now.Add(s.checkBalanceInterval) } // balance changefeeds among the active nodes - movedSize := pkgScheduler.Balance(s.batchSize, s.random, s.nodeManager.GetAliveNodes(), s.changefeedDB.GetReplicating(), + movedSize := pkgScheduler.Balance(s.batchSize, s.random, activeNodes, s.changefeedDB.GetReplicating(), func(cf *changefeed.Changefeed, nodeID node.ID) bool { return s.operatorController.AddOperator(operator.NewMoveMaintainerOperator(s.changefeedDB, cf, cf.GetNodeID(), nodeID)) }) diff --git a/coordinator/scheduler/balance_test.go b/coordinator/scheduler/balance_test.go new file mode 100644 index 0000000000..93056f0314 --- /dev/null +++ b/coordinator/scheduler/balance_test.go @@ -0,0 +1,176 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package scheduler + +import ( + "fmt" + "testing" + "time" + + "github.com/pingcap/ticdc/coordinator/changefeed" + "github.com/pingcap/ticdc/coordinator/drain" + "github.com/pingcap/ticdc/coordinator/operator" + "github.com/pingcap/ticdc/heartbeatpb" + "github.com/pingcap/ticdc/pkg/common" + appcontext "github.com/pingcap/ticdc/pkg/common/context" + "github.com/pingcap/ticdc/pkg/config" + "github.com/pingcap/ticdc/pkg/messaging" + "github.com/pingcap/ticdc/pkg/node" + "github.com/pingcap/ticdc/server/watcher" + "github.com/stretchr/testify/require" +) + +func TestBalanceSchedulerCreatesMoveOperators(t *testing.T) { + setupCoordinatorSchedulerTestServices() + mc := appcontext.GetService[messaging.MessageCenter](appcontext.MessageCenter) + nodeManager := appcontext.GetService[*watcher.NodeManager](watcher.NodeManagerName) + + nodeA := node.ID("node-a") + nodeB := node.ID("node-b") + nodeManager.GetAliveNodes()[nodeA] = &node.Info{ID: nodeA} + nodeManager.GetAliveNodes()[nodeB] = &node.Info{ID: nodeB} + + drainController := drain.NewController(mc) + db := changefeed.NewChangefeedDB(1) + addReplicatingMaintainer(t, db, "cf-a-1", nodeA) + addReplicatingMaintainer(t, db, "cf-a-2", nodeA) + + selfNode := &node.Info{ID: node.ID("coordinator")} + oc := operator.NewOperatorController(selfNode, db, nil, 10) + s := NewBalanceScheduler("test", 10, oc, db, 0, drainController) + _ = s.Execute() + + require.Equal(t, 1, oc.OperatorSize()) +} + +func TestBalanceSchedulerSkipsWhenDrainActive(t *testing.T) { + setupCoordinatorSchedulerTestServices() + mc := appcontext.GetService[messaging.MessageCenter](appcontext.MessageCenter) + nodeManager := appcontext.GetService[*watcher.NodeManager](watcher.NodeManagerName) + + drainingNode := node.ID("draining") + nodeA := node.ID("node-a") + nodeB := node.ID("node-b") + nodeManager.GetAliveNodes()[drainingNode] = &node.Info{ID: drainingNode} + nodeManager.GetAliveNodes()[nodeA] = &node.Info{ID: nodeA} + nodeManager.GetAliveNodes()[nodeB] = &node.Info{ID: nodeB} + + drainController := drain.NewController(mc) + drainController.ObserveHeartbeat(drainingNode, &heartbeatpb.NodeHeartbeat{ + Liveness: heartbeatpb.NodeLiveness_DRAINING, + NodeEpoch: 1, + }) + + db := changefeed.NewChangefeedDB(1) + addReplicatingMaintainer(t, db, "cf-drain", drainingNode) + addReplicatingMaintainer(t, db, "cf-a-1", nodeA) + addReplicatingMaintainer(t, db, "cf-a-2", nodeA) + addReplicatingMaintainer(t, db, "cf-a-3", nodeA) + addReplicatingMaintainer(t, db, "cf-a-4", nodeA) + + selfNode := &node.Info{ID: node.ID("coordinator")} + oc := operator.NewOperatorController(selfNode, db, nil, 10) + s := NewBalanceScheduler("test", 10, oc, db, 0, drainController) + _ = s.Execute() + + require.Equal(t, 0, oc.OperatorSize()) +} + +func TestBalanceSchedulerSkipsUntilAllDrainCompleteAndCooldownExpires(t *testing.T) { + setupCoordinatorSchedulerTestServices() + mc := appcontext.GetService[messaging.MessageCenter](appcontext.MessageCenter) + nodeManager := appcontext.GetService[*watcher.NodeManager](watcher.NodeManagerName) + + drainingA := node.ID("draining-a") + drainingB := node.ID("draining-b") + nodeA := node.ID("node-a") + nodeB := node.ID("node-b") + nodeManager.GetAliveNodes()[drainingA] = &node.Info{ID: drainingA} + nodeManager.GetAliveNodes()[drainingB] = &node.Info{ID: drainingB} + nodeManager.GetAliveNodes()[nodeA] = &node.Info{ID: nodeA} + nodeManager.GetAliveNodes()[nodeB] = &node.Info{ID: nodeB} + + drainController := drain.NewController(mc) + drainController.ObserveHeartbeat(drainingA, &heartbeatpb.NodeHeartbeat{ + Liveness: heartbeatpb.NodeLiveness_DRAINING, + NodeEpoch: 1, + }) + drainController.ObserveHeartbeat(drainingB, &heartbeatpb.NodeHeartbeat{ + Liveness: heartbeatpb.NodeLiveness_DRAINING, + NodeEpoch: 1, + }) + + db := changefeed.NewChangefeedDB(1) + addReplicatingMaintainer(t, db, "cf-a-1", nodeA) + addReplicatingMaintainer(t, db, "cf-a-2", nodeA) + addReplicatingMaintainer(t, db, "cf-a-3", nodeA) + addReplicatingMaintainer(t, db, "cf-a-4", nodeA) + + selfNode := &node.Info{ID: node.ID("coordinator")} + oc := operator.NewOperatorController(selfNode, db, nil, 10) + s := NewBalanceScheduler("test", 10, oc, db, 0, drainController) + s.drainBalanceBlockedUntil = time.Time{} + + // Any draining node should block balance. + _ = s.Execute() + require.Equal(t, 0, oc.OperatorSize()) + + // One node remains draining, balance is still blocked. + drainController.ObserveHeartbeat(drainingA, &heartbeatpb.NodeHeartbeat{ + Liveness: heartbeatpb.NodeLiveness_ALIVE, + NodeEpoch: 2, + }) + _ = s.Execute() + require.Equal(t, 0, oc.OperatorSize()) + + // After all nodes leave draining, cooldown still blocks balance. + drainController.ObserveHeartbeat(drainingB, &heartbeatpb.NodeHeartbeat{ + Liveness: heartbeatpb.NodeLiveness_ALIVE, + NodeEpoch: 2, + }) + s.drainBalanceBlockedUntil = time.Now().Add(100 * time.Millisecond) + _ = s.Execute() + require.Equal(t, 0, oc.OperatorSize()) + + s.drainBalanceBlockedUntil = time.Now().Add(-time.Millisecond) + _ = s.Execute() + require.Greater(t, oc.OperatorSize(), 0) +} + +func setupCoordinatorSchedulerTestServices() { + mc := messaging.NewMockMessageCenter() + appcontext.SetService(appcontext.MessageCenter, mc) + nodeManager := watcher.NewNodeManager(nil, nil) + appcontext.SetService(watcher.NodeManagerName, nodeManager) +} + +func addReplicatingMaintainer( + t *testing.T, + db *changefeed.ChangefeedDB, + name string, + nodeID node.ID, +) common.ChangeFeedID { + t.Helper() + + cfID := common.NewChangeFeedIDWithName(name, common.DefaultKeyspaceName) + info := &config.ChangeFeedInfo{ + ChangefeedID: cfID, + SinkURI: fmt.Sprintf("blackhole://%s", name), + Config: config.GetDefaultReplicaConfig(), + State: config.StateNormal, + } + cf := changefeed.NewChangefeed(cfID, info, 1, false) + db.AddReplicatingMaintainer(cf, nodeID) + return cfID +} diff --git a/coordinator/scheduler/basic.go b/coordinator/scheduler/basic.go index 301a75f86d..90a828a132 100644 --- a/coordinator/scheduler/basic.go +++ b/coordinator/scheduler/basic.go @@ -17,6 +17,7 @@ import ( "time" "github.com/pingcap/ticdc/coordinator/changefeed" + "github.com/pingcap/ticdc/coordinator/drain" "github.com/pingcap/ticdc/coordinator/operator" appcontext "github.com/pingcap/ticdc/pkg/common/context" "github.com/pingcap/ticdc/pkg/node" @@ -35,12 +36,14 @@ type basicScheduler struct { operatorController *operator.Controller changefeedDB *changefeed.ChangefeedDB nodeManager *watcher.NodeManager + liveness *drain.Controller } func NewBasicScheduler( id string, batchSize int, oc *operator.Controller, changefeedDB *changefeed.ChangefeedDB, + liveness *drain.Controller, ) *basicScheduler { return &basicScheduler{ id: id, @@ -48,6 +51,7 @@ func NewBasicScheduler( operatorController: oc, changefeedDB: changefeedDB, nodeManager: appcontext.GetService[*watcher.NodeManager](watcher.NodeManagerName), + liveness: liveness, } } @@ -74,7 +78,7 @@ func (s *basicScheduler) doBasicSchedule(availableSize int) { absentChangefeeds := s.changefeedDB.GetAbsentByGroup(id, availableSize) nodeTaskSize := s.changefeedDB.GetTaskSizePerNodeByGroup(id) // add the absent node to the node size map - nodeIDs := s.nodeManager.GetAliveNodeIDs() + nodeIDs := filterSchedulableNodeIDs(s.nodeManager.GetAliveNodeIDs(), s.liveness) nodeSize := make(map[node.ID]int) for _, id := range nodeIDs { nodeSize[id] = nodeTaskSize[id] diff --git a/coordinator/scheduler/drain.go b/coordinator/scheduler/drain.go new file mode 100644 index 0000000000..7ddad026a0 --- /dev/null +++ b/coordinator/scheduler/drain.go @@ -0,0 +1,177 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. +package scheduler + +import ( + "math" + "slices" + "time" + + "github.com/pingcap/log" + "github.com/pingcap/ticdc/coordinator/changefeed" + "github.com/pingcap/ticdc/coordinator/drain" + "github.com/pingcap/ticdc/coordinator/operator" + appcontext "github.com/pingcap/ticdc/pkg/common/context" + "github.com/pingcap/ticdc/pkg/node" + "github.com/pingcap/ticdc/server/watcher" + "go.uber.org/zap" +) + +// drainScheduler generates move operators to move maintainers out of draining nodes. +// It skips changefeeds that already have in-flight operators. +type drainScheduler struct { + id string + batchSize int + + operatorController *operator.Controller + changefeedDB *changefeed.ChangefeedDB + nodeManager *watcher.NodeManager + liveness *drain.Controller + + // rrCursor rotates the starting draining node to avoid starving nodes later in the list. + rrCursor int +} + +// NewDrainScheduler creates a scheduler that migrates maintainers away from draining nodes. +func NewDrainScheduler( + id string, + batchSize int, + oc *operator.Controller, + changefeedDB *changefeed.ChangefeedDB, + liveness *drain.Controller, +) *drainScheduler { + return &drainScheduler{ + id: id, + batchSize: batchSize, + operatorController: oc, + changefeedDB: changefeedDB, + nodeManager: appcontext.GetService[*watcher.NodeManager](watcher.NodeManagerName), + liveness: liveness, + } +} + +// Execute schedules move operators from draining nodes to schedulable destination nodes. +// It limits scheduling by available operator slots and returns the next run time. +func (s *drainScheduler) Execute() time.Time { + availableSize := s.batchSize - s.operatorController.OperatorSize() + if availableSize <= 0 { + return time.Now().Add(time.Millisecond * 200) + } + + if s.liveness == nil { + return time.Now().Add(time.Second) + } + + now := time.Now() + drainingNodes := s.liveness.GetDrainingOrStoppingNodes() + if len(drainingNodes) == 0 { + return now.Add(time.Second) + } + slices.Sort(drainingNodes) + + destCandidates := filterSchedulableNodeIDs(s.nodeManager.GetAliveNodeIDs(), s.liveness) + + if len(destCandidates) == 0 { + log.Info("no alive destination node for drain", + zap.String("schedulerID", s.id), + zap.Int("drainingNodeCount", len(drainingNodes))) + return now.Add(time.Second) + } + + nodeTaskSize := s.changefeedDB.GetTaskSizePerNode() + scheduled := 0 + + if s.rrCursor >= len(drainingNodes) { + s.rrCursor = 0 + } + + for scheduled < availableSize { + progress := false + for i := 0; i < len(drainingNodes) && scheduled < availableSize; i++ { + origin := drainingNodes[(s.rrCursor+i)%len(drainingNodes)] + if s.scheduleOneFromNode(origin, destCandidates, nodeTaskSize) { + scheduled++ + progress = true + } + } + s.rrCursor = (s.rrCursor + 1) % len(drainingNodes) + if !progress { + break + } + } + + if scheduled > 0 { + log.Info("drain scheduler created move operators", + zap.Int("scheduled", scheduled), + zap.Int("drainingNodeCount", len(drainingNodes))) + } + + return now.Add(time.Millisecond * 200) +} + +// scheduleOneFromNode tries to schedule one maintainer move from origin. +// It skips changefeeds that already have in-flight operators. +func (s *drainScheduler) scheduleOneFromNode( + origin node.ID, + destCandidates []node.ID, + nodeTaskSize map[node.ID]int, +) bool { + maintainers := s.changefeedDB.GetByNodeID(origin) + if len(maintainers) == 0 { + return false + } + + for _, cf := range maintainers { + if s.operatorController.HasOperator(cf.ID.DisplayName) { + continue + } + dest, ok := chooseLeastLoadedDest(origin, destCandidates, nodeTaskSize) + if !ok { + return false + } + if s.operatorController.AddOperator(operator.NewMoveMaintainerOperator(s.changefeedDB, cf, origin, dest)) { + nodeTaskSize[dest]++ + return true + } + } + return false +} + +// chooseLeastLoadedDest selects the destination with the smallest task count, excluding origin. +func chooseLeastLoadedDest( + origin node.ID, + destCandidates []node.ID, + nodeTaskSize map[node.ID]int, +) (node.ID, bool) { + minSize := math.MaxInt + var chosen node.ID + for _, id := range destCandidates { + if id == origin { + continue + } + size := nodeTaskSize[id] + if size < minSize { + minSize = size + chosen = id + } + } + if chosen.IsEmpty() { + return "", false + } + return chosen, true +} + +// Name returns the scheduler name used by scheduler controller and logs. +func (s *drainScheduler) Name() string { + return "drain-scheduler" +} diff --git a/coordinator/scheduler/drain_liveness.go b/coordinator/scheduler/drain_liveness.go new file mode 100644 index 0000000000..950fcdbcea --- /dev/null +++ b/coordinator/scheduler/drain_liveness.go @@ -0,0 +1,58 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. +package scheduler + +import ( + "github.com/pingcap/ticdc/coordinator/drain" + "github.com/pingcap/ticdc/pkg/node" +) + +// filterSchedulableNodeIDs removes nodes that are draining, stopping, or stale +// so drain scheduling never moves work onto a node that should no longer accept +// new maintainers. +func filterSchedulableNodeIDs(nodeIDs []node.ID, liveness *drain.Controller) []node.ID { + if liveness == nil { + return nodeIDs + } + filtered := nodeIDs[:0] + for _, nodeID := range nodeIDs { + if liveness.IsSchedulableDest(nodeID) { + filtered = append(filtered, nodeID) + } + } + return filtered +} + +// hasDrainingOrStoppingNode reports whether drain-aware schedulers should keep +// considering node evacuation in the current tick. +func hasDrainingOrStoppingNode(liveness *drain.Controller) bool { + return liveness != nil && len(liveness.GetDrainingOrStoppingNodes()) > 0 +} + +// filterSchedulableAliveNodes applies the same destination constraint to the +// alive-node map used by balance schedulers. +func filterSchedulableAliveNodes( + nodes map[node.ID]*node.Info, + liveness *drain.Controller, +) map[node.ID]*node.Info { + if liveness == nil { + return nodes + } + filtered := make(map[node.ID]*node.Info, len(nodes)) + for id, info := range nodes { + if liveness.IsSchedulableDest(id) { + filtered[id] = info + } + } + return filtered +} diff --git a/coordinator/scheduler/drain_test.go b/coordinator/scheduler/drain_test.go new file mode 100644 index 0000000000..17aa8d3b34 --- /dev/null +++ b/coordinator/scheduler/drain_test.go @@ -0,0 +1,136 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. +package scheduler + +import ( + "testing" + + "github.com/pingcap/ticdc/coordinator/changefeed" + "github.com/pingcap/ticdc/coordinator/drain" + "github.com/pingcap/ticdc/coordinator/operator" + "github.com/pingcap/ticdc/heartbeatpb" + "github.com/pingcap/ticdc/pkg/common" + appcontext "github.com/pingcap/ticdc/pkg/common/context" + "github.com/pingcap/ticdc/pkg/config" + "github.com/pingcap/ticdc/pkg/messaging" + "github.com/pingcap/ticdc/pkg/node" + "github.com/pingcap/ticdc/server/watcher" + "github.com/stretchr/testify/require" +) + +func TestDrainSchedulerCreatesMoveOperators(t *testing.T) { + mc := messaging.NewMockMessageCenter() + appcontext.SetService(appcontext.MessageCenter, mc) + + nodeManager := watcher.NewNodeManager(nil, nil) + appcontext.SetService(watcher.NodeManagerName, nodeManager) + + origin := node.ID("origin") + destHot := node.ID("dest-hot") + destCold := node.ID("dest-cold") + nodeManager.GetAliveNodes()[origin] = &node.Info{ID: origin} + nodeManager.GetAliveNodes()[destHot] = &node.Info{ID: destHot} + nodeManager.GetAliveNodes()[destCold] = &node.Info{ID: destCold} + + drainController := drain.NewController(mc) + drainController.ObserveHeartbeat(origin, &heartbeatpb.NodeHeartbeat{ + Liveness: heartbeatpb.NodeLiveness_DRAINING, + NodeEpoch: 1, + }) + + db := changefeed.NewChangefeedDB(1) + cfID := common.NewChangeFeedIDWithName("cf1", common.DefaultKeyspaceName) + info := &config.ChangeFeedInfo{ + ChangefeedID: cfID, + SinkURI: "blackhole://", + Config: config.GetDefaultReplicaConfig(), + State: config.StateNormal, + } + cf := changefeed.NewChangefeed(cfID, info, 1, false) + db.AddReplicatingMaintainer(cf, origin) + + // Create load on one destination to ensure the scheduler deterministically chooses the least loaded node. + loadInfo := &config.ChangeFeedInfo{ + SinkURI: "blackhole://load", + Config: config.GetDefaultReplicaConfig(), + State: config.StateNormal, + } + loadInfo1 := *loadInfo + loadInfo1.ChangefeedID = common.NewChangeFeedIDWithName("cf-load-1", common.DefaultKeyspaceName) + db.AddReplicatingMaintainer(changefeed.NewChangefeed(loadInfo1.ChangefeedID, &loadInfo1, 1, false), destHot) + loadInfo2 := *loadInfo + loadInfo2.ChangefeedID = common.NewChangeFeedIDWithName("cf-load-2", common.DefaultKeyspaceName) + db.AddReplicatingMaintainer(changefeed.NewChangefeed(loadInfo2.ChangefeedID, &loadInfo2, 1, false), destHot) + + selfNode := &node.Info{ID: node.ID("coordinator")} + oc := operator.NewOperatorController(selfNode, db, nil, 10) + + s := NewDrainScheduler("test", 10, oc, db, drainController) + _ = s.Execute() + + require.Equal(t, 1, oc.OperatorSize()) + op := oc.GetOperator(cfID) + require.NotNil(t, op) + require.ElementsMatch(t, []node.ID{origin, destCold}, op.AffectedNodes()) +} + +func TestDrainSchedulerSkipsChangefeedWithInflightOperator(t *testing.T) { + mc := messaging.NewMockMessageCenter() + appcontext.SetService(appcontext.MessageCenter, mc) + + nodeManager := watcher.NewNodeManager(nil, nil) + appcontext.SetService(watcher.NodeManagerName, nodeManager) + + origin := node.ID("origin") + dest := node.ID("dest") + nodeManager.GetAliveNodes()[origin] = &node.Info{ID: origin} + nodeManager.GetAliveNodes()[dest] = &node.Info{ID: dest} + + drainController := drain.NewController(mc) + drainController.ObserveHeartbeat(origin, &heartbeatpb.NodeHeartbeat{ + Liveness: heartbeatpb.NodeLiveness_DRAINING, + NodeEpoch: 1, + }) + + db := changefeed.NewChangefeedDB(1) + cfID1 := common.NewChangeFeedIDWithName("cf1", common.DefaultKeyspaceName) + cfID2 := common.NewChangeFeedIDWithName("cf2", common.DefaultKeyspaceName) + + info := &config.ChangeFeedInfo{ + SinkURI: "blackhole://", + Config: config.GetDefaultReplicaConfig(), + State: config.StateNormal, + } + + info1 := *info + info1.ChangefeedID = cfID1 + cf1 := changefeed.NewChangefeed(cfID1, &info1, 1, false) + db.AddReplicatingMaintainer(cf1, origin) + + info2 := *info + info2.ChangefeedID = cfID2 + cf2 := changefeed.NewChangefeed(cfID2, &info2, 1, false) + db.AddReplicatingMaintainer(cf2, origin) + + selfNode := &node.Info{ID: node.ID("coordinator")} + oc := operator.NewOperatorController(selfNode, db, nil, 10) + + require.True(t, oc.AddOperator(operator.NewMoveMaintainerOperator(db, cf1, origin, dest))) + require.Equal(t, 1, oc.OperatorSize()) + + s := NewDrainScheduler("test", 2, oc, db, drainController) + _ = s.Execute() + + require.Equal(t, 2, oc.OperatorSize()) + require.NotNil(t, oc.GetOperator(cfID2)) +} From 13176fb2966f87f36bad33362097ec4ec6c28e44 Mon Sep 17 00:00:00 2001 From: hongyunyan <649330952@qq.com> Date: Wed, 8 Apr 2026 08:23:02 +0800 Subject: [PATCH 2/6] coordinator/scheduler: tighten drain scheduling loops --- coordinator/scheduler/balance.go | 13 ++++++--- coordinator/scheduler/balance_test.go | 32 ++++++++++++++++++++++ coordinator/scheduler/drain.go | 39 ++++++++++++++++++--------- 3 files changed, 68 insertions(+), 16 deletions(-) diff --git a/coordinator/scheduler/balance.go b/coordinator/scheduler/balance.go index 20cf15668e..4ed24fee85 100644 --- a/coordinator/scheduler/balance.go +++ b/coordinator/scheduler/balance.go @@ -50,8 +50,6 @@ type balanceScheduler struct { drainBalanceBlockedUntil time.Time } -const drainBalanceCooldown = 120 * time.Second - func NewBalanceScheduler( id string, batchSize int, oc *operator.Controller, @@ -76,7 +74,9 @@ func (s *balanceScheduler) Execute() time.Time { now := time.Now() if hasDrainingOrStoppingNode(s.liveness) { // Pause regular balance scheduling while any node is draining/stopping. - s.drainBalanceBlockedUntil = now.Add(drainBalanceCooldown) + // Reuse the configured balance interval as the post-drain quiet period so + // operators can settle before regular rebalance resumes. + s.drainBalanceBlockedUntil = now.Add(s.drainCooldown()) return now.Add(s.checkBalanceInterval) } if now.Before(s.drainBalanceBlockedUntil) { @@ -112,6 +112,13 @@ func (s *balanceScheduler) Execute() time.Time { return now.Add(s.checkBalanceInterval) } +func (s *balanceScheduler) drainCooldown() time.Duration { + if s.checkBalanceInterval > 0 { + return s.checkBalanceInterval + } + return time.Second +} + func (s *balanceScheduler) Name() string { return "balance-scheduler" } diff --git a/coordinator/scheduler/balance_test.go b/coordinator/scheduler/balance_test.go index 93056f0314..cd185ca08c 100644 --- a/coordinator/scheduler/balance_test.go +++ b/coordinator/scheduler/balance_test.go @@ -148,6 +148,38 @@ func TestBalanceSchedulerSkipsUntilAllDrainCompleteAndCooldownExpires(t *testing require.Greater(t, oc.OperatorSize(), 0) } +func TestBalanceSchedulerUsesBalanceIntervalAsDrainCooldown(t *testing.T) { + setupCoordinatorSchedulerTestServices() + mc := appcontext.GetService[messaging.MessageCenter](appcontext.MessageCenter) + nodeManager := appcontext.GetService[*watcher.NodeManager](watcher.NodeManagerName) + + drainingNode := node.ID("draining") + nodeA := node.ID("node-a") + nodeB := node.ID("node-b") + nodeManager.GetAliveNodes()[drainingNode] = &node.Info{ID: drainingNode} + nodeManager.GetAliveNodes()[nodeA] = &node.Info{ID: nodeA} + nodeManager.GetAliveNodes()[nodeB] = &node.Info{ID: nodeB} + + drainController := drain.NewController(mc) + drainController.ObserveHeartbeat(drainingNode, &heartbeatpb.NodeHeartbeat{ + Liveness: heartbeatpb.NodeLiveness_DRAINING, + NodeEpoch: 1, + }) + + db := changefeed.NewChangefeedDB(1) + addReplicatingMaintainer(t, db, "cf-a-1", nodeA) + addReplicatingMaintainer(t, db, "cf-a-2", nodeA) + + selfNode := &node.Info{ID: node.ID("coordinator")} + oc := operator.NewOperatorController(selfNode, db, nil, 10) + interval := 200 * time.Millisecond + s := NewBalanceScheduler("test", 10, oc, db, interval, drainController) + + start := time.Now() + _ = s.Execute() + require.WithinDuration(t, start.Add(interval), s.drainBalanceBlockedUntil, 50*time.Millisecond) +} + func setupCoordinatorSchedulerTestServices() { mc := messaging.NewMockMessageCenter() appcontext.SetService(appcontext.MessageCenter, mc) diff --git a/coordinator/scheduler/drain.go b/coordinator/scheduler/drain.go index 7ddad026a0..2ead3795f3 100644 --- a/coordinator/scheduler/drain.go +++ b/coordinator/scheduler/drain.go @@ -89,6 +89,11 @@ func (s *drainScheduler) Execute() time.Time { } nodeTaskSize := s.changefeedDB.GetTaskSizePerNode() + maintainersByNode := make(map[node.ID][]*changefeed.Changefeed, len(drainingNodes)) + nextMaintainerIndex := make(map[node.ID]int, len(drainingNodes)) + for _, origin := range drainingNodes { + maintainersByNode[origin] = s.changefeedDB.GetByNodeID(origin) + } scheduled := 0 if s.rrCursor >= len(drainingNodes) { @@ -99,7 +104,15 @@ func (s *drainScheduler) Execute() time.Time { progress := false for i := 0; i < len(drainingNodes) && scheduled < availableSize; i++ { origin := drainingNodes[(s.rrCursor+i)%len(drainingNodes)] - if s.scheduleOneFromNode(origin, destCandidates, nodeTaskSize) { + nextIndex, ok := s.scheduleOneFromNode( + origin, + maintainersByNode[origin], + nextMaintainerIndex[origin], + destCandidates, + nodeTaskSize, + ) + nextMaintainerIndex[origin] = nextIndex + if ok { scheduled++ progress = true } @@ -119,32 +132,32 @@ func (s *drainScheduler) Execute() time.Time { return now.Add(time.Millisecond * 200) } -// scheduleOneFromNode tries to schedule one maintainer move from origin. -// It skips changefeeds that already have in-flight operators. +// scheduleOneFromNode tries to schedule one maintainer move from origin, +// continuing from nextIndex within the pre-fetched maintainer slice. It skips +// changefeeds that already have in-flight operators. func (s *drainScheduler) scheduleOneFromNode( origin node.ID, + maintainers []*changefeed.Changefeed, + nextIndex int, destCandidates []node.ID, nodeTaskSize map[node.ID]int, -) bool { - maintainers := s.changefeedDB.GetByNodeID(origin) - if len(maintainers) == 0 { - return false - } - - for _, cf := range maintainers { +) (int, bool) { + for nextIndex < len(maintainers) { + cf := maintainers[nextIndex] + nextIndex++ if s.operatorController.HasOperator(cf.ID.DisplayName) { continue } dest, ok := chooseLeastLoadedDest(origin, destCandidates, nodeTaskSize) if !ok { - return false + return nextIndex, false } if s.operatorController.AddOperator(operator.NewMoveMaintainerOperator(s.changefeedDB, cf, origin, dest)) { nodeTaskSize[dest]++ - return true + return nextIndex, true } } - return false + return nextIndex, false } // chooseLeastLoadedDest selects the destination with the smallest task count, excluding origin. From 2249143b77a47097c0e5627d80dde86b4608b8fc Mon Sep 17 00:00:00 2001 From: hongyunyan <649330952@qq.com> Date: Mon, 20 Apr 2026 14:13:01 +0800 Subject: [PATCH 3/6] coordinator,scheduler: avoid drain starvation --- coordinator/operator/operator_controller.go | 23 ++++++++++++ .../operator/operator_controller_test.go | 22 ++++++++++++ coordinator/operator/operator_move.go | 8 +++++ coordinator/scheduler/drain.go | 11 +++--- coordinator/scheduler/drain_test.go | 36 +++++++++++++++++++ 5 files changed, 94 insertions(+), 6 deletions(-) diff --git a/coordinator/operator/operator_controller.go b/coordinator/operator/operator_controller.go index d6fd7a3095..2b76cc3bc9 100644 --- a/coordinator/operator/operator_controller.go +++ b/coordinator/operator/operator_controller.go @@ -232,6 +232,29 @@ func (oc *Controller) OperatorSize() int { return len(oc.operators) } +// CountMoveMaintainerOperatorsFromNodes returns the number of in-flight move +// operators whose origin is one of the given nodes. +func (oc *Controller) CountMoveMaintainerOperatorsFromNodes(origins []node.ID) int { + oc.mu.RLock() + defer oc.mu.RUnlock() + + count := 0 + for _, op := range oc.operators { + moveOp, ok := op.OP.(*MoveMaintainerOperator) + if !ok { + continue + } + origin := moveOp.OriginNode() + for _, candidate := range origins { + if origin == candidate { + count++ + break + } + } + } + return count +} + // HasOperatorInvolvingNode returns true if any in-flight operator affects n. func (oc *Controller) HasOperatorInvolvingNode(n node.ID) bool { oc.mu.RLock() diff --git a/coordinator/operator/operator_controller_test.go b/coordinator/operator/operator_controller_test.go index 5afc45a924..c86e8358a1 100644 --- a/coordinator/operator/operator_controller_test.go +++ b/coordinator/operator/operator_controller_test.go @@ -127,6 +127,28 @@ func TestController_HasOperatorInvolvingNode(t *testing.T) { require.False(t, oc.HasOperatorInvolvingNode("n3")) } +func TestController_CountMoveMaintainerOperatorsFromNodes(t *testing.T) { + changefeedDB := changefeed.NewChangefeedDB(1216) + ctrl := gomock.NewController(t) + backend := mock_changefeed.NewMockBackend(ctrl) + oc, self, nodeManager := newOperatorControllerForTest(t, changefeedDB, backend) + dest := node.NewInfo("localhost:8301", "") + nodeManager.GetAliveNodes()[dest.ID] = dest + + cfID := common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceName) + cf := changefeed.NewChangefeed(cfID, &config.ChangeFeedInfo{ + ChangefeedID: cfID, + Config: config.GetDefaultReplicaConfig(), + SinkURI: "mysql://127.0.0.1:3306", + }, 1, true) + changefeedDB.AddReplicatingMaintainer(cf, self.ID) + + require.True(t, oc.AddOperator(NewMoveMaintainerOperator(changefeedDB, cf, self.ID, dest.ID))) + + require.Equal(t, 1, oc.CountMoveMaintainerOperatorsFromNodes([]node.ID{self.ID})) + require.Equal(t, 0, oc.CountMoveMaintainerOperatorsFromNodes([]node.ID{"n3"})) +} + func TestController_StopChangefeedDuringAddOperator(t *testing.T) { // Setup test environment changefeedDB := changefeed.NewChangefeedDB(1216) diff --git a/coordinator/operator/operator_move.go b/coordinator/operator/operator_move.go index 560b1a1088..9c15c936af 100644 --- a/coordinator/operator/operator_move.go +++ b/coordinator/operator/operator_move.go @@ -144,6 +144,14 @@ func (m *MoveMaintainerOperator) AffectedNodes() []node.ID { return []node.ID{m.origin, m.dest} } +// OriginNode returns the source node of the move. +func (m *MoveMaintainerOperator) OriginNode() node.ID { + m.lck.Lock() + defer m.lck.Unlock() + + return m.origin +} + func (m *MoveMaintainerOperator) ID() common.ChangeFeedID { return m.changefeed.ID } diff --git a/coordinator/scheduler/drain.go b/coordinator/scheduler/drain.go index 2ead3795f3..715588d1cd 100644 --- a/coordinator/scheduler/drain.go +++ b/coordinator/scheduler/drain.go @@ -61,13 +61,8 @@ func NewDrainScheduler( } // Execute schedules move operators from draining nodes to schedulable destination nodes. -// It limits scheduling by available operator slots and returns the next run time. +// It limits drain scheduling by drain move slots and returns the next run time. func (s *drainScheduler) Execute() time.Time { - availableSize := s.batchSize - s.operatorController.OperatorSize() - if availableSize <= 0 { - return time.Now().Add(time.Millisecond * 200) - } - if s.liveness == nil { return time.Now().Add(time.Second) } @@ -78,6 +73,10 @@ func (s *drainScheduler) Execute() time.Time { return now.Add(time.Second) } slices.Sort(drainingNodes) + availableSize := s.batchSize - s.operatorController.CountMoveMaintainerOperatorsFromNodes(drainingNodes) + if availableSize <= 0 { + return now.Add(time.Millisecond * 200) + } destCandidates := filterSchedulableNodeIDs(s.nodeManager.GetAliveNodeIDs(), s.liveness) diff --git a/coordinator/scheduler/drain_test.go b/coordinator/scheduler/drain_test.go index 17aa8d3b34..2f6cfcc2f8 100644 --- a/coordinator/scheduler/drain_test.go +++ b/coordinator/scheduler/drain_test.go @@ -134,3 +134,39 @@ func TestDrainSchedulerSkipsChangefeedWithInflightOperator(t *testing.T) { require.Equal(t, 2, oc.OperatorSize()) require.NotNil(t, oc.GetOperator(cfID2)) } + +func TestDrainSchedulerIgnoresUnrelatedOperatorCapacity(t *testing.T) { + mc := messaging.NewMockMessageCenter() + appcontext.SetService(appcontext.MessageCenter, mc) + + nodeManager := watcher.NewNodeManager(nil, nil) + appcontext.SetService(watcher.NodeManagerName, nodeManager) + + origin := node.ID("origin") + other := node.ID("other") + dest := node.ID("dest") + nodeManager.GetAliveNodes()[origin] = &node.Info{ID: origin} + nodeManager.GetAliveNodes()[other] = &node.Info{ID: other} + nodeManager.GetAliveNodes()[dest] = &node.Info{ID: dest} + + drainController := drain.NewController(mc) + drainController.ObserveHeartbeat(origin, &heartbeatpb.NodeHeartbeat{ + Liveness: heartbeatpb.NodeLiveness_DRAINING, + NodeEpoch: 1, + }) + + db := changefeed.NewChangefeedDB(1) + cfID := addReplicatingMaintainer(t, db, "cf-drain", origin) + otherID := addReplicatingMaintainer(t, db, "cf-other", other) + otherCF := db.GetByID(otherID) + + selfNode := &node.Info{ID: node.ID("coordinator")} + oc := operator.NewOperatorController(selfNode, db, nil, 10) + require.True(t, oc.AddOperator(operator.NewMoveMaintainerOperator(db, otherCF, other, dest))) + + s := NewDrainScheduler("test", 1, oc, db, drainController) + _ = s.Execute() + + require.Equal(t, 2, oc.OperatorSize()) + require.NotNil(t, oc.GetOperator(cfID)) +} From 3e139a5b5b335960b78b17c7853f3e1ebdd5db5d Mon Sep 17 00:00:00 2001 From: hongyunyan <649330952@qq.com> Date: Tue, 21 Apr 2026 17:33:44 +0800 Subject: [PATCH 4/6] coordinator,scheduler: tighten drain scheduling readability --- coordinator/operator/operator_controller.go | 16 ++++--- coordinator/scheduler/drain.go | 2 + coordinator/scheduler/drain_test.go | 47 +++++++++++++++++++++ 3 files changed, 59 insertions(+), 6 deletions(-) diff --git a/coordinator/operator/operator_controller.go b/coordinator/operator/operator_controller.go index 2b76cc3bc9..2b168b09c9 100644 --- a/coordinator/operator/operator_controller.go +++ b/coordinator/operator/operator_controller.go @@ -238,18 +238,22 @@ func (oc *Controller) CountMoveMaintainerOperatorsFromNodes(origins []node.ID) i oc.mu.RLock() defer oc.mu.RUnlock() + if len(origins) == 0 { + return 0 + } + originSet := make(map[node.ID]struct{}, len(origins)) + for _, origin := range origins { + originSet[origin] = struct{}{} + } + count := 0 for _, op := range oc.operators { moveOp, ok := op.OP.(*MoveMaintainerOperator) if !ok { continue } - origin := moveOp.OriginNode() - for _, candidate := range origins { - if origin == candidate { - count++ - break - } + if _, ok := originSet[moveOp.OriginNode()]; ok { + count++ } } return count diff --git a/coordinator/scheduler/drain.go b/coordinator/scheduler/drain.go index 715588d1cd..5345c0a301 100644 --- a/coordinator/scheduler/drain.go +++ b/coordinator/scheduler/drain.go @@ -73,6 +73,8 @@ func (s *drainScheduler) Execute() time.Time { return now.Add(time.Second) } slices.Sort(drainingNodes) + // Drain moves get their own slot budget so unrelated operators do not stall + // evacuation progress while a node is draining. availableSize := s.batchSize - s.operatorController.CountMoveMaintainerOperatorsFromNodes(drainingNodes) if availableSize <= 0 { return now.Add(time.Millisecond * 200) diff --git a/coordinator/scheduler/drain_test.go b/coordinator/scheduler/drain_test.go index 2f6cfcc2f8..fa20f54509 100644 --- a/coordinator/scheduler/drain_test.go +++ b/coordinator/scheduler/drain_test.go @@ -170,3 +170,50 @@ func TestDrainSchedulerIgnoresUnrelatedOperatorCapacity(t *testing.T) { require.Equal(t, 2, oc.OperatorSize()) require.NotNil(t, oc.GetOperator(cfID)) } + +func TestDrainSchedulerRotatesAcrossDrainingNodes(t *testing.T) { + mc := messaging.NewMockMessageCenter() + appcontext.SetService(appcontext.MessageCenter, mc) + + nodeManager := watcher.NewNodeManager(nil, nil) + appcontext.SetService(watcher.NodeManagerName, nodeManager) + + originA := node.ID("origin-a") + originB := node.ID("origin-b") + dest := node.ID("dest") + nodeManager.GetAliveNodes()[originA] = &node.Info{ID: originA} + nodeManager.GetAliveNodes()[originB] = &node.Info{ID: originB} + nodeManager.GetAliveNodes()[dest] = &node.Info{ID: dest} + + drainController := drain.NewController(mc) + drainController.ObserveHeartbeat(originA, &heartbeatpb.NodeHeartbeat{ + Liveness: heartbeatpb.NodeLiveness_DRAINING, + NodeEpoch: 1, + }) + drainController.ObserveHeartbeat(originB, &heartbeatpb.NodeHeartbeat{ + Liveness: heartbeatpb.NodeLiveness_DRAINING, + NodeEpoch: 1, + }) + + db := changefeed.NewChangefeedDB(1) + cfA := addReplicatingMaintainer(t, db, "cf-a", originA) + cfB := addReplicatingMaintainer(t, db, "cf-b", originB) + + selfNode := &node.Info{ID: node.ID("coordinator")} + oc := operator.NewOperatorController(selfNode, db, nil, 10) + s := NewDrainScheduler("test", 1, oc, db, drainController) + + _ = s.Execute() + + first := oc.GetOperator(cfA) + require.NotNil(t, first) + require.Nil(t, oc.GetOperator(cfB)) + + first.OnTaskRemoved() + _ = oc.Execute() + require.Equal(t, 0, oc.OperatorSize()) + + _ = s.Execute() + + require.NotNil(t, oc.GetOperator(cfB)) +} From 6b7550cc95092f984b375b5867d9f5038b525bd4 Mon Sep 17 00:00:00 2001 From: hongyunyan <649330952@qq.com> Date: Wed, 22 Apr 2026 09:59:16 +0800 Subject: [PATCH 5/6] coordinator,scheduler: clarify drain block window semantics --- coordinator/scheduler/balance.go | 10 +++++----- coordinator/scheduler/balance_test.go | 10 +++++----- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/coordinator/scheduler/balance.go b/coordinator/scheduler/balance.go index 4ed24fee85..d7d57fc66c 100644 --- a/coordinator/scheduler/balance.go +++ b/coordinator/scheduler/balance.go @@ -73,15 +73,15 @@ func NewBalanceScheduler( func (s *balanceScheduler) Execute() time.Time { now := time.Now() if hasDrainingOrStoppingNode(s.liveness) { - // Pause regular balance scheduling while any node is draining/stopping. - // Reuse the configured balance interval as the post-drain quiet period so - // operators can settle before regular rebalance resumes. + // Pause regular balance scheduling while any node is observed draining or + // stopping. Each observation extends the block window by one balance + // interval so regular rebalance does not race with evacuation progress. s.drainBalanceBlockedUntil = now.Add(s.drainCooldown()) return now.Add(s.checkBalanceInterval) } if now.Before(s.drainBalanceBlockedUntil) { - // Keep a cooldown window after all draining/stopping nodes are gone - // to avoid immediate rebalance churn. + // If drain disappears before the previously extended block window expires, + // keep skipping regular rebalance until that window elapses. return now.Add(s.checkBalanceInterval) } if !s.forceBalance && time.Since(s.lastRebalanceTime) < s.checkBalanceInterval { diff --git a/coordinator/scheduler/balance_test.go b/coordinator/scheduler/balance_test.go index cd185ca08c..2c386afb61 100644 --- a/coordinator/scheduler/balance_test.go +++ b/coordinator/scheduler/balance_test.go @@ -87,7 +87,7 @@ func TestBalanceSchedulerSkipsWhenDrainActive(t *testing.T) { require.Equal(t, 0, oc.OperatorSize()) } -func TestBalanceSchedulerSkipsUntilAllDrainCompleteAndCooldownExpires(t *testing.T) { +func TestBalanceSchedulerSkipsUntilObservedDrainBlockWindowExpires(t *testing.T) { setupCoordinatorSchedulerTestServices() mc := appcontext.GetService[messaging.MessageCenter](appcontext.MessageCenter) nodeManager := appcontext.GetService[*watcher.NodeManager](watcher.NodeManagerName) @@ -122,11 +122,11 @@ func TestBalanceSchedulerSkipsUntilAllDrainCompleteAndCooldownExpires(t *testing s := NewBalanceScheduler("test", 10, oc, db, 0, drainController) s.drainBalanceBlockedUntil = time.Time{} - // Any draining node should block balance. + // Any observed draining node should block balance. _ = s.Execute() require.Equal(t, 0, oc.OperatorSize()) - // One node remains draining, balance is still blocked. + // One node remains draining, so another observation keeps extending the block window. drainController.ObserveHeartbeat(drainingA, &heartbeatpb.NodeHeartbeat{ Liveness: heartbeatpb.NodeLiveness_ALIVE, NodeEpoch: 2, @@ -134,7 +134,7 @@ func TestBalanceSchedulerSkipsUntilAllDrainCompleteAndCooldownExpires(t *testing _ = s.Execute() require.Equal(t, 0, oc.OperatorSize()) - // After all nodes leave draining, cooldown still blocks balance. + // After drain disappears, the previously extended block window still blocks balance. drainController.ObserveHeartbeat(drainingB, &heartbeatpb.NodeHeartbeat{ Liveness: heartbeatpb.NodeLiveness_ALIVE, NodeEpoch: 2, @@ -148,7 +148,7 @@ func TestBalanceSchedulerSkipsUntilAllDrainCompleteAndCooldownExpires(t *testing require.Greater(t, oc.OperatorSize(), 0) } -func TestBalanceSchedulerUsesBalanceIntervalAsDrainCooldown(t *testing.T) { +func TestBalanceSchedulerUsesBalanceIntervalAsDrainBlockWindow(t *testing.T) { setupCoordinatorSchedulerTestServices() mc := appcontext.GetService[messaging.MessageCenter](appcontext.MessageCenter) nodeManager := appcontext.GetService[*watcher.NodeManager](watcher.NodeManagerName) From 31b896e2e9e1643135661ddd31a6b89fe962d6d3 Mon Sep 17 00:00:00 2001 From: hongyunyan <649330952@qq.com> Date: Wed, 22 Apr 2026 10:00:58 +0800 Subject: [PATCH 6/6] coordinator,scheduler: document drain scheduling flow --- coordinator/scheduler/drain.go | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/coordinator/scheduler/drain.go b/coordinator/scheduler/drain.go index 5345c0a301..cdce89068c 100644 --- a/coordinator/scheduler/drain.go +++ b/coordinator/scheduler/drain.go @@ -68,6 +68,9 @@ func (s *drainScheduler) Execute() time.Time { } now := time.Now() + // Snapshot the nodes that still need evacuation in this tick. Execute works + // from a stable, sorted slice so round-robin fairness is deterministic even + // if the backing drain state changes between ticks. drainingNodes := s.liveness.GetDrainingOrStoppingNodes() if len(drainingNodes) == 0 { return now.Add(time.Second) @@ -89,6 +92,11 @@ func (s *drainScheduler) Execute() time.Time { return now.Add(time.Second) } + // Build per-node snapshots once for this tick: + // - nodeTaskSize is mutated in memory after each accepted move so later picks + // in the same tick observe the updated load distribution. + // - maintainersByNode avoids repeatedly querying ChangefeedDB while walking + // the draining nodes in round-robin order. nodeTaskSize := s.changefeedDB.GetTaskSizePerNode() maintainersByNode := make(map[node.ID][]*changefeed.Changefeed, len(drainingNodes)) nextMaintainerIndex := make(map[node.ID]int, len(drainingNodes)) @@ -101,6 +109,9 @@ func (s *drainScheduler) Execute() time.Time { s.rrCursor = 0 } + // Walk draining nodes in rounds until we either consume the dedicated move + // budget or make no progress. Each round starts from rrCursor so a busy node + // does not monopolize the first scheduling chance forever. for scheduled < availableSize { progress := false for i := 0; i < len(drainingNodes) && scheduled < availableSize; i++ { @@ -146,9 +157,14 @@ func (s *drainScheduler) scheduleOneFromNode( for nextIndex < len(maintainers) { cf := maintainers[nextIndex] nextIndex++ + // Skip changefeeds that are already being added, moved, or stopped by + // another operator. Drain scheduling only fills genuinely free slots. if s.operatorController.HasOperator(cf.ID.DisplayName) { continue } + // Choose against the mutable nodeTaskSize snapshot so multiple moves + // created in the same tick spread across destinations instead of all + // targeting the same initially cold node. dest, ok := chooseLeastLoadedDest(origin, destCandidates, nodeTaskSize) if !ok { return nextIndex, false