diff --git a/downstreamadapter/dispatcher/basic_dispatcher.go b/downstreamadapter/dispatcher/basic_dispatcher.go index 0c31275a4c..8f9c8c73ea 100644 --- a/downstreamadapter/dispatcher/basic_dispatcher.go +++ b/downstreamadapter/dispatcher/basic_dispatcher.go @@ -32,7 +32,6 @@ import ( tidbTypes "github.com/pingcap/tidb/pkg/types" "go.uber.org/atomic" "go.uber.org/zap" - "go.uber.org/zap/zapcore" ) // DispatcherService defines the interface for providing dispatcher information and basic event handling. @@ -76,7 +75,7 @@ type Dispatcher interface { GetHeartBeatInfo(h *HeartBeatInfo) GetComponentStatus() heartbeatpb.ComponentState GetBlockEventStatus() *heartbeatpb.State - GetBlockStatusesChan() chan *heartbeatpb.TableSpanBlockStatus + OfferBlockStatus(status *heartbeatpb.TableSpanBlockStatus) GetEventSizePerSecond() float32 IsTableTriggerDispatcher() bool DealWithBlockEvent(event commonEvent.BlockEvent) @@ -576,12 +575,12 @@ func (d *BasicDispatcher) handleEvents(dispatcherEvents []DispatcherEvent, wakeC latestResolvedTs := uint64(0) // Dispatcher is ready, handle the events for _, dispatcherEvent := range dispatcherEvents { - if log.GetLevel() == zapcore.DebugLevel { - log.Debug("dispatcher receive all event", - zap.Stringer("dispatcher", d.id), zap.Int64("mode", d.mode), - zap.String("eventType", commonEvent.TypeToString(dispatcherEvent.Event.GetType())), - zap.Any("event", dispatcherEvent.Event)) - } + // if log.GetLevel() == zapcore.DebugLevel { + log.Debug("dispatcher receive all event", + zap.Stringer("dispatcher", d.id), zap.Int64("mode", d.mode), + zap.String("eventType", commonEvent.TypeToString(dispatcherEvent.Event.GetType())), + zap.Any("event", dispatcherEvent.Event)) + //} failpoint.Inject("HandleEventsSlowly", func() { lag := time.Duration(rand.Intn(5000)) * time.Millisecond @@ -836,16 +835,7 @@ func (d *BasicDispatcher) HandleDispatcherStatus(dispatcherStatus *heartbeatpb.D } // Step3: whether the outdate message or not, we need to return message show we have finished the event. - d.sharedInfo.blockStatusesChan <- &heartbeatpb.TableSpanBlockStatus{ - ID: d.id.ToPB(), - State: &heartbeatpb.State{ - IsBlocked: true, - BlockTs: action.CommitTs, - IsSyncPoint: action.IsSyncPoint, - Stage: heartbeatpb.BlockStage_DONE, - }, - Mode: d.GetMode(), - } + d.OfferDoneBlockStatus(action.CommitTs, action.IsSyncPoint) } return false } @@ -881,16 +871,7 @@ func (d *BasicDispatcher) reportBlockedEventDone( actionCommitTs uint64, actionIsSyncPoint bool, ) { - d.sharedInfo.blockStatusesChan <- &heartbeatpb.TableSpanBlockStatus{ - ID: d.id.ToPB(), - State: &heartbeatpb.State{ - IsBlocked: true, - BlockTs: actionCommitTs, - IsSyncPoint: actionIsSyncPoint, - Stage: heartbeatpb.BlockStage_DONE, - }, - Mode: d.GetMode(), - } + d.OfferDoneBlockStatus(actionCommitTs, actionIsSyncPoint) GetDispatcherStatusDynamicStream().Wake(d.id) } @@ -1042,7 +1023,7 @@ func (d *BasicDispatcher) DealWithBlockEvent(event commonEvent.BlockEvent) { } else { d.resendTaskMap.Set(identifier, newResendTask(message, d, nil)) } - d.sharedInfo.blockStatusesChan <- message + d.OfferBlockStatus(message) }) // dealing with events which update schema ids @@ -1166,7 +1147,7 @@ func (d *BasicDispatcher) reportBlockedEventToMaintainer(event commonEvent.Block IsSyncPoint: event.GetType() == commonEvent.TypeSyncPointEvent, } d.resendTaskMap.Set(identifier, newResendTask(message, d, nil)) - d.sharedInfo.blockStatusesChan <- message + d.OfferBlockStatus(message) } func (d *BasicDispatcher) flushBlockedEventAndReportToMaintainer(event commonEvent.BlockEvent) { diff --git a/downstreamadapter/dispatcher/basic_dispatcher_active_active_test.go b/downstreamadapter/dispatcher/basic_dispatcher_active_active_test.go index 8f95a93b5b..882da1fe1b 100644 --- a/downstreamadapter/dispatcher/basic_dispatcher_active_active_test.go +++ b/downstreamadapter/dispatcher/basic_dispatcher_active_active_test.go @@ -129,7 +129,6 @@ func TestDDLEventsAlwaysValidateActiveActive(t *testing.T) { func newTestBasicDispatcher(t *testing.T, sinkType common.SinkType, enableActiveActive bool) *BasicDispatcher { t.Helper() statuses := make(chan TableSpanStatusWithSeq, 2) - blockStatuses := make(chan *heartbeatpb.TableSpanBlockStatus, 1) errCh := make(chan error, 1) sharedInfo := NewSharedInfo( common.NewChangefeedID("test"), @@ -143,7 +142,7 @@ func newTestBasicDispatcher(t *testing.T, sinkType common.SinkType, enableActive nil, false, statuses, - blockStatuses, + 1, errCh, ) dispatcherSink := newDispatcherTestSink(t, sinkType) diff --git a/downstreamadapter/dispatcher/basic_dispatcher_info.go b/downstreamadapter/dispatcher/basic_dispatcher_info.go index 19a0980461..161d3eb58c 100644 --- a/downstreamadapter/dispatcher/basic_dispatcher_info.go +++ b/downstreamadapter/dispatcher/basic_dispatcher_info.go @@ -14,6 +14,7 @@ package dispatcher import ( + "context" "sync/atomic" "time" @@ -58,9 +59,9 @@ type SharedInfo struct { // statusesChan is used to store the status of dispatchers when status changed // and push to heartbeatRequestQueue statusesChan chan TableSpanStatusWithSeq - // blockStatusesChan use to collector block status of ddl/sync point event to Maintainer - // shared by the event dispatcher manager - blockStatusesChan chan *heartbeatpb.TableSpanBlockStatus + // blockStatusBuffer keeps block statuses for the dispatcher manager. + // Identical DONE statuses are coalesced while pending to reduce local memory amplification. + blockStatusBuffer *BlockStatusBuffer // blockExecutor is used to execute block events such as DDL and sync point events asynchronously // to avoid callback() called in handleEvents, causing deadlock in ds @@ -88,7 +89,7 @@ func NewSharedInfo( txnAtomicity *config.AtomicityLevel, enableSplittableCheck bool, statusesChan chan TableSpanStatusWithSeq, - blockStatusesChan chan *heartbeatpb.TableSpanBlockStatus, + blockStatusBufferSize int, errCh chan error, ) *SharedInfo { sharedInfo := &SharedInfo{ @@ -102,7 +103,7 @@ func NewSharedInfo( syncPointConfig: syncPointConfig, enableSplittableCheck: enableSplittableCheck, statusesChan: statusesChan, - blockStatusesChan: blockStatusesChan, + blockStatusBuffer: NewBlockStatusBuffer(blockStatusBufferSize), blockExecutor: newBlockEventExecutor(), errCh: errCh, metricHandleDDLHis: metrics.HandleDDLHistogram.WithLabelValues(changefeedID.Keyspace(), changefeedID.Name()), @@ -219,8 +220,20 @@ func (d *BasicDispatcher) GetTxnAtomicity() config.AtomicityLevel { return d.sharedInfo.txnAtomicity } -func (d *BasicDispatcher) GetBlockStatusesChan() chan *heartbeatpb.TableSpanBlockStatus { - return d.sharedInfo.blockStatusesChan +func (d *BasicDispatcher) OfferBlockStatus(status *heartbeatpb.TableSpanBlockStatus) { + d.sharedInfo.OfferBlockStatus(status) +} + +func (d *BasicDispatcher) OfferDoneBlockStatus(blockTs uint64, isSyncPoint bool) { + d.sharedInfo.OfferDoneBlockStatus(d.id, blockTs, isSyncPoint, d.GetMode()) +} + +func (d *BasicDispatcher) TakeBlockStatus(ctx context.Context) *heartbeatpb.TableSpanBlockStatus { + return d.sharedInfo.TakeBlockStatus(ctx) +} + +func (d *BasicDispatcher) TakeBlockStatusWithTimeout(timeout time.Duration) (*heartbeatpb.TableSpanBlockStatus, bool) { + return d.sharedInfo.TakeBlockStatusWithTimeout(timeout) } func (d *BasicDispatcher) GetEventSizePerSecond() float32 { @@ -254,8 +267,34 @@ func (s *SharedInfo) EnableActiveActive() bool { return s.enableActiveActive } -func (s *SharedInfo) GetBlockStatusesChan() chan *heartbeatpb.TableSpanBlockStatus { - return s.blockStatusesChan +func (s *SharedInfo) OfferBlockStatus(status *heartbeatpb.TableSpanBlockStatus) { + s.blockStatusBuffer.Offer(status) +} + +func (s *SharedInfo) OfferDoneBlockStatus(dispatcherID common.DispatcherID, blockTs uint64, isSyncPoint bool, mode int64) { + s.blockStatusBuffer.OfferDone(dispatcherID, blockTs, isSyncPoint, mode) +} + +func (s *SharedInfo) TakeBlockStatus(ctx context.Context) *heartbeatpb.TableSpanBlockStatus { + return s.blockStatusBuffer.Take(ctx) +} + +func (s *SharedInfo) TakeBlockStatusWithTimeout(timeout time.Duration) (*heartbeatpb.TableSpanBlockStatus, bool) { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + status := s.TakeBlockStatus(ctx) + if status == nil { + return nil, false + } + return status, true +} + +func (s *SharedInfo) TryTakeBlockStatus() (*heartbeatpb.TableSpanBlockStatus, bool) { + return s.blockStatusBuffer.TryTake() +} + +func (s *SharedInfo) BlockStatusLen() int { + return s.blockStatusBuffer.Len() } func (s *SharedInfo) GetErrCh() chan error { diff --git a/downstreamadapter/dispatcher/block_status_buffer.go b/downstreamadapter/dispatcher/block_status_buffer.go new file mode 100644 index 0000000000..894e4dc193 --- /dev/null +++ b/downstreamadapter/dispatcher/block_status_buffer.go @@ -0,0 +1,157 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package dispatcher + +import ( + "context" + "sync" + + "github.com/pingcap/ticdc/heartbeatpb" + "github.com/pingcap/ticdc/pkg/common" +) + +type doneBlockStatusKey struct { + dispatcherID common.DispatcherID + blockTs uint64 + mode int64 + isSyncPoint bool +} + +type blockStatusQueueEntry struct { + status *heartbeatpb.TableSpanBlockStatus + doneKey *doneBlockStatusKey +} + +// BlockStatusBuffer keeps block statuses ordered while coalescing identical DONE +// statuses that are still pending locally. Non-DONE statuses keep the original +// protobuf object and ordering. +type BlockStatusBuffer struct { + queue chan blockStatusQueueEntry + + mu sync.Mutex + pendingDone map[doneBlockStatusKey]struct{} +} + +// NewBlockStatusBuffer creates a bounded local mailbox for dispatcher block +// statuses. The buffer keeps enqueue order while coalescing identical pending +// DONE statuses before protobuf materialization. +func NewBlockStatusBuffer(size int) *BlockStatusBuffer { + if size <= 0 { + size = 1 + } + return &BlockStatusBuffer{ + queue: make(chan blockStatusQueueEntry, size), + pendingDone: make(map[doneBlockStatusKey]struct{}), + } +} + +func (b *BlockStatusBuffer) Offer(status *heartbeatpb.TableSpanBlockStatus) { + if status == nil { + return + } + if !isDoneBlockStatus(status) { + b.queue <- blockStatusQueueEntry{status: status} + return + } + + key := doneBlockStatusKey{ + dispatcherID: common.NewDispatcherIDFromPB(status.ID), + blockTs: status.State.BlockTs, + mode: status.Mode, + isSyncPoint: status.State.IsSyncPoint, + } + if !b.reserveDone(key) { + return + } + b.queue <- blockStatusQueueEntry{doneKey: &key} +} + +func (b *BlockStatusBuffer) OfferDone( + dispatcherID common.DispatcherID, + blockTs uint64, + isSyncPoint bool, + mode int64, +) { + key := doneBlockStatusKey{ + dispatcherID: dispatcherID, + blockTs: blockTs, + mode: mode, + isSyncPoint: isSyncPoint, + } + if !b.reserveDone(key) { + return + } + b.queue <- blockStatusQueueEntry{doneKey: &key} +} + +func (b *BlockStatusBuffer) Take(ctx context.Context) *heartbeatpb.TableSpanBlockStatus { + select { + case <-ctx.Done(): + return nil + case entry := <-b.queue: + return b.materialize(entry) + } +} + +func (b *BlockStatusBuffer) TryTake() (*heartbeatpb.TableSpanBlockStatus, bool) { + select { + case entry := <-b.queue: + return b.materialize(entry), true + default: + return nil, false + } +} + +func (b *BlockStatusBuffer) Len() int { + return len(b.queue) +} + +func (b *BlockStatusBuffer) reserveDone(key doneBlockStatusKey) bool { + b.mu.Lock() + defer b.mu.Unlock() + if _, ok := b.pendingDone[key]; ok { + return false + } + b.pendingDone[key] = struct{}{} + return true +} + +func (b *BlockStatusBuffer) materialize(entry blockStatusQueueEntry) *heartbeatpb.TableSpanBlockStatus { + if entry.status != nil { + return entry.status + } + + key := *entry.doneKey + b.mu.Lock() + delete(b.pendingDone, key) + b.mu.Unlock() + + return &heartbeatpb.TableSpanBlockStatus{ + ID: key.dispatcherID.ToPB(), + State: &heartbeatpb.State{ + IsBlocked: true, + BlockTs: key.blockTs, + IsSyncPoint: key.isSyncPoint, + Stage: heartbeatpb.BlockStage_DONE, + }, + Mode: key.mode, + } +} + +func isDoneBlockStatus(status *heartbeatpb.TableSpanBlockStatus) bool { + return status != nil && + status.State != nil && + status.State.IsBlocked && + status.State.Stage == heartbeatpb.BlockStage_DONE +} diff --git a/downstreamadapter/dispatcher/block_status_buffer_test.go b/downstreamadapter/dispatcher/block_status_buffer_test.go new file mode 100644 index 0000000000..4dcce37d2e --- /dev/null +++ b/downstreamadapter/dispatcher/block_status_buffer_test.go @@ -0,0 +1,107 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package dispatcher + +import ( + "context" + "testing" + "time" + + "github.com/pingcap/ticdc/heartbeatpb" + "github.com/pingcap/ticdc/pkg/common" + "github.com/stretchr/testify/require" +) + +func TestBlockStatusBufferDeduplicatesPendingDone(t *testing.T) { + buffer := NewBlockStatusBuffer(4) + dispatcherID := common.NewDispatcherID() + + buffer.OfferDone(dispatcherID, 100, false, common.DefaultMode) + buffer.OfferDone(dispatcherID, 100, false, common.DefaultMode) + + require.Equal(t, 1, buffer.Len()) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + msg := buffer.Take(ctx) + require.NotNil(t, msg) + require.Equal(t, dispatcherID, common.NewDispatcherIDFromPB(msg.ID)) + require.Equal(t, uint64(100), msg.State.BlockTs) + require.Equal(t, heartbeatpb.BlockStage_DONE, msg.State.Stage) + require.False(t, msg.State.IsSyncPoint) + require.Equal(t, common.DefaultMode, msg.Mode) + + _, ok := buffer.TryTake() + require.False(t, ok) +} + +func TestBlockStatusBufferKeepsWaitingBeforeDone(t *testing.T) { + buffer := NewBlockStatusBuffer(4) + dispatcherID := common.NewDispatcherID() + + buffer.Offer(&heartbeatpb.TableSpanBlockStatus{ + ID: dispatcherID.ToPB(), + State: &heartbeatpb.State{ + IsBlocked: true, + BlockTs: 200, + Stage: heartbeatpb.BlockStage_WAITING, + }, + Mode: common.DefaultMode, + }) + buffer.OfferDone(dispatcherID, 200, false, common.DefaultMode) + buffer.OfferDone(dispatcherID, 200, false, common.DefaultMode) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + first := buffer.Take(ctx) + require.NotNil(t, first) + require.Equal(t, heartbeatpb.BlockStage_WAITING, first.State.Stage) + + second := buffer.Take(ctx) + require.NotNil(t, second) + require.Equal(t, heartbeatpb.BlockStage_DONE, second.State.Stage) + + _, ok := buffer.TryTake() + require.False(t, ok) +} + +func TestBlockStatusBufferKeepsDistinctDoneKeys(t *testing.T) { + buffer := NewBlockStatusBuffer(4) + dispatcherID := common.NewDispatcherID() + + buffer.OfferDone(dispatcherID, 300, false, common.DefaultMode) + buffer.OfferDone(dispatcherID, 300, true, common.DefaultMode) + buffer.OfferDone(dispatcherID, 300, false, common.RedoMode) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + first := buffer.Take(ctx) + require.NotNil(t, first) + require.False(t, first.State.IsSyncPoint) + require.Equal(t, common.DefaultMode, first.Mode) + + second := buffer.Take(ctx) + require.NotNil(t, second) + require.True(t, second.State.IsSyncPoint) + require.Equal(t, common.DefaultMode, second.Mode) + + third := buffer.Take(ctx) + require.NotNil(t, third) + require.False(t, third.State.IsSyncPoint) + require.Equal(t, common.RedoMode, third.Mode) + + _, ok := buffer.TryTake() + require.False(t, ok) +} diff --git a/downstreamadapter/dispatcher/event_dispatcher.go b/downstreamadapter/dispatcher/event_dispatcher.go index 65464b919b..93bc60b02c 100644 --- a/downstreamadapter/dispatcher/event_dispatcher.go +++ b/downstreamadapter/dispatcher/event_dispatcher.go @@ -133,9 +133,15 @@ func (d *EventDispatcher) cache(dispatcherEvents []DispatcherEvent, wakeCallback } func (d *EventDispatcher) HandleEvents(dispatcherEvents []DispatcherEvent, wakeCallback func()) bool { + log.Info("handle events", zap.Stringer("dispatcher", d.id), zap.Int("eventCount", len(dispatcherEvents)), zap.Uint64("redoGlobalTs", d.redoGlobalTs.Load())) // if the commit-ts of last event of dispatcherEvents is greater than redoGlobalTs, // the dispatcherEvents will be cached util the redoGlobalTs is updated. if d.redoEnable && len(dispatcherEvents) > 0 && d.redoGlobalTs.Load() < dispatcherEvents[len(dispatcherEvents)-1].Event.GetCommitTs() { + log.Info("cache events because commitTs is greater than redoGlobalTs", + zap.Stringer("dispatcher", d.id), + zap.Uint64("dispatcherResolvedTs", d.GetResolvedTs()), + zap.Int("length", len(dispatcherEvents)), + zap.Int("eventType", dispatcherEvents[len(dispatcherEvents)-1].Event.GetType())) d.cache(dispatcherEvents, wakeCallback) return true } diff --git a/downstreamadapter/dispatcher/event_dispatcher_test.go b/downstreamadapter/dispatcher/event_dispatcher_test.go index 29b8ace3e2..0ce694d02e 100644 --- a/downstreamadapter/dispatcher/event_dispatcher_test.go +++ b/downstreamadapter/dispatcher/event_dispatcher_test.go @@ -83,7 +83,7 @@ func newDispatcherForTest(sink sink.Sink, tableSpan *heartbeatpb.TableSpan) *Eve &defaultAtomicity, false, // enableSplittableCheck make(chan TableSpanStatusWithSeq, 128), - make(chan *heartbeatpb.TableSpanBlockStatus, 128), + 128, make(chan error, 1), ) return NewEventDispatcher( @@ -476,22 +476,16 @@ func TestBlockingDDLFlushBeforeWaitingAndWriteDoesNotFlushAgain(t *testing.T) { require.Nil(t, pendingEvent) require.Equal(t, heartbeatpb.BlockStage_NONE, blockStage) - select { - case msg := <-dispatcher.GetBlockStatusesChan(): - require.FailNow(t, "unexpected block status before local flush finishes", "received=%v", msg) - case <-time.After(200 * time.Millisecond): - } + msg, ok := dispatcher.TakeBlockStatusWithTimeout(200 * time.Millisecond) + require.False(t, ok, "unexpected block status before local flush finishes: %v", msg) close(flushRelease) - select { - case msg := <-dispatcher.GetBlockStatusesChan(): - require.True(t, msg.State.IsBlocked) - require.Equal(t, uint64(10), msg.State.BlockTs) - require.Equal(t, heartbeatpb.BlockStage_WAITING, msg.State.Stage) - case <-time.After(time.Second): - require.FailNow(t, "expected blocking DDL to enter WAITING after local flush") - } + msg, ok = dispatcher.TakeBlockStatusWithTimeout(time.Second) + require.True(t, ok, "expected blocking DDL to enter WAITING after local flush") + require.True(t, msg.State.IsBlocked) + require.Equal(t, uint64(10), msg.State.BlockTs) + require.Equal(t, heartbeatpb.BlockStage_WAITING, msg.State.Stage) pendingEvent, blockStage = dispatcher.blockEventStatus.getEventAndStage() require.Same(t, ddlEvent, pendingEvent) @@ -507,14 +501,11 @@ func TestBlockingDDLFlushBeforeWaitingAndWriteDoesNotFlushAgain(t *testing.T) { }) require.True(t, await) - select { - case msg := <-dispatcher.GetBlockStatusesChan(): - require.True(t, msg.State.IsBlocked) - require.Equal(t, uint64(10), msg.State.BlockTs) - require.Equal(t, heartbeatpb.BlockStage_DONE, msg.State.Stage) - case <-time.After(time.Second): - require.FailNow(t, "expected DONE after write action") - } + msg, ok = dispatcher.TakeBlockStatusWithTimeout(time.Second) + require.True(t, ok, "expected DONE after write action") + require.True(t, msg.State.IsBlocked) + require.Equal(t, uint64(10), msg.State.BlockTs) + require.Equal(t, heartbeatpb.BlockStage_DONE, msg.State.Stage) require.Eventually(t, func() bool { pendingEvent, blockStage = dispatcher.blockEventStatus.getEventAndStage() @@ -1004,7 +995,7 @@ func TestDispatcherSplittableCheck(t *testing.T) { &defaultAtomicity, true, // enableSplittableCheck = true make(chan TableSpanStatusWithSeq, 128), - make(chan *heartbeatpb.TableSpanBlockStatus, 128), + 128, make(chan error, 1), ) @@ -1114,7 +1105,7 @@ func TestDispatcher_SkipDMLAsStartTs_FilterCorrectly(t *testing.T) { &defaultAtomicity, false, make(chan TableSpanStatusWithSeq, 128), - make(chan *heartbeatpb.TableSpanBlockStatus, 128), + 128, make(chan error, 1), ) @@ -1194,7 +1185,7 @@ func TestDispatcher_SkipDMLAsStartTs_Disabled(t *testing.T) { &defaultAtomicity, false, make(chan TableSpanStatusWithSeq, 128), - make(chan *heartbeatpb.TableSpanBlockStatus, 128), + 128, make(chan error, 1), ) @@ -1258,14 +1249,11 @@ func TestHoldBlockEventUntilNoResendTasks(t *testing.T) { block := dispatcher.HandleEvents([]DispatcherEvent{NewDispatcherEvent(&nodeID, createTableDDL)}, func() {}) require.True(t, block) - select { - case msg := <-dispatcher.GetBlockStatusesChan(): - require.False(t, msg.State.IsBlocked) - require.False(t, msg.State.IsSyncPoint) - require.Equal(t, uint64(10), msg.State.BlockTs) - case <-time.After(time.Second): - require.FailNow(t, "expected add-table block status") - } + msg, ok := dispatcher.TakeBlockStatusWithTimeout(time.Second) + require.True(t, ok, "expected add-table block status") + require.False(t, msg.State.IsBlocked) + require.False(t, msg.State.IsSyncPoint) + require.Equal(t, uint64(10), msg.State.BlockTs) require.Equal(t, 1, dispatcher.resendTaskMap.Len()) // A DB/All block event must be deferred until resendTaskMap becomes empty, @@ -1281,11 +1269,8 @@ func TestHoldBlockEventUntilNoResendTasks(t *testing.T) { block = dispatcher.HandleEvents([]DispatcherEvent{NewDispatcherEvent(&nodeID, dropDBDDL)}, func() {}) require.True(t, block) - select { - case msg := <-dispatcher.GetBlockStatusesChan(): - require.FailNow(t, "unexpected block status", "received=%v", msg) - case <-time.After(200 * time.Millisecond): - } + msg, ok = dispatcher.TakeBlockStatusWithTimeout(200 * time.Millisecond) + require.False(t, ok, "unexpected block status: %v", msg) // Simulate maintainer ACK for the create table scheduling message. dispatcher.HandleDispatcherStatus(&heartbeatpb.DispatcherStatus{ @@ -1301,23 +1286,17 @@ func TestHoldBlockEventUntilNoResendTasks(t *testing.T) { require.FailNow(t, "expected deferred DB-level flush to start") } - select { - case msg := <-dispatcher.GetBlockStatusesChan(): - require.FailNow(t, "unexpected block status before local flush finishes", "received=%v", msg) - case <-time.After(200 * time.Millisecond): - } + msg, ok = dispatcher.TakeBlockStatusWithTimeout(200 * time.Millisecond) + require.False(t, ok, "unexpected block status before local flush finishes: %v", msg) close(flushRelease) - select { - case msg := <-dispatcher.GetBlockStatusesChan(): - require.True(t, msg.State.IsBlocked) - require.False(t, msg.State.IsSyncPoint) - require.Equal(t, uint64(20), msg.State.BlockTs) - require.Equal(t, heartbeatpb.InfluenceType_DB, msg.State.BlockTables.InfluenceType) - require.Equal(t, int64(1), msg.State.BlockTables.SchemaID) - require.Equal(t, heartbeatpb.BlockStage_WAITING, msg.State.Stage) - case <-time.After(time.Second): - require.FailNow(t, "expected deferred DB-level block status") - } + msg, ok = dispatcher.TakeBlockStatusWithTimeout(time.Second) + require.True(t, ok, "expected deferred DB-level block status") + require.True(t, msg.State.IsBlocked) + require.False(t, msg.State.IsSyncPoint) + require.Equal(t, uint64(20), msg.State.BlockTs) + require.Equal(t, heartbeatpb.InfluenceType_DB, msg.State.BlockTables.InfluenceType) + require.Equal(t, int64(1), msg.State.BlockTables.SchemaID) + require.Equal(t, heartbeatpb.BlockStage_WAITING, msg.State.Stage) } diff --git a/downstreamadapter/dispatcher/helper.go b/downstreamadapter/dispatcher/helper.go index d50137a95b..30983c1d50 100644 --- a/downstreamadapter/dispatcher/helper.go +++ b/downstreamadapter/dispatcher/helper.go @@ -270,7 +270,7 @@ func newResendTask(message *heartbeatpb.TableSpanBlockStatus, dispatcher Dispatc func (t *ResendTask) Execute() time.Time { log.Debug("resend task", zap.Any("message", t.message), zap.Any("dispatcherID", t.dispatcher.GetId())) - t.dispatcher.GetBlockStatusesChan() <- t.message + t.dispatcher.OfferBlockStatus(t.message) executeCount := atomic.AddUint64(&t.executeCount, 1) if executeCount%10 == 0 { diff --git a/downstreamadapter/dispatcher/redo_dispatcher_test.go b/downstreamadapter/dispatcher/redo_dispatcher_test.go index 68f97016b7..bdd906eebc 100644 --- a/downstreamadapter/dispatcher/redo_dispatcher_test.go +++ b/downstreamadapter/dispatcher/redo_dispatcher_test.go @@ -48,7 +48,7 @@ func newRedoDispatcherForTest(sink sink.Sink, tableSpan *heartbeatpb.TableSpan) &defaultAtomicity, false, // enableSplittableCheck make(chan TableSpanStatusWithSeq, 128), - make(chan *heartbeatpb.TableSpanBlockStatus, 128), + 128, make(chan error, 1), ) return NewRedoDispatcher( diff --git a/downstreamadapter/dispatchermanager/dispatcher_manager.go b/downstreamadapter/dispatchermanager/dispatcher_manager.go index 128f8cbe1e..ef4668bb29 100644 --- a/downstreamadapter/dispatchermanager/dispatcher_manager.go +++ b/downstreamadapter/dispatchermanager/dispatcher_manager.go @@ -45,6 +45,8 @@ import ( "go.uber.org/zap" ) +const maxBlockStatusesPerRequest = 2048 + /* DispatcherManager manages dispatchers for a changefeed instance with responsibilities including: @@ -258,7 +260,7 @@ func NewDispatcherManager( manager.config.SinkConfig.TxnAtomicity, manager.config.EnableSplittableCheck, make(chan dispatcher.TableSpanStatusWithSeq, 8192), - make(chan *heartbeatpb.TableSpanBlockStatus, 1024*1024), + 1024*1024, make(chan error, 1), ) @@ -565,49 +567,57 @@ func (e *DispatcherManager) collectErrors(ctx context.Context) { // collectBlockStatusRequest collect the block status from the block status channel and report to the maintainer. func (e *DispatcherManager) collectBlockStatusRequest(ctx context.Context) { - delay := time.NewTimer(0) - defer delay.Stop() enqueueBlockStatus := func(blockStatusMessage []*heartbeatpb.TableSpanBlockStatus, mode int64) { - var message heartbeatpb.BlockStatusRequest - message.ChangefeedID = e.changefeedID.ToPB() - message.BlockStatuses = blockStatusMessage - message.Mode = mode - e.blockStatusRequestQueue.Enqueue(&BlockStatusRequestWithTargetID{TargetID: e.GetMaintainerID(), Request: &message}) + // Split oversized batches so one protobuf message does not monopolize + // serialization, transport, and maintainer-side processing. + for start := 0; start < len(blockStatusMessage); start += maxBlockStatusesPerRequest { + end := start + maxBlockStatusesPerRequest + if end > len(blockStatusMessage) { + end = len(blockStatusMessage) + } + var message heartbeatpb.BlockStatusRequest + message.ChangefeedID = e.changefeedID.ToPB() + message.BlockStatuses = blockStatusMessage[start:end] + message.Mode = mode + e.blockStatusRequestQueue.Enqueue(&BlockStatusRequestWithTargetID{TargetID: e.GetMaintainerID(), Request: &message}) + } } for { blockStatusMessage := make([]*heartbeatpb.TableSpanBlockStatus, 0) redoBlockStatusMessage := make([]*heartbeatpb.TableSpanBlockStatus, 0) - select { - case <-ctx.Done(): + blockStatus := e.sharedInfo.TakeBlockStatus(ctx) + if blockStatus == nil { return - case blockStatus := <-e.sharedInfo.GetBlockStatusesChan(): + } + if common.IsDefaultMode(blockStatus.Mode) { + blockStatusMessage = append(blockStatusMessage, blockStatus) + } else { + redoBlockStatusMessage = append(redoBlockStatusMessage, blockStatus) + } + + batchCtx, cancel := context.WithTimeout(ctx, 10*time.Millisecond) + for { + blockStatus = e.sharedInfo.TakeBlockStatus(batchCtx) + if blockStatus == nil { + break + } if common.IsDefaultMode(blockStatus.Mode) { blockStatusMessage = append(blockStatusMessage, blockStatus) } else { redoBlockStatusMessage = append(redoBlockStatusMessage, blockStatus) } - delay.Reset(10 * time.Millisecond) - loop: - for { - select { - case blockStatus := <-e.sharedInfo.GetBlockStatusesChan(): - if common.IsDefaultMode(blockStatus.Mode) { - blockStatusMessage = append(blockStatusMessage, blockStatus) - } else { - redoBlockStatusMessage = append(redoBlockStatusMessage, blockStatus) - } - case <-delay.C: - break loop - } - } + } + cancel() + if ctx.Err() != nil { + return + } - e.metricBlockStatusesChanLen.Set(float64(len(e.sharedInfo.GetBlockStatusesChan()))) - if len(blockStatusMessage) != 0 { - enqueueBlockStatus(blockStatusMessage, common.DefaultMode) - } - if len(redoBlockStatusMessage) != 0 { - enqueueBlockStatus(redoBlockStatusMessage, common.RedoMode) - } + e.metricBlockStatusesChanLen.Set(float64(e.sharedInfo.BlockStatusLen())) + if len(blockStatusMessage) != 0 { + enqueueBlockStatus(blockStatusMessage, common.DefaultMode) + } + if len(redoBlockStatusMessage) != 0 { + enqueueBlockStatus(redoBlockStatusMessage, common.RedoMode) } } } diff --git a/downstreamadapter/dispatchermanager/dispatcher_manager_test.go b/downstreamadapter/dispatchermanager/dispatcher_manager_test.go index d26a6193d8..c47622a396 100644 --- a/downstreamadapter/dispatchermanager/dispatcher_manager_test.go +++ b/downstreamadapter/dispatchermanager/dispatcher_manager_test.go @@ -80,7 +80,7 @@ func createTestDispatcher(t *testing.T, manager *DispatcherManager, id common.Di &defaultAtomicity, false, make(chan dispatcher.TableSpanStatusWithSeq, 1), - make(chan *heartbeatpb.TableSpanBlockStatus, 1), + 1, make(chan error, 1), ) d := dispatcher.NewEventDispatcher( @@ -125,6 +125,7 @@ func createTestManager(t *testing.T) *DispatcherManager { metricResolvedTs: metrics.DispatcherManagerResolvedTsGauge.WithLabelValues(changefeedID.Keyspace(), changefeedID.Name()), metricCheckpointTsLag: metrics.DispatcherManagerCheckpointTsLagGauge.WithLabelValues(changefeedID.Keyspace(), changefeedID.Name()), metricResolvedTsLag: metrics.DispatcherManagerResolvedTsLagGauge.WithLabelValues(changefeedID.Keyspace(), changefeedID.Name()), + metricBlockStatusesChanLen: metrics.DispatcherManagerBlockStatusesChanLenGauge.WithLabelValues(changefeedID.Keyspace(), changefeedID.Name()), } // Create shared info for the test manager @@ -141,7 +142,7 @@ func createTestManager(t *testing.T) *DispatcherManager { &defaultAtomicity, false, make(chan dispatcher.TableSpanStatusWithSeq, 8192), - make(chan *heartbeatpb.TableSpanBlockStatus, 1024*1024), + 1024*1024, make(chan error, 1), ) nodeID := node.NewID() @@ -217,6 +218,66 @@ func TestCollectComponentStatusWhenChangedWatermarkSeqNoFallback(t *testing.T) { require.Equal(t, uint64(200), req.Request.RedoWatermark.Seq) } +func TestCollectBlockStatusRequestSplitsOversizedMessages(t *testing.T) { + manager := createTestManager(t) + + for i := 0; i < maxBlockStatusesPerRequest+2; i++ { + manager.sharedInfo.OfferBlockStatus(newWaitingBlockStatus(common.DefaultMode, uint64(i+1))) + } + for i := 0; i < maxBlockStatusesPerRequest+1; i++ { + manager.sharedInfo.OfferBlockStatus(newWaitingBlockStatus(common.RedoMode, uint64(i+10000))) + } + + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan struct{}) + go func() { + manager.collectBlockStatusRequest(ctx) + close(done) + }() + defer func() { + cancel() + <-done + }() + + dequeueRequest := func() *BlockStatusRequestWithTargetID { + t.Helper() + dequeueCtx, cancelDequeue := context.WithTimeout(context.Background(), time.Second) + defer cancelDequeue() + req := manager.blockStatusRequestQueue.Dequeue(dequeueCtx) + require.NotNil(t, req) + require.NotNil(t, req.Request) + return req + } + + defaultFirst := dequeueRequest() + defaultSecond := dequeueRequest() + redoFirst := dequeueRequest() + redoSecond := dequeueRequest() + + require.Equal(t, common.DefaultMode, defaultFirst.Request.Mode) + require.Len(t, defaultFirst.Request.BlockStatuses, maxBlockStatusesPerRequest) + require.Equal(t, uint64(1), defaultFirst.Request.BlockStatuses[0].State.BlockTs) + require.Equal(t, uint64(maxBlockStatusesPerRequest), defaultFirst.Request.BlockStatuses[maxBlockStatusesPerRequest-1].State.BlockTs) + + require.Equal(t, common.DefaultMode, defaultSecond.Request.Mode) + require.Len(t, defaultSecond.Request.BlockStatuses, 2) + require.Equal(t, uint64(maxBlockStatusesPerRequest+1), defaultSecond.Request.BlockStatuses[0].State.BlockTs) + require.Equal(t, uint64(maxBlockStatusesPerRequest+2), defaultSecond.Request.BlockStatuses[1].State.BlockTs) + + require.Equal(t, common.RedoMode, redoFirst.Request.Mode) + require.Len(t, redoFirst.Request.BlockStatuses, maxBlockStatusesPerRequest) + require.Equal(t, uint64(10000), redoFirst.Request.BlockStatuses[0].State.BlockTs) + require.Equal(t, uint64(10000+maxBlockStatusesPerRequest-1), redoFirst.Request.BlockStatuses[maxBlockStatusesPerRequest-1].State.BlockTs) + + require.Equal(t, common.RedoMode, redoSecond.Request.Mode) + require.Len(t, redoSecond.Request.BlockStatuses, 1) + require.Equal(t, uint64(10000+maxBlockStatusesPerRequest), redoSecond.Request.BlockStatuses[0].State.BlockTs) + + shortCtx, shortCancel := context.WithTimeout(context.Background(), 20*time.Millisecond) + defer shortCancel() + require.Nil(t, manager.blockStatusRequestQueue.Dequeue(shortCtx)) +} + func TestMergeDispatcherNormal(t *testing.T) { manager := createTestManager(t) @@ -250,6 +311,18 @@ func TestMergeDispatcherNormal(t *testing.T) { require.Equal(t, []byte("z"), mergedDispatcher.GetTableSpan().EndKey) } +func newWaitingBlockStatus(mode int64, blockTs uint64) *heartbeatpb.TableSpanBlockStatus { + return &heartbeatpb.TableSpanBlockStatus{ + ID: common.NewDispatcherID().ToPB(), + State: &heartbeatpb.State{ + IsBlocked: true, + BlockTs: blockTs, + Stage: heartbeatpb.BlockStage_WAITING, + }, + Mode: mode, + } +} + func TestMergeDispatcherInvalidIDs(t *testing.T) { manager := createTestManager(t) diff --git a/downstreamadapter/dispatchermanager/heartbeat_collector.go b/downstreamadapter/dispatchermanager/heartbeat_collector.go index 99ef678b14..67417face8 100644 --- a/downstreamadapter/dispatchermanager/heartbeat_collector.go +++ b/downstreamadapter/dispatchermanager/heartbeat_collector.go @@ -248,6 +248,7 @@ func (c *HeartBeatCollector) sendBlockStatusMessages(ctx context.Context) error messaging.MaintainerManagerTopic, blockStatusRequestWithTargetID.Request, )) + c.blockStatusReqQueue.OnSendComplete(blockStatusRequestWithTargetID) if err != nil { log.Error("failed to send block status request message", zap.Error(err)) } diff --git a/downstreamadapter/dispatchermanager/heartbeat_queue.go b/downstreamadapter/dispatchermanager/heartbeat_queue.go index e93b59f401..6ceae27f48 100644 --- a/downstreamadapter/dispatchermanager/heartbeat_queue.go +++ b/downstreamadapter/dispatchermanager/heartbeat_queue.go @@ -15,8 +15,10 @@ package dispatchermanager import ( "context" + "sync" "github.com/pingcap/ticdc/heartbeatpb" + "github.com/pingcap/ticdc/pkg/common" "github.com/pingcap/ticdc/pkg/metrics" "github.com/pingcap/ticdc/pkg/node" ) @@ -62,15 +64,30 @@ type BlockStatusRequestWithTargetID struct { // BlockStatusRequestQueue is a channel for all event dispatcher managers to send block status requests to HeartBeatCollector type BlockStatusRequestQueue struct { queue chan *BlockStatusRequestWithTargetID + + mu sync.Mutex + requestDoneKeys map[*BlockStatusRequestWithTargetID][]blockStatusRequestDedupeKey + queuedDone map[blockStatusRequestDedupeKey]struct{} + inFlightDone map[blockStatusRequestDedupeKey]struct{} } func NewBlockStatusRequestQueue() *BlockStatusRequestQueue { return &BlockStatusRequestQueue{ - queue: make(chan *BlockStatusRequestWithTargetID, 10000), + queue: make(chan *BlockStatusRequestWithTargetID, 10000), + requestDoneKeys: make(map[*BlockStatusRequestWithTargetID][]blockStatusRequestDedupeKey), + queuedDone: make(map[blockStatusRequestDedupeKey]struct{}), + inFlightDone: make(map[blockStatusRequestDedupeKey]struct{}), } } func (q *BlockStatusRequestQueue) Enqueue(request *BlockStatusRequestWithTargetID) { + if request == nil || request.Request == nil { + return + } + if !q.trackPendingDone(request) { + metrics.HeartbeatCollectorBlockStatusRequestQueueLenGauge.Set(float64(len(q.queue))) + return + } q.queue <- request metrics.HeartbeatCollectorBlockStatusRequestQueueLenGauge.Set(float64(len(q.queue))) } @@ -80,11 +97,89 @@ func (q *BlockStatusRequestQueue) Dequeue(ctx context.Context) *BlockStatusReque case <-ctx.Done(): return nil case request := <-q.queue: + q.markInFlight(request) metrics.HeartbeatCollectorBlockStatusRequestQueueLenGauge.Set(float64(len(q.queue))) return request } } +func (q *BlockStatusRequestQueue) OnSendComplete(request *BlockStatusRequestWithTargetID) { + if request == nil { + return + } + + q.mu.Lock() + defer q.mu.Unlock() + for _, key := range q.requestDoneKeys[request] { + delete(q.inFlightDone, key) + } + delete(q.requestDoneKeys, request) +} + func (q *BlockStatusRequestQueue) Close() { close(q.queue) } + +type blockStatusRequestDedupeKey struct { + targetID node.ID + dispatcherID common.DispatcherID + blockTs uint64 + mode int64 + isSyncPoint bool +} + +func (q *BlockStatusRequestQueue) trackPendingDone(request *BlockStatusRequestWithTargetID) bool { + statuses := request.Request.BlockStatuses + filtered := statuses[:0] + doneKeys := make([]blockStatusRequestDedupeKey, 0) + + q.mu.Lock() + defer q.mu.Unlock() + + for _, status := range statuses { + if !isDoneRequestStatus(status) { + filtered = append(filtered, status) + continue + } + + key := blockStatusRequestDedupeKey{ + targetID: request.TargetID, + dispatcherID: common.NewDispatcherIDFromPB(status.ID), + blockTs: status.State.BlockTs, + mode: status.Mode, + isSyncPoint: status.State.IsSyncPoint, + } + if _, ok := q.queuedDone[key]; ok { + continue + } + if _, ok := q.inFlightDone[key]; ok { + continue + } + + filtered = append(filtered, status) + q.queuedDone[key] = struct{}{} + doneKeys = append(doneKeys, key) + } + + request.Request.BlockStatuses = filtered + if len(doneKeys) > 0 { + q.requestDoneKeys[request] = doneKeys + } + return len(filtered) > 0 +} + +func (q *BlockStatusRequestQueue) markInFlight(request *BlockStatusRequestWithTargetID) { + q.mu.Lock() + defer q.mu.Unlock() + for _, key := range q.requestDoneKeys[request] { + delete(q.queuedDone, key) + q.inFlightDone[key] = struct{}{} + } +} + +func isDoneRequestStatus(status *heartbeatpb.TableSpanBlockStatus) bool { + return status != nil && + status.State != nil && + status.State.IsBlocked && + status.State.Stage == heartbeatpb.BlockStage_DONE +} diff --git a/downstreamadapter/dispatchermanager/heartbeat_queue_test.go b/downstreamadapter/dispatchermanager/heartbeat_queue_test.go new file mode 100644 index 0000000000..942ceb1c33 --- /dev/null +++ b/downstreamadapter/dispatchermanager/heartbeat_queue_test.go @@ -0,0 +1,165 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package dispatchermanager + +import ( + "context" + "testing" + "time" + + "github.com/pingcap/ticdc/heartbeatpb" + "github.com/pingcap/ticdc/pkg/common" + "github.com/pingcap/ticdc/pkg/node" + "github.com/stretchr/testify/require" +) + +func TestBlockStatusRequestQueueDeduplicatesQueuedAndInFlightDone(t *testing.T) { + queue := NewBlockStatusRequestQueue() + targetID := node.NewID() + dispatcherID := common.NewDispatcherID() + + first := newDoneBlockStatusRequest(targetID, dispatcherID, 100) + second := newDoneBlockStatusRequest(targetID, dispatcherID, 100) + + queue.Enqueue(first) + queue.Enqueue(second) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + dequeued := queue.Dequeue(ctx) + require.Same(t, first, dequeued) + + shortCtx, shortCancel := context.WithTimeout(context.Background(), 20*time.Millisecond) + defer shortCancel() + require.Nil(t, queue.Dequeue(shortCtx)) + + third := newDoneBlockStatusRequest(targetID, dispatcherID, 100) + queue.Enqueue(third) + + shortCtx2, shortCancel2 := context.WithTimeout(context.Background(), 20*time.Millisecond) + defer shortCancel2() + require.Nil(t, queue.Dequeue(shortCtx2)) + + queue.OnSendComplete(dequeued) + + fourth := newDoneBlockStatusRequest(targetID, dispatcherID, 100) + queue.Enqueue(fourth) + + ctx2, cancel2 := context.WithTimeout(context.Background(), time.Second) + defer cancel2() + require.Same(t, fourth, queue.Dequeue(ctx2)) +} + +func TestBlockStatusRequestQueueDoesNotDeduplicateWaiting(t *testing.T) { + queue := NewBlockStatusRequestQueue() + targetID := node.NewID() + dispatcherID := common.NewDispatcherID() + + first := &BlockStatusRequestWithTargetID{ + TargetID: targetID, + Request: &heartbeatpb.BlockStatusRequest{ + BlockStatuses: []*heartbeatpb.TableSpanBlockStatus{ + { + ID: dispatcherID.ToPB(), + State: &heartbeatpb.State{ + IsBlocked: true, + BlockTs: 101, + Stage: heartbeatpb.BlockStage_WAITING, + }, + Mode: common.DefaultMode, + }, + }, + Mode: common.DefaultMode, + }, + } + second := &BlockStatusRequestWithTargetID{ + TargetID: targetID, + Request: &heartbeatpb.BlockStatusRequest{ + BlockStatuses: []*heartbeatpb.TableSpanBlockStatus{ + { + ID: dispatcherID.ToPB(), + State: &heartbeatpb.State{ + IsBlocked: true, + BlockTs: 101, + Stage: heartbeatpb.BlockStage_WAITING, + }, + Mode: common.DefaultMode, + }, + }, + Mode: common.DefaultMode, + }, + } + + queue.Enqueue(first) + queue.Enqueue(second) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + require.Same(t, first, queue.Dequeue(ctx)) + + ctx2, cancel2 := context.WithTimeout(context.Background(), time.Second) + defer cancel2() + require.Same(t, second, queue.Dequeue(ctx2)) +} + +func TestBlockStatusRequestQueueKeepsDistinctDoneKeys(t *testing.T) { + queue := NewBlockStatusRequestQueue() + targetID := node.NewID() + dispatcherID := common.NewDispatcherID() + + defaultDone := newDoneBlockStatusRequest(targetID, dispatcherID, 200) + syncPointDone := newDoneBlockStatusRequest(targetID, dispatcherID, 200) + syncPointDone.Request.BlockStatuses[0].State.IsSyncPoint = true + redoDone := newDoneBlockStatusRequest(targetID, dispatcherID, 200) + redoDone.Request.BlockStatuses[0].Mode = common.RedoMode + redoDone.Request.Mode = common.RedoMode + + queue.Enqueue(defaultDone) + queue.Enqueue(syncPointDone) + queue.Enqueue(redoDone) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + require.Same(t, defaultDone, queue.Dequeue(ctx)) + queue.OnSendComplete(defaultDone) + + ctx2, cancel2 := context.WithTimeout(context.Background(), time.Second) + defer cancel2() + require.Same(t, syncPointDone, queue.Dequeue(ctx2)) + queue.OnSendComplete(syncPointDone) + + ctx3, cancel3 := context.WithTimeout(context.Background(), time.Second) + defer cancel3() + require.Same(t, redoDone, queue.Dequeue(ctx3)) +} + +func newDoneBlockStatusRequest(targetID node.ID, dispatcherID common.DispatcherID, blockTs uint64) *BlockStatusRequestWithTargetID { + return &BlockStatusRequestWithTargetID{ + TargetID: targetID, + Request: &heartbeatpb.BlockStatusRequest{ + BlockStatuses: []*heartbeatpb.TableSpanBlockStatus{ + { + ID: dispatcherID.ToPB(), + State: &heartbeatpb.State{ + IsBlocked: true, + BlockTs: blockTs, + Stage: heartbeatpb.BlockStage_DONE, + }, + Mode: common.DefaultMode, + }, + }, + Mode: common.DefaultMode, + }, + } +} diff --git a/downstreamadapter/dispatchermanager/helper.go b/downstreamadapter/dispatchermanager/helper.go index 35e07a4765..55ecae3648 100644 --- a/downstreamadapter/dispatchermanager/helper.go +++ b/downstreamadapter/dispatchermanager/helper.go @@ -664,6 +664,11 @@ func (h *RedoResolvedTsForwardMessageHandler) Handle(dispatcherManager *Dispatch ok := dispatcherManager.SetRedoResolvedTs(msg.ResolvedTs) if ok { dispatcherManager.dispatcherMap.ForEach(func(id common.DispatcherID, dispatcher *dispatcher.EventDispatcher) { + log.Info("forward redo resolved ts to event dispatcher", + zap.String("changefeedID", dispatcherManager.changefeedID.String()), + zap.String("dispatcherID", id.String()), + zap.Uint64("resolvedTs", msg.ResolvedTs), + ) dispatcher.HandleCacheEvents() }) } diff --git a/downstreamadapter/eventcollector/dispatcher_stat.go b/downstreamadapter/eventcollector/dispatcher_stat.go index 0b1862279e..4dbfcf95a8 100644 --- a/downstreamadapter/eventcollector/dispatcher_stat.go +++ b/downstreamadapter/eventcollector/dispatcher_stat.go @@ -441,6 +441,7 @@ func (d *dispatcherStat) isFromCurrentEpoch(event dispatcher.DispatcherEvent, st // // 3. Finally: Forward valid events to target with wake callback func (d *dispatcherStat) handleBatchDataEvents(events []dispatcher.DispatcherEvent) bool { + log.Info("handle batch data events", zap.Int("eventCount", len(events)), zap.Stringer("changefeedID", d.target.GetChangefeedID()), zap.Stringer("dispatcher", d.getDispatcherID())) var validEvents []dispatcher.DispatcherEvent state := d.loadCurrentEpochState() for _, event := range events { diff --git a/downstreamadapter/eventcollector/helper.go b/downstreamadapter/eventcollector/helper.go index 7c839fd695..43ab115eea 100644 --- a/downstreamadapter/eventcollector/helper.go +++ b/downstreamadapter/eventcollector/helper.go @@ -71,7 +71,7 @@ func (h *EventsHandler) Path(event dispatcher.DispatcherEvent) common.Dispatcher // Invariant: at any times, we can receive events from at most two event service, and one of them must be local event service. func (h *EventsHandler) Handle(stat *dispatcherStat, events ...dispatcher.DispatcherEvent) bool { // add this log for debug some strange bug. - log.Debug("handle events", zap.Any("dispatcher", stat.target.GetId()), zap.Any("eventLen", len(events))) + log.Info("handle events", zap.Any("dispatcher", stat.target.GetId()), zap.Any("eventLen", len(events))) if len(events) == 0 { return false } diff --git a/maintainer/barrier_event.go b/maintainer/barrier_event.go index cf81e05a5e..6535884539 100644 --- a/maintainer/barrier_event.go +++ b/maintainer/barrier_event.go @@ -32,6 +32,8 @@ import ( "go.uber.org/zap" ) +const fanoutPassResendQuietInterval = time.Second + // BarrierEvent is a barrier event that reported by dispatchers, note is a block multiple dispatchers // all of these dispatchers should report the same event type BarrierEvent struct { @@ -70,8 +72,10 @@ type BarrierEvent struct { // so only we use table trigger to create rangeChecker can ensure the coverage is correct. reportedDispatchers map[common.DispatcherID]struct{} // rangeChecker is used to check if all the dispatchers reported the block events - rangeChecker range_checker.RangeChecker - lastResendTime time.Time + rangeChecker range_checker.RangeChecker + lastResendTime time.Time + lastStatusReceivedTime time.Time + passActionSent bool lastWarningLogTime time.Time } @@ -106,7 +110,8 @@ func NewBlockEvent(cfID common.ChangeFeedID, reportedDispatchers: make(map[common.DispatcherID]struct{}), lastResendTime: time.Time{}, - lastWarningLogTime: time.Now(), + lastStatusReceivedTime: time.Now(), + lastWarningLogTime: time.Now(), } if status.BlockTables != nil { @@ -215,6 +220,7 @@ func (be *BarrierEvent) onAllDispatcherReportedBlockEvent(dispatcherID common.Di be.selected.Store(true) be.writerDispatcher = dispatcher be.lastResendTime = time.Now() + be.passActionSent = false log.Info("all dispatcher reported heartbeat, schedule it, and select one to write", zap.String("changefeed", be.cfID.Name()), zap.String("dispatcher", be.writerDispatcher.String()), @@ -289,6 +295,10 @@ func (be *BarrierEvent) markTableDone(tableID int64) { be.rangeChecker.AddSubRange(tableID, nil, nil) } +func (be *BarrierEvent) markStatusReceived() { + be.lastStatusReceivedTime = time.Now() +} + func (be *BarrierEvent) addDispatchersToRangeChecker() { for dispatcher := range be.reportedDispatchers { replicaSpan := be.spanController.GetTaskByID(dispatcher) @@ -304,6 +314,9 @@ func (be *BarrierEvent) addDispatchersToRangeChecker() { } func (be *BarrierEvent) markDispatcherEventDone(dispatcherID common.DispatcherID) { + if be.selected.Load() { + be.markStatusReceived() + } replicaSpan := be.spanController.GetTaskByID(dispatcherID) if replicaSpan == nil { log.Warn("dispatcher not found, ignore", @@ -580,7 +593,8 @@ func forwardBarrierEvent(replication *replica.SpanReplication, event *BarrierEve } func (be *BarrierEvent) resend(mode int64) []*messaging.TargetMessage { - if time.Since(be.lastResendTime) < time.Second { + now := time.Now() + if now.Sub(be.lastResendTime) < time.Second { return nil } var msgs []*messaging.TargetMessage @@ -629,9 +643,9 @@ func (be *BarrierEvent) resend(mode int64) []*messaging.TargetMessage { be.checkBlockedDispatchers() return nil } - be.lastResendTime = time.Now() // we select a dispatcher as the writer, still waiting for that dispatcher advance its checkpoint ts if !be.writerDispatcherAdvanced { + be.lastResendTime = now // resend write action stm := be.spanController.GetTaskByID(be.writerDispatcher) if stm == nil || stm.GetNodeID() == "" { @@ -676,11 +690,25 @@ func (be *BarrierEvent) resend(mode int64) []*messaging.TargetMessage { msgs = []*messaging.TargetMessage{be.newWriterActionMessage(stm.GetNodeID(), mode)} } else { // the writer dispatcher is advanced, resend pass action + if be.passActionSent && be.isFanoutPassAction() && + now.Sub(be.lastStatusReceivedTime) < fanoutPassResendQuietInterval { + return nil + } + be.passActionSent = true + be.lastResendTime = now return be.sendPassAction(mode) } return msgs } +func (be *BarrierEvent) isFanoutPassAction() bool { + if be.blockedDispatchers == nil { + return false + } + return be.blockedDispatchers.InfluenceType == heartbeatpb.InfluenceType_All || + be.blockedDispatchers.InfluenceType == heartbeatpb.InfluenceType_DB +} + func (be *BarrierEvent) newWriterActionMessage(capture node.ID, mode int64) *messaging.TargetMessage { msg := messaging.NewSingleTargetMessage(capture, messaging.HeartbeatCollectorTopic, &heartbeatpb.HeartBeatResponse{ diff --git a/maintainer/barrier_event_test.go b/maintainer/barrier_event_test.go index 79d3faae62..77e7465515 100644 --- a/maintainer/barrier_event_test.go +++ b/maintainer/barrier_event_test.go @@ -188,6 +188,101 @@ func TestResendAction(t *testing.T) { require.Equal(t, resp.DispatcherStatuses[0].Action.CommitTs, uint64(10)) } +func TestFanoutPassResendWaitsForQuietStatus(t *testing.T) { + testutil.SetUpTestServices(t) + nodeManager := appcontext.GetService[*watcher.NodeManager](watcher.NodeManagerName) + nodeManager.GetAliveNodes()["node1"] = &node.Info{ID: "node1"} + + tableTriggerEventDispatcherID := common.NewDispatcherID() + cfID := common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceName) + ddlSpan := replica.NewWorkingSpanReplication(cfID, tableTriggerEventDispatcherID, + common.DDLSpanSchemaID, + common.KeyspaceDDLSpan(common.DefaultKeyspaceID), &heartbeatpb.TableSpanStatus{ + ID: tableTriggerEventDispatcherID.ToPB(), + ComponentStatus: heartbeatpb.ComponentState_Working, + CheckpointTs: 1, + }, "node1", false) + spanController := span.NewController(cfID, ddlSpan, nil, nil, nil, common.DefaultKeyspaceID, common.DefaultMode) + operatorController := operator.NewOperatorController(cfID, spanController, 1000, common.DefaultMode) + spanController.AddNewTable(commonEvent.Table{SchemaID: 1, TableID: 1}, 1) + spanController.AddNewTable(commonEvent.Table{SchemaID: 1, TableID: 2}, 1) + for _, stm := range spanController.GetAbsentForTest(100) { + spanController.BindSpanToNode("", "node1", stm) + spanController.MarkSpanReplicating(stm) + } + + for _, influenceType := range []heartbeatpb.InfluenceType{ + heartbeatpb.InfluenceType_All, + heartbeatpb.InfluenceType_DB, + } { + event := NewBlockEvent(cfID, tableTriggerEventDispatcherID, spanController, operatorController, &heartbeatpb.State{ + IsBlocked: true, + BlockTs: 10, + BlockTables: &heartbeatpb.InfluencedTables{ + InfluenceType: influenceType, + SchemaID: 1, + }, + }, false, common.DefaultMode) + event.selected.Store(true) + event.writerDispatcherAdvanced = true + event.lastResendTime = time.Now().Add(-2 * time.Second) + + msgs := event.resend(common.DefaultMode) + require.Len(t, msgs, 1) + + event.lastResendTime = time.Now().Add(-2 * time.Second) + event.markStatusReceived() + msgs = event.resend(common.DefaultMode) + require.Len(t, msgs, 0) + + event.lastResendTime = time.Now().Add(-2 * time.Second) + event.lastStatusReceivedTime = time.Now().Add(-2 * time.Second) + msgs = event.resend(common.DefaultMode) + require.Len(t, msgs, 1) + } +} + +func TestNormalPassResendDoesNotWaitForQuietStatus(t *testing.T) { + testutil.SetUpTestServices(t) + nodeManager := appcontext.GetService[*watcher.NodeManager](watcher.NodeManagerName) + nodeManager.GetAliveNodes()["node1"] = &node.Info{ID: "node1"} + + tableTriggerEventDispatcherID := common.NewDispatcherID() + cfID := common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceName) + ddlSpan := replica.NewWorkingSpanReplication(cfID, tableTriggerEventDispatcherID, + common.DDLSpanSchemaID, + common.KeyspaceDDLSpan(common.DefaultKeyspaceID), &heartbeatpb.TableSpanStatus{ + ID: tableTriggerEventDispatcherID.ToPB(), + ComponentStatus: heartbeatpb.ComponentState_Working, + CheckpointTs: 1, + }, "node1", false) + spanController := span.NewController(cfID, ddlSpan, nil, nil, nil, common.DefaultKeyspaceID, common.DefaultMode) + operatorController := operator.NewOperatorController(cfID, spanController, 1000, common.DefaultMode) + spanController.AddNewTable(commonEvent.Table{SchemaID: 1, TableID: 1}, 1) + spanController.AddNewTable(commonEvent.Table{SchemaID: 1, TableID: 2}, 1) + for _, stm := range spanController.GetAbsentForTest(100) { + spanController.BindSpanToNode("", "node1", stm) + spanController.MarkSpanReplicating(stm) + } + + event := NewBlockEvent(cfID, tableTriggerEventDispatcherID, spanController, operatorController, &heartbeatpb.State{ + IsBlocked: true, + BlockTs: 10, + BlockTables: &heartbeatpb.InfluencedTables{ + InfluenceType: heartbeatpb.InfluenceType_Normal, + TableIDs: []int64{1, 2}, + }, + }, false, common.DefaultMode) + event.selected.Store(true) + event.writerDispatcherAdvanced = true + event.passActionSent = true + event.lastResendTime = time.Now().Add(-2 * time.Second) + event.markStatusReceived() + + msgs := event.resend(common.DefaultMode) + require.Len(t, msgs, 1) +} + func TestSendPassActionTypeDBIncludesWriterNode(t *testing.T) { testutil.SetUpTestServices(t) nodeManager := appcontext.GetService[*watcher.NodeManager](watcher.NodeManagerName)