From a73bf623417b624b22f4f45d02c30710ba707b27 Mon Sep 17 00:00:00 2001 From: wlwilliamx Date: Wed, 15 Apr 2026 15:29:23 +0800 Subject: [PATCH] maintainer: quiesce control plane during remove handoff Freeze ordinary scheduling once RemoveMaintainer starts so the old maintainer can only finish the DDL trigger close path. This avoids late heartbeat, barrier, node-change, and operator activity from recreating dispatchers after shutdown begins. --- maintainer/maintainer.go | 28 +++- maintainer/maintainer_controller.go | 26 ++- maintainer/maintainer_test.go | 153 ++++++++++++++++++ maintainer/operator/operator_controller.go | 113 +++++++++++-- .../operator/operator_controller_test.go | 100 ++++++++++++ 5 files changed, 403 insertions(+), 17 deletions(-) diff --git a/maintainer/maintainer.go b/maintainer/maintainer.go index 15a16cd984..bc2e1c98f1 100644 --- a/maintainer/maintainer.go +++ b/maintainer/maintainer.go @@ -345,6 +345,9 @@ func (m *Maintainer) HandleEvent(event *Event) bool { } func (m *Maintainer) checkNodeChanged() { + if m.removing.Load() { + return + } m.nodeChanged.Lock() defer m.nodeChanged.Unlock() if m.nodeChanged.changed { @@ -502,6 +505,13 @@ func (m *Maintainer) onRemoveMaintainer(cascade, changefeedRemoved bool) { m.removing.Store(true) m.cascadeRemoving.Store(cascade) m.changefeedRemoved.Store(changefeedRemoved) + // Freeze ordinary scheduling on the old maintainer before we start the close flow. + // Only the DDL trigger close operator is allowed to keep running. + allowedDispatcherIDs := []common.DispatcherID{m.ddlSpan.ID} + if m.enableRedo { + allowedDispatcherIDs = append(allowedDispatcherIDs, m.redoDDLSpan.ID) + } + m.controller.EnterRemovingMode(allowedDispatcherIDs...) closed := m.tryCloseChangefeed() if closed { m.removed.Store(true) @@ -812,6 +822,14 @@ func (m *Maintainer) onHeartbeatRequest(msg *messaging.TargetMessage) { // Process operator status updates AFTER checkpointTsByCapture is updated // This ensures when operators complete, checkpointTsByCapture already contains the complete heartbeat // Works with calCheckpointTs constraint ordering to prevent checkpoint advancing past new dispatcher startTs + if m.removing.Load() { + // Once RemoveMaintainer starts, we still need status updates for the close flow itself + // (for example DDL-trigger close operators reaching terminal states), but we must not run + // failover self-healing. A late Stopped/Working heartbeat from a closing dispatcher manager + // would otherwise mark spans absent or remove/recreate dispatchers after shutdown has begun. + m.controller.handleStatus(msg.From, req.Statuses, false) + return + } m.controller.HandleStatus(msg.From, req.Statuses) } @@ -828,7 +846,7 @@ func (m *Maintainer) onError(from node.ID, err *heartbeatpb.RunningError) { func (m *Maintainer) onBlockStateRequest(msg *messaging.TargetMessage) { // the barrier is not initialized - if !m.initialized.Load() { + if !m.initialized.Load() || m.removing.Load() { return } req := msg.Message[0].(*heartbeatpb.BlockStatusRequest) @@ -961,8 +979,12 @@ func (m *Maintainer) onMaintainerCloseResponse(from node.ID, response *heartbeat func (m *Maintainer) handleResendMessage() { // resend closing message - if m.removing.Load() && m.cascadeRemoving.Load() { - m.trySendMaintainerCloseRequestToAllNode() + if m.removing.Load() { + // After RemoveMaintainer starts, the old maintainer must stop resending bootstrap/barrier + // traffic. Otherwise stale control-plane messages can race with the new maintainer. + if m.cascadeRemoving.Load() { + m.trySendMaintainerCloseRequestToAllNode() + } return } // resend bootstrap message diff --git a/maintainer/maintainer_controller.go b/maintainer/maintainer_controller.go index 10258efd63..31931d69d2 100644 --- a/maintainer/maintainer_controller.go +++ b/maintainer/maintainer_controller.go @@ -138,8 +138,12 @@ func NewController(changefeedID common.ChangeFeedID, } } -// HandleStatus handle the status report from the node +// HandleStatus handles the status report from the node. func (c *Controller) HandleStatus(from node.ID, statusList []*heartbeatpb.TableSpanStatus) { + c.handleStatus(from, statusList, true) +} + +func (c *Controller) handleStatus(from node.ID, statusList []*heartbeatpb.TableSpanStatus, allowSelfHealing bool) { // HandleStatus reconciles runtime dispatcher reports with maintainer-side state. // // In the steady state, spanController (desired tasks), operatorController (in-flight scheduling), @@ -151,6 +155,10 @@ func (c *Controller) HandleStatus(from node.ID, statusList []*heartbeatpb.TableS // The rules below make the system converge: // 1) Orphan Working dispatcher without an operator => actively remove it to avoid leaks. // 2) Non-working dispatcher without an operator => mark the span absent so scheduler can recreate it. + // + // During maintainer removal we still need status bookkeeping so close/remove can observe terminal + // states, but we must disable the self-healing branches. Otherwise a late Stopped/Working heartbeat + // can recreate dispatchers for a changefeed that is already shutting down. for _, status := range statusList { dispatcherID := common.NewDispatcherIDFromPB(status.ID) operatorController := c.getOperatorController(status.Mode) @@ -159,6 +167,9 @@ func (c *Controller) HandleStatus(from node.ID, statusList []*heartbeatpb.TableS operatorController.UpdateOperatorStatus(dispatcherID, from, status) stm := spanController.GetTaskByID(dispatcherID) if stm == nil { + if !allowSelfHealing { + continue + } // If maintainer doesn't know this dispatcherID, most statuses are late/outdated and can be ignored. // We only need to act when the runtime says the dispatcher is Working, because that implies there's // still an active dispatcher consuming resources and potentially producing output. @@ -189,6 +200,10 @@ func (c *Controller) HandleStatus(from node.ID, statusList []*heartbeatpb.TableS } spanController.UpdateStatus(stm, status) + if !allowSelfHealing { + continue + } + // Fallback: dispatcher becomes non-working without an operator. // // In normal scheduling flow, a dispatcher should transition to Stopped/Removed as part of a maintainer @@ -256,6 +271,15 @@ func (c *Controller) RemoveNode(id node.ID) { c.operatorController.OnNodeRemoved(id) } +// EnterRemovingMode freezes normal scheduling on the old maintainer while keeping the +// DDL trigger dispatcher close path alive. +func (c *Controller) EnterRemovingMode(allowedDispatcherIDs ...common.DispatcherID) { + c.operatorController.QuiesceExcept(allowedDispatcherIDs...) + if c.redoOperatorController != nil { + c.redoOperatorController.QuiesceExcept(allowedDispatcherIDs...) + } +} + func (c *Controller) GetMinRedoCheckpointTs(minCheckpointTs uint64) uint64 { minCheckpointTsForOperator := c.redoOperatorController.GetMinCheckpointTs(minCheckpointTs) minCheckpointTsForSpan := c.redoSpanController.GetMinCheckpointTsForNonReplicatingSpans(minCheckpointTs) diff --git a/maintainer/maintainer_test.go b/maintainer/maintainer_test.go index 8c06121e47..7e8745e916 100644 --- a/maintainer/maintainer_test.go +++ b/maintainer/maintainer_test.go @@ -395,3 +395,156 @@ func TestMaintainer_GetMaintainerStatusUsesCommittedCheckpoint(t *testing.T) { require.Equal(t, uint64(20), status.CheckpointTs) require.Equal(t, uint64(50), status.LastSyncedTs) } + +func TestMaintainerHeartbeatDuringRemovingSkipsFailoverRecovery(t *testing.T) { + buildMaintainer := func(t *testing.T) (*Maintainer, *replica.SpanReplication, node.ID) { + t.Helper() + testutil.SetUpTestServices(t) + + nodeManager := appcontext.GetService[*watcher.NodeManager](watcher.NodeManagerName) + captureID := node.ID("node1") + nodeManager.GetAliveNodes()[captureID] = &node.Info{ID: captureID} + + cfID := common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceName) + ddlDispatcherID := common.NewDispatcherID() + ddlSpan := replica.NewWorkingSpanReplication(cfID, ddlDispatcherID, + common.DDLSpanSchemaID, + common.KeyspaceDDLSpan(common.DefaultKeyspaceID), &heartbeatpb.TableSpanStatus{ + ID: ddlDispatcherID.ToPB(), + ComponentStatus: heartbeatpb.ComponentState_Working, + CheckpointTs: 10, + Mode: common.DefaultMode, + }, captureID, false) + refresher := replica.NewRegionCountRefresher(cfID, time.Minute) + controller := NewController(cfID, 10, &mockThreadPool{}, + config.GetDefaultReplicaConfig(), ddlSpan, nil, 1000, 0, refresher, common.DefaultKeyspace, false) + + totalSpan := common.TableIDToComparableSpan(common.DefaultKeyspaceID, 1) + dispatcherID := common.NewDispatcherID() + workingSpan := replica.NewWorkingSpanReplication(cfID, dispatcherID, + 1, + &heartbeatpb.TableSpan{ + TableID: totalSpan.TableID, + StartKey: totalSpan.StartKey, + EndKey: totalSpan.EndKey, + KeyspaceID: common.DefaultKeyspaceID, + }, &heartbeatpb.TableSpanStatus{ + ID: dispatcherID.ToPB(), + ComponentStatus: heartbeatpb.ComponentState_Working, + CheckpointTs: 10, + Mode: common.DefaultMode, + }, captureID, false) + controller.spanController.AddReplicatingSpan(workingSpan) + + m := &Maintainer{ + changefeedID: cfID, + controller: controller, + checkpointTsByCapture: newWatermarkCaptureMap(), + redoTsByCapture: newWatermarkCaptureMap(), + statusChanged: atomic.NewBool(false), + } + m.watermark.Watermark = &heartbeatpb.Watermark{} + m.runningErrors.m = make(map[node.ID]*heartbeatpb.RunningError) + m.initialized.Store(true) + return m, workingSpan, captureID + } + + makeHeartbeat := func(dispatcherID common.DispatcherID, from node.ID) *messaging.TargetMessage { + req := &heartbeatpb.HeartBeatRequest{ + Watermark: &heartbeatpb.Watermark{ + CheckpointTs: 20, + ResolvedTs: 20, + }, + Statuses: []*heartbeatpb.TableSpanStatus{ + { + ID: dispatcherID.ToPB(), + ComponentStatus: heartbeatpb.ComponentState_Stopped, + CheckpointTs: 20, + Mode: common.DefaultMode, + }, + }, + } + return &messaging.TargetMessage{ + From: from, + Type: messaging.TypeHeartBeatRequest, + Message: []messaging.IOTypeT{req}, + } + } + + // Normal failover recovery should still mark a non-working span absent when the runtime + // reports Stopped but maintainer has no operator for it. + t.Run("normal maintainer still self heals", func(t *testing.T) { + m, workingSpan, captureID := buildMaintainer(t) + + m.onHeartbeatRequest(makeHeartbeat(workingSpan.ID, captureID)) + + require.Equal(t, 1, m.controller.spanController.GetAbsentSize()) + require.Equal(t, heartbeatpb.ComponentState_Stopped, workingSpan.GetStatus().ComponentStatus) + require.Equal(t, node.ID(""), workingSpan.GetNodeID()) + }) + + // When RemoveMaintainer has started, the same late Stopped heartbeat must only update runtime + // status bookkeeping. Re-marking the span absent here would let the scheduler recreate a + // dispatcher while the changefeed is shutting down. + t.Run("removing maintainer skips self healing", func(t *testing.T) { + m, workingSpan, captureID := buildMaintainer(t) + m.removing.Store(true) + + m.onHeartbeatRequest(makeHeartbeat(workingSpan.ID, captureID)) + + require.Equal(t, 0, m.controller.spanController.GetAbsentSize()) + require.Equal(t, heartbeatpb.ComponentState_Stopped, workingSpan.GetStatus().ComponentStatus) + require.Equal(t, captureID, workingSpan.GetNodeID()) + require.Zero(t, m.controller.operatorController.OperatorSize()) + }) +} + +func TestMaintainerRemovingSuppressesLegacyControlPlaneActions(t *testing.T) { + mockMC := messaging.NewMockMessageCenter() + nodeManager := watcher.NewNodeManager(nil, nil) + nodeManager.GetAliveNodes()["node1"] = &node.Info{ID: "node1"} + nodeManager.GetAliveNodes()["node2"] = &node.Info{ID: "node2"} + + cfID := common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceName) + m := &Maintainer{ + changefeedID: cfID, + mc: mockMC, + nodeManager: nodeManager, + closedNodes: make(map[node.ID]struct{}), + statusChanged: atomic.NewBool(false), + postBootstrapMsg: &heartbeatpb.MaintainerPostBootstrapRequest{ + ChangefeedID: cfID.ToPB(), + }, + } + m.watermark.Watermark = &heartbeatpb.Watermark{CheckpointTs: 100, ResolvedTs: 100} + m.runningErrors.m = make(map[node.ID]*heartbeatpb.RunningError) + m.initialized.Store(true) + m.removing.Store(true) + + // Removing maintainer must not keep resending bootstrap/post-bootstrap or barrier traffic. + // The only remaining control-plane action should be cascade close requests. + m.handleResendMessage() + require.Len(t, mockMC.GetMessageChannel(), 0) + + // Block status handling must also stop once removal starts, otherwise the old maintainer + // can still schedule DDL-driven add/remove operations after handoff begins. + m.onBlockStateRequest(&messaging.TargetMessage{ + From: "node1", + Type: messaging.TypeBlockStatusRequest, + Message: []messaging.IOTypeT{&heartbeatpb.BlockStatusRequest{ + ChangefeedID: cfID.ToPB(), + Mode: common.DefaultMode, + }}, + }) + require.Len(t, mockMC.GetMessageChannel(), 0) + + m.cascadeRemoving.Store(true) + m.handleResendMessage() + require.Len(t, mockMC.GetMessageChannel(), 2) + for i := 0; i < 2; i++ { + msg := <-mockMC.GetMessageChannel() + require.Equal(t, messaging.TypeMaintainerCloseRequest, msg.Type) + req := msg.Message[0].(*heartbeatpb.MaintainerCloseRequest) + require.Equal(t, cfID.ToPB(), req.ChangefeedID) + } +} diff --git a/maintainer/operator/operator_controller.go b/maintainer/operator/operator_controller.go index 5684611aea..d81c6afd3f 100644 --- a/maintainer/operator/operator_controller.go +++ b/maintainer/operator/operator_controller.go @@ -57,6 +57,12 @@ type Controller struct { operators map[common.DispatcherID]*operator.OperatorWithTime[common.DispatcherID, *heartbeatpb.TableSpanStatus] runningQueue operator.OperatorQueue[common.DispatcherID, *heartbeatpb.TableSpanStatus] mode int64 + // quiescing freezes ordinary operators while the old maintainer is being removed. + // Only dispatcher IDs in allowedOperatorIDs may continue to run, which keeps the + // DDL trigger dispatcher close path alive without letting stale schedulers recreate + // ordinary table dispatchers during handoff. + quiescing bool + allowedOperatorIDs map[common.DispatcherID]struct{} // lastWarnTime tracks the last warning time for each operator to avoid spam logs lastWarnTime map[common.DispatcherID]time.Time } @@ -69,19 +75,59 @@ func NewOperatorController( mode int64, ) *Controller { return &Controller{ - changefeedID: changefeedID, - batchSize: batchSize, - operators: make(map[common.DispatcherID]*operator.OperatorWithTime[common.DispatcherID, *heartbeatpb.TableSpanStatus]), - runningQueue: make(operator.OperatorQueue[common.DispatcherID, *heartbeatpb.TableSpanStatus], 0), - role: "maintainer", - spanController: spanController, - nodeManager: appcontext.GetService[*watcher.NodeManager](watcher.NodeManagerName), - messageCenter: appcontext.GetService[messaging.MessageCenter](appcontext.MessageCenter), - mode: mode, - lastWarnTime: make(map[common.DispatcherID]time.Time), + changefeedID: changefeedID, + batchSize: batchSize, + operators: make(map[common.DispatcherID]*operator.OperatorWithTime[common.DispatcherID, *heartbeatpb.TableSpanStatus]), + runningQueue: make(operator.OperatorQueue[common.DispatcherID, *heartbeatpb.TableSpanStatus], 0), + role: "maintainer", + spanController: spanController, + nodeManager: appcontext.GetService[*watcher.NodeManager](watcher.NodeManagerName), + messageCenter: appcontext.GetService[messaging.MessageCenter](appcontext.MessageCenter), + mode: mode, + allowedOperatorIDs: make(map[common.DispatcherID]struct{}), + lastWarnTime: make(map[common.DispatcherID]time.Time), } } +// QuiesceExcept freezes the controller so only the listed dispatcher IDs remain active. +// +// This is used when a maintainer enters removing mode. The old maintainer must stop +// issuing or advancing ordinary table operators, but the DDL trigger dispatcher close +// operator still needs to complete. +func (oc *Controller) QuiesceExcept(ids ...common.DispatcherID) { + oc.mu.Lock() + defer oc.mu.Unlock() + + oc.quiescing = true + clear(oc.allowedOperatorIDs) + for _, id := range ids { + if id.IsZero() { + continue + } + oc.allowedOperatorIDs[id] = struct{}{} + } +} + +func (oc *Controller) isOperatorAllowedLocked(id common.DispatcherID) bool { + if !oc.quiescing { + return true + } + _, ok := oc.allowedOperatorIDs[id] + return ok +} + +func (oc *Controller) isOperatorAllowed(id common.DispatcherID) bool { + oc.mu.RLock() + defer oc.mu.RUnlock() + return oc.isOperatorAllowedLocked(id) +} + +func (oc *Controller) isQuiescing() bool { + oc.mu.RLock() + defer oc.mu.RUnlock() + return oc.quiescing +} + // Execute poll the operator from the queue and execute it // It will be called in the thread pool. func (oc *Controller) Execute() time.Time { @@ -143,6 +189,15 @@ func (oc *Controller) RemoveTasksByTableIDs(tables ...int64) { // AddOperator adds an operator to the controller, if the operator already exists, return false. func (oc *Controller) AddOperator(op operator.Operator[common.DispatcherID, *heartbeatpb.TableSpanStatus]) bool { oc.mu.RLock() + if !oc.isOperatorAllowedLocked(op.ID()) { + oc.mu.RUnlock() + log.Info("add operator failed, controller is quiescing", + zap.String("role", oc.role), + zap.Stringer("changefeedID", oc.changefeedID), + zap.String("dispatcherID", op.ID().String()), + zap.String("operator", op.String())) + return false + } if old, ok := oc.operators[op.ID()]; ok { oc.mu.RUnlock() log.Info("add operator failed, operator already exists", @@ -161,11 +216,13 @@ func (oc *Controller) AddOperator(op operator.Operator[common.DispatcherID, *hea zap.String("operator", op.String())) return false } - oc.pushOperator(op) - return true + return oc.pushOperator(op) } func (oc *Controller) UpdateOperatorStatus(id common.DispatcherID, from node.ID, status *heartbeatpb.TableSpanStatus) { + if !oc.isOperatorAllowed(id) { + return + } oc.mu.RLock() op, ok := oc.operators[id] oc.mu.RUnlock() @@ -179,6 +236,9 @@ func (oc *Controller) UpdateOperatorStatus(id common.DispatcherID, from node.ID, // the controller will mark all spans on the node as absent if no operator is handling it, // then the controller will notify all operators. func (oc *Controller) OnNodeRemoved(n node.ID) { + if oc.isQuiescing() { + return + } for _, span := range oc.spanController.GetTaskByNodeID(n) { oc.mu.RLock() _, ok := oc.operators[span.ID] @@ -216,6 +276,9 @@ func (oc *Controller) GetMinCheckpointTs(minCheckpointTs uint64) uint64 { ops := oc.GetAllOperators() for _, op := range ops { + if !oc.isOperatorAllowed(op.ID()) { + continue + } if op.BlockTsForward() { spanReplication := oc.spanController.GetTaskByID(op.ID()) if spanReplication == nil { @@ -246,6 +309,13 @@ func (oc *Controller) pollQueueingOperator() ( op := item.OP opID := op.ID() oc.mu.Unlock() + if !oc.isOperatorAllowed(opID) { + oc.mu.Lock() + item.NotifyAt = time.Now().Add(time.Millisecond * 500) + heap.Push(&oc.runningQueue, item) + oc.mu.Unlock() + return nil, true + } if item.IsRemoved.Load() { return nil, true } @@ -327,6 +397,14 @@ func (oc *Controller) cancelOperator(opID common.DispatcherID) { } func (oc *Controller) removeReplicaSet(op *removeDispatcherOperator) { + if !oc.isOperatorAllowed(op.ID()) { + log.Info("skip remove operator while controller is quiescing", + zap.String("role", oc.role), + zap.Stringer("changefeedID", oc.changefeedID), + zap.String("dispatcherID", op.ID().String()), + zap.String("operator", op.String())) + return + } oc.mu.RLock() old, ok := oc.operators[op.ID()] oc.mu.RUnlock() @@ -343,7 +421,15 @@ func (oc *Controller) removeReplicaSet(op *removeDispatcherOperator) { } // pushOperator add an operator to the controller queue. -func (oc *Controller) pushOperator(op operator.Operator[common.DispatcherID, *heartbeatpb.TableSpanStatus]) { +func (oc *Controller) pushOperator(op operator.Operator[common.DispatcherID, *heartbeatpb.TableSpanStatus]) bool { + if !oc.isOperatorAllowed(op.ID()) { + log.Info("skip operator while controller is quiescing", + zap.String("role", oc.role), + zap.Stringer("changefeedID", oc.changefeedID), + zap.String("dispatcherID", op.ID().String()), + zap.String("operator", op.String())) + return false + } log.Info("add operator to running queue", zap.String("role", oc.role), zap.Stringer("changefeedID", oc.changefeedID), @@ -367,6 +453,7 @@ func (oc *Controller) pushOperator(op operator.Operator[common.DispatcherID, *he metrics.OperatorCount.WithLabelValues(common.DefaultKeyspaceName, oc.changefeedID.Name(), op.Type(), common.StringMode(oc.mode)).Inc() metrics.TotalOperatorCount.WithLabelValues(common.DefaultKeyspaceName, oc.changefeedID.Name(), op.Type(), common.StringMode(oc.mode)).Inc() + return true } func (oc *Controller) checkAffectedNodes(op operator.Operator[common.DispatcherID, *heartbeatpb.TableSpanStatus]) { diff --git a/maintainer/operator/operator_controller_test.go b/maintainer/operator/operator_controller_test.go index 4909d5dc06..0cbef850f9 100644 --- a/maintainer/operator/operator_controller_test.go +++ b/maintainer/operator/operator_controller_test.go @@ -21,6 +21,8 @@ import ( "unsafe" "github.com/pingcap/ticdc/heartbeatpb" + "github.com/pingcap/ticdc/maintainer/replica" + maintainertestutil "github.com/pingcap/ticdc/maintainer/testutil" "github.com/pingcap/ticdc/pkg/common" appcontext "github.com/pingcap/ticdc/pkg/common/context" "github.com/pingcap/ticdc/pkg/messaging" @@ -101,6 +103,35 @@ func (o *neverFinishOperator) OnTaskRemoved() {} func (o *neverFinishOperator) String() string { return "never-finish" } func (o *neverFinishOperator) BlockTsForward() bool { return false } +type countingOperator struct { + id common.DispatcherID + targetNode node.ID + blockTsForward bool + scheduleCount syncatomic.Int32 + checkCount syncatomic.Int32 + nodeRemovedCount syncatomic.Int32 +} + +func (o *countingOperator) ID() common.DispatcherID { return o.id } +func (o *countingOperator) Type() string { return "add" } +func (o *countingOperator) Start() {} +func (o *countingOperator) Schedule() *messaging.TargetMessage { + o.scheduleCount.Add(1) + return messaging.NewSingleTargetMessage(o.targetNode, messaging.MaintainerManagerTopic, &heartbeatpb.RemoveMaintainerRequest{}) +} +func (o *countingOperator) IsFinished() bool { return false } +func (o *countingOperator) PostFinish() {} +func (o *countingOperator) Check(node.ID, *heartbeatpb.TableSpanStatus) { + o.checkCount.Add(1) +} +func (o *countingOperator) OnNodeRemove(node.ID) { + o.nodeRemovedCount.Add(1) +} +func (o *countingOperator) AffectedNodes() []node.ID { return []node.ID{o.targetNode} } +func (o *countingOperator) OnTaskRemoved() {} +func (o *countingOperator) String() string { return "counting-operator" } +func (o *countingOperator) BlockTsForward() bool { return o.blockTsForward } + func setAliveNodes(nodeManager *watcher.NodeManager, alive map[node.ID]*node.Info) { type nodeMap = map[node.ID]*node.Info v := reflect.ValueOf(nodeManager).Elem().FieldByName("nodes") @@ -202,3 +233,72 @@ func TestController_RemoveReplicaSet_ReplacesRemoveOperatorOnTaskRemoved(t *test require.Equal(t, int32(0), postFinishCount.Load()) require.NotNil(t, oc.GetOperator(replicaSet.ID)) } + +func TestController_QuiesceExceptFreezesNonAllowedOperators(t *testing.T) { + messageCenter := messaging.NewMockMessageCenter() + appcontext.SetService(appcontext.MessageCenter, messageCenter) + + spanController, changefeedID, replicaSet, nodeA, _ := setupTestEnvironment(t) + spanController.AddReplicatingSpan(replicaSet) + + allowedID := common.NewDispatcherID() + allowedReplica := setupReplicaSetWithID(t, changefeedID, allowedID, nodeA) + spanController.AddReplicatingSpan(allowedReplica) + + blockedID := common.NewDispatcherID() + blockedReplica := setupReplicaSetWithID(t, changefeedID, blockedID, nodeA) + spanController.AddReplicatingSpan(blockedReplica) + + nodeManager := appcontext.GetService[*watcher.NodeManager](watcher.NodeManagerName) + setAliveNodes(nodeManager, map[node.ID]*node.Info{nodeA: {ID: nodeA}}) + + oc := NewOperatorController(changefeedID, spanController, 10, common.DefaultMode) + allowedOp := &countingOperator{id: allowedID, targetNode: nodeA, blockTsForward: true} + blockedOp := &countingOperator{id: blockedID, targetNode: nodeA, blockTsForward: true} + require.True(t, oc.AddOperator(allowedOp)) + require.True(t, oc.AddOperator(blockedOp)) + + oc.QuiesceExcept(allowedID) + + oc.UpdateOperatorStatus(allowedID, nodeA, &heartbeatpb.TableSpanStatus{ID: allowedID.ToPB()}) + oc.UpdateOperatorStatus(blockedID, nodeA, &heartbeatpb.TableSpanStatus{ID: blockedID.ToPB()}) + require.Equal(t, int32(1), allowedOp.checkCount.Load()) + require.Equal(t, int32(0), blockedOp.checkCount.Load()) + + oc.OnNodeRemoved(nodeA) + require.Equal(t, int32(0), allowedOp.nodeRemovedCount.Load()) + require.Equal(t, int32(0), blockedOp.nodeRemovedCount.Load()) + require.Equal(t, 0, spanController.GetAbsentSize()) + + newBlockedID := common.NewDispatcherID() + newBlockedReplica := setupReplicaSetWithID(t, changefeedID, newBlockedID, nodeA) + spanController.AddReplicatingSpan(newBlockedReplica) + require.False(t, oc.AddOperator(&countingOperator{id: newBlockedID, targetNode: nodeA})) + + require.Equal(t, uint64(10), oc.GetMinCheckpointTs(^uint64(0))) + + next := oc.Execute() + require.False(t, next.IsZero()) + require.Equal(t, int32(1), allowedOp.scheduleCount.Load()) + require.Equal(t, int32(0), blockedOp.scheduleCount.Load()) + require.Len(t, messageCenter.GetMessageChannel(), 1) + require.Equal(t, 2, oc.OperatorSize()) +} + +func setupReplicaSetWithID( + t *testing.T, + changefeedID common.ChangeFeedID, + dispatcherID common.DispatcherID, + nodeID node.ID, +) *replica.SpanReplication { + t.Helper() + + tableID := int64(dispatcherID.Low + 100) + span := maintainertestutil.GetTableSpanByID(tableID) + return replica.NewWorkingSpanReplication(changefeedID, dispatcherID, 1, span, &heartbeatpb.TableSpanStatus{ + ID: dispatcherID.ToPB(), + ComponentStatus: heartbeatpb.ComponentState_Working, + CheckpointTs: 10, + Mode: common.DefaultMode, + }, nodeID, false) +}