diff --git a/downstreamadapter/dispatcher/basic_dispatcher.go b/downstreamadapter/dispatcher/basic_dispatcher.go index 63536fb7a4..b88a427a4d 100644 --- a/downstreamadapter/dispatcher/basic_dispatcher.go +++ b/downstreamadapter/dispatcher/basic_dispatcher.go @@ -468,8 +468,8 @@ func (d *BasicDispatcher) isFirstEvent(event commonEvent.Event) bool { if event.GetCommitTs() > d.startTs { return true } - // the first syncpoint event can be same as startTs - case commonEvent.TypeResolvedEvent, commonEvent.TypeSyncPointEvent: + // the first syncpoint / resolved / handshake event can be same as startTs + case commonEvent.TypeResolvedEvent, commonEvent.TypeSyncPointEvent, commonEvent.TypeHandshakeEvent: if event.GetCommitTs() >= d.startTs { return true } @@ -709,9 +709,14 @@ func (d *BasicDispatcher) handleEvents(dispatcherEvents []DispatcherEvent, wakeC }) d.DealWithBlockEvent(syncPoint) case commonEvent.TypeHandshakeEvent: - log.Warn("Receive handshake event unexpectedly", + // Handshake means the event collector has registered/reset to the requested startTs and can + // safely consume this span. Idle tables may not receive a later resolved/DML promptly, so we + // treat handshake as the first live event and let the earlier updateDispatcherStatusToWorking() + // unblock the maintainer add operator. + log.Debug("dispatcher receive handshake event", zap.Stringer("dispatcher", d.id), - zap.Any("event", event)) + zap.Uint64("commitTs", event.GetCommitTs()), + zap.Uint64("seq", event.GetSeq())) default: log.Panic("Unexpected event type", zap.Int("eventType", event.GetType()), diff --git a/downstreamadapter/dispatcher/event_dispatcher.go b/downstreamadapter/dispatcher/event_dispatcher.go index 65464b919b..5afe6fc846 100644 --- a/downstreamadapter/dispatcher/event_dispatcher.go +++ b/downstreamadapter/dispatcher/event_dispatcher.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/ticdc/logservice/schemastore" "github.com/pingcap/ticdc/pkg/common" appcontext "github.com/pingcap/ticdc/pkg/common/context" + commonEvent "github.com/pingcap/ticdc/pkg/common/event" "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/pkg/sink/codec" "go.uber.org/zap" @@ -44,7 +45,7 @@ type EventDispatcher struct { // cacheEvents is used to store events with a commit-ts greater than redoGlobalTs cacheEvents struct { sync.Mutex - events chan cacheEvents + events []cacheEvents } } @@ -85,22 +86,34 @@ func NewEventDispatcher( redoEnable: redoEnable, redoGlobalTs: redoGlobalTs, } - dispatcher.cacheEvents.events = make(chan cacheEvents, 1) + dispatcher.cacheEvents.events = make([]cacheEvents, 0, 1) return dispatcher } // HandleCacheEvents called when redoGlobalTs is updated func (d *EventDispatcher) HandleCacheEvents() { - select { - case cacheEvents, ok := <-d.cacheEvents.events: - if !ok { - return - } - block := d.HandleEvents(cacheEvents.events, cacheEvents.wakeCallback) - if !block { - cacheEvents.wakeCallback() + d.cacheEvents.Lock() + if len(d.cacheEvents.events) == 0 { + d.cacheEvents.Unlock() + return + } + cached := d.cacheEvents.events[0] + d.cacheEvents.events[0] = cacheEvents{} + d.cacheEvents.events = d.cacheEvents.events[1:] + d.cacheEvents.Unlock() + + wakeAndContinue := func() { + // Drain cached redo-gated batches before waking the live stream again so + // already-buffered older events keep their ordering priority. + d.HandleCacheEvents() + if cached.wakeCallback != nil { + cached.wakeCallback() } - default: + } + + block := d.HandleEvents(cached.events, wakeAndContinue) + if !block { + wakeAndContinue() } } @@ -117,22 +130,27 @@ func (d *EventDispatcher) cache(dispatcherEvents []DispatcherEvent, wakeCallback events: append(make([]DispatcherEvent, 0, len(dispatcherEvents)), dispatcherEvents...), wakeCallback: wakeCallback, } - select { - case d.cacheEvents.events <- cacheEvents: - log.Debug("cache events", - zap.Stringer("dispatcher", d.id), - zap.Uint64("dispatcherResolvedTs", d.GetResolvedTs()), - zap.Int("length", len(dispatcherEvents)), - zap.Int("eventType", dispatcherEvents[len(dispatcherEvents)-1].Event.GetType()), - zap.Uint64("commitTs", dispatcherEvents[len(dispatcherEvents)-1].Event.GetCommitTs()), - zap.Uint64("redoGlobalTs", d.redoGlobalTs.Load()), - ) - default: - log.Panic("dispatcher cache events is full", zap.Stringer("dispatcher", d.id), zap.Int("len", len(d.cacheEvents.events))) - } + d.cacheEvents.events = append(d.cacheEvents.events, cacheEvents) + log.Debug("cache events", + zap.Stringer("dispatcher", d.id), + zap.Uint64("dispatcherResolvedTs", d.GetResolvedTs()), + zap.Int("length", len(dispatcherEvents)), + zap.Int("eventType", dispatcherEvents[len(dispatcherEvents)-1].Event.GetType()), + zap.Uint64("commitTs", dispatcherEvents[len(dispatcherEvents)-1].Event.GetCommitTs()), + zap.Uint64("redoGlobalTs", d.redoGlobalTs.Load()), + zap.Int("cached", len(d.cacheEvents.events)), + ) } func (d *EventDispatcher) HandleEvents(dispatcherEvents []DispatcherEvent, wakeCallback func()) bool { + // Handshake has no downstream side effect. If we cache it behind redoGlobalTs, + // a recreated normal dispatcher can stay Initializing forever and keep the add + // operator from finishing, which in turn pins the global checkpoint. + if len(dispatcherEvents) == 1 && + dispatcherEvents[0].Event.GetType() == commonEvent.TypeHandshakeEvent { + return d.handleEvents(dispatcherEvents, wakeCallback) + } + // if the commit-ts of last event of dispatcherEvents is greater than redoGlobalTs, // the dispatcherEvents will be cached util the redoGlobalTs is updated. if d.redoEnable && len(dispatcherEvents) > 0 && d.redoGlobalTs.Load() < dispatcherEvents[len(dispatcherEvents)-1].Event.GetCommitTs() { @@ -148,7 +166,7 @@ func (d *EventDispatcher) Remove() { if d.isRemoving.CompareAndSwap(false, true) { d.cacheEvents.Lock() defer d.cacheEvents.Unlock() - close(d.cacheEvents.events) + d.cacheEvents.events = nil } d.removeDispatcher() } @@ -226,7 +244,7 @@ func (d *EventDispatcher) EmitBootstrap() bool { return true } -// cacheEvents cache the events which commit-ts is less than or equal the redoGlobalTs +// cacheEvents caches events whose commit-ts is greater than redoGlobalTs. // it will be used when redoEnable is true type cacheEvents struct { events []DispatcherEvent diff --git a/downstreamadapter/dispatcher/event_dispatcher_test.go b/downstreamadapter/dispatcher/event_dispatcher_test.go index 1c91b79bb6..cd18e111a9 100644 --- a/downstreamadapter/dispatcher/event_dispatcher_test.go +++ b/downstreamadapter/dispatcher/event_dispatcher_test.go @@ -444,6 +444,69 @@ func TestDispatcherHandleEvents(t *testing.T) { t.Run("cloud storage wake callback after batch enqueue", verifyDMLWakeCallbackStorageAfterBatchEnqueue) } +func TestDispatcherHandshakePromotesToWorking(t *testing.T) { + tableSpan, err := getCompleteTableSpan(getTestingKeyspaceID()) + require.NoError(t, err) + + dispatcher := newDispatcherForTest(newDispatcherTestSink(t, common.MysqlSinkType).Sink(), tableSpan) + require.Equal(t, heartbeatpb.ComponentState_Initializing, dispatcher.GetComponentStatus()) + + nodeID := node.NewID() + handshake := commonEvent.NewHandshakeEvent(dispatcher.GetId(), dispatcher.GetStartTs(), 1, &common.TableInfo{}) + block := dispatcher.HandleEvents([]DispatcherEvent{NewDispatcherEvent(&nodeID, &handshake)}, func() {}) + require.False(t, block) + require.Equal(t, heartbeatpb.ComponentState_Working, dispatcher.GetComponentStatus()) + + select { + case status := <-dispatcher.sharedInfo.statusesChan: + require.Equal(t, dispatcher.GetId().ToPB(), status.ID) + require.Equal(t, heartbeatpb.ComponentState_Working, status.ComponentStatus) + require.Equal(t, dispatcher.GetCheckpointTs(), status.CheckpointTs) + default: + t.Fatal("expected dispatcher working status after handshake") + } +} + +func TestDispatcherHandshakeBypassesRedoCache(t *testing.T) { + tableSpan, err := getCompleteTableSpan(getTestingKeyspaceID()) + require.NoError(t, err) + + var redoTs atomic.Uint64 + redoTs.Store(0) + sharedInfo := newTestSharedInfo(false, false, newTestSyncPointConfig()) + dispatcher := NewEventDispatcher( + common.NewDispatcherID(), + tableSpan, + common.Ts(100), + 1, + NewSchemaIDToDispatchers(), + false, + false, + common.Ts(100), + newDispatcherTestSink(t, common.MysqlSinkType).Sink(), + sharedInfo, + true, + &redoTs, + ) + require.Equal(t, heartbeatpb.ComponentState_Initializing, dispatcher.GetComponentStatus()) + + nodeID := node.NewID() + handshake := commonEvent.NewHandshakeEvent(dispatcher.GetId(), dispatcher.GetStartTs(), 1, &common.TableInfo{}) + block := dispatcher.HandleEvents([]DispatcherEvent{NewDispatcherEvent(&nodeID, &handshake)}, func() {}) + require.False(t, block) + require.Equal(t, heartbeatpb.ComponentState_Working, dispatcher.GetComponentStatus()) + require.Empty(t, dispatcher.cacheEvents.events) + + select { + case status := <-dispatcher.sharedInfo.statusesChan: + require.Equal(t, dispatcher.GetId().ToPB(), status.ID) + require.Equal(t, heartbeatpb.ComponentState_Working, status.ComponentStatus) + require.Equal(t, dispatcher.GetCheckpointTs(), status.CheckpointTs) + default: + t.Fatal("expected dispatcher working status after handshake") + } +} + func TestDispatcherIgnoresStaleIgnoredBlockStatus(t *testing.T) { tableSpan := getUncompleteTableSpan() tableSpan.KeyspaceID = getTestingKeyspaceID() @@ -1236,6 +1299,145 @@ func TestDispatcher_SkipDMLAsStartTs_Disabled(t *testing.T) { require.Equal(t, 1, len(mockSink.GetDMLs()), "DML at commitTs=100 should be processed when skipDMLAsStartTs=false") } +func TestEventDispatcherRedoCachesMultipleBlockedDDLEvents(t *testing.T) { + helper := commonEvent.NewEventTestHelper(t) + defer helper.Close() + + helper.Tk().MustExec("use test") + helper.DDL2Job("create table t(id int primary key, v int)") + dmlEvent := helper.DML2Event("test", "t", "insert into t values(1, 1)") + require.NotNil(t, dmlEvent) + + testSink := newDispatcherTestSink(t, common.MysqlSinkType) + tableSpan, err := getCompleteTableSpan(getTestingKeyspaceID()) + require.NoError(t, err) + + var redoTs atomic.Uint64 + redoTs.Store(1) + sharedInfo := newTestSharedInfo(false, false, newTestSyncPointConfig()) + dispatcher := NewEventDispatcher( + common.NewDispatcherID(), + tableSpan, + common.Ts(0), + 1, + NewSchemaIDToDispatchers(), + false, + false, + common.Ts(0), + testSink.Sink(), + sharedInfo, + true, + &redoTs, + ) + + ddlEvent1 := &commonEvent.DDLEvent{ + FinishedTs: 5, + BlockedTables: &commonEvent.InfluencedTables{ + InfluenceType: commonEvent.InfluenceTypeNormal, + TableIDs: []int64{0}, + }, + TableInfo: dmlEvent.TableInfo, + Query: "alter table t add column c1 int", + } + ddlEvent2 := &commonEvent.DDLEvent{ + FinishedTs: 6, + BlockedTables: &commonEvent.InfluencedTables{ + InfluenceType: commonEvent.InfluenceTypeNormal, + TableIDs: []int64{0}, + }, + TableInfo: dmlEvent.TableInfo, + Query: "alter table t add column c2 int", + } + + nodeID := node.NewID() + var wakeCount atomic.Int32 + wake := func() { + wakeCount.Add(1) + } + + block := dispatcher.HandleEvents([]DispatcherEvent{NewDispatcherEvent(&nodeID, ddlEvent1)}, wake) + require.True(t, block) + require.Equal(t, 1, len(dispatcher.cacheEvents.events)) + + require.NotPanics(t, func() { + block = dispatcher.HandleEvents([]DispatcherEvent{NewDispatcherEvent(&nodeID, ddlEvent2)}, wake) + }) + require.True(t, block) + require.Equal(t, 2, len(dispatcher.cacheEvents.events)) + require.Equal(t, int32(0), wakeCount.Load()) + + redoTs.Store(math.MaxUint64) + dispatcher.HandleCacheEvents() + dispatcher.HandleCacheEvents() + + require.Eventually(t, func() bool { + return wakeCount.Load() == 2 + }, 5*time.Second, 50*time.Millisecond) + require.Equal(t, 0, len(dispatcher.cacheEvents.events)) +} + +func TestEventDispatcherRedoDrainsAllReleasableCachedEvents(t *testing.T) { + helper := commonEvent.NewEventTestHelper(t) + defer helper.Close() + + helper.Tk().MustExec("use test") + helper.DDL2Job("create table t(id int primary key, v int)") + dmlEvent := helper.DML2Event("test", "t", "insert into t values(1, 1)") + require.NotNil(t, dmlEvent) + + testSink := newDispatcherTestSink(t, common.MysqlSinkType) + tableSpan, err := getCompleteTableSpan(getTestingKeyspaceID()) + require.NoError(t, err) + + var redoTs atomic.Uint64 + redoTs.Store(1) + sharedInfo := newTestSharedInfo(false, false, newTestSyncPointConfig()) + dispatcher := NewEventDispatcher( + common.NewDispatcherID(), + tableSpan, + common.Ts(0), + 1, + NewSchemaIDToDispatchers(), + false, + false, + common.Ts(0), + testSink.Sink(), + sharedInfo, + true, + &redoTs, + ) + + nodeID := node.NewID() + makeDDL := func(commitTs uint64, query string) *commonEvent.DDLEvent { + return &commonEvent.DDLEvent{ + FinishedTs: commitTs, + BlockedTables: &commonEvent.InfluencedTables{ + InfluenceType: commonEvent.InfluenceTypeNormal, + TableIDs: []int64{0}, + }, + TableInfo: dmlEvent.TableInfo, + Query: query, + } + } + + var wakeCount atomic.Int32 + wake := func() { + wakeCount.Add(1) + } + + require.True(t, dispatcher.HandleEvents([]DispatcherEvent{NewDispatcherEvent(&nodeID, makeDDL(5, "alter table t add column c1 int"))}, wake)) + require.True(t, dispatcher.HandleEvents([]DispatcherEvent{NewDispatcherEvent(&nodeID, makeDDL(6, "alter table t add column c2 int"))}, wake)) + require.Equal(t, 2, len(dispatcher.cacheEvents.events)) + + redoTs.Store(math.MaxUint64) + dispatcher.HandleCacheEvents() + + require.Eventually(t, func() bool { + return wakeCount.Load() == 2 + }, 5*time.Second, 50*time.Millisecond) + require.Equal(t, 0, len(dispatcher.cacheEvents.events)) +} + func TestHoldBlockEventUntilNoResendTasks(t *testing.T) { keyspaceID := getTestingKeyspaceID() ddlTableSpan := common.KeyspaceDDLSpan(keyspaceID) diff --git a/downstreamadapter/eventcollector/dispatcher_stat.go b/downstreamadapter/eventcollector/dispatcher_stat.go index 0b1862279e..1a8e77be1f 100644 --- a/downstreamadapter/eventcollector/dispatcher_stat.go +++ b/downstreamadapter/eventcollector/dispatcher_stat.go @@ -697,6 +697,12 @@ func (d *dispatcherStat) handleHandshakeEvent(event dispatcher.DispatcherEvent) } state.lastEventSeq.Store(handshakeEvent.Seq) d.observeCurrentEpochMaxEventTs(state, handshakeEvent.GetCommitTs()) + + // Forward the handshake to the dispatcher so an idle table can still move from + // Initializing to Working after register/reset/handshake, even before any later + // resolved or DML event arrives. This unblocks maintainer add operators without + // advancing sink-side checkpoint beyond the collector-observed handshake ts. + _ = d.target.HandleEvents([]dispatcher.DispatcherEvent{event}, func() { d.wake() }) } func (d *dispatcherStat) getHeartbeatProgressForEventService() (uint64, uint64) { diff --git a/downstreamadapter/eventcollector/dispatcher_stat_test.go b/downstreamadapter/eventcollector/dispatcher_stat_test.go index fdd8e21c34..1958a8a68f 100644 --- a/downstreamadapter/eventcollector/dispatcher_stat_test.go +++ b/downstreamadapter/eventcollector/dispatcher_stat_test.go @@ -1448,6 +1448,28 @@ func TestCheckpointTsForEventServiceUsesCollectorObservedMaxTs(t *testing.T) { require.Equal(t, uint64(210), getHeartbeatCheckpoint()) } +func TestHandleHandshakeEventForwardsToDispatcher(t *testing.T) { + t.Parallel() + + dispatcherID := common.NewDispatcherID() + mockDisp := newMockDispatcher(dispatcherID, 100) + called := 0 + mockDisp.handleEvents = func(events []dispatcher.DispatcherEvent, wakeCallback func()) (block bool) { + called++ + require.Len(t, events, 1) + require.Equal(t, commonEvent.TypeHandshakeEvent, events[0].GetType()) + return false + } + stat := newDispatcherStat(mockDisp, newTestEventCollector(node.ID("local")), nil) + stat.doReset(node.ID("event-service-1"), 100) + + handshake := commonEvent.NewHandshakeEvent(dispatcherID, 100, 1, &common.TableInfo{}) + stat.handleHandshakeEvent(dispatcher.DispatcherEvent{Event: &handshake}) + require.Equal(t, 1, called) + require.Len(t, mockDisp.events, 1) + require.Equal(t, commonEvent.TypeHandshakeEvent, mockDisp.events[0].GetType()) +} + func TestRegisterTo(t *testing.T) { localServerID := node.ID("local-server") remoteServerID := node.ID("remote-server") diff --git a/maintainer/barrier.go b/maintainer/barrier.go index 0c92fdeb87..b55313094c 100644 --- a/maintainer/barrier.go +++ b/maintainer/barrier.go @@ -77,6 +77,7 @@ func (b *Barrier) HandleStatus(from node.ID, eventDispatcherIDsMap := make(map[*BarrierEvent][]*heartbeatpb.DispatcherID) actions := map[node.ID][]*heartbeatpb.DispatcherStatus{} var dispatcherStatus []*heartbeatpb.DispatcherStatus + deferredStatus := false for _, status := range request.BlockStatuses { // only receive block status from the replicating dispatcher dispatcherID := common.NewDispatcherIDFromPB(status.ID) @@ -121,6 +122,10 @@ func (b *Barrier) HandleStatus(from node.ID, // e.g. TODO(truncate + create table) event, action, targetID, needACK := b.handleOneStatus(request.ChangefeedID, status) if event == nil { + if !needACK { + deferredStatus = true + continue + } // should not happen log.Error("handle block status failed, event is nil", zap.String("from", from.String()), @@ -147,7 +152,7 @@ func (b *Barrier) HandleStatus(from node.ID, }) } dispatcherStatus = append(dispatcherStatus, actions[from]...) - if len(dispatcherStatus) <= 0 { + if len(dispatcherStatus) <= 0 && !deferredStatus { log.Warn("no dispatcher status to send", zap.String("from", from.String()), zap.String("changefeed", request.ChangefeedID.String()), @@ -361,6 +366,24 @@ func (b *Barrier) handleBlockState(changefeedID common.ChangeFeedID, blockState := status.State if blockState.IsBlocked { key := getEventKey(blockState.BlockTs, blockState.IsSyncPoint) + // Normal DDLs that also involve the DDL span need the table trigger dispatcher + // to participate. Under DDL-heavy workloads, table dispatchers can report many + // future barriers while the table trigger dispatcher is still executing earlier + // DDLs. If we tracked and ACKed those reports immediately, maintainer would keep + // one pending barrier per future DDL and each one would wait on tableID 0, causing + // barrier status memory to grow with the DDL backlog. Let table dispatchers resend + // until the table trigger dispatcher reaches this commitTs, then track the barrier + // normally from that report. + if _, ok := b.blockedEvents.Get(key); !ok && + dispatcherID != b.spanController.GetDDLDispatcherID() && + normalBlockEventIncludesDDLSpan(blockState.BlockTables) { + log.Debug("wait ddl dispatcher report before tracking normal block event", + zap.String("changefeed", changefeedID.Name()), + zap.String("dispatcher", dispatcherID.String()), + zap.Uint64("commitTs", blockState.BlockTs), + zap.Int64("mode", b.mode)) + return nil, nil, "", false + } // insert an event, or get the old one event check if the event is already tracked event := b.getOrInsertNewEvent(changefeedID, dispatcherID, key, blockState) if dispatcherID == b.spanController.GetDDLDispatcherID() { @@ -446,6 +469,18 @@ func (b *Barrier) handleBlockState(changefeedID common.ChangeFeedID, return event, nil, "", true } +func normalBlockEventIncludesDDLSpan(blockedTables *heartbeatpb.InfluencedTables) bool { + if blockedTables == nil || blockedTables.InfluenceType != heartbeatpb.InfluenceType_Normal { + return false + } + for _, tableID := range blockedTables.TableIDs { + if tableID == common.DDLSpanTableID { + return true + } + } + return false +} + // getOrInsertNewEvent get the block event from the map, if not found, create a new one func (b *Barrier) getOrInsertNewEvent(changefeedID common.ChangeFeedID, dispatcherID common.DispatcherID, key eventKey, blockState *heartbeatpb.State, diff --git a/maintainer/barrier_test.go b/maintainer/barrier_test.go index 5e982762a5..724bdec537 100644 --- a/maintainer/barrier_test.go +++ b/maintainer/barrier_test.go @@ -384,10 +384,8 @@ func TestNormalBlockWithTableTrigger(t *testing.T) { require.NotNil(t, msgs) require.NotEmpty(t, msgs) resp := msgs[0].Message[0].(*heartbeatpb.HeartBeatResponse) - require.Len(t, resp.DispatcherStatuses, 1) - require.True(t, resp.DispatcherStatuses[0].Ack.CommitTs == 10) - require.Len(t, resp.DispatcherStatuses[0].InfluencedDispatchers.DispatcherIDs, 1) - require.False(t, barrier.blockedEvents.m[eventKey{blockTs: 10, isSyncPoint: false}].tableTriggerDispatcherRelated) + require.Empty(t, resp.DispatcherStatuses) + require.Len(t, barrier.blockedEvents.m, 0) // table trigger block request msgs = barrier.HandleStatus("node1", &heartbeatpb.BlockStatusRequest{ @@ -434,8 +432,33 @@ func TestNormalBlockWithTableTrigger(t *testing.T) { } event := barrier.blockedEvents.m[key] require.Equal(t, uint64(10), event.commitTs) - require.True(t, event.writerDispatcher == tableTriggerEventDispatcherID) - // all dispatcher reported, the reported status is reset + require.False(t, event.rangeChecker.IsFullyCovered()) + require.True(t, event.tableTriggerDispatcherRelated) + + msgs = barrier.HandleStatus("node1", &heartbeatpb.BlockStatusRequest{ + ChangefeedID: cfID.ToPB(), + BlockStatuses: []*heartbeatpb.TableSpanBlockStatus{ + { + ID: blockedDispatcherIDS[0], + State: &heartbeatpb.State{ + IsBlocked: true, + BlockTs: 10, + BlockTables: &heartbeatpb.InfluencedTables{ + InfluenceType: heartbeatpb.InfluenceType_Normal, + TableIDs: []int64{0, 1, 2}, + }, + NeedDroppedTables: &heartbeatpb.InfluencedTables{ + InfluenceType: heartbeatpb.InfluenceType_Normal, + TableIDs: []int64{2}, + }, + NeedAddedTables: []*heartbeatpb.Table{newSpan}, + }, + }, + }, + }) + require.NotNil(t, msgs) + // all dispatchers reported, the reported status is reset + require.Equal(t, tableTriggerEventDispatcherID, event.writerDispatcher) require.False(t, event.rangeChecker.IsFullyCovered()) require.True(t, event.tableTriggerDispatcherRelated) @@ -475,6 +498,96 @@ func TestNormalBlockWithTableTrigger(t *testing.T) { require.Len(t, barrier.blockedEvents.m, 0) } +func TestNormalBlockWithDDLSpanWaitsForTableTriggerReport(t *testing.T) { + testutil.SetUpTestServices(t) + tableTriggerEventDispatcherID := common.NewDispatcherID() + cfID := common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceName) + ddlSpan := replica.NewWorkingSpanReplication(cfID, tableTriggerEventDispatcherID, + common.DDLSpanSchemaID, + common.KeyspaceDDLSpan(common.DefaultKeyspaceID), &heartbeatpb.TableSpanStatus{ + ID: tableTriggerEventDispatcherID.ToPB(), + ComponentStatus: heartbeatpb.ComponentState_Working, + CheckpointTs: 1, + }, "node1", false) + spanController := span.NewController(cfID, ddlSpan, nil, nil, nil, common.DefaultKeyspaceID, common.DefaultMode) + operatorController := operator.NewOperatorController(cfID, spanController, 1000, common.DefaultMode) + + var blockedDispatcherIDs []*heartbeatpb.DispatcherID + for id := 1; id < 3; id++ { + spanController.AddNewTable(commonEvent.Table{SchemaID: 1, TableID: int64(id)}, 10) + stm := spanController.GetTasksByTableID(int64(id))[0] + blockedDispatcherIDs = append(blockedDispatcherIDs, stm.ID.ToPB()) + spanController.BindSpanToNode("", "node1", stm) + spanController.MarkSpanReplicating(stm) + } + + barrier := NewBarrier(spanController, operatorController, false, nil, common.DefaultMode) + blockState := &heartbeatpb.State{ + IsBlocked: true, + BlockTs: 10, + BlockTables: &heartbeatpb.InfluencedTables{ + InfluenceType: heartbeatpb.InfluenceType_Normal, + TableIDs: []int64{0, 1, 2}, + }, + NeedDroppedTables: &heartbeatpb.InfluencedTables{ + InfluenceType: heartbeatpb.InfluenceType_Normal, + TableIDs: []int64{2}, + }, + NeedAddedTables: []*heartbeatpb.Table{{TableID: 10, SchemaID: 1}}, + } + + msgs := barrier.HandleStatus("node1", &heartbeatpb.BlockStatusRequest{ + ChangefeedID: cfID.ToPB(), + BlockStatuses: []*heartbeatpb.TableSpanBlockStatus{ + { + ID: blockedDispatcherIDs[0], + State: blockState, + }, + }, + }) + require.NotNil(t, msgs) + resp := msgs[0].Message[0].(*heartbeatpb.HeartBeatResponse) + require.Empty(t, resp.DispatcherStatuses) + require.Len(t, barrier.blockedEvents.m, 0) + + msgs = barrier.HandleStatus("node1", &heartbeatpb.BlockStatusRequest{ + ChangefeedID: cfID.ToPB(), + BlockStatuses: []*heartbeatpb.TableSpanBlockStatus{ + { + ID: tableTriggerEventDispatcherID.ToPB(), + State: blockState, + }, + }, + }) + require.NotNil(t, msgs) + resp = msgs[0].Message[0].(*heartbeatpb.HeartBeatResponse) + require.Len(t, resp.DispatcherStatuses, 1) + require.True(t, resp.DispatcherStatuses[0].Ack.CommitTs == 10) + event := barrier.blockedEvents.m[eventKey{blockTs: 10, isSyncPoint: false}] + require.NotNil(t, event) + require.True(t, event.tableTriggerDispatcherRelated) + require.False(t, event.selected.Load()) + + msgs = barrier.HandleStatus("node1", &heartbeatpb.BlockStatusRequest{ + ChangefeedID: cfID.ToPB(), + BlockStatuses: []*heartbeatpb.TableSpanBlockStatus{ + { + ID: blockedDispatcherIDs[0], + State: blockState, + }, + { + ID: blockedDispatcherIDs[1], + State: blockState, + }, + }, + }) + require.NotNil(t, msgs) + resp = msgs[0].Message[0].(*heartbeatpb.HeartBeatResponse) + require.Len(t, resp.DispatcherStatuses, 2) + require.True(t, event.selected.Load()) + require.Equal(t, tableTriggerEventDispatcherID, event.writerDispatcher) +} + func TestSchemaBlock(t *testing.T) { testutil.SetUpTestServices(t) nm := appcontext.GetService[*watcher.NodeManager](watcher.NodeManagerName) @@ -1443,12 +1556,10 @@ func TestBarrierEventWithDispatcherScheduling(t *testing.T) { require.NotNil(t, msgs) - // Verify the event is created but not selected for execution + // Verify the event is not tracked before the table trigger event dispatcher reports it. event, ok := barrier.blockedEvents.Get(getEventKey(ddlTs, false)) - require.True(t, ok) - require.NotNil(t, event) - require.False(t, event.selected.Load()) - require.Contains(t, event.reportedDispatchers, dispatcherA.ID) + require.False(t, ok) + require.Nil(t, event) // Phase 2: Dispatcher A enters scheduling state, table trigger event dispatcher reports DDL // Move dispatcher A to scheduling state