diff --git a/downstreamadapter/dispatcher/basic_dispatcher.go b/downstreamadapter/dispatcher/basic_dispatcher.go index 0c31275a4c..53db1e85c8 100644 --- a/downstreamadapter/dispatcher/basic_dispatcher.go +++ b/downstreamadapter/dispatcher/basic_dispatcher.go @@ -76,7 +76,7 @@ type Dispatcher interface { GetHeartBeatInfo(h *HeartBeatInfo) GetComponentStatus() heartbeatpb.ComponentState GetBlockEventStatus() *heartbeatpb.State - GetBlockStatusesChan() chan *heartbeatpb.TableSpanBlockStatus + OfferBlockStatus(status *heartbeatpb.TableSpanBlockStatus) GetEventSizePerSecond() float32 IsTableTriggerDispatcher() bool DealWithBlockEvent(event commonEvent.BlockEvent) @@ -836,16 +836,7 @@ func (d *BasicDispatcher) HandleDispatcherStatus(dispatcherStatus *heartbeatpb.D } // Step3: whether the outdate message or not, we need to return message show we have finished the event. - d.sharedInfo.blockStatusesChan <- &heartbeatpb.TableSpanBlockStatus{ - ID: d.id.ToPB(), - State: &heartbeatpb.State{ - IsBlocked: true, - BlockTs: action.CommitTs, - IsSyncPoint: action.IsSyncPoint, - Stage: heartbeatpb.BlockStage_DONE, - }, - Mode: d.GetMode(), - } + d.OfferDoneBlockStatus(action.CommitTs, action.IsSyncPoint) } return false } @@ -881,16 +872,7 @@ func (d *BasicDispatcher) reportBlockedEventDone( actionCommitTs uint64, actionIsSyncPoint bool, ) { - d.sharedInfo.blockStatusesChan <- &heartbeatpb.TableSpanBlockStatus{ - ID: d.id.ToPB(), - State: &heartbeatpb.State{ - IsBlocked: true, - BlockTs: actionCommitTs, - IsSyncPoint: actionIsSyncPoint, - Stage: heartbeatpb.BlockStage_DONE, - }, - Mode: d.GetMode(), - } + d.OfferDoneBlockStatus(actionCommitTs, actionIsSyncPoint) GetDispatcherStatusDynamicStream().Wake(d.id) } @@ -1042,7 +1024,7 @@ func (d *BasicDispatcher) DealWithBlockEvent(event commonEvent.BlockEvent) { } else { d.resendTaskMap.Set(identifier, newResendTask(message, d, nil)) } - d.sharedInfo.blockStatusesChan <- message + d.OfferBlockStatus(message) }) // dealing with events which update schema ids @@ -1166,7 +1148,7 @@ func (d *BasicDispatcher) reportBlockedEventToMaintainer(event commonEvent.Block IsSyncPoint: event.GetType() == commonEvent.TypeSyncPointEvent, } d.resendTaskMap.Set(identifier, newResendTask(message, d, nil)) - d.sharedInfo.blockStatusesChan <- message + d.OfferBlockStatus(message) } func (d *BasicDispatcher) flushBlockedEventAndReportToMaintainer(event commonEvent.BlockEvent) { diff --git a/downstreamadapter/dispatcher/basic_dispatcher_active_active_test.go b/downstreamadapter/dispatcher/basic_dispatcher_active_active_test.go index 8f95a93b5b..882da1fe1b 100644 --- a/downstreamadapter/dispatcher/basic_dispatcher_active_active_test.go +++ b/downstreamadapter/dispatcher/basic_dispatcher_active_active_test.go @@ -129,7 +129,6 @@ func TestDDLEventsAlwaysValidateActiveActive(t *testing.T) { func newTestBasicDispatcher(t *testing.T, sinkType common.SinkType, enableActiveActive bool) *BasicDispatcher { t.Helper() statuses := make(chan TableSpanStatusWithSeq, 2) - blockStatuses := make(chan *heartbeatpb.TableSpanBlockStatus, 1) errCh := make(chan error, 1) sharedInfo := NewSharedInfo( common.NewChangefeedID("test"), @@ -143,7 +142,7 @@ func newTestBasicDispatcher(t *testing.T, sinkType common.SinkType, enableActive nil, false, statuses, - blockStatuses, + 1, errCh, ) dispatcherSink := newDispatcherTestSink(t, sinkType) diff --git a/downstreamadapter/dispatcher/basic_dispatcher_info.go b/downstreamadapter/dispatcher/basic_dispatcher_info.go index 19a0980461..ee71078891 100644 --- a/downstreamadapter/dispatcher/basic_dispatcher_info.go +++ b/downstreamadapter/dispatcher/basic_dispatcher_info.go @@ -14,6 +14,7 @@ package dispatcher import ( + "context" "sync/atomic" "time" @@ -58,9 +59,9 @@ type SharedInfo struct { // statusesChan is used to store the status of dispatchers when status changed // and push to heartbeatRequestQueue statusesChan chan TableSpanStatusWithSeq - // blockStatusesChan use to collector block status of ddl/sync point event to Maintainer - // shared by the event dispatcher manager - blockStatusesChan chan *heartbeatpb.TableSpanBlockStatus + // blockStatusBuffer keeps block statuses for the dispatcher manager. + // Identical WAITING and DONE statuses are coalesced while pending to reduce local memory amplification. + blockStatusBuffer *BlockStatusBuffer // blockExecutor is used to execute block events such as DDL and sync point events asynchronously // to avoid callback() called in handleEvents, causing deadlock in ds @@ -88,7 +89,7 @@ func NewSharedInfo( txnAtomicity *config.AtomicityLevel, enableSplittableCheck bool, statusesChan chan TableSpanStatusWithSeq, - blockStatusesChan chan *heartbeatpb.TableSpanBlockStatus, + blockStatusBufferSize int, errCh chan error, ) *SharedInfo { sharedInfo := &SharedInfo{ @@ -102,7 +103,7 @@ func NewSharedInfo( syncPointConfig: syncPointConfig, enableSplittableCheck: enableSplittableCheck, statusesChan: statusesChan, - blockStatusesChan: blockStatusesChan, + blockStatusBuffer: NewBlockStatusBuffer(blockStatusBufferSize), blockExecutor: newBlockEventExecutor(), errCh: errCh, metricHandleDDLHis: metrics.HandleDDLHistogram.WithLabelValues(changefeedID.Keyspace(), changefeedID.Name()), @@ -219,8 +220,20 @@ func (d *BasicDispatcher) GetTxnAtomicity() config.AtomicityLevel { return d.sharedInfo.txnAtomicity } -func (d *BasicDispatcher) GetBlockStatusesChan() chan *heartbeatpb.TableSpanBlockStatus { - return d.sharedInfo.blockStatusesChan +func (d *BasicDispatcher) OfferBlockStatus(status *heartbeatpb.TableSpanBlockStatus) { + d.sharedInfo.OfferBlockStatus(status) +} + +func (d *BasicDispatcher) OfferDoneBlockStatus(blockTs uint64, isSyncPoint bool) { + d.sharedInfo.OfferDoneBlockStatus(d.id, blockTs, isSyncPoint, d.GetMode()) +} + +func (d *BasicDispatcher) TakeBlockStatus(ctx context.Context) *heartbeatpb.TableSpanBlockStatus { + return d.sharedInfo.TakeBlockStatus(ctx) +} + +func (d *BasicDispatcher) TakeBlockStatusWithTimeout(timeout time.Duration) (*heartbeatpb.TableSpanBlockStatus, bool) { + return d.sharedInfo.TakeBlockStatusWithTimeout(timeout) } func (d *BasicDispatcher) GetEventSizePerSecond() float32 { @@ -254,8 +267,34 @@ func (s *SharedInfo) EnableActiveActive() bool { return s.enableActiveActive } -func (s *SharedInfo) GetBlockStatusesChan() chan *heartbeatpb.TableSpanBlockStatus { - return s.blockStatusesChan +func (s *SharedInfo) OfferBlockStatus(status *heartbeatpb.TableSpanBlockStatus) { + s.blockStatusBuffer.Offer(status) +} + +func (s *SharedInfo) OfferDoneBlockStatus(dispatcherID common.DispatcherID, blockTs uint64, isSyncPoint bool, mode int64) { + s.blockStatusBuffer.OfferDone(dispatcherID, blockTs, isSyncPoint, mode) +} + +func (s *SharedInfo) TakeBlockStatus(ctx context.Context) *heartbeatpb.TableSpanBlockStatus { + return s.blockStatusBuffer.Take(ctx) +} + +func (s *SharedInfo) TakeBlockStatusWithTimeout(timeout time.Duration) (*heartbeatpb.TableSpanBlockStatus, bool) { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + status := s.TakeBlockStatus(ctx) + if status == nil { + return nil, false + } + return status, true +} + +func (s *SharedInfo) TryTakeBlockStatus() (*heartbeatpb.TableSpanBlockStatus, bool) { + return s.blockStatusBuffer.TryTake() +} + +func (s *SharedInfo) BlockStatusLen() int { + return s.blockStatusBuffer.Len() } func (s *SharedInfo) GetErrCh() chan error { diff --git a/downstreamadapter/dispatcher/block_status_buffer.go b/downstreamadapter/dispatcher/block_status_buffer.go new file mode 100644 index 0000000000..434ada5484 --- /dev/null +++ b/downstreamadapter/dispatcher/block_status_buffer.go @@ -0,0 +1,194 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package dispatcher + +import ( + "context" + "sync" + + "github.com/pingcap/ticdc/heartbeatpb" + "github.com/pingcap/ticdc/pkg/common" +) + +type blockStatusKey struct { + dispatcherID common.DispatcherID + blockTs uint64 + mode int64 + isSyncPoint bool +} + +type blockStatusQueueEntry struct { + status *heartbeatpb.TableSpanBlockStatus + doneKey *blockStatusKey +} + +// BlockStatusBuffer keeps block statuses ordered while coalescing identical +// WAITING and DONE statuses that are still pending locally. Other statuses keep +// the original protobuf object and ordering. +type BlockStatusBuffer struct { + queue chan blockStatusQueueEntry + + mu sync.Mutex + pendingDone map[blockStatusKey]struct{} + pendingWaiting map[blockStatusKey]struct{} +} + +// NewBlockStatusBuffer creates a bounded local mailbox for dispatcher block +// statuses. The buffer keeps enqueue order while coalescing identical pending +// WAITING and DONE statuses before protobuf materialization. +func NewBlockStatusBuffer(size int) *BlockStatusBuffer { + if size <= 0 { + size = 1 + } + return &BlockStatusBuffer{ + queue: make(chan blockStatusQueueEntry, size), + pendingDone: make(map[blockStatusKey]struct{}), + pendingWaiting: make(map[blockStatusKey]struct{}), + } +} + +func (b *BlockStatusBuffer) Offer(status *heartbeatpb.TableSpanBlockStatus) { + if status == nil { + return + } + if isWaitingBlockStatus(status) { + key := newBlockStatusKey(status) + if !b.reserveWaiting(key) { + return + } + b.queue <- blockStatusQueueEntry{status: status} + return + } + if !isDoneBlockStatus(status) { + b.queue <- blockStatusQueueEntry{status: status} + return + } + + key := newBlockStatusKey(status) + if !b.reserveDone(key) { + return + } + b.queue <- blockStatusQueueEntry{doneKey: &key} +} + +func (b *BlockStatusBuffer) OfferDone( + dispatcherID common.DispatcherID, + blockTs uint64, + isSyncPoint bool, + mode int64, +) { + key := blockStatusKey{ + dispatcherID: dispatcherID, + blockTs: blockTs, + mode: mode, + isSyncPoint: isSyncPoint, + } + if !b.reserveDone(key) { + return + } + b.queue <- blockStatusQueueEntry{doneKey: &key} +} + +func (b *BlockStatusBuffer) Take(ctx context.Context) *heartbeatpb.TableSpanBlockStatus { + select { + case <-ctx.Done(): + return nil + case entry := <-b.queue: + return b.materialize(entry) + } +} + +func (b *BlockStatusBuffer) TryTake() (*heartbeatpb.TableSpanBlockStatus, bool) { + select { + case entry := <-b.queue: + return b.materialize(entry), true + default: + return nil, false + } +} + +func (b *BlockStatusBuffer) Len() int { + return len(b.queue) +} + +func (b *BlockStatusBuffer) reserveWaiting(key blockStatusKey) bool { + b.mu.Lock() + defer b.mu.Unlock() + if _, ok := b.pendingWaiting[key]; ok { + return false + } + b.pendingWaiting[key] = struct{}{} + return true +} + +func (b *BlockStatusBuffer) reserveDone(key blockStatusKey) bool { + b.mu.Lock() + defer b.mu.Unlock() + if _, ok := b.pendingDone[key]; ok { + return false + } + b.pendingDone[key] = struct{}{} + return true +} + +func (b *BlockStatusBuffer) materialize(entry blockStatusQueueEntry) *heartbeatpb.TableSpanBlockStatus { + if entry.status != nil { + if isWaitingBlockStatus(entry.status) { + key := newBlockStatusKey(entry.status) + b.mu.Lock() + delete(b.pendingWaiting, key) + b.mu.Unlock() + } + return entry.status + } + + key := *entry.doneKey + b.mu.Lock() + delete(b.pendingDone, key) + b.mu.Unlock() + + return &heartbeatpb.TableSpanBlockStatus{ + ID: key.dispatcherID.ToPB(), + State: &heartbeatpb.State{ + IsBlocked: true, + BlockTs: key.blockTs, + IsSyncPoint: key.isSyncPoint, + Stage: heartbeatpb.BlockStage_DONE, + }, + Mode: key.mode, + } +} + +func newBlockStatusKey(status *heartbeatpb.TableSpanBlockStatus) blockStatusKey { + return blockStatusKey{ + dispatcherID: common.NewDispatcherIDFromPB(status.ID), + blockTs: status.State.BlockTs, + mode: status.Mode, + isSyncPoint: status.State.IsSyncPoint, + } +} + +func isWaitingBlockStatus(status *heartbeatpb.TableSpanBlockStatus) bool { + return status != nil && + status.State != nil && + status.State.IsBlocked && + status.State.Stage == heartbeatpb.BlockStage_WAITING +} + +func isDoneBlockStatus(status *heartbeatpb.TableSpanBlockStatus) bool { + return status != nil && + status.State != nil && + status.State.IsBlocked && + status.State.Stage == heartbeatpb.BlockStage_DONE +} diff --git a/downstreamadapter/dispatcher/block_status_buffer_test.go b/downstreamadapter/dispatcher/block_status_buffer_test.go new file mode 100644 index 0000000000..021039bb81 --- /dev/null +++ b/downstreamadapter/dispatcher/block_status_buffer_test.go @@ -0,0 +1,168 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package dispatcher + +import ( + "context" + "testing" + "time" + + "github.com/pingcap/ticdc/heartbeatpb" + "github.com/pingcap/ticdc/pkg/common" + "github.com/stretchr/testify/require" +) + +func TestBlockStatusBufferDeduplicatesPendingDone(t *testing.T) { + buffer := NewBlockStatusBuffer(4) + dispatcherID := common.NewDispatcherID() + + buffer.OfferDone(dispatcherID, 100, false, common.DefaultMode) + buffer.OfferDone(dispatcherID, 100, false, common.DefaultMode) + + require.Equal(t, 1, buffer.Len()) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + msg := buffer.Take(ctx) + require.NotNil(t, msg) + require.Equal(t, dispatcherID, common.NewDispatcherIDFromPB(msg.ID)) + require.Equal(t, uint64(100), msg.State.BlockTs) + require.Equal(t, heartbeatpb.BlockStage_DONE, msg.State.Stage) + require.False(t, msg.State.IsSyncPoint) + require.Equal(t, common.DefaultMode, msg.Mode) + + _, ok := buffer.TryTake() + require.False(t, ok) +} + +func TestBlockStatusBufferDeduplicatesPendingWaiting(t *testing.T) { + buffer := NewBlockStatusBuffer(4) + dispatcherID := common.NewDispatcherID() + + waiting := &heartbeatpb.TableSpanBlockStatus{ + ID: dispatcherID.ToPB(), + State: &heartbeatpb.State{ + IsBlocked: true, + BlockTs: 150, + Stage: heartbeatpb.BlockStage_WAITING, + }, + Mode: common.DefaultMode, + } + + buffer.Offer(waiting) + buffer.Offer(waiting) + + require.Equal(t, 1, buffer.Len()) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + msg := buffer.Take(ctx) + require.NotNil(t, msg) + require.Equal(t, heartbeatpb.BlockStage_WAITING, msg.State.Stage) + + _, ok := buffer.TryTake() + require.False(t, ok) +} + +func TestBlockStatusBufferAllowsWaitingAgainAfterTake(t *testing.T) { + buffer := NewBlockStatusBuffer(4) + dispatcherID := common.NewDispatcherID() + + waiting := &heartbeatpb.TableSpanBlockStatus{ + ID: dispatcherID.ToPB(), + State: &heartbeatpb.State{ + IsBlocked: true, + BlockTs: 180, + Stage: heartbeatpb.BlockStage_WAITING, + }, + Mode: common.DefaultMode, + } + + buffer.Offer(waiting) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + first := buffer.Take(ctx) + require.NotNil(t, first) + require.Equal(t, heartbeatpb.BlockStage_WAITING, first.State.Stage) + + buffer.Offer(waiting) + + second := buffer.Take(ctx) + require.NotNil(t, second) + require.Equal(t, heartbeatpb.BlockStage_WAITING, second.State.Stage) + + _, ok := buffer.TryTake() + require.False(t, ok) +} + +func TestBlockStatusBufferKeepsWaitingBeforeDone(t *testing.T) { + buffer := NewBlockStatusBuffer(4) + dispatcherID := common.NewDispatcherID() + + buffer.Offer(&heartbeatpb.TableSpanBlockStatus{ + ID: dispatcherID.ToPB(), + State: &heartbeatpb.State{ + IsBlocked: true, + BlockTs: 200, + Stage: heartbeatpb.BlockStage_WAITING, + }, + Mode: common.DefaultMode, + }) + buffer.OfferDone(dispatcherID, 200, false, common.DefaultMode) + buffer.OfferDone(dispatcherID, 200, false, common.DefaultMode) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + first := buffer.Take(ctx) + require.NotNil(t, first) + require.Equal(t, heartbeatpb.BlockStage_WAITING, first.State.Stage) + + second := buffer.Take(ctx) + require.NotNil(t, second) + require.Equal(t, heartbeatpb.BlockStage_DONE, second.State.Stage) + + _, ok := buffer.TryTake() + require.False(t, ok) +} + +func TestBlockStatusBufferKeepsDistinctDoneKeys(t *testing.T) { + buffer := NewBlockStatusBuffer(4) + dispatcherID := common.NewDispatcherID() + + buffer.OfferDone(dispatcherID, 300, false, common.DefaultMode) + buffer.OfferDone(dispatcherID, 300, true, common.DefaultMode) + buffer.OfferDone(dispatcherID, 300, false, common.RedoMode) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + first := buffer.Take(ctx) + require.NotNil(t, first) + require.False(t, first.State.IsSyncPoint) + require.Equal(t, common.DefaultMode, first.Mode) + + second := buffer.Take(ctx) + require.NotNil(t, second) + require.True(t, second.State.IsSyncPoint) + require.Equal(t, common.DefaultMode, second.Mode) + + third := buffer.Take(ctx) + require.NotNil(t, third) + require.False(t, third.State.IsSyncPoint) + require.Equal(t, common.RedoMode, third.Mode) + + _, ok := buffer.TryTake() + require.False(t, ok) +} diff --git a/downstreamadapter/dispatcher/event_dispatcher_test.go b/downstreamadapter/dispatcher/event_dispatcher_test.go index 29b8ace3e2..0ce694d02e 100644 --- a/downstreamadapter/dispatcher/event_dispatcher_test.go +++ b/downstreamadapter/dispatcher/event_dispatcher_test.go @@ -83,7 +83,7 @@ func newDispatcherForTest(sink sink.Sink, tableSpan *heartbeatpb.TableSpan) *Eve &defaultAtomicity, false, // enableSplittableCheck make(chan TableSpanStatusWithSeq, 128), - make(chan *heartbeatpb.TableSpanBlockStatus, 128), + 128, make(chan error, 1), ) return NewEventDispatcher( @@ -476,22 +476,16 @@ func TestBlockingDDLFlushBeforeWaitingAndWriteDoesNotFlushAgain(t *testing.T) { require.Nil(t, pendingEvent) require.Equal(t, heartbeatpb.BlockStage_NONE, blockStage) - select { - case msg := <-dispatcher.GetBlockStatusesChan(): - require.FailNow(t, "unexpected block status before local flush finishes", "received=%v", msg) - case <-time.After(200 * time.Millisecond): - } + msg, ok := dispatcher.TakeBlockStatusWithTimeout(200 * time.Millisecond) + require.False(t, ok, "unexpected block status before local flush finishes: %v", msg) close(flushRelease) - select { - case msg := <-dispatcher.GetBlockStatusesChan(): - require.True(t, msg.State.IsBlocked) - require.Equal(t, uint64(10), msg.State.BlockTs) - require.Equal(t, heartbeatpb.BlockStage_WAITING, msg.State.Stage) - case <-time.After(time.Second): - require.FailNow(t, "expected blocking DDL to enter WAITING after local flush") - } + msg, ok = dispatcher.TakeBlockStatusWithTimeout(time.Second) + require.True(t, ok, "expected blocking DDL to enter WAITING after local flush") + require.True(t, msg.State.IsBlocked) + require.Equal(t, uint64(10), msg.State.BlockTs) + require.Equal(t, heartbeatpb.BlockStage_WAITING, msg.State.Stage) pendingEvent, blockStage = dispatcher.blockEventStatus.getEventAndStage() require.Same(t, ddlEvent, pendingEvent) @@ -507,14 +501,11 @@ func TestBlockingDDLFlushBeforeWaitingAndWriteDoesNotFlushAgain(t *testing.T) { }) require.True(t, await) - select { - case msg := <-dispatcher.GetBlockStatusesChan(): - require.True(t, msg.State.IsBlocked) - require.Equal(t, uint64(10), msg.State.BlockTs) - require.Equal(t, heartbeatpb.BlockStage_DONE, msg.State.Stage) - case <-time.After(time.Second): - require.FailNow(t, "expected DONE after write action") - } + msg, ok = dispatcher.TakeBlockStatusWithTimeout(time.Second) + require.True(t, ok, "expected DONE after write action") + require.True(t, msg.State.IsBlocked) + require.Equal(t, uint64(10), msg.State.BlockTs) + require.Equal(t, heartbeatpb.BlockStage_DONE, msg.State.Stage) require.Eventually(t, func() bool { pendingEvent, blockStage = dispatcher.blockEventStatus.getEventAndStage() @@ -1004,7 +995,7 @@ func TestDispatcherSplittableCheck(t *testing.T) { &defaultAtomicity, true, // enableSplittableCheck = true make(chan TableSpanStatusWithSeq, 128), - make(chan *heartbeatpb.TableSpanBlockStatus, 128), + 128, make(chan error, 1), ) @@ -1114,7 +1105,7 @@ func TestDispatcher_SkipDMLAsStartTs_FilterCorrectly(t *testing.T) { &defaultAtomicity, false, make(chan TableSpanStatusWithSeq, 128), - make(chan *heartbeatpb.TableSpanBlockStatus, 128), + 128, make(chan error, 1), ) @@ -1194,7 +1185,7 @@ func TestDispatcher_SkipDMLAsStartTs_Disabled(t *testing.T) { &defaultAtomicity, false, make(chan TableSpanStatusWithSeq, 128), - make(chan *heartbeatpb.TableSpanBlockStatus, 128), + 128, make(chan error, 1), ) @@ -1258,14 +1249,11 @@ func TestHoldBlockEventUntilNoResendTasks(t *testing.T) { block := dispatcher.HandleEvents([]DispatcherEvent{NewDispatcherEvent(&nodeID, createTableDDL)}, func() {}) require.True(t, block) - select { - case msg := <-dispatcher.GetBlockStatusesChan(): - require.False(t, msg.State.IsBlocked) - require.False(t, msg.State.IsSyncPoint) - require.Equal(t, uint64(10), msg.State.BlockTs) - case <-time.After(time.Second): - require.FailNow(t, "expected add-table block status") - } + msg, ok := dispatcher.TakeBlockStatusWithTimeout(time.Second) + require.True(t, ok, "expected add-table block status") + require.False(t, msg.State.IsBlocked) + require.False(t, msg.State.IsSyncPoint) + require.Equal(t, uint64(10), msg.State.BlockTs) require.Equal(t, 1, dispatcher.resendTaskMap.Len()) // A DB/All block event must be deferred until resendTaskMap becomes empty, @@ -1281,11 +1269,8 @@ func TestHoldBlockEventUntilNoResendTasks(t *testing.T) { block = dispatcher.HandleEvents([]DispatcherEvent{NewDispatcherEvent(&nodeID, dropDBDDL)}, func() {}) require.True(t, block) - select { - case msg := <-dispatcher.GetBlockStatusesChan(): - require.FailNow(t, "unexpected block status", "received=%v", msg) - case <-time.After(200 * time.Millisecond): - } + msg, ok = dispatcher.TakeBlockStatusWithTimeout(200 * time.Millisecond) + require.False(t, ok, "unexpected block status: %v", msg) // Simulate maintainer ACK for the create table scheduling message. dispatcher.HandleDispatcherStatus(&heartbeatpb.DispatcherStatus{ @@ -1301,23 +1286,17 @@ func TestHoldBlockEventUntilNoResendTasks(t *testing.T) { require.FailNow(t, "expected deferred DB-level flush to start") } - select { - case msg := <-dispatcher.GetBlockStatusesChan(): - require.FailNow(t, "unexpected block status before local flush finishes", "received=%v", msg) - case <-time.After(200 * time.Millisecond): - } + msg, ok = dispatcher.TakeBlockStatusWithTimeout(200 * time.Millisecond) + require.False(t, ok, "unexpected block status before local flush finishes: %v", msg) close(flushRelease) - select { - case msg := <-dispatcher.GetBlockStatusesChan(): - require.True(t, msg.State.IsBlocked) - require.False(t, msg.State.IsSyncPoint) - require.Equal(t, uint64(20), msg.State.BlockTs) - require.Equal(t, heartbeatpb.InfluenceType_DB, msg.State.BlockTables.InfluenceType) - require.Equal(t, int64(1), msg.State.BlockTables.SchemaID) - require.Equal(t, heartbeatpb.BlockStage_WAITING, msg.State.Stage) - case <-time.After(time.Second): - require.FailNow(t, "expected deferred DB-level block status") - } + msg, ok = dispatcher.TakeBlockStatusWithTimeout(time.Second) + require.True(t, ok, "expected deferred DB-level block status") + require.True(t, msg.State.IsBlocked) + require.False(t, msg.State.IsSyncPoint) + require.Equal(t, uint64(20), msg.State.BlockTs) + require.Equal(t, heartbeatpb.InfluenceType_DB, msg.State.BlockTables.InfluenceType) + require.Equal(t, int64(1), msg.State.BlockTables.SchemaID) + require.Equal(t, heartbeatpb.BlockStage_WAITING, msg.State.Stage) } diff --git a/downstreamadapter/dispatcher/helper.go b/downstreamadapter/dispatcher/helper.go index d50137a95b..30983c1d50 100644 --- a/downstreamadapter/dispatcher/helper.go +++ b/downstreamadapter/dispatcher/helper.go @@ -270,7 +270,7 @@ func newResendTask(message *heartbeatpb.TableSpanBlockStatus, dispatcher Dispatc func (t *ResendTask) Execute() time.Time { log.Debug("resend task", zap.Any("message", t.message), zap.Any("dispatcherID", t.dispatcher.GetId())) - t.dispatcher.GetBlockStatusesChan() <- t.message + t.dispatcher.OfferBlockStatus(t.message) executeCount := atomic.AddUint64(&t.executeCount, 1) if executeCount%10 == 0 { diff --git a/downstreamadapter/dispatcher/redo_dispatcher_test.go b/downstreamadapter/dispatcher/redo_dispatcher_test.go index 68f97016b7..bdd906eebc 100644 --- a/downstreamadapter/dispatcher/redo_dispatcher_test.go +++ b/downstreamadapter/dispatcher/redo_dispatcher_test.go @@ -48,7 +48,7 @@ func newRedoDispatcherForTest(sink sink.Sink, tableSpan *heartbeatpb.TableSpan) &defaultAtomicity, false, // enableSplittableCheck make(chan TableSpanStatusWithSeq, 128), - make(chan *heartbeatpb.TableSpanBlockStatus, 128), + 128, make(chan error, 1), ) return NewRedoDispatcher( diff --git a/downstreamadapter/dispatchermanager/dispatcher_manager.go b/downstreamadapter/dispatchermanager/dispatcher_manager.go index 128f8cbe1e..7446889b52 100644 --- a/downstreamadapter/dispatchermanager/dispatcher_manager.go +++ b/downstreamadapter/dispatchermanager/dispatcher_manager.go @@ -258,7 +258,7 @@ func NewDispatcherManager( manager.config.SinkConfig.TxnAtomicity, manager.config.EnableSplittableCheck, make(chan dispatcher.TableSpanStatusWithSeq, 8192), - make(chan *heartbeatpb.TableSpanBlockStatus, 1024*1024), + 1024*1024, make(chan error, 1), ) @@ -565,8 +565,6 @@ func (e *DispatcherManager) collectErrors(ctx context.Context) { // collectBlockStatusRequest collect the block status from the block status channel and report to the maintainer. func (e *DispatcherManager) collectBlockStatusRequest(ctx context.Context) { - delay := time.NewTimer(0) - defer delay.Stop() enqueueBlockStatus := func(blockStatusMessage []*heartbeatpb.TableSpanBlockStatus, mode int64) { var message heartbeatpb.BlockStatusRequest message.ChangefeedID = e.changefeedID.ToPB() @@ -577,37 +575,39 @@ func (e *DispatcherManager) collectBlockStatusRequest(ctx context.Context) { for { blockStatusMessage := make([]*heartbeatpb.TableSpanBlockStatus, 0) redoBlockStatusMessage := make([]*heartbeatpb.TableSpanBlockStatus, 0) - select { - case <-ctx.Done(): + blockStatus := e.sharedInfo.TakeBlockStatus(ctx) + if blockStatus == nil { return - case blockStatus := <-e.sharedInfo.GetBlockStatusesChan(): + } + if common.IsDefaultMode(blockStatus.Mode) { + blockStatusMessage = append(blockStatusMessage, blockStatus) + } else { + redoBlockStatusMessage = append(redoBlockStatusMessage, blockStatus) + } + + batchCtx, cancel := context.WithTimeout(ctx, 10*time.Millisecond) + for { + blockStatus = e.sharedInfo.TakeBlockStatus(batchCtx) + if blockStatus == nil { + break + } if common.IsDefaultMode(blockStatus.Mode) { blockStatusMessage = append(blockStatusMessage, blockStatus) } else { redoBlockStatusMessage = append(redoBlockStatusMessage, blockStatus) } - delay.Reset(10 * time.Millisecond) - loop: - for { - select { - case blockStatus := <-e.sharedInfo.GetBlockStatusesChan(): - if common.IsDefaultMode(blockStatus.Mode) { - blockStatusMessage = append(blockStatusMessage, blockStatus) - } else { - redoBlockStatusMessage = append(redoBlockStatusMessage, blockStatus) - } - case <-delay.C: - break loop - } - } + } + cancel() + if ctx.Err() != nil { + return + } - e.metricBlockStatusesChanLen.Set(float64(len(e.sharedInfo.GetBlockStatusesChan()))) - if len(blockStatusMessage) != 0 { - enqueueBlockStatus(blockStatusMessage, common.DefaultMode) - } - if len(redoBlockStatusMessage) != 0 { - enqueueBlockStatus(redoBlockStatusMessage, common.RedoMode) - } + e.metricBlockStatusesChanLen.Set(float64(e.sharedInfo.BlockStatusLen())) + if len(blockStatusMessage) != 0 { + enqueueBlockStatus(blockStatusMessage, common.DefaultMode) + } + if len(redoBlockStatusMessage) != 0 { + enqueueBlockStatus(redoBlockStatusMessage, common.RedoMode) } } } diff --git a/downstreamadapter/dispatchermanager/dispatcher_manager_test.go b/downstreamadapter/dispatchermanager/dispatcher_manager_test.go index d26a6193d8..f48d52df21 100644 --- a/downstreamadapter/dispatchermanager/dispatcher_manager_test.go +++ b/downstreamadapter/dispatchermanager/dispatcher_manager_test.go @@ -80,7 +80,7 @@ func createTestDispatcher(t *testing.T, manager *DispatcherManager, id common.Di &defaultAtomicity, false, make(chan dispatcher.TableSpanStatusWithSeq, 1), - make(chan *heartbeatpb.TableSpanBlockStatus, 1), + 1, make(chan error, 1), ) d := dispatcher.NewEventDispatcher( @@ -141,7 +141,7 @@ func createTestManager(t *testing.T) *DispatcherManager { &defaultAtomicity, false, make(chan dispatcher.TableSpanStatusWithSeq, 8192), - make(chan *heartbeatpb.TableSpanBlockStatus, 1024*1024), + 1024*1024, make(chan error, 1), ) nodeID := node.NewID() diff --git a/downstreamadapter/dispatchermanager/heartbeat_collector.go b/downstreamadapter/dispatchermanager/heartbeat_collector.go index 99ef678b14..67417face8 100644 --- a/downstreamadapter/dispatchermanager/heartbeat_collector.go +++ b/downstreamadapter/dispatchermanager/heartbeat_collector.go @@ -248,6 +248,7 @@ func (c *HeartBeatCollector) sendBlockStatusMessages(ctx context.Context) error messaging.MaintainerManagerTopic, blockStatusRequestWithTargetID.Request, )) + c.blockStatusReqQueue.OnSendComplete(blockStatusRequestWithTargetID) if err != nil { log.Error("failed to send block status request message", zap.Error(err)) } diff --git a/downstreamadapter/dispatchermanager/heartbeat_queue.go b/downstreamadapter/dispatchermanager/heartbeat_queue.go index e93b59f401..af4fe6b7c6 100644 --- a/downstreamadapter/dispatchermanager/heartbeat_queue.go +++ b/downstreamadapter/dispatchermanager/heartbeat_queue.go @@ -15,8 +15,10 @@ package dispatchermanager import ( "context" + "sync" "github.com/pingcap/ticdc/heartbeatpb" + "github.com/pingcap/ticdc/pkg/common" "github.com/pingcap/ticdc/pkg/metrics" "github.com/pingcap/ticdc/pkg/node" ) @@ -62,15 +64,30 @@ type BlockStatusRequestWithTargetID struct { // BlockStatusRequestQueue is a channel for all event dispatcher managers to send block status requests to HeartBeatCollector type BlockStatusRequestQueue struct { queue chan *BlockStatusRequestWithTargetID + + mu sync.Mutex + requestStatusKeys map[*BlockStatusRequestWithTargetID][]blockStatusRequestDedupeKey + queuedStatuses map[blockStatusRequestDedupeKey]struct{} + inFlightStatuses map[blockStatusRequestDedupeKey]struct{} } func NewBlockStatusRequestQueue() *BlockStatusRequestQueue { return &BlockStatusRequestQueue{ - queue: make(chan *BlockStatusRequestWithTargetID, 10000), + queue: make(chan *BlockStatusRequestWithTargetID, 10000), + requestStatusKeys: make(map[*BlockStatusRequestWithTargetID][]blockStatusRequestDedupeKey), + queuedStatuses: make(map[blockStatusRequestDedupeKey]struct{}), + inFlightStatuses: make(map[blockStatusRequestDedupeKey]struct{}), } } func (q *BlockStatusRequestQueue) Enqueue(request *BlockStatusRequestWithTargetID) { + if request == nil || request.Request == nil { + return + } + if !q.trackPendingStatuses(request) { + metrics.HeartbeatCollectorBlockStatusRequestQueueLenGauge.Set(float64(len(q.queue))) + return + } q.queue <- request metrics.HeartbeatCollectorBlockStatusRequestQueueLenGauge.Set(float64(len(q.queue))) } @@ -80,11 +97,102 @@ func (q *BlockStatusRequestQueue) Dequeue(ctx context.Context) *BlockStatusReque case <-ctx.Done(): return nil case request := <-q.queue: + q.markInFlight(request) metrics.HeartbeatCollectorBlockStatusRequestQueueLenGauge.Set(float64(len(q.queue))) return request } } +func (q *BlockStatusRequestQueue) OnSendComplete(request *BlockStatusRequestWithTargetID) { + if request == nil { + return + } + + q.mu.Lock() + defer q.mu.Unlock() + for _, key := range q.requestStatusKeys[request] { + delete(q.inFlightStatuses, key) + } + delete(q.requestStatusKeys, request) +} + func (q *BlockStatusRequestQueue) Close() { close(q.queue) } + +type blockStatusRequestDedupeKey struct { + targetID node.ID + dispatcherID common.DispatcherID + blockTs uint64 + mode int64 + isSyncPoint bool + stage heartbeatpb.BlockStage +} + +func (q *BlockStatusRequestQueue) trackPendingStatuses(request *BlockStatusRequestWithTargetID) bool { + statuses := request.Request.BlockStatuses + filtered := statuses[:0] + statusKeys := make([]blockStatusRequestDedupeKey, 0) + + q.mu.Lock() + defer q.mu.Unlock() + + for _, status := range statuses { + if !shouldDeduplicateRequestStatus(status) { + filtered = append(filtered, status) + continue + } + + key := blockStatusRequestDedupeKey{ + targetID: request.TargetID, + dispatcherID: common.NewDispatcherIDFromPB(status.ID), + blockTs: status.State.BlockTs, + mode: status.Mode, + isSyncPoint: status.State.IsSyncPoint, + stage: status.State.Stage, + } + if _, ok := q.queuedStatuses[key]; ok { + continue + } + if _, ok := q.inFlightStatuses[key]; ok { + continue + } + + filtered = append(filtered, status) + q.queuedStatuses[key] = struct{}{} + statusKeys = append(statusKeys, key) + } + + request.Request.BlockStatuses = filtered + if len(statusKeys) > 0 { + q.requestStatusKeys[request] = statusKeys + } + return len(filtered) > 0 +} + +func (q *BlockStatusRequestQueue) markInFlight(request *BlockStatusRequestWithTargetID) { + q.mu.Lock() + defer q.mu.Unlock() + for _, key := range q.requestStatusKeys[request] { + delete(q.queuedStatuses, key) + q.inFlightStatuses[key] = struct{}{} + } +} + +func isWaitingRequestStatus(status *heartbeatpb.TableSpanBlockStatus) bool { + return status != nil && + status.State != nil && + status.State.IsBlocked && + status.State.Stage == heartbeatpb.BlockStage_WAITING +} + +func shouldDeduplicateRequestStatus(status *heartbeatpb.TableSpanBlockStatus) bool { + return isDoneRequestStatus(status) || isWaitingRequestStatus(status) +} + +func isDoneRequestStatus(status *heartbeatpb.TableSpanBlockStatus) bool { + return status != nil && + status.State != nil && + status.State.IsBlocked && + status.State.Stage == heartbeatpb.BlockStage_DONE +} diff --git a/downstreamadapter/dispatchermanager/heartbeat_queue_test.go b/downstreamadapter/dispatchermanager/heartbeat_queue_test.go new file mode 100644 index 0000000000..e639bb9216 --- /dev/null +++ b/downstreamadapter/dispatchermanager/heartbeat_queue_test.go @@ -0,0 +1,250 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package dispatchermanager + +import ( + "context" + "testing" + "time" + + "github.com/pingcap/ticdc/heartbeatpb" + "github.com/pingcap/ticdc/pkg/common" + "github.com/pingcap/ticdc/pkg/node" + "github.com/stretchr/testify/require" +) + +func TestBlockStatusRequestQueueDeduplicatesQueuedAndInFlightDone(t *testing.T) { + queue := NewBlockStatusRequestQueue() + targetID := node.NewID() + dispatcherID := common.NewDispatcherID() + + first := newDoneBlockStatusRequest(targetID, dispatcherID, 100) + second := newDoneBlockStatusRequest(targetID, dispatcherID, 100) + + queue.Enqueue(first) + queue.Enqueue(second) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + dequeued := queue.Dequeue(ctx) + require.Same(t, first, dequeued) + + shortCtx, shortCancel := context.WithTimeout(context.Background(), 20*time.Millisecond) + defer shortCancel() + require.Nil(t, queue.Dequeue(shortCtx)) + + third := newDoneBlockStatusRequest(targetID, dispatcherID, 100) + queue.Enqueue(third) + + shortCtx2, shortCancel2 := context.WithTimeout(context.Background(), 20*time.Millisecond) + defer shortCancel2() + require.Nil(t, queue.Dequeue(shortCtx2)) + + queue.OnSendComplete(dequeued) + + fourth := newDoneBlockStatusRequest(targetID, dispatcherID, 100) + queue.Enqueue(fourth) + + ctx2, cancel2 := context.WithTimeout(context.Background(), time.Second) + defer cancel2() + require.Same(t, fourth, queue.Dequeue(ctx2)) +} + +func TestBlockStatusRequestQueueDeduplicatesQueuedAndInFlightWaiting(t *testing.T) { + queue := NewBlockStatusRequestQueue() + targetID := node.NewID() + dispatcherID := common.NewDispatcherID() + + first := &BlockStatusRequestWithTargetID{ + TargetID: targetID, + Request: &heartbeatpb.BlockStatusRequest{ + BlockStatuses: []*heartbeatpb.TableSpanBlockStatus{ + { + ID: dispatcherID.ToPB(), + State: &heartbeatpb.State{ + IsBlocked: true, + BlockTs: 101, + Stage: heartbeatpb.BlockStage_WAITING, + }, + Mode: common.DefaultMode, + }, + }, + Mode: common.DefaultMode, + }, + } + second := &BlockStatusRequestWithTargetID{ + TargetID: targetID, + Request: &heartbeatpb.BlockStatusRequest{ + BlockStatuses: []*heartbeatpb.TableSpanBlockStatus{ + { + ID: dispatcherID.ToPB(), + State: &heartbeatpb.State{ + IsBlocked: true, + BlockTs: 101, + Stage: heartbeatpb.BlockStage_WAITING, + }, + Mode: common.DefaultMode, + }, + }, + Mode: common.DefaultMode, + }, + } + + queue.Enqueue(first) + queue.Enqueue(second) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + dequeued := queue.Dequeue(ctx) + require.Same(t, first, dequeued) + + shortCtx, shortCancel := context.WithTimeout(context.Background(), 20*time.Millisecond) + defer shortCancel() + require.Nil(t, queue.Dequeue(shortCtx)) + + third := &BlockStatusRequestWithTargetID{ + TargetID: targetID, + Request: &heartbeatpb.BlockStatusRequest{ + BlockStatuses: []*heartbeatpb.TableSpanBlockStatus{ + { + ID: dispatcherID.ToPB(), + State: &heartbeatpb.State{ + IsBlocked: true, + BlockTs: 101, + Stage: heartbeatpb.BlockStage_WAITING, + }, + Mode: common.DefaultMode, + }, + }, + Mode: common.DefaultMode, + }, + } + queue.Enqueue(third) + + shortCtx2, shortCancel2 := context.WithTimeout(context.Background(), 20*time.Millisecond) + defer shortCancel2() + require.Nil(t, queue.Dequeue(shortCtx2)) + + queue.OnSendComplete(dequeued) + + fourth := &BlockStatusRequestWithTargetID{ + TargetID: targetID, + Request: &heartbeatpb.BlockStatusRequest{ + BlockStatuses: []*heartbeatpb.TableSpanBlockStatus{ + { + ID: dispatcherID.ToPB(), + State: &heartbeatpb.State{ + IsBlocked: true, + BlockTs: 101, + Stage: heartbeatpb.BlockStage_WAITING, + }, + Mode: common.DefaultMode, + }, + }, + Mode: common.DefaultMode, + }, + } + queue.Enqueue(fourth) + + ctx2, cancel2 := context.WithTimeout(context.Background(), time.Second) + defer cancel2() + require.Same(t, fourth, queue.Dequeue(ctx2)) +} + +func TestBlockStatusRequestQueueKeepsDistinctDoneKeys(t *testing.T) { + queue := NewBlockStatusRequestQueue() + targetID := node.NewID() + dispatcherID := common.NewDispatcherID() + + defaultDone := newDoneBlockStatusRequest(targetID, dispatcherID, 200) + syncPointDone := newDoneBlockStatusRequest(targetID, dispatcherID, 200) + syncPointDone.Request.BlockStatuses[0].State.IsSyncPoint = true + redoDone := newDoneBlockStatusRequest(targetID, dispatcherID, 200) + redoDone.Request.BlockStatuses[0].Mode = common.RedoMode + redoDone.Request.Mode = common.RedoMode + + queue.Enqueue(defaultDone) + queue.Enqueue(syncPointDone) + queue.Enqueue(redoDone) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + require.Same(t, defaultDone, queue.Dequeue(ctx)) + queue.OnSendComplete(defaultDone) + + ctx2, cancel2 := context.WithTimeout(context.Background(), time.Second) + defer cancel2() + require.Same(t, syncPointDone, queue.Dequeue(ctx2)) + queue.OnSendComplete(syncPointDone) + + ctx3, cancel3 := context.WithTimeout(context.Background(), time.Second) + defer cancel3() + require.Same(t, redoDone, queue.Dequeue(ctx3)) +} + +func TestBlockStatusRequestQueueKeepsWaitingAndDoneDistinct(t *testing.T) { + queue := NewBlockStatusRequestQueue() + targetID := node.NewID() + dispatcherID := common.NewDispatcherID() + + waiting := &BlockStatusRequestWithTargetID{ + TargetID: targetID, + Request: &heartbeatpb.BlockStatusRequest{ + BlockStatuses: []*heartbeatpb.TableSpanBlockStatus{ + { + ID: dispatcherID.ToPB(), + State: &heartbeatpb.State{ + IsBlocked: true, + BlockTs: 300, + Stage: heartbeatpb.BlockStage_WAITING, + }, + Mode: common.DefaultMode, + }, + }, + Mode: common.DefaultMode, + }, + } + done := newDoneBlockStatusRequest(targetID, dispatcherID, 300) + + queue.Enqueue(waiting) + queue.Enqueue(done) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + require.Same(t, waiting, queue.Dequeue(ctx)) + + ctx2, cancel2 := context.WithTimeout(context.Background(), time.Second) + defer cancel2() + require.Same(t, done, queue.Dequeue(ctx2)) +} + +func newDoneBlockStatusRequest(targetID node.ID, dispatcherID common.DispatcherID, blockTs uint64) *BlockStatusRequestWithTargetID { + return &BlockStatusRequestWithTargetID{ + TargetID: targetID, + Request: &heartbeatpb.BlockStatusRequest{ + BlockStatuses: []*heartbeatpb.TableSpanBlockStatus{ + { + ID: dispatcherID.ToPB(), + State: &heartbeatpb.State{ + IsBlocked: true, + BlockTs: blockTs, + Stage: heartbeatpb.BlockStage_DONE, + }, + Mode: common.DefaultMode, + }, + }, + Mode: common.DefaultMode, + }, + } +} diff --git a/maintainer/maintainer.go b/maintainer/maintainer.go index 15a16cd984..e8cfff5d73 100644 --- a/maintainer/maintainer.go +++ b/maintainer/maintainer.go @@ -69,8 +69,9 @@ type Maintainer struct { selfNode *node.Info controller *Controller - pdClock pdutil.Clock - eventCh *chann.DrainableChann[*Event] + pdClock pdutil.Clock + eventCh *chann.DrainableChann[*Event] + statusRequestNotifyCh chan struct{} mc messaging.MessageCenter @@ -163,6 +164,14 @@ type Maintainer struct { redoScheduledTaskGauge prometheus.Gauge redoSpanCountGauge prometheus.Gauge redoTableCountGauge prometheus.Gauge + + bufferedStatusRequests bufferedStatusRequestBuffer + + droppedStatusRequests struct { + sync.Mutex + count uint64 + lastLogTime time.Time + } } // NewMaintainer create the maintainer for the changefeed @@ -196,10 +205,11 @@ func NewMaintainer(cfID common.ChangeFeedID, Name: keyspaceName, } m := &Maintainer{ - changefeedID: cfID, - selfNode: selfNode, - eventCh: chann.NewAutoDrainChann[*Event](), - startCheckpointTs: checkpointTs, + changefeedID: cfID, + selfNode: selfNode, + eventCh: chann.NewAutoDrainChann[*Event](), + statusRequestNotifyCh: make(chan struct{}, 1), + startCheckpointTs: checkpointTs, controller: NewController(cfID, checkpointTs, taskScheduler, info.Config, ddlSpan, redoDDLSpan, conf.AddTableBatchSize, time.Duration(conf.CheckBalanceInterval), refresher, keyspaceMeta, enableRedo), mc: mc, @@ -1157,6 +1167,8 @@ func (m *Maintainer) runHandleEvents(ctx context.Context) { return case event := <-m.eventCh.Out(): m.HandleEvent(event) + case <-m.statusRequestNotifyCh: + m.handleBufferedStatusRequests() case <-ticker.C: m.HandleEvent(&Event{ changefeedID: m.changefeedID, @@ -1168,8 +1180,31 @@ func (m *Maintainer) runHandleEvents(ctx context.Context) { // pushEvent is used to push event to maintainer's event channel // event will be handled by maintainer's main loop -func (m *Maintainer) pushEvent(event *Event) { +func (m *Maintainer) pushEvent(event *Event) bool { + if handled, accepted := m.tryBufferStatusRequestEvent(event); handled { + return accepted + } m.eventCh.In() <- event + return true +} + +func (m *Maintainer) recordDroppedStatusRequest(msgType messaging.IOType, reason string) { + m.droppedStatusRequests.Lock() + defer m.droppedStatusRequests.Unlock() + + m.droppedStatusRequests.count++ + now := time.Now() + if !m.droppedStatusRequests.lastLogTime.IsZero() && now.Sub(m.droppedStatusRequests.lastLogTime) < 5*time.Second { + return + } + m.droppedStatusRequests.lastLogTime = now + + log.Warn("drop maintainer status request", + zap.Stringer("changefeedID", m.changefeedID), + zap.String("type", msgType.String()), + zap.String("reason", reason), + zap.Int("eventChLen", m.eventCh.Len()), + zap.Uint64("droppedCount", m.droppedStatusRequests.count)) } func (m *Maintainer) getWatermark() heartbeatpb.Watermark { diff --git a/maintainer/maintainer_test.go b/maintainer/maintainer_test.go index 8c06121e47..84b452c1a1 100644 --- a/maintainer/maintainer_test.go +++ b/maintainer/maintainer_test.go @@ -33,6 +33,7 @@ import ( "github.com/pingcap/ticdc/pkg/node" "github.com/pingcap/ticdc/pkg/pdutil" "github.com/pingcap/ticdc/server/watcher" + "github.com/pingcap/ticdc/utils/chann" "github.com/pingcap/ticdc/utils/threadpool" "github.com/stretchr/testify/require" "go.uber.org/atomic" @@ -256,6 +257,189 @@ func (m *mockDispatcherManager) sendHeartbeat() { } } +func TestMaintainerPushEventDropsStatusRequestBeforeInitialized(t *testing.T) { + m := &Maintainer{ + changefeedID: common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceName), + eventCh: chann.NewAutoDrainChann[*Event](chann.Cap(8)), + statusRequestNotifyCh: make(chan struct{}, 1), + removed: atomic.NewBool(false), + } + + pushed := m.pushEvent(&Event{ + changefeedID: m.changefeedID, + eventType: EventMessage, + message: &messaging.TargetMessage{Type: messaging.TypeBlockStatusRequest}, + }) + + require.False(t, pushed) + require.Equal(t, 0, m.eventCh.Len()) +} + +func TestMaintainerPushEventBuffersBlockStatusRequest(t *testing.T) { + dispatcherID := common.NewDispatcherID() + from := node.ID("node-1") + m := &Maintainer{ + changefeedID: common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceName), + eventCh: chann.NewAutoDrainChann[*Event](chann.Cap(8)), + statusRequestNotifyCh: make(chan struct{}, 1), + removed: atomic.NewBool(false), + } + m.initialized.Store(true) + + req := &heartbeatpb.BlockStatusRequest{ + ChangefeedID: m.changefeedID.ToPB(), + Mode: common.DefaultMode, + BlockStatuses: []*heartbeatpb.TableSpanBlockStatus{ + { + ID: dispatcherID.ToPB(), + State: &heartbeatpb.State{ + IsBlocked: true, + BlockTs: 100, + IsSyncPoint: true, + Stage: heartbeatpb.BlockStage_WAITING, + }, + Mode: common.DefaultMode, + }, + }, + } + + require.True(t, m.pushEvent(&Event{ + changefeedID: m.changefeedID, + eventType: EventMessage, + message: &messaging.TargetMessage{ + From: from, + Type: messaging.TypeBlockStatusRequest, + Message: []messaging.IOTypeT{req}, + }, + })) + require.True(t, m.pushEvent(&Event{ + changefeedID: m.changefeedID, + eventType: EventMessage, + message: &messaging.TargetMessage{ + From: from, + Type: messaging.TypeBlockStatusRequest, + Message: []messaging.IOTypeT{req}, + }, + })) + + require.Equal(t, 0, m.eventCh.Len()) + require.Equal(t, 1, len(m.statusRequestNotifyCh)) + + events := m.takeBufferedStatusRequestEvents() + require.Len(t, events, 1) + bufferedReq := events[0].message.Message[0].(*heartbeatpb.BlockStatusRequest) + require.Len(t, bufferedReq.BlockStatuses, 1) + require.Equal(t, from, events[0].message.From) +} + +func TestMaintainerPushEventBuffersBlockStatusRequestByModeAndBlockTs(t *testing.T) { + from := node.ID("node-1") + changefeedID := common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceName) + defaultDispatcherID := common.NewDispatcherID() + redoDispatcherID := common.NewDispatcherID() + + newBlockStatus := func(dispatcherID common.DispatcherID, blockTs uint64, mode int64) *heartbeatpb.TableSpanBlockStatus { + return &heartbeatpb.TableSpanBlockStatus{ + ID: dispatcherID.ToPB(), + State: &heartbeatpb.State{ + IsBlocked: true, + BlockTs: blockTs, + IsSyncPoint: true, + Stage: heartbeatpb.BlockStage_WAITING, + }, + Mode: mode, + } + } + + m := &Maintainer{ + changefeedID: changefeedID, + eventCh: chann.NewAutoDrainChann[*Event](chann.Cap(8)), + statusRequestNotifyCh: make(chan struct{}, 1), + removed: atomic.NewBool(false), + } + m.initialized.Store(true) + + newBlockStatusRequest := func(mode int64, statuses ...*heartbeatpb.TableSpanBlockStatus) *heartbeatpb.BlockStatusRequest { + return &heartbeatpb.BlockStatusRequest{ + ChangefeedID: changefeedID.ToPB(), + Mode: mode, + BlockStatuses: statuses, + } + } + + require.True(t, m.pushEvent(&Event{ + changefeedID: m.changefeedID, + eventType: EventMessage, + message: &messaging.TargetMessage{ + From: from, + Type: messaging.TypeBlockStatusRequest, + Message: []messaging.IOTypeT{ + newBlockStatusRequest(common.DefaultMode, + newBlockStatus(defaultDispatcherID, 100, common.DefaultMode)), + }, + }, + })) + require.True(t, m.pushEvent(&Event{ + changefeedID: m.changefeedID, + eventType: EventMessage, + message: &messaging.TargetMessage{ + From: from, + Type: messaging.TypeBlockStatusRequest, + Message: []messaging.IOTypeT{ + newBlockStatusRequest(common.DefaultMode, + newBlockStatus(defaultDispatcherID, 200, common.DefaultMode)), + }, + }, + })) + require.True(t, m.pushEvent(&Event{ + changefeedID: m.changefeedID, + eventType: EventMessage, + message: &messaging.TargetMessage{ + From: from, + Type: messaging.TypeBlockStatusRequest, + Message: []messaging.IOTypeT{ + newBlockStatusRequest(common.RedoMode, + newBlockStatus(redoDispatcherID, 300, common.RedoMode)), + }, + }, + })) + + events := m.takeBufferedStatusRequestEvents() + require.Len(t, events, 2) + + blockTsByMode := make(map[int64][]uint64, 2) + for _, event := range events { + require.Equal(t, from, event.message.From) + req := event.message.Message[0].(*heartbeatpb.BlockStatusRequest) + for _, status := range req.BlockStatuses { + blockTsByMode[req.Mode] = append(blockTsByMode[req.Mode], status.State.BlockTs) + } + } + + require.ElementsMatch(t, []uint64{100, 200}, blockTsByMode[common.DefaultMode]) + require.ElementsMatch(t, []uint64{300}, blockTsByMode[common.RedoMode]) +} + +func TestMaintainerPushEventKeepsBootstrapResponseOutsideStatusBuffer(t *testing.T) { + m := &Maintainer{ + changefeedID: common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceName), + eventCh: chann.NewAutoDrainChann[*Event](chann.Cap(8)), + statusRequestNotifyCh: make(chan struct{}, 1), + removed: atomic.NewBool(false), + } + m.initialized.Store(true) + + pushed := m.pushEvent(&Event{ + changefeedID: m.changefeedID, + eventType: EventMessage, + message: &messaging.TargetMessage{Type: messaging.TypeMaintainerBootstrapResponse}, + }) + + require.True(t, pushed) + require.Equal(t, 1, m.eventCh.Len()) + require.Equal(t, 0, len(m.statusRequestNotifyCh)) +} + func TestMaintainerSchedule(t *testing.T) { // This test exercises a single-node maintainer lifecycle: // 1) Bootstrap a changefeed via the dispatcher manager mock. diff --git a/maintainer/status_request_buffer.go b/maintainer/status_request_buffer.go new file mode 100644 index 0000000000..ab2849c670 --- /dev/null +++ b/maintainer/status_request_buffer.go @@ -0,0 +1,299 @@ +package maintainer + +import ( + "sync" + + "github.com/pingcap/ticdc/heartbeatpb" + "github.com/pingcap/ticdc/pkg/common" + "github.com/pingcap/ticdc/pkg/messaging" + "github.com/pingcap/ticdc/pkg/node" +) + +type bufferedStatusRequestBuffer struct { + sync.Mutex + + heartbeats map[node.ID]*bufferedHeartbeatRequest + blockStatuses map[bufferedBlockStatusSourceKey]*bufferedBlockStatusRequest + redoResolvedTs map[node.ID]*heartbeatpb.RedoResolvedTsProgressMessage +} + +type bufferedHeartbeatRequest struct { + changefeedID *heartbeatpb.ChangefeedID + watermark *heartbeatpb.Watermark + redoWatermark *heartbeatpb.Watermark + statuses map[heartbeatStatusKey]*heartbeatpb.TableSpanStatus + order []heartbeatStatusKey + err *heartbeatpb.RunningError + completeStatus bool +} + +type heartbeatStatusKey struct { + dispatcherID common.DispatcherID + mode int64 +} + +type bufferedBlockStatusRequest struct { + changefeedID *heartbeatpb.ChangefeedID + from node.ID + mode int64 + statuses map[blockStatusKey]*heartbeatpb.TableSpanBlockStatus + order []blockStatusKey +} + +type bufferedBlockStatusSourceKey struct { + from node.ID + mode int64 +} + +type blockStatusKey struct { + dispatcherID common.DispatcherID + blockTs uint64 + mode int64 + isSyncPoint bool + stage heartbeatpb.BlockStage +} + +func isBufferableStatusRequest(msgType messaging.IOType) bool { + switch msgType { + case messaging.TypeHeartBeatRequest, + messaging.TypeBlockStatusRequest, + messaging.TypeRedoResolvedTsProgressMessage: + return true + default: + return false + } +} + +func (m *Maintainer) tryBufferStatusRequestEvent(event *Event) (bool, bool) { + if event == nil || event.eventType != EventMessage || event.message == nil { + return false, false + } + if !isBufferableStatusRequest(event.message.Type) { + return false, false + } + if !m.initialized.Load() { + m.recordDroppedStatusRequest(event.message.Type, "maintainer not initialized") + return true, false + } + if m.removing.Load() || (m.removed != nil && m.removed.Load()) { + m.recordDroppedStatusRequest(event.message.Type, "maintainer is closing") + return true, false + } + + m.bufferStatusRequest(event.message) + select { + case m.statusRequestNotifyCh <- struct{}{}: + default: + } + return true, true +} + +func (m *Maintainer) bufferStatusRequest(msg *messaging.TargetMessage) { + switch msg.Type { + case messaging.TypeHeartBeatRequest: + req := msg.Message[0].(*heartbeatpb.HeartBeatRequest) + m.bufferedStatusRequests.mergeHeartbeat(msg.From, req) + case messaging.TypeBlockStatusRequest: + req := msg.Message[0].(*heartbeatpb.BlockStatusRequest) + m.bufferedStatusRequests.mergeBlockStatus(msg.From, req) + case messaging.TypeRedoResolvedTsProgressMessage: + req := msg.Message[0].(*heartbeatpb.RedoResolvedTsProgressMessage) + m.bufferedStatusRequests.mergeRedoResolvedTs(msg.From, req) + } +} + +func (m *Maintainer) handleBufferedStatusRequests() { + for _, event := range m.takeBufferedStatusRequestEvents() { + m.HandleEvent(event) + } +} + +func (m *Maintainer) takeBufferedStatusRequestEvents() []*Event { + return m.bufferedStatusRequests.drain(m.changefeedID) +} + +func (b *bufferedStatusRequestBuffer) mergeHeartbeat(from node.ID, req *heartbeatpb.HeartBeatRequest) { + if req == nil { + return + } + b.Lock() + defer b.Unlock() + if b.heartbeats == nil { + b.heartbeats = make(map[node.ID]*bufferedHeartbeatRequest) + } + entry, ok := b.heartbeats[from] + if !ok { + entry = &bufferedHeartbeatRequest{ + changefeedID: req.ChangefeedID, + statuses: make(map[heartbeatStatusKey]*heartbeatpb.TableSpanStatus), + } + b.heartbeats[from] = entry + } + entry.changefeedID = req.ChangefeedID + entry.completeStatus = entry.completeStatus || req.CompeleteStatus + entry.err = pickRunningError(entry.err, req.Err) + entry.watermark = pickWatermark(entry.watermark, req.Watermark) + entry.redoWatermark = pickWatermark(entry.redoWatermark, req.RedoWatermark) + for _, status := range req.Statuses { + if status == nil || status.ID == nil { + continue + } + key := heartbeatStatusKey{ + dispatcherID: common.NewDispatcherIDFromPB(status.ID), + mode: status.Mode, + } + if _, exists := entry.statuses[key]; !exists { + entry.order = append(entry.order, key) + } + entry.statuses[key] = status + } +} + +func (b *bufferedStatusRequestBuffer) mergeBlockStatus(from node.ID, req *heartbeatpb.BlockStatusRequest) { + if req == nil { + return + } + b.Lock() + defer b.Unlock() + if b.blockStatuses == nil { + b.blockStatuses = make(map[bufferedBlockStatusSourceKey]*bufferedBlockStatusRequest) + } + sourceKey := bufferedBlockStatusSourceKey{ + from: from, + mode: req.Mode, + } + entry, ok := b.blockStatuses[sourceKey] + if !ok { + entry = &bufferedBlockStatusRequest{ + changefeedID: req.ChangefeedID, + from: from, + mode: req.Mode, + statuses: make(map[blockStatusKey]*heartbeatpb.TableSpanBlockStatus), + } + b.blockStatuses[sourceKey] = entry + } + entry.changefeedID = req.ChangefeedID + entry.mode = req.Mode + for _, status := range req.BlockStatuses { + if status == nil || status.ID == nil || status.State == nil { + continue + } + key := blockStatusKey{ + dispatcherID: common.NewDispatcherIDFromPB(status.ID), + blockTs: status.State.BlockTs, + mode: status.Mode, + isSyncPoint: status.State.IsSyncPoint, + stage: status.State.Stage, + } + if _, exists := entry.statuses[key]; !exists { + entry.order = append(entry.order, key) + } + entry.statuses[key] = status + } +} + +func (b *bufferedStatusRequestBuffer) mergeRedoResolvedTs(from node.ID, req *heartbeatpb.RedoResolvedTsProgressMessage) { + if req == nil { + return + } + b.Lock() + defer b.Unlock() + if b.redoResolvedTs == nil { + b.redoResolvedTs = make(map[node.ID]*heartbeatpb.RedoResolvedTsProgressMessage) + } + current, ok := b.redoResolvedTs[from] + if !ok || req.ResolvedTs > current.ResolvedTs { + b.redoResolvedTs[from] = req + } +} + +func (b *bufferedStatusRequestBuffer) drain(changefeedID common.ChangeFeedID) []*Event { + var events []*Event + b.Lock() + defer b.Unlock() + + for _, entry := range b.blockStatuses { + req := &heartbeatpb.BlockStatusRequest{ + ChangefeedID: entry.changefeedID, + Mode: entry.mode, + BlockStatuses: make([]*heartbeatpb.TableSpanBlockStatus, 0, len(entry.order)), + } + for _, key := range entry.order { + if status, ok := entry.statuses[key]; ok { + req.BlockStatuses = append(req.BlockStatuses, status) + } + } + if len(req.BlockStatuses) > 0 { + events = append(events, newBufferedStatusEvent(changefeedID, entry.from, messaging.TypeBlockStatusRequest, req)) + } + } + b.blockStatuses = nil + + for from, entry := range b.heartbeats { + req := &heartbeatpb.HeartBeatRequest{ + ChangefeedID: entry.changefeedID, + Statuses: make([]*heartbeatpb.TableSpanStatus, 0, len(entry.order)), + Watermark: entry.watermark, + RedoWatermark: entry.redoWatermark, + Err: entry.err, + CompeleteStatus: entry.completeStatus, + } + for _, key := range entry.order { + if status, ok := entry.statuses[key]; ok { + req.Statuses = append(req.Statuses, status) + } + } + if len(req.Statuses) > 0 || req.Watermark != nil || req.RedoWatermark != nil || req.Err != nil { + events = append(events, newBufferedStatusEvent(changefeedID, from, messaging.TypeHeartBeatRequest, req)) + } + } + b.heartbeats = nil + + for from, req := range b.redoResolvedTs { + events = append(events, newBufferedStatusEvent(changefeedID, from, messaging.TypeRedoResolvedTsProgressMessage, req)) + } + b.redoResolvedTs = nil + + return events +} + +func newBufferedStatusEvent( + changefeedID common.ChangeFeedID, + from node.ID, + msgType messaging.IOType, + msg messaging.IOTypeT, +) *Event { + return &Event{ + changefeedID: changefeedID, + eventType: EventMessage, + message: &messaging.TargetMessage{ + From: from, + To: "", + Type: msgType, + Message: []messaging.IOTypeT{msg}, + }, + } +} + +func pickRunningError(current, candidate *heartbeatpb.RunningError) *heartbeatpb.RunningError { + if candidate != nil { + return candidate + } + return current +} + +func pickWatermark(current, candidate *heartbeatpb.Watermark) *heartbeatpb.Watermark { + if candidate == nil { + return current + } + if current == nil { + return candidate + } + if candidate.Seq > current.Seq || (candidate.Seq == current.Seq && candidate.CheckpointTs >= current.CheckpointTs) { + current = candidate + } + if candidate.LastSyncedTs > current.LastSyncedTs { + current.LastSyncedTs = candidate.LastSyncedTs + } + return current +}