Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 25 additions & 3 deletions maintainer/maintainer.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,9 @@ func (m *Maintainer) HandleEvent(event *Event) bool {
}

func (m *Maintainer) checkNodeChanged() {
if m.removing.Load() {
return
}
m.nodeChanged.Lock()
defer m.nodeChanged.Unlock()
if m.nodeChanged.changed {
Expand Down Expand Up @@ -502,6 +505,13 @@ func (m *Maintainer) onRemoveMaintainer(cascade, changefeedRemoved bool) {
m.removing.Store(true)
m.cascadeRemoving.Store(cascade)
m.changefeedRemoved.Store(changefeedRemoved)
// Freeze ordinary scheduling on the old maintainer before we start the close flow.
// Only the DDL trigger close operator is allowed to keep running.
allowedDispatcherIDs := []common.DispatcherID{m.ddlSpan.ID}
if m.enableRedo {
allowedDispatcherIDs = append(allowedDispatcherIDs, m.redoDDLSpan.ID)
}
m.controller.EnterRemovingMode(allowedDispatcherIDs...)
closed := m.tryCloseChangefeed()
if closed {
m.removed.Store(true)
Expand Down Expand Up @@ -812,6 +822,14 @@ func (m *Maintainer) onHeartbeatRequest(msg *messaging.TargetMessage) {
// Process operator status updates AFTER checkpointTsByCapture is updated
// This ensures when operators complete, checkpointTsByCapture already contains the complete heartbeat
// Works with calCheckpointTs constraint ordering to prevent checkpoint advancing past new dispatcher startTs
if m.removing.Load() {
// Once RemoveMaintainer starts, we still need status updates for the close flow itself
// (for example DDL-trigger close operators reaching terminal states), but we must not run
// failover self-healing. A late Stopped/Working heartbeat from a closing dispatcher manager
// would otherwise mark spans absent or remove/recreate dispatchers after shutdown has begun.
m.controller.handleStatus(msg.From, req.Statuses, false)
return
}
m.controller.HandleStatus(msg.From, req.Statuses)
}

Expand All @@ -828,7 +846,7 @@ func (m *Maintainer) onError(from node.ID, err *heartbeatpb.RunningError) {

func (m *Maintainer) onBlockStateRequest(msg *messaging.TargetMessage) {
// the barrier is not initialized
if !m.initialized.Load() {
if !m.initialized.Load() || m.removing.Load() {
return
}
req := msg.Message[0].(*heartbeatpb.BlockStatusRequest)
Expand Down Expand Up @@ -961,8 +979,12 @@ func (m *Maintainer) onMaintainerCloseResponse(from node.ID, response *heartbeat

func (m *Maintainer) handleResendMessage() {
// resend closing message
if m.removing.Load() && m.cascadeRemoving.Load() {
m.trySendMaintainerCloseRequestToAllNode()
if m.removing.Load() {
// After RemoveMaintainer starts, the old maintainer must stop resending bootstrap/barrier
// traffic. Otherwise stale control-plane messages can race with the new maintainer.
if m.cascadeRemoving.Load() {
m.trySendMaintainerCloseRequestToAllNode()
}
return
}
// resend bootstrap message
Expand Down
26 changes: 25 additions & 1 deletion maintainer/maintainer_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,12 @@ func NewController(changefeedID common.ChangeFeedID,
}
}

// HandleStatus handle the status report from the node
// HandleStatus handles the status report from the node.
func (c *Controller) HandleStatus(from node.ID, statusList []*heartbeatpb.TableSpanStatus) {
c.handleStatus(from, statusList, true)
}

func (c *Controller) handleStatus(from node.ID, statusList []*heartbeatpb.TableSpanStatus, allowSelfHealing bool) {
// HandleStatus reconciles runtime dispatcher reports with maintainer-side state.
//
// In the steady state, spanController (desired tasks), operatorController (in-flight scheduling),
Expand All @@ -151,6 +155,10 @@ func (c *Controller) HandleStatus(from node.ID, statusList []*heartbeatpb.TableS
// The rules below make the system converge:
// 1) Orphan Working dispatcher without an operator => actively remove it to avoid leaks.
// 2) Non-working dispatcher without an operator => mark the span absent so scheduler can recreate it.
//
// During maintainer removal we still need status bookkeeping so close/remove can observe terminal
// states, but we must disable the self-healing branches. Otherwise a late Stopped/Working heartbeat
// can recreate dispatchers for a changefeed that is already shutting down.
for _, status := range statusList {
dispatcherID := common.NewDispatcherIDFromPB(status.ID)
operatorController := c.getOperatorController(status.Mode)
Expand All @@ -159,6 +167,9 @@ func (c *Controller) HandleStatus(from node.ID, statusList []*heartbeatpb.TableS
operatorController.UpdateOperatorStatus(dispatcherID, from, status)
stm := spanController.GetTaskByID(dispatcherID)
if stm == nil {
if !allowSelfHealing {
continue
}
// If maintainer doesn't know this dispatcherID, most statuses are late/outdated and can be ignored.
// We only need to act when the runtime says the dispatcher is Working, because that implies there's
// still an active dispatcher consuming resources and potentially producing output.
Expand Down Expand Up @@ -189,6 +200,10 @@ func (c *Controller) HandleStatus(from node.ID, statusList []*heartbeatpb.TableS
}
spanController.UpdateStatus(stm, status)

if !allowSelfHealing {
continue
}

// Fallback: dispatcher becomes non-working without an operator.
//
// In normal scheduling flow, a dispatcher should transition to Stopped/Removed as part of a maintainer
Expand Down Expand Up @@ -256,6 +271,15 @@ func (c *Controller) RemoveNode(id node.ID) {
c.operatorController.OnNodeRemoved(id)
}

// EnterRemovingMode freezes normal scheduling on the old maintainer while keeping the
// DDL trigger dispatcher close path alive.
func (c *Controller) EnterRemovingMode(allowedDispatcherIDs ...common.DispatcherID) {
c.operatorController.QuiesceExcept(allowedDispatcherIDs...)
if c.redoOperatorController != nil {
c.redoOperatorController.QuiesceExcept(allowedDispatcherIDs...)
}
}

func (c *Controller) GetMinRedoCheckpointTs(minCheckpointTs uint64) uint64 {
minCheckpointTsForOperator := c.redoOperatorController.GetMinCheckpointTs(minCheckpointTs)
minCheckpointTsForSpan := c.redoSpanController.GetMinCheckpointTsForNonReplicatingSpans(minCheckpointTs)
Expand Down
153 changes: 153 additions & 0 deletions maintainer/maintainer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,3 +395,156 @@ func TestMaintainer_GetMaintainerStatusUsesCommittedCheckpoint(t *testing.T) {
require.Equal(t, uint64(20), status.CheckpointTs)
require.Equal(t, uint64(50), status.LastSyncedTs)
}

func TestMaintainerHeartbeatDuringRemovingSkipsFailoverRecovery(t *testing.T) {
buildMaintainer := func(t *testing.T) (*Maintainer, *replica.SpanReplication, node.ID) {
t.Helper()
testutil.SetUpTestServices(t)

nodeManager := appcontext.GetService[*watcher.NodeManager](watcher.NodeManagerName)
captureID := node.ID("node1")
nodeManager.GetAliveNodes()[captureID] = &node.Info{ID: captureID}

cfID := common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceName)
ddlDispatcherID := common.NewDispatcherID()
ddlSpan := replica.NewWorkingSpanReplication(cfID, ddlDispatcherID,
common.DDLSpanSchemaID,
common.KeyspaceDDLSpan(common.DefaultKeyspaceID), &heartbeatpb.TableSpanStatus{
ID: ddlDispatcherID.ToPB(),
ComponentStatus: heartbeatpb.ComponentState_Working,
CheckpointTs: 10,
Mode: common.DefaultMode,
}, captureID, false)
refresher := replica.NewRegionCountRefresher(cfID, time.Minute)
controller := NewController(cfID, 10, &mockThreadPool{},
config.GetDefaultReplicaConfig(), ddlSpan, nil, 1000, 0, refresher, common.DefaultKeyspace, false)

totalSpan := common.TableIDToComparableSpan(common.DefaultKeyspaceID, 1)
dispatcherID := common.NewDispatcherID()
workingSpan := replica.NewWorkingSpanReplication(cfID, dispatcherID,
1,
&heartbeatpb.TableSpan{
TableID: totalSpan.TableID,
StartKey: totalSpan.StartKey,
EndKey: totalSpan.EndKey,
KeyspaceID: common.DefaultKeyspaceID,
}, &heartbeatpb.TableSpanStatus{
ID: dispatcherID.ToPB(),
ComponentStatus: heartbeatpb.ComponentState_Working,
CheckpointTs: 10,
Mode: common.DefaultMode,
}, captureID, false)
controller.spanController.AddReplicatingSpan(workingSpan)

m := &Maintainer{
changefeedID: cfID,
controller: controller,
checkpointTsByCapture: newWatermarkCaptureMap(),
redoTsByCapture: newWatermarkCaptureMap(),
statusChanged: atomic.NewBool(false),
}
m.watermark.Watermark = &heartbeatpb.Watermark{}
m.runningErrors.m = make(map[node.ID]*heartbeatpb.RunningError)
m.initialized.Store(true)
return m, workingSpan, captureID
}

makeHeartbeat := func(dispatcherID common.DispatcherID, from node.ID) *messaging.TargetMessage {
req := &heartbeatpb.HeartBeatRequest{
Watermark: &heartbeatpb.Watermark{
CheckpointTs: 20,
ResolvedTs: 20,
},
Statuses: []*heartbeatpb.TableSpanStatus{
{
ID: dispatcherID.ToPB(),
ComponentStatus: heartbeatpb.ComponentState_Stopped,
CheckpointTs: 20,
Mode: common.DefaultMode,
},
},
}
return &messaging.TargetMessage{
From: from,
Type: messaging.TypeHeartBeatRequest,
Message: []messaging.IOTypeT{req},
}
}

// Normal failover recovery should still mark a non-working span absent when the runtime
// reports Stopped but maintainer has no operator for it.
t.Run("normal maintainer still self heals", func(t *testing.T) {
m, workingSpan, captureID := buildMaintainer(t)

m.onHeartbeatRequest(makeHeartbeat(workingSpan.ID, captureID))

require.Equal(t, 1, m.controller.spanController.GetAbsentSize())
require.Equal(t, heartbeatpb.ComponentState_Stopped, workingSpan.GetStatus().ComponentStatus)
require.Equal(t, node.ID(""), workingSpan.GetNodeID())
})

// When RemoveMaintainer has started, the same late Stopped heartbeat must only update runtime
// status bookkeeping. Re-marking the span absent here would let the scheduler recreate a
// dispatcher while the changefeed is shutting down.
t.Run("removing maintainer skips self healing", func(t *testing.T) {
m, workingSpan, captureID := buildMaintainer(t)
m.removing.Store(true)

m.onHeartbeatRequest(makeHeartbeat(workingSpan.ID, captureID))

require.Equal(t, 0, m.controller.spanController.GetAbsentSize())
require.Equal(t, heartbeatpb.ComponentState_Stopped, workingSpan.GetStatus().ComponentStatus)
require.Equal(t, captureID, workingSpan.GetNodeID())
require.Zero(t, m.controller.operatorController.OperatorSize())
})
}

func TestMaintainerRemovingSuppressesLegacyControlPlaneActions(t *testing.T) {
mockMC := messaging.NewMockMessageCenter()
nodeManager := watcher.NewNodeManager(nil, nil)
nodeManager.GetAliveNodes()["node1"] = &node.Info{ID: "node1"}
nodeManager.GetAliveNodes()["node2"] = &node.Info{ID: "node2"}

cfID := common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceName)
m := &Maintainer{
changefeedID: cfID,
mc: mockMC,
nodeManager: nodeManager,
closedNodes: make(map[node.ID]struct{}),
statusChanged: atomic.NewBool(false),
postBootstrapMsg: &heartbeatpb.MaintainerPostBootstrapRequest{
ChangefeedID: cfID.ToPB(),
},
}
m.watermark.Watermark = &heartbeatpb.Watermark{CheckpointTs: 100, ResolvedTs: 100}
m.runningErrors.m = make(map[node.ID]*heartbeatpb.RunningError)
m.initialized.Store(true)
m.removing.Store(true)

// Removing maintainer must not keep resending bootstrap/post-bootstrap or barrier traffic.
// The only remaining control-plane action should be cascade close requests.
m.handleResendMessage()
require.Len(t, mockMC.GetMessageChannel(), 0)

// Block status handling must also stop once removal starts, otherwise the old maintainer
// can still schedule DDL-driven add/remove operations after handoff begins.
m.onBlockStateRequest(&messaging.TargetMessage{
From: "node1",
Type: messaging.TypeBlockStatusRequest,
Message: []messaging.IOTypeT{&heartbeatpb.BlockStatusRequest{
ChangefeedID: cfID.ToPB(),
Mode: common.DefaultMode,
}},
})
require.Len(t, mockMC.GetMessageChannel(), 0)

m.cascadeRemoving.Store(true)
m.handleResendMessage()
require.Len(t, mockMC.GetMessageChannel(), 2)
for i := 0; i < 2; i++ {
msg := <-mockMC.GetMessageChannel()
require.Equal(t, messaging.TypeMaintainerCloseRequest, msg.Type)
req := msg.Message[0].(*heartbeatpb.MaintainerCloseRequest)
require.Equal(t, cfID.ToPB(), req.ChangefeedID)
}
}
Loading
Loading