diff --git a/.gitignore b/.gitignore index 5012978619..f0555def26 100644 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,8 @@ tools/bin tools/include tools/workload/bin +.design +.codex .issue .vscode .idea diff --git a/downstreamadapter/dispatcher/basic_dispatcher.go b/downstreamadapter/dispatcher/basic_dispatcher.go index 97138f6388..51cf5b32cb 100644 --- a/downstreamadapter/dispatcher/basic_dispatcher.go +++ b/downstreamadapter/dispatcher/basic_dispatcher.go @@ -76,7 +76,7 @@ type Dispatcher interface { GetHeartBeatInfo(h *HeartBeatInfo) GetComponentStatus() heartbeatpb.ComponentState GetBlockEventStatus() *heartbeatpb.State - GetBlockStatusesChan() chan *heartbeatpb.TableSpanBlockStatus + OfferBlockStatus(status *heartbeatpb.TableSpanBlockStatus) GetEventSizePerSecond() float32 IsTableTriggerDispatcher() bool DealWithBlockEvent(event commonEvent.BlockEvent) @@ -838,16 +838,7 @@ func (d *BasicDispatcher) HandleDispatcherStatus(dispatcherStatus *heartbeatpb.D } // Step3: whether the outdate message or not, we need to return message show we have finished the event. - d.sharedInfo.blockStatusesChan <- &heartbeatpb.TableSpanBlockStatus{ - ID: d.id.ToPB(), - State: &heartbeatpb.State{ - IsBlocked: true, - BlockTs: action.CommitTs, - IsSyncPoint: action.IsSyncPoint, - Stage: heartbeatpb.BlockStage_DONE, - }, - Mode: d.GetMode(), - } + d.OfferDoneBlockStatus(action.CommitTs, action.IsSyncPoint) } return false } @@ -883,16 +874,7 @@ func (d *BasicDispatcher) reportBlockedEventDone( actionCommitTs uint64, actionIsSyncPoint bool, ) { - d.sharedInfo.blockStatusesChan <- &heartbeatpb.TableSpanBlockStatus{ - ID: d.id.ToPB(), - State: &heartbeatpb.State{ - IsBlocked: true, - BlockTs: actionCommitTs, - IsSyncPoint: actionIsSyncPoint, - Stage: heartbeatpb.BlockStage_DONE, - }, - Mode: d.GetMode(), - } + d.OfferDoneBlockStatus(actionCommitTs, actionIsSyncPoint) GetDispatcherStatusDynamicStream().Wake(d.id) } @@ -1044,7 +1026,7 @@ func (d *BasicDispatcher) DealWithBlockEvent(event commonEvent.BlockEvent) { } else { d.resendTaskMap.Set(identifier, newResendTask(message, d, nil)) } - d.sharedInfo.blockStatusesChan <- message + d.OfferBlockStatus(message) }) // dealing with events which update schema ids @@ -1168,7 +1150,7 @@ func (d *BasicDispatcher) reportBlockedEventToMaintainer(event commonEvent.Block IsSyncPoint: event.GetType() == commonEvent.TypeSyncPointEvent, } d.resendTaskMap.Set(identifier, newResendTask(message, d, nil)) - d.sharedInfo.blockStatusesChan <- message + d.OfferBlockStatus(message) } func (d *BasicDispatcher) flushBlockedEventAndReportToMaintainer(event commonEvent.BlockEvent) { diff --git a/downstreamadapter/dispatcher/basic_dispatcher_active_active_test.go b/downstreamadapter/dispatcher/basic_dispatcher_active_active_test.go index 8f95a93b5b..882da1fe1b 100644 --- a/downstreamadapter/dispatcher/basic_dispatcher_active_active_test.go +++ b/downstreamadapter/dispatcher/basic_dispatcher_active_active_test.go @@ -129,7 +129,6 @@ func TestDDLEventsAlwaysValidateActiveActive(t *testing.T) { func newTestBasicDispatcher(t *testing.T, sinkType common.SinkType, enableActiveActive bool) *BasicDispatcher { t.Helper() statuses := make(chan TableSpanStatusWithSeq, 2) - blockStatuses := make(chan *heartbeatpb.TableSpanBlockStatus, 1) errCh := make(chan error, 1) sharedInfo := NewSharedInfo( common.NewChangefeedID("test"), @@ -143,7 +142,7 @@ func newTestBasicDispatcher(t *testing.T, sinkType common.SinkType, enableActive nil, false, statuses, - blockStatuses, + 1, errCh, ) dispatcherSink := newDispatcherTestSink(t, sinkType) diff --git a/downstreamadapter/dispatcher/basic_dispatcher_info.go b/downstreamadapter/dispatcher/basic_dispatcher_info.go index 19a0980461..ee71078891 100644 --- a/downstreamadapter/dispatcher/basic_dispatcher_info.go +++ b/downstreamadapter/dispatcher/basic_dispatcher_info.go @@ -14,6 +14,7 @@ package dispatcher import ( + "context" "sync/atomic" "time" @@ -58,9 +59,9 @@ type SharedInfo struct { // statusesChan is used to store the status of dispatchers when status changed // and push to heartbeatRequestQueue statusesChan chan TableSpanStatusWithSeq - // blockStatusesChan use to collector block status of ddl/sync point event to Maintainer - // shared by the event dispatcher manager - blockStatusesChan chan *heartbeatpb.TableSpanBlockStatus + // blockStatusBuffer keeps block statuses for the dispatcher manager. + // Identical WAITING and DONE statuses are coalesced while pending to reduce local memory amplification. + blockStatusBuffer *BlockStatusBuffer // blockExecutor is used to execute block events such as DDL and sync point events asynchronously // to avoid callback() called in handleEvents, causing deadlock in ds @@ -88,7 +89,7 @@ func NewSharedInfo( txnAtomicity *config.AtomicityLevel, enableSplittableCheck bool, statusesChan chan TableSpanStatusWithSeq, - blockStatusesChan chan *heartbeatpb.TableSpanBlockStatus, + blockStatusBufferSize int, errCh chan error, ) *SharedInfo { sharedInfo := &SharedInfo{ @@ -102,7 +103,7 @@ func NewSharedInfo( syncPointConfig: syncPointConfig, enableSplittableCheck: enableSplittableCheck, statusesChan: statusesChan, - blockStatusesChan: blockStatusesChan, + blockStatusBuffer: NewBlockStatusBuffer(blockStatusBufferSize), blockExecutor: newBlockEventExecutor(), errCh: errCh, metricHandleDDLHis: metrics.HandleDDLHistogram.WithLabelValues(changefeedID.Keyspace(), changefeedID.Name()), @@ -219,8 +220,20 @@ func (d *BasicDispatcher) GetTxnAtomicity() config.AtomicityLevel { return d.sharedInfo.txnAtomicity } -func (d *BasicDispatcher) GetBlockStatusesChan() chan *heartbeatpb.TableSpanBlockStatus { - return d.sharedInfo.blockStatusesChan +func (d *BasicDispatcher) OfferBlockStatus(status *heartbeatpb.TableSpanBlockStatus) { + d.sharedInfo.OfferBlockStatus(status) +} + +func (d *BasicDispatcher) OfferDoneBlockStatus(blockTs uint64, isSyncPoint bool) { + d.sharedInfo.OfferDoneBlockStatus(d.id, blockTs, isSyncPoint, d.GetMode()) +} + +func (d *BasicDispatcher) TakeBlockStatus(ctx context.Context) *heartbeatpb.TableSpanBlockStatus { + return d.sharedInfo.TakeBlockStatus(ctx) +} + +func (d *BasicDispatcher) TakeBlockStatusWithTimeout(timeout time.Duration) (*heartbeatpb.TableSpanBlockStatus, bool) { + return d.sharedInfo.TakeBlockStatusWithTimeout(timeout) } func (d *BasicDispatcher) GetEventSizePerSecond() float32 { @@ -254,8 +267,34 @@ func (s *SharedInfo) EnableActiveActive() bool { return s.enableActiveActive } -func (s *SharedInfo) GetBlockStatusesChan() chan *heartbeatpb.TableSpanBlockStatus { - return s.blockStatusesChan +func (s *SharedInfo) OfferBlockStatus(status *heartbeatpb.TableSpanBlockStatus) { + s.blockStatusBuffer.Offer(status) +} + +func (s *SharedInfo) OfferDoneBlockStatus(dispatcherID common.DispatcherID, blockTs uint64, isSyncPoint bool, mode int64) { + s.blockStatusBuffer.OfferDone(dispatcherID, blockTs, isSyncPoint, mode) +} + +func (s *SharedInfo) TakeBlockStatus(ctx context.Context) *heartbeatpb.TableSpanBlockStatus { + return s.blockStatusBuffer.Take(ctx) +} + +func (s *SharedInfo) TakeBlockStatusWithTimeout(timeout time.Duration) (*heartbeatpb.TableSpanBlockStatus, bool) { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + status := s.TakeBlockStatus(ctx) + if status == nil { + return nil, false + } + return status, true +} + +func (s *SharedInfo) TryTakeBlockStatus() (*heartbeatpb.TableSpanBlockStatus, bool) { + return s.blockStatusBuffer.TryTake() +} + +func (s *SharedInfo) BlockStatusLen() int { + return s.blockStatusBuffer.Len() } func (s *SharedInfo) GetErrCh() chan error { diff --git a/downstreamadapter/dispatcher/block_status_buffer.go b/downstreamadapter/dispatcher/block_status_buffer.go new file mode 100644 index 0000000000..434ada5484 --- /dev/null +++ b/downstreamadapter/dispatcher/block_status_buffer.go @@ -0,0 +1,194 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package dispatcher + +import ( + "context" + "sync" + + "github.com/pingcap/ticdc/heartbeatpb" + "github.com/pingcap/ticdc/pkg/common" +) + +type blockStatusKey struct { + dispatcherID common.DispatcherID + blockTs uint64 + mode int64 + isSyncPoint bool +} + +type blockStatusQueueEntry struct { + status *heartbeatpb.TableSpanBlockStatus + doneKey *blockStatusKey +} + +// BlockStatusBuffer keeps block statuses ordered while coalescing identical +// WAITING and DONE statuses that are still pending locally. Other statuses keep +// the original protobuf object and ordering. +type BlockStatusBuffer struct { + queue chan blockStatusQueueEntry + + mu sync.Mutex + pendingDone map[blockStatusKey]struct{} + pendingWaiting map[blockStatusKey]struct{} +} + +// NewBlockStatusBuffer creates a bounded local mailbox for dispatcher block +// statuses. The buffer keeps enqueue order while coalescing identical pending +// WAITING and DONE statuses before protobuf materialization. +func NewBlockStatusBuffer(size int) *BlockStatusBuffer { + if size <= 0 { + size = 1 + } + return &BlockStatusBuffer{ + queue: make(chan blockStatusQueueEntry, size), + pendingDone: make(map[blockStatusKey]struct{}), + pendingWaiting: make(map[blockStatusKey]struct{}), + } +} + +func (b *BlockStatusBuffer) Offer(status *heartbeatpb.TableSpanBlockStatus) { + if status == nil { + return + } + if isWaitingBlockStatus(status) { + key := newBlockStatusKey(status) + if !b.reserveWaiting(key) { + return + } + b.queue <- blockStatusQueueEntry{status: status} + return + } + if !isDoneBlockStatus(status) { + b.queue <- blockStatusQueueEntry{status: status} + return + } + + key := newBlockStatusKey(status) + if !b.reserveDone(key) { + return + } + b.queue <- blockStatusQueueEntry{doneKey: &key} +} + +func (b *BlockStatusBuffer) OfferDone( + dispatcherID common.DispatcherID, + blockTs uint64, + isSyncPoint bool, + mode int64, +) { + key := blockStatusKey{ + dispatcherID: dispatcherID, + blockTs: blockTs, + mode: mode, + isSyncPoint: isSyncPoint, + } + if !b.reserveDone(key) { + return + } + b.queue <- blockStatusQueueEntry{doneKey: &key} +} + +func (b *BlockStatusBuffer) Take(ctx context.Context) *heartbeatpb.TableSpanBlockStatus { + select { + case <-ctx.Done(): + return nil + case entry := <-b.queue: + return b.materialize(entry) + } +} + +func (b *BlockStatusBuffer) TryTake() (*heartbeatpb.TableSpanBlockStatus, bool) { + select { + case entry := <-b.queue: + return b.materialize(entry), true + default: + return nil, false + } +} + +func (b *BlockStatusBuffer) Len() int { + return len(b.queue) +} + +func (b *BlockStatusBuffer) reserveWaiting(key blockStatusKey) bool { + b.mu.Lock() + defer b.mu.Unlock() + if _, ok := b.pendingWaiting[key]; ok { + return false + } + b.pendingWaiting[key] = struct{}{} + return true +} + +func (b *BlockStatusBuffer) reserveDone(key blockStatusKey) bool { + b.mu.Lock() + defer b.mu.Unlock() + if _, ok := b.pendingDone[key]; ok { + return false + } + b.pendingDone[key] = struct{}{} + return true +} + +func (b *BlockStatusBuffer) materialize(entry blockStatusQueueEntry) *heartbeatpb.TableSpanBlockStatus { + if entry.status != nil { + if isWaitingBlockStatus(entry.status) { + key := newBlockStatusKey(entry.status) + b.mu.Lock() + delete(b.pendingWaiting, key) + b.mu.Unlock() + } + return entry.status + } + + key := *entry.doneKey + b.mu.Lock() + delete(b.pendingDone, key) + b.mu.Unlock() + + return &heartbeatpb.TableSpanBlockStatus{ + ID: key.dispatcherID.ToPB(), + State: &heartbeatpb.State{ + IsBlocked: true, + BlockTs: key.blockTs, + IsSyncPoint: key.isSyncPoint, + Stage: heartbeatpb.BlockStage_DONE, + }, + Mode: key.mode, + } +} + +func newBlockStatusKey(status *heartbeatpb.TableSpanBlockStatus) blockStatusKey { + return blockStatusKey{ + dispatcherID: common.NewDispatcherIDFromPB(status.ID), + blockTs: status.State.BlockTs, + mode: status.Mode, + isSyncPoint: status.State.IsSyncPoint, + } +} + +func isWaitingBlockStatus(status *heartbeatpb.TableSpanBlockStatus) bool { + return status != nil && + status.State != nil && + status.State.IsBlocked && + status.State.Stage == heartbeatpb.BlockStage_WAITING +} + +func isDoneBlockStatus(status *heartbeatpb.TableSpanBlockStatus) bool { + return status != nil && + status.State != nil && + status.State.IsBlocked && + status.State.Stage == heartbeatpb.BlockStage_DONE +} diff --git a/downstreamadapter/dispatcher/block_status_buffer_test.go b/downstreamadapter/dispatcher/block_status_buffer_test.go new file mode 100644 index 0000000000..021039bb81 --- /dev/null +++ b/downstreamadapter/dispatcher/block_status_buffer_test.go @@ -0,0 +1,168 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package dispatcher + +import ( + "context" + "testing" + "time" + + "github.com/pingcap/ticdc/heartbeatpb" + "github.com/pingcap/ticdc/pkg/common" + "github.com/stretchr/testify/require" +) + +func TestBlockStatusBufferDeduplicatesPendingDone(t *testing.T) { + buffer := NewBlockStatusBuffer(4) + dispatcherID := common.NewDispatcherID() + + buffer.OfferDone(dispatcherID, 100, false, common.DefaultMode) + buffer.OfferDone(dispatcherID, 100, false, common.DefaultMode) + + require.Equal(t, 1, buffer.Len()) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + msg := buffer.Take(ctx) + require.NotNil(t, msg) + require.Equal(t, dispatcherID, common.NewDispatcherIDFromPB(msg.ID)) + require.Equal(t, uint64(100), msg.State.BlockTs) + require.Equal(t, heartbeatpb.BlockStage_DONE, msg.State.Stage) + require.False(t, msg.State.IsSyncPoint) + require.Equal(t, common.DefaultMode, msg.Mode) + + _, ok := buffer.TryTake() + require.False(t, ok) +} + +func TestBlockStatusBufferDeduplicatesPendingWaiting(t *testing.T) { + buffer := NewBlockStatusBuffer(4) + dispatcherID := common.NewDispatcherID() + + waiting := &heartbeatpb.TableSpanBlockStatus{ + ID: dispatcherID.ToPB(), + State: &heartbeatpb.State{ + IsBlocked: true, + BlockTs: 150, + Stage: heartbeatpb.BlockStage_WAITING, + }, + Mode: common.DefaultMode, + } + + buffer.Offer(waiting) + buffer.Offer(waiting) + + require.Equal(t, 1, buffer.Len()) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + msg := buffer.Take(ctx) + require.NotNil(t, msg) + require.Equal(t, heartbeatpb.BlockStage_WAITING, msg.State.Stage) + + _, ok := buffer.TryTake() + require.False(t, ok) +} + +func TestBlockStatusBufferAllowsWaitingAgainAfterTake(t *testing.T) { + buffer := NewBlockStatusBuffer(4) + dispatcherID := common.NewDispatcherID() + + waiting := &heartbeatpb.TableSpanBlockStatus{ + ID: dispatcherID.ToPB(), + State: &heartbeatpb.State{ + IsBlocked: true, + BlockTs: 180, + Stage: heartbeatpb.BlockStage_WAITING, + }, + Mode: common.DefaultMode, + } + + buffer.Offer(waiting) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + first := buffer.Take(ctx) + require.NotNil(t, first) + require.Equal(t, heartbeatpb.BlockStage_WAITING, first.State.Stage) + + buffer.Offer(waiting) + + second := buffer.Take(ctx) + require.NotNil(t, second) + require.Equal(t, heartbeatpb.BlockStage_WAITING, second.State.Stage) + + _, ok := buffer.TryTake() + require.False(t, ok) +} + +func TestBlockStatusBufferKeepsWaitingBeforeDone(t *testing.T) { + buffer := NewBlockStatusBuffer(4) + dispatcherID := common.NewDispatcherID() + + buffer.Offer(&heartbeatpb.TableSpanBlockStatus{ + ID: dispatcherID.ToPB(), + State: &heartbeatpb.State{ + IsBlocked: true, + BlockTs: 200, + Stage: heartbeatpb.BlockStage_WAITING, + }, + Mode: common.DefaultMode, + }) + buffer.OfferDone(dispatcherID, 200, false, common.DefaultMode) + buffer.OfferDone(dispatcherID, 200, false, common.DefaultMode) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + first := buffer.Take(ctx) + require.NotNil(t, first) + require.Equal(t, heartbeatpb.BlockStage_WAITING, first.State.Stage) + + second := buffer.Take(ctx) + require.NotNil(t, second) + require.Equal(t, heartbeatpb.BlockStage_DONE, second.State.Stage) + + _, ok := buffer.TryTake() + require.False(t, ok) +} + +func TestBlockStatusBufferKeepsDistinctDoneKeys(t *testing.T) { + buffer := NewBlockStatusBuffer(4) + dispatcherID := common.NewDispatcherID() + + buffer.OfferDone(dispatcherID, 300, false, common.DefaultMode) + buffer.OfferDone(dispatcherID, 300, true, common.DefaultMode) + buffer.OfferDone(dispatcherID, 300, false, common.RedoMode) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + first := buffer.Take(ctx) + require.NotNil(t, first) + require.False(t, first.State.IsSyncPoint) + require.Equal(t, common.DefaultMode, first.Mode) + + second := buffer.Take(ctx) + require.NotNil(t, second) + require.True(t, second.State.IsSyncPoint) + require.Equal(t, common.DefaultMode, second.Mode) + + third := buffer.Take(ctx) + require.NotNil(t, third) + require.False(t, third.State.IsSyncPoint) + require.Equal(t, common.RedoMode, third.Mode) + + _, ok := buffer.TryTake() + require.False(t, ok) +} diff --git a/downstreamadapter/dispatcher/event_dispatcher_test.go b/downstreamadapter/dispatcher/event_dispatcher_test.go index 29b8ace3e2..0ce694d02e 100644 --- a/downstreamadapter/dispatcher/event_dispatcher_test.go +++ b/downstreamadapter/dispatcher/event_dispatcher_test.go @@ -83,7 +83,7 @@ func newDispatcherForTest(sink sink.Sink, tableSpan *heartbeatpb.TableSpan) *Eve &defaultAtomicity, false, // enableSplittableCheck make(chan TableSpanStatusWithSeq, 128), - make(chan *heartbeatpb.TableSpanBlockStatus, 128), + 128, make(chan error, 1), ) return NewEventDispatcher( @@ -476,22 +476,16 @@ func TestBlockingDDLFlushBeforeWaitingAndWriteDoesNotFlushAgain(t *testing.T) { require.Nil(t, pendingEvent) require.Equal(t, heartbeatpb.BlockStage_NONE, blockStage) - select { - case msg := <-dispatcher.GetBlockStatusesChan(): - require.FailNow(t, "unexpected block status before local flush finishes", "received=%v", msg) - case <-time.After(200 * time.Millisecond): - } + msg, ok := dispatcher.TakeBlockStatusWithTimeout(200 * time.Millisecond) + require.False(t, ok, "unexpected block status before local flush finishes: %v", msg) close(flushRelease) - select { - case msg := <-dispatcher.GetBlockStatusesChan(): - require.True(t, msg.State.IsBlocked) - require.Equal(t, uint64(10), msg.State.BlockTs) - require.Equal(t, heartbeatpb.BlockStage_WAITING, msg.State.Stage) - case <-time.After(time.Second): - require.FailNow(t, "expected blocking DDL to enter WAITING after local flush") - } + msg, ok = dispatcher.TakeBlockStatusWithTimeout(time.Second) + require.True(t, ok, "expected blocking DDL to enter WAITING after local flush") + require.True(t, msg.State.IsBlocked) + require.Equal(t, uint64(10), msg.State.BlockTs) + require.Equal(t, heartbeatpb.BlockStage_WAITING, msg.State.Stage) pendingEvent, blockStage = dispatcher.blockEventStatus.getEventAndStage() require.Same(t, ddlEvent, pendingEvent) @@ -507,14 +501,11 @@ func TestBlockingDDLFlushBeforeWaitingAndWriteDoesNotFlushAgain(t *testing.T) { }) require.True(t, await) - select { - case msg := <-dispatcher.GetBlockStatusesChan(): - require.True(t, msg.State.IsBlocked) - require.Equal(t, uint64(10), msg.State.BlockTs) - require.Equal(t, heartbeatpb.BlockStage_DONE, msg.State.Stage) - case <-time.After(time.Second): - require.FailNow(t, "expected DONE after write action") - } + msg, ok = dispatcher.TakeBlockStatusWithTimeout(time.Second) + require.True(t, ok, "expected DONE after write action") + require.True(t, msg.State.IsBlocked) + require.Equal(t, uint64(10), msg.State.BlockTs) + require.Equal(t, heartbeatpb.BlockStage_DONE, msg.State.Stage) require.Eventually(t, func() bool { pendingEvent, blockStage = dispatcher.blockEventStatus.getEventAndStage() @@ -1004,7 +995,7 @@ func TestDispatcherSplittableCheck(t *testing.T) { &defaultAtomicity, true, // enableSplittableCheck = true make(chan TableSpanStatusWithSeq, 128), - make(chan *heartbeatpb.TableSpanBlockStatus, 128), + 128, make(chan error, 1), ) @@ -1114,7 +1105,7 @@ func TestDispatcher_SkipDMLAsStartTs_FilterCorrectly(t *testing.T) { &defaultAtomicity, false, make(chan TableSpanStatusWithSeq, 128), - make(chan *heartbeatpb.TableSpanBlockStatus, 128), + 128, make(chan error, 1), ) @@ -1194,7 +1185,7 @@ func TestDispatcher_SkipDMLAsStartTs_Disabled(t *testing.T) { &defaultAtomicity, false, make(chan TableSpanStatusWithSeq, 128), - make(chan *heartbeatpb.TableSpanBlockStatus, 128), + 128, make(chan error, 1), ) @@ -1258,14 +1249,11 @@ func TestHoldBlockEventUntilNoResendTasks(t *testing.T) { block := dispatcher.HandleEvents([]DispatcherEvent{NewDispatcherEvent(&nodeID, createTableDDL)}, func() {}) require.True(t, block) - select { - case msg := <-dispatcher.GetBlockStatusesChan(): - require.False(t, msg.State.IsBlocked) - require.False(t, msg.State.IsSyncPoint) - require.Equal(t, uint64(10), msg.State.BlockTs) - case <-time.After(time.Second): - require.FailNow(t, "expected add-table block status") - } + msg, ok := dispatcher.TakeBlockStatusWithTimeout(time.Second) + require.True(t, ok, "expected add-table block status") + require.False(t, msg.State.IsBlocked) + require.False(t, msg.State.IsSyncPoint) + require.Equal(t, uint64(10), msg.State.BlockTs) require.Equal(t, 1, dispatcher.resendTaskMap.Len()) // A DB/All block event must be deferred until resendTaskMap becomes empty, @@ -1281,11 +1269,8 @@ func TestHoldBlockEventUntilNoResendTasks(t *testing.T) { block = dispatcher.HandleEvents([]DispatcherEvent{NewDispatcherEvent(&nodeID, dropDBDDL)}, func() {}) require.True(t, block) - select { - case msg := <-dispatcher.GetBlockStatusesChan(): - require.FailNow(t, "unexpected block status", "received=%v", msg) - case <-time.After(200 * time.Millisecond): - } + msg, ok = dispatcher.TakeBlockStatusWithTimeout(200 * time.Millisecond) + require.False(t, ok, "unexpected block status: %v", msg) // Simulate maintainer ACK for the create table scheduling message. dispatcher.HandleDispatcherStatus(&heartbeatpb.DispatcherStatus{ @@ -1301,23 +1286,17 @@ func TestHoldBlockEventUntilNoResendTasks(t *testing.T) { require.FailNow(t, "expected deferred DB-level flush to start") } - select { - case msg := <-dispatcher.GetBlockStatusesChan(): - require.FailNow(t, "unexpected block status before local flush finishes", "received=%v", msg) - case <-time.After(200 * time.Millisecond): - } + msg, ok = dispatcher.TakeBlockStatusWithTimeout(200 * time.Millisecond) + require.False(t, ok, "unexpected block status before local flush finishes: %v", msg) close(flushRelease) - select { - case msg := <-dispatcher.GetBlockStatusesChan(): - require.True(t, msg.State.IsBlocked) - require.False(t, msg.State.IsSyncPoint) - require.Equal(t, uint64(20), msg.State.BlockTs) - require.Equal(t, heartbeatpb.InfluenceType_DB, msg.State.BlockTables.InfluenceType) - require.Equal(t, int64(1), msg.State.BlockTables.SchemaID) - require.Equal(t, heartbeatpb.BlockStage_WAITING, msg.State.Stage) - case <-time.After(time.Second): - require.FailNow(t, "expected deferred DB-level block status") - } + msg, ok = dispatcher.TakeBlockStatusWithTimeout(time.Second) + require.True(t, ok, "expected deferred DB-level block status") + require.True(t, msg.State.IsBlocked) + require.False(t, msg.State.IsSyncPoint) + require.Equal(t, uint64(20), msg.State.BlockTs) + require.Equal(t, heartbeatpb.InfluenceType_DB, msg.State.BlockTables.InfluenceType) + require.Equal(t, int64(1), msg.State.BlockTables.SchemaID) + require.Equal(t, heartbeatpb.BlockStage_WAITING, msg.State.Stage) } diff --git a/downstreamadapter/dispatcher/helper.go b/downstreamadapter/dispatcher/helper.go index 177eab4ffa..42625a7ff5 100644 --- a/downstreamadapter/dispatcher/helper.go +++ b/downstreamadapter/dispatcher/helper.go @@ -269,7 +269,7 @@ func newResendTask(message *heartbeatpb.TableSpanBlockStatus, dispatcher Dispatc func (t *ResendTask) Execute() time.Time { log.Debug("resend task", zap.Any("message", t.message), zap.Any("dispatcherID", t.dispatcher.GetId())) - t.dispatcher.GetBlockStatusesChan() <- t.message + t.dispatcher.OfferBlockStatus(t.message) executeCount := atomic.AddUint64(&t.executeCount, 1) if executeCount%10 == 0 { diff --git a/downstreamadapter/dispatcher/redo_dispatcher_test.go b/downstreamadapter/dispatcher/redo_dispatcher_test.go index 68f97016b7..bdd906eebc 100644 --- a/downstreamadapter/dispatcher/redo_dispatcher_test.go +++ b/downstreamadapter/dispatcher/redo_dispatcher_test.go @@ -48,7 +48,7 @@ func newRedoDispatcherForTest(sink sink.Sink, tableSpan *heartbeatpb.TableSpan) &defaultAtomicity, false, // enableSplittableCheck make(chan TableSpanStatusWithSeq, 128), - make(chan *heartbeatpb.TableSpanBlockStatus, 128), + 128, make(chan error, 1), ) return NewRedoDispatcher( diff --git a/downstreamadapter/dispatchermanager/dispatcher_manager.go b/downstreamadapter/dispatchermanager/dispatcher_manager.go index 128f8cbe1e..7446889b52 100644 --- a/downstreamadapter/dispatchermanager/dispatcher_manager.go +++ b/downstreamadapter/dispatchermanager/dispatcher_manager.go @@ -258,7 +258,7 @@ func NewDispatcherManager( manager.config.SinkConfig.TxnAtomicity, manager.config.EnableSplittableCheck, make(chan dispatcher.TableSpanStatusWithSeq, 8192), - make(chan *heartbeatpb.TableSpanBlockStatus, 1024*1024), + 1024*1024, make(chan error, 1), ) @@ -565,8 +565,6 @@ func (e *DispatcherManager) collectErrors(ctx context.Context) { // collectBlockStatusRequest collect the block status from the block status channel and report to the maintainer. func (e *DispatcherManager) collectBlockStatusRequest(ctx context.Context) { - delay := time.NewTimer(0) - defer delay.Stop() enqueueBlockStatus := func(blockStatusMessage []*heartbeatpb.TableSpanBlockStatus, mode int64) { var message heartbeatpb.BlockStatusRequest message.ChangefeedID = e.changefeedID.ToPB() @@ -577,37 +575,39 @@ func (e *DispatcherManager) collectBlockStatusRequest(ctx context.Context) { for { blockStatusMessage := make([]*heartbeatpb.TableSpanBlockStatus, 0) redoBlockStatusMessage := make([]*heartbeatpb.TableSpanBlockStatus, 0) - select { - case <-ctx.Done(): + blockStatus := e.sharedInfo.TakeBlockStatus(ctx) + if blockStatus == nil { return - case blockStatus := <-e.sharedInfo.GetBlockStatusesChan(): + } + if common.IsDefaultMode(blockStatus.Mode) { + blockStatusMessage = append(blockStatusMessage, blockStatus) + } else { + redoBlockStatusMessage = append(redoBlockStatusMessage, blockStatus) + } + + batchCtx, cancel := context.WithTimeout(ctx, 10*time.Millisecond) + for { + blockStatus = e.sharedInfo.TakeBlockStatus(batchCtx) + if blockStatus == nil { + break + } if common.IsDefaultMode(blockStatus.Mode) { blockStatusMessage = append(blockStatusMessage, blockStatus) } else { redoBlockStatusMessage = append(redoBlockStatusMessage, blockStatus) } - delay.Reset(10 * time.Millisecond) - loop: - for { - select { - case blockStatus := <-e.sharedInfo.GetBlockStatusesChan(): - if common.IsDefaultMode(blockStatus.Mode) { - blockStatusMessage = append(blockStatusMessage, blockStatus) - } else { - redoBlockStatusMessage = append(redoBlockStatusMessage, blockStatus) - } - case <-delay.C: - break loop - } - } + } + cancel() + if ctx.Err() != nil { + return + } - e.metricBlockStatusesChanLen.Set(float64(len(e.sharedInfo.GetBlockStatusesChan()))) - if len(blockStatusMessage) != 0 { - enqueueBlockStatus(blockStatusMessage, common.DefaultMode) - } - if len(redoBlockStatusMessage) != 0 { - enqueueBlockStatus(redoBlockStatusMessage, common.RedoMode) - } + e.metricBlockStatusesChanLen.Set(float64(e.sharedInfo.BlockStatusLen())) + if len(blockStatusMessage) != 0 { + enqueueBlockStatus(blockStatusMessage, common.DefaultMode) + } + if len(redoBlockStatusMessage) != 0 { + enqueueBlockStatus(redoBlockStatusMessage, common.RedoMode) } } } diff --git a/downstreamadapter/dispatchermanager/dispatcher_manager_test.go b/downstreamadapter/dispatchermanager/dispatcher_manager_test.go index d26a6193d8..f48d52df21 100644 --- a/downstreamadapter/dispatchermanager/dispatcher_manager_test.go +++ b/downstreamadapter/dispatchermanager/dispatcher_manager_test.go @@ -80,7 +80,7 @@ func createTestDispatcher(t *testing.T, manager *DispatcherManager, id common.Di &defaultAtomicity, false, make(chan dispatcher.TableSpanStatusWithSeq, 1), - make(chan *heartbeatpb.TableSpanBlockStatus, 1), + 1, make(chan error, 1), ) d := dispatcher.NewEventDispatcher( @@ -141,7 +141,7 @@ func createTestManager(t *testing.T) *DispatcherManager { &defaultAtomicity, false, make(chan dispatcher.TableSpanStatusWithSeq, 8192), - make(chan *heartbeatpb.TableSpanBlockStatus, 1024*1024), + 1024*1024, make(chan error, 1), ) nodeID := node.NewID() diff --git a/downstreamadapter/dispatchermanager/heartbeat_collector.go b/downstreamadapter/dispatchermanager/heartbeat_collector.go index 99ef678b14..67417face8 100644 --- a/downstreamadapter/dispatchermanager/heartbeat_collector.go +++ b/downstreamadapter/dispatchermanager/heartbeat_collector.go @@ -248,6 +248,7 @@ func (c *HeartBeatCollector) sendBlockStatusMessages(ctx context.Context) error messaging.MaintainerManagerTopic, blockStatusRequestWithTargetID.Request, )) + c.blockStatusReqQueue.OnSendComplete(blockStatusRequestWithTargetID) if err != nil { log.Error("failed to send block status request message", zap.Error(err)) } diff --git a/downstreamadapter/dispatchermanager/heartbeat_queue.go b/downstreamadapter/dispatchermanager/heartbeat_queue.go index e93b59f401..af4fe6b7c6 100644 --- a/downstreamadapter/dispatchermanager/heartbeat_queue.go +++ b/downstreamadapter/dispatchermanager/heartbeat_queue.go @@ -15,8 +15,10 @@ package dispatchermanager import ( "context" + "sync" "github.com/pingcap/ticdc/heartbeatpb" + "github.com/pingcap/ticdc/pkg/common" "github.com/pingcap/ticdc/pkg/metrics" "github.com/pingcap/ticdc/pkg/node" ) @@ -62,15 +64,30 @@ type BlockStatusRequestWithTargetID struct { // BlockStatusRequestQueue is a channel for all event dispatcher managers to send block status requests to HeartBeatCollector type BlockStatusRequestQueue struct { queue chan *BlockStatusRequestWithTargetID + + mu sync.Mutex + requestStatusKeys map[*BlockStatusRequestWithTargetID][]blockStatusRequestDedupeKey + queuedStatuses map[blockStatusRequestDedupeKey]struct{} + inFlightStatuses map[blockStatusRequestDedupeKey]struct{} } func NewBlockStatusRequestQueue() *BlockStatusRequestQueue { return &BlockStatusRequestQueue{ - queue: make(chan *BlockStatusRequestWithTargetID, 10000), + queue: make(chan *BlockStatusRequestWithTargetID, 10000), + requestStatusKeys: make(map[*BlockStatusRequestWithTargetID][]blockStatusRequestDedupeKey), + queuedStatuses: make(map[blockStatusRequestDedupeKey]struct{}), + inFlightStatuses: make(map[blockStatusRequestDedupeKey]struct{}), } } func (q *BlockStatusRequestQueue) Enqueue(request *BlockStatusRequestWithTargetID) { + if request == nil || request.Request == nil { + return + } + if !q.trackPendingStatuses(request) { + metrics.HeartbeatCollectorBlockStatusRequestQueueLenGauge.Set(float64(len(q.queue))) + return + } q.queue <- request metrics.HeartbeatCollectorBlockStatusRequestQueueLenGauge.Set(float64(len(q.queue))) } @@ -80,11 +97,102 @@ func (q *BlockStatusRequestQueue) Dequeue(ctx context.Context) *BlockStatusReque case <-ctx.Done(): return nil case request := <-q.queue: + q.markInFlight(request) metrics.HeartbeatCollectorBlockStatusRequestQueueLenGauge.Set(float64(len(q.queue))) return request } } +func (q *BlockStatusRequestQueue) OnSendComplete(request *BlockStatusRequestWithTargetID) { + if request == nil { + return + } + + q.mu.Lock() + defer q.mu.Unlock() + for _, key := range q.requestStatusKeys[request] { + delete(q.inFlightStatuses, key) + } + delete(q.requestStatusKeys, request) +} + func (q *BlockStatusRequestQueue) Close() { close(q.queue) } + +type blockStatusRequestDedupeKey struct { + targetID node.ID + dispatcherID common.DispatcherID + blockTs uint64 + mode int64 + isSyncPoint bool + stage heartbeatpb.BlockStage +} + +func (q *BlockStatusRequestQueue) trackPendingStatuses(request *BlockStatusRequestWithTargetID) bool { + statuses := request.Request.BlockStatuses + filtered := statuses[:0] + statusKeys := make([]blockStatusRequestDedupeKey, 0) + + q.mu.Lock() + defer q.mu.Unlock() + + for _, status := range statuses { + if !shouldDeduplicateRequestStatus(status) { + filtered = append(filtered, status) + continue + } + + key := blockStatusRequestDedupeKey{ + targetID: request.TargetID, + dispatcherID: common.NewDispatcherIDFromPB(status.ID), + blockTs: status.State.BlockTs, + mode: status.Mode, + isSyncPoint: status.State.IsSyncPoint, + stage: status.State.Stage, + } + if _, ok := q.queuedStatuses[key]; ok { + continue + } + if _, ok := q.inFlightStatuses[key]; ok { + continue + } + + filtered = append(filtered, status) + q.queuedStatuses[key] = struct{}{} + statusKeys = append(statusKeys, key) + } + + request.Request.BlockStatuses = filtered + if len(statusKeys) > 0 { + q.requestStatusKeys[request] = statusKeys + } + return len(filtered) > 0 +} + +func (q *BlockStatusRequestQueue) markInFlight(request *BlockStatusRequestWithTargetID) { + q.mu.Lock() + defer q.mu.Unlock() + for _, key := range q.requestStatusKeys[request] { + delete(q.queuedStatuses, key) + q.inFlightStatuses[key] = struct{}{} + } +} + +func isWaitingRequestStatus(status *heartbeatpb.TableSpanBlockStatus) bool { + return status != nil && + status.State != nil && + status.State.IsBlocked && + status.State.Stage == heartbeatpb.BlockStage_WAITING +} + +func shouldDeduplicateRequestStatus(status *heartbeatpb.TableSpanBlockStatus) bool { + return isDoneRequestStatus(status) || isWaitingRequestStatus(status) +} + +func isDoneRequestStatus(status *heartbeatpb.TableSpanBlockStatus) bool { + return status != nil && + status.State != nil && + status.State.IsBlocked && + status.State.Stage == heartbeatpb.BlockStage_DONE +} diff --git a/downstreamadapter/dispatchermanager/heartbeat_queue_test.go b/downstreamadapter/dispatchermanager/heartbeat_queue_test.go new file mode 100644 index 0000000000..e639bb9216 --- /dev/null +++ b/downstreamadapter/dispatchermanager/heartbeat_queue_test.go @@ -0,0 +1,250 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package dispatchermanager + +import ( + "context" + "testing" + "time" + + "github.com/pingcap/ticdc/heartbeatpb" + "github.com/pingcap/ticdc/pkg/common" + "github.com/pingcap/ticdc/pkg/node" + "github.com/stretchr/testify/require" +) + +func TestBlockStatusRequestQueueDeduplicatesQueuedAndInFlightDone(t *testing.T) { + queue := NewBlockStatusRequestQueue() + targetID := node.NewID() + dispatcherID := common.NewDispatcherID() + + first := newDoneBlockStatusRequest(targetID, dispatcherID, 100) + second := newDoneBlockStatusRequest(targetID, dispatcherID, 100) + + queue.Enqueue(first) + queue.Enqueue(second) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + dequeued := queue.Dequeue(ctx) + require.Same(t, first, dequeued) + + shortCtx, shortCancel := context.WithTimeout(context.Background(), 20*time.Millisecond) + defer shortCancel() + require.Nil(t, queue.Dequeue(shortCtx)) + + third := newDoneBlockStatusRequest(targetID, dispatcherID, 100) + queue.Enqueue(third) + + shortCtx2, shortCancel2 := context.WithTimeout(context.Background(), 20*time.Millisecond) + defer shortCancel2() + require.Nil(t, queue.Dequeue(shortCtx2)) + + queue.OnSendComplete(dequeued) + + fourth := newDoneBlockStatusRequest(targetID, dispatcherID, 100) + queue.Enqueue(fourth) + + ctx2, cancel2 := context.WithTimeout(context.Background(), time.Second) + defer cancel2() + require.Same(t, fourth, queue.Dequeue(ctx2)) +} + +func TestBlockStatusRequestQueueDeduplicatesQueuedAndInFlightWaiting(t *testing.T) { + queue := NewBlockStatusRequestQueue() + targetID := node.NewID() + dispatcherID := common.NewDispatcherID() + + first := &BlockStatusRequestWithTargetID{ + TargetID: targetID, + Request: &heartbeatpb.BlockStatusRequest{ + BlockStatuses: []*heartbeatpb.TableSpanBlockStatus{ + { + ID: dispatcherID.ToPB(), + State: &heartbeatpb.State{ + IsBlocked: true, + BlockTs: 101, + Stage: heartbeatpb.BlockStage_WAITING, + }, + Mode: common.DefaultMode, + }, + }, + Mode: common.DefaultMode, + }, + } + second := &BlockStatusRequestWithTargetID{ + TargetID: targetID, + Request: &heartbeatpb.BlockStatusRequest{ + BlockStatuses: []*heartbeatpb.TableSpanBlockStatus{ + { + ID: dispatcherID.ToPB(), + State: &heartbeatpb.State{ + IsBlocked: true, + BlockTs: 101, + Stage: heartbeatpb.BlockStage_WAITING, + }, + Mode: common.DefaultMode, + }, + }, + Mode: common.DefaultMode, + }, + } + + queue.Enqueue(first) + queue.Enqueue(second) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + dequeued := queue.Dequeue(ctx) + require.Same(t, first, dequeued) + + shortCtx, shortCancel := context.WithTimeout(context.Background(), 20*time.Millisecond) + defer shortCancel() + require.Nil(t, queue.Dequeue(shortCtx)) + + third := &BlockStatusRequestWithTargetID{ + TargetID: targetID, + Request: &heartbeatpb.BlockStatusRequest{ + BlockStatuses: []*heartbeatpb.TableSpanBlockStatus{ + { + ID: dispatcherID.ToPB(), + State: &heartbeatpb.State{ + IsBlocked: true, + BlockTs: 101, + Stage: heartbeatpb.BlockStage_WAITING, + }, + Mode: common.DefaultMode, + }, + }, + Mode: common.DefaultMode, + }, + } + queue.Enqueue(third) + + shortCtx2, shortCancel2 := context.WithTimeout(context.Background(), 20*time.Millisecond) + defer shortCancel2() + require.Nil(t, queue.Dequeue(shortCtx2)) + + queue.OnSendComplete(dequeued) + + fourth := &BlockStatusRequestWithTargetID{ + TargetID: targetID, + Request: &heartbeatpb.BlockStatusRequest{ + BlockStatuses: []*heartbeatpb.TableSpanBlockStatus{ + { + ID: dispatcherID.ToPB(), + State: &heartbeatpb.State{ + IsBlocked: true, + BlockTs: 101, + Stage: heartbeatpb.BlockStage_WAITING, + }, + Mode: common.DefaultMode, + }, + }, + Mode: common.DefaultMode, + }, + } + queue.Enqueue(fourth) + + ctx2, cancel2 := context.WithTimeout(context.Background(), time.Second) + defer cancel2() + require.Same(t, fourth, queue.Dequeue(ctx2)) +} + +func TestBlockStatusRequestQueueKeepsDistinctDoneKeys(t *testing.T) { + queue := NewBlockStatusRequestQueue() + targetID := node.NewID() + dispatcherID := common.NewDispatcherID() + + defaultDone := newDoneBlockStatusRequest(targetID, dispatcherID, 200) + syncPointDone := newDoneBlockStatusRequest(targetID, dispatcherID, 200) + syncPointDone.Request.BlockStatuses[0].State.IsSyncPoint = true + redoDone := newDoneBlockStatusRequest(targetID, dispatcherID, 200) + redoDone.Request.BlockStatuses[0].Mode = common.RedoMode + redoDone.Request.Mode = common.RedoMode + + queue.Enqueue(defaultDone) + queue.Enqueue(syncPointDone) + queue.Enqueue(redoDone) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + require.Same(t, defaultDone, queue.Dequeue(ctx)) + queue.OnSendComplete(defaultDone) + + ctx2, cancel2 := context.WithTimeout(context.Background(), time.Second) + defer cancel2() + require.Same(t, syncPointDone, queue.Dequeue(ctx2)) + queue.OnSendComplete(syncPointDone) + + ctx3, cancel3 := context.WithTimeout(context.Background(), time.Second) + defer cancel3() + require.Same(t, redoDone, queue.Dequeue(ctx3)) +} + +func TestBlockStatusRequestQueueKeepsWaitingAndDoneDistinct(t *testing.T) { + queue := NewBlockStatusRequestQueue() + targetID := node.NewID() + dispatcherID := common.NewDispatcherID() + + waiting := &BlockStatusRequestWithTargetID{ + TargetID: targetID, + Request: &heartbeatpb.BlockStatusRequest{ + BlockStatuses: []*heartbeatpb.TableSpanBlockStatus{ + { + ID: dispatcherID.ToPB(), + State: &heartbeatpb.State{ + IsBlocked: true, + BlockTs: 300, + Stage: heartbeatpb.BlockStage_WAITING, + }, + Mode: common.DefaultMode, + }, + }, + Mode: common.DefaultMode, + }, + } + done := newDoneBlockStatusRequest(targetID, dispatcherID, 300) + + queue.Enqueue(waiting) + queue.Enqueue(done) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + require.Same(t, waiting, queue.Dequeue(ctx)) + + ctx2, cancel2 := context.WithTimeout(context.Background(), time.Second) + defer cancel2() + require.Same(t, done, queue.Dequeue(ctx2)) +} + +func newDoneBlockStatusRequest(targetID node.ID, dispatcherID common.DispatcherID, blockTs uint64) *BlockStatusRequestWithTargetID { + return &BlockStatusRequestWithTargetID{ + TargetID: targetID, + Request: &heartbeatpb.BlockStatusRequest{ + BlockStatuses: []*heartbeatpb.TableSpanBlockStatus{ + { + ID: dispatcherID.ToPB(), + State: &heartbeatpb.State{ + IsBlocked: true, + BlockTs: blockTs, + Stage: heartbeatpb.BlockStage_DONE, + }, + Mode: common.DefaultMode, + }, + }, + Mode: common.DefaultMode, + }, + } +} diff --git a/maintainer/maintainer.go b/maintainer/maintainer.go index 3b31642a6c..2dc215665c 100644 --- a/maintainer/maintainer.go +++ b/maintainer/maintainer.go @@ -69,8 +69,9 @@ type Maintainer struct { selfNode *node.Info controller *Controller - pdClock pdutil.Clock - eventCh *chann.DrainableChann[*Event] + pdClock pdutil.Clock + eventCh *chann.DrainableChann[*Event] + statusRequestNotifyCh chan struct{} mc messaging.MessageCenter @@ -163,6 +164,14 @@ type Maintainer struct { redoScheduledTaskGauge prometheus.Gauge redoSpanCountGauge prometheus.Gauge redoTableCountGauge prometheus.Gauge + + bufferedStatusRequests bufferedStatusRequestBuffer + + droppedStatusRequests struct { + sync.Mutex + count uint64 + lastLogTime time.Time + } } // NewMaintainer create the maintainer for the changefeed @@ -196,10 +205,11 @@ func NewMaintainer(cfID common.ChangeFeedID, Name: keyspaceName, } m := &Maintainer{ - changefeedID: cfID, - selfNode: selfNode, - eventCh: chann.NewAutoDrainChann[*Event](), - startCheckpointTs: checkpointTs, + changefeedID: cfID, + selfNode: selfNode, + eventCh: chann.NewAutoDrainChann[*Event](), + statusRequestNotifyCh: make(chan struct{}, 1), + startCheckpointTs: checkpointTs, controller: NewController(cfID, checkpointTs, taskScheduler, info.Config, ddlSpan, redoDDLSpan, conf.AddTableBatchSize, time.Duration(conf.CheckBalanceInterval), refresher, keyspaceMeta, enableRedo), mc: mc, @@ -1158,6 +1168,8 @@ func (m *Maintainer) runHandleEvents(ctx context.Context) { return case event := <-m.eventCh.Out(): m.HandleEvent(event) + case <-m.statusRequestNotifyCh: + m.handleBufferedStatusRequests() case <-ticker.C: m.HandleEvent(&Event{ changefeedID: m.changefeedID, @@ -1169,8 +1181,31 @@ func (m *Maintainer) runHandleEvents(ctx context.Context) { // pushEvent is used to push event to maintainer's event channel // event will be handled by maintainer's main loop -func (m *Maintainer) pushEvent(event *Event) { +func (m *Maintainer) pushEvent(event *Event) bool { + if handled, accepted := m.tryBufferStatusRequestEvent(event); handled { + return accepted + } m.eventCh.In() <- event + return true +} + +func (m *Maintainer) recordDroppedStatusRequest(msgType messaging.IOType, reason string) { + m.droppedStatusRequests.Lock() + defer m.droppedStatusRequests.Unlock() + + m.droppedStatusRequests.count++ + now := time.Now() + if !m.droppedStatusRequests.lastLogTime.IsZero() && now.Sub(m.droppedStatusRequests.lastLogTime) < 5*time.Second { + return + } + m.droppedStatusRequests.lastLogTime = now + + log.Warn("drop maintainer status request", + zap.Stringer("changefeedID", m.changefeedID), + zap.String("type", msgType.String()), + zap.String("reason", reason), + zap.Int("eventChLen", m.eventCh.Len()), + zap.Uint64("droppedCount", m.droppedStatusRequests.count)) } func (m *Maintainer) getWatermark() heartbeatpb.Watermark { diff --git a/maintainer/maintainer_test.go b/maintainer/maintainer_test.go index 8c06121e47..84b452c1a1 100644 --- a/maintainer/maintainer_test.go +++ b/maintainer/maintainer_test.go @@ -33,6 +33,7 @@ import ( "github.com/pingcap/ticdc/pkg/node" "github.com/pingcap/ticdc/pkg/pdutil" "github.com/pingcap/ticdc/server/watcher" + "github.com/pingcap/ticdc/utils/chann" "github.com/pingcap/ticdc/utils/threadpool" "github.com/stretchr/testify/require" "go.uber.org/atomic" @@ -256,6 +257,189 @@ func (m *mockDispatcherManager) sendHeartbeat() { } } +func TestMaintainerPushEventDropsStatusRequestBeforeInitialized(t *testing.T) { + m := &Maintainer{ + changefeedID: common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceName), + eventCh: chann.NewAutoDrainChann[*Event](chann.Cap(8)), + statusRequestNotifyCh: make(chan struct{}, 1), + removed: atomic.NewBool(false), + } + + pushed := m.pushEvent(&Event{ + changefeedID: m.changefeedID, + eventType: EventMessage, + message: &messaging.TargetMessage{Type: messaging.TypeBlockStatusRequest}, + }) + + require.False(t, pushed) + require.Equal(t, 0, m.eventCh.Len()) +} + +func TestMaintainerPushEventBuffersBlockStatusRequest(t *testing.T) { + dispatcherID := common.NewDispatcherID() + from := node.ID("node-1") + m := &Maintainer{ + changefeedID: common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceName), + eventCh: chann.NewAutoDrainChann[*Event](chann.Cap(8)), + statusRequestNotifyCh: make(chan struct{}, 1), + removed: atomic.NewBool(false), + } + m.initialized.Store(true) + + req := &heartbeatpb.BlockStatusRequest{ + ChangefeedID: m.changefeedID.ToPB(), + Mode: common.DefaultMode, + BlockStatuses: []*heartbeatpb.TableSpanBlockStatus{ + { + ID: dispatcherID.ToPB(), + State: &heartbeatpb.State{ + IsBlocked: true, + BlockTs: 100, + IsSyncPoint: true, + Stage: heartbeatpb.BlockStage_WAITING, + }, + Mode: common.DefaultMode, + }, + }, + } + + require.True(t, m.pushEvent(&Event{ + changefeedID: m.changefeedID, + eventType: EventMessage, + message: &messaging.TargetMessage{ + From: from, + Type: messaging.TypeBlockStatusRequest, + Message: []messaging.IOTypeT{req}, + }, + })) + require.True(t, m.pushEvent(&Event{ + changefeedID: m.changefeedID, + eventType: EventMessage, + message: &messaging.TargetMessage{ + From: from, + Type: messaging.TypeBlockStatusRequest, + Message: []messaging.IOTypeT{req}, + }, + })) + + require.Equal(t, 0, m.eventCh.Len()) + require.Equal(t, 1, len(m.statusRequestNotifyCh)) + + events := m.takeBufferedStatusRequestEvents() + require.Len(t, events, 1) + bufferedReq := events[0].message.Message[0].(*heartbeatpb.BlockStatusRequest) + require.Len(t, bufferedReq.BlockStatuses, 1) + require.Equal(t, from, events[0].message.From) +} + +func TestMaintainerPushEventBuffersBlockStatusRequestByModeAndBlockTs(t *testing.T) { + from := node.ID("node-1") + changefeedID := common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceName) + defaultDispatcherID := common.NewDispatcherID() + redoDispatcherID := common.NewDispatcherID() + + newBlockStatus := func(dispatcherID common.DispatcherID, blockTs uint64, mode int64) *heartbeatpb.TableSpanBlockStatus { + return &heartbeatpb.TableSpanBlockStatus{ + ID: dispatcherID.ToPB(), + State: &heartbeatpb.State{ + IsBlocked: true, + BlockTs: blockTs, + IsSyncPoint: true, + Stage: heartbeatpb.BlockStage_WAITING, + }, + Mode: mode, + } + } + + m := &Maintainer{ + changefeedID: changefeedID, + eventCh: chann.NewAutoDrainChann[*Event](chann.Cap(8)), + statusRequestNotifyCh: make(chan struct{}, 1), + removed: atomic.NewBool(false), + } + m.initialized.Store(true) + + newBlockStatusRequest := func(mode int64, statuses ...*heartbeatpb.TableSpanBlockStatus) *heartbeatpb.BlockStatusRequest { + return &heartbeatpb.BlockStatusRequest{ + ChangefeedID: changefeedID.ToPB(), + Mode: mode, + BlockStatuses: statuses, + } + } + + require.True(t, m.pushEvent(&Event{ + changefeedID: m.changefeedID, + eventType: EventMessage, + message: &messaging.TargetMessage{ + From: from, + Type: messaging.TypeBlockStatusRequest, + Message: []messaging.IOTypeT{ + newBlockStatusRequest(common.DefaultMode, + newBlockStatus(defaultDispatcherID, 100, common.DefaultMode)), + }, + }, + })) + require.True(t, m.pushEvent(&Event{ + changefeedID: m.changefeedID, + eventType: EventMessage, + message: &messaging.TargetMessage{ + From: from, + Type: messaging.TypeBlockStatusRequest, + Message: []messaging.IOTypeT{ + newBlockStatusRequest(common.DefaultMode, + newBlockStatus(defaultDispatcherID, 200, common.DefaultMode)), + }, + }, + })) + require.True(t, m.pushEvent(&Event{ + changefeedID: m.changefeedID, + eventType: EventMessage, + message: &messaging.TargetMessage{ + From: from, + Type: messaging.TypeBlockStatusRequest, + Message: []messaging.IOTypeT{ + newBlockStatusRequest(common.RedoMode, + newBlockStatus(redoDispatcherID, 300, common.RedoMode)), + }, + }, + })) + + events := m.takeBufferedStatusRequestEvents() + require.Len(t, events, 2) + + blockTsByMode := make(map[int64][]uint64, 2) + for _, event := range events { + require.Equal(t, from, event.message.From) + req := event.message.Message[0].(*heartbeatpb.BlockStatusRequest) + for _, status := range req.BlockStatuses { + blockTsByMode[req.Mode] = append(blockTsByMode[req.Mode], status.State.BlockTs) + } + } + + require.ElementsMatch(t, []uint64{100, 200}, blockTsByMode[common.DefaultMode]) + require.ElementsMatch(t, []uint64{300}, blockTsByMode[common.RedoMode]) +} + +func TestMaintainerPushEventKeepsBootstrapResponseOutsideStatusBuffer(t *testing.T) { + m := &Maintainer{ + changefeedID: common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceName), + eventCh: chann.NewAutoDrainChann[*Event](chann.Cap(8)), + statusRequestNotifyCh: make(chan struct{}, 1), + removed: atomic.NewBool(false), + } + m.initialized.Store(true) + + pushed := m.pushEvent(&Event{ + changefeedID: m.changefeedID, + eventType: EventMessage, + message: &messaging.TargetMessage{Type: messaging.TypeMaintainerBootstrapResponse}, + }) + + require.True(t, pushed) + require.Equal(t, 1, m.eventCh.Len()) + require.Equal(t, 0, len(m.statusRequestNotifyCh)) +} + func TestMaintainerSchedule(t *testing.T) { // This test exercises a single-node maintainer lifecycle: // 1) Bootstrap a changefeed via the dispatcher manager mock. diff --git a/maintainer/status_request_buffer.go b/maintainer/status_request_buffer.go new file mode 100644 index 0000000000..ab2849c670 --- /dev/null +++ b/maintainer/status_request_buffer.go @@ -0,0 +1,299 @@ +package maintainer + +import ( + "sync" + + "github.com/pingcap/ticdc/heartbeatpb" + "github.com/pingcap/ticdc/pkg/common" + "github.com/pingcap/ticdc/pkg/messaging" + "github.com/pingcap/ticdc/pkg/node" +) + +type bufferedStatusRequestBuffer struct { + sync.Mutex + + heartbeats map[node.ID]*bufferedHeartbeatRequest + blockStatuses map[bufferedBlockStatusSourceKey]*bufferedBlockStatusRequest + redoResolvedTs map[node.ID]*heartbeatpb.RedoResolvedTsProgressMessage +} + +type bufferedHeartbeatRequest struct { + changefeedID *heartbeatpb.ChangefeedID + watermark *heartbeatpb.Watermark + redoWatermark *heartbeatpb.Watermark + statuses map[heartbeatStatusKey]*heartbeatpb.TableSpanStatus + order []heartbeatStatusKey + err *heartbeatpb.RunningError + completeStatus bool +} + +type heartbeatStatusKey struct { + dispatcherID common.DispatcherID + mode int64 +} + +type bufferedBlockStatusRequest struct { + changefeedID *heartbeatpb.ChangefeedID + from node.ID + mode int64 + statuses map[blockStatusKey]*heartbeatpb.TableSpanBlockStatus + order []blockStatusKey +} + +type bufferedBlockStatusSourceKey struct { + from node.ID + mode int64 +} + +type blockStatusKey struct { + dispatcherID common.DispatcherID + blockTs uint64 + mode int64 + isSyncPoint bool + stage heartbeatpb.BlockStage +} + +func isBufferableStatusRequest(msgType messaging.IOType) bool { + switch msgType { + case messaging.TypeHeartBeatRequest, + messaging.TypeBlockStatusRequest, + messaging.TypeRedoResolvedTsProgressMessage: + return true + default: + return false + } +} + +func (m *Maintainer) tryBufferStatusRequestEvent(event *Event) (bool, bool) { + if event == nil || event.eventType != EventMessage || event.message == nil { + return false, false + } + if !isBufferableStatusRequest(event.message.Type) { + return false, false + } + if !m.initialized.Load() { + m.recordDroppedStatusRequest(event.message.Type, "maintainer not initialized") + return true, false + } + if m.removing.Load() || (m.removed != nil && m.removed.Load()) { + m.recordDroppedStatusRequest(event.message.Type, "maintainer is closing") + return true, false + } + + m.bufferStatusRequest(event.message) + select { + case m.statusRequestNotifyCh <- struct{}{}: + default: + } + return true, true +} + +func (m *Maintainer) bufferStatusRequest(msg *messaging.TargetMessage) { + switch msg.Type { + case messaging.TypeHeartBeatRequest: + req := msg.Message[0].(*heartbeatpb.HeartBeatRequest) + m.bufferedStatusRequests.mergeHeartbeat(msg.From, req) + case messaging.TypeBlockStatusRequest: + req := msg.Message[0].(*heartbeatpb.BlockStatusRequest) + m.bufferedStatusRequests.mergeBlockStatus(msg.From, req) + case messaging.TypeRedoResolvedTsProgressMessage: + req := msg.Message[0].(*heartbeatpb.RedoResolvedTsProgressMessage) + m.bufferedStatusRequests.mergeRedoResolvedTs(msg.From, req) + } +} + +func (m *Maintainer) handleBufferedStatusRequests() { + for _, event := range m.takeBufferedStatusRequestEvents() { + m.HandleEvent(event) + } +} + +func (m *Maintainer) takeBufferedStatusRequestEvents() []*Event { + return m.bufferedStatusRequests.drain(m.changefeedID) +} + +func (b *bufferedStatusRequestBuffer) mergeHeartbeat(from node.ID, req *heartbeatpb.HeartBeatRequest) { + if req == nil { + return + } + b.Lock() + defer b.Unlock() + if b.heartbeats == nil { + b.heartbeats = make(map[node.ID]*bufferedHeartbeatRequest) + } + entry, ok := b.heartbeats[from] + if !ok { + entry = &bufferedHeartbeatRequest{ + changefeedID: req.ChangefeedID, + statuses: make(map[heartbeatStatusKey]*heartbeatpb.TableSpanStatus), + } + b.heartbeats[from] = entry + } + entry.changefeedID = req.ChangefeedID + entry.completeStatus = entry.completeStatus || req.CompeleteStatus + entry.err = pickRunningError(entry.err, req.Err) + entry.watermark = pickWatermark(entry.watermark, req.Watermark) + entry.redoWatermark = pickWatermark(entry.redoWatermark, req.RedoWatermark) + for _, status := range req.Statuses { + if status == nil || status.ID == nil { + continue + } + key := heartbeatStatusKey{ + dispatcherID: common.NewDispatcherIDFromPB(status.ID), + mode: status.Mode, + } + if _, exists := entry.statuses[key]; !exists { + entry.order = append(entry.order, key) + } + entry.statuses[key] = status + } +} + +func (b *bufferedStatusRequestBuffer) mergeBlockStatus(from node.ID, req *heartbeatpb.BlockStatusRequest) { + if req == nil { + return + } + b.Lock() + defer b.Unlock() + if b.blockStatuses == nil { + b.blockStatuses = make(map[bufferedBlockStatusSourceKey]*bufferedBlockStatusRequest) + } + sourceKey := bufferedBlockStatusSourceKey{ + from: from, + mode: req.Mode, + } + entry, ok := b.blockStatuses[sourceKey] + if !ok { + entry = &bufferedBlockStatusRequest{ + changefeedID: req.ChangefeedID, + from: from, + mode: req.Mode, + statuses: make(map[blockStatusKey]*heartbeatpb.TableSpanBlockStatus), + } + b.blockStatuses[sourceKey] = entry + } + entry.changefeedID = req.ChangefeedID + entry.mode = req.Mode + for _, status := range req.BlockStatuses { + if status == nil || status.ID == nil || status.State == nil { + continue + } + key := blockStatusKey{ + dispatcherID: common.NewDispatcherIDFromPB(status.ID), + blockTs: status.State.BlockTs, + mode: status.Mode, + isSyncPoint: status.State.IsSyncPoint, + stage: status.State.Stage, + } + if _, exists := entry.statuses[key]; !exists { + entry.order = append(entry.order, key) + } + entry.statuses[key] = status + } +} + +func (b *bufferedStatusRequestBuffer) mergeRedoResolvedTs(from node.ID, req *heartbeatpb.RedoResolvedTsProgressMessage) { + if req == nil { + return + } + b.Lock() + defer b.Unlock() + if b.redoResolvedTs == nil { + b.redoResolvedTs = make(map[node.ID]*heartbeatpb.RedoResolvedTsProgressMessage) + } + current, ok := b.redoResolvedTs[from] + if !ok || req.ResolvedTs > current.ResolvedTs { + b.redoResolvedTs[from] = req + } +} + +func (b *bufferedStatusRequestBuffer) drain(changefeedID common.ChangeFeedID) []*Event { + var events []*Event + b.Lock() + defer b.Unlock() + + for _, entry := range b.blockStatuses { + req := &heartbeatpb.BlockStatusRequest{ + ChangefeedID: entry.changefeedID, + Mode: entry.mode, + BlockStatuses: make([]*heartbeatpb.TableSpanBlockStatus, 0, len(entry.order)), + } + for _, key := range entry.order { + if status, ok := entry.statuses[key]; ok { + req.BlockStatuses = append(req.BlockStatuses, status) + } + } + if len(req.BlockStatuses) > 0 { + events = append(events, newBufferedStatusEvent(changefeedID, entry.from, messaging.TypeBlockStatusRequest, req)) + } + } + b.blockStatuses = nil + + for from, entry := range b.heartbeats { + req := &heartbeatpb.HeartBeatRequest{ + ChangefeedID: entry.changefeedID, + Statuses: make([]*heartbeatpb.TableSpanStatus, 0, len(entry.order)), + Watermark: entry.watermark, + RedoWatermark: entry.redoWatermark, + Err: entry.err, + CompeleteStatus: entry.completeStatus, + } + for _, key := range entry.order { + if status, ok := entry.statuses[key]; ok { + req.Statuses = append(req.Statuses, status) + } + } + if len(req.Statuses) > 0 || req.Watermark != nil || req.RedoWatermark != nil || req.Err != nil { + events = append(events, newBufferedStatusEvent(changefeedID, from, messaging.TypeHeartBeatRequest, req)) + } + } + b.heartbeats = nil + + for from, req := range b.redoResolvedTs { + events = append(events, newBufferedStatusEvent(changefeedID, from, messaging.TypeRedoResolvedTsProgressMessage, req)) + } + b.redoResolvedTs = nil + + return events +} + +func newBufferedStatusEvent( + changefeedID common.ChangeFeedID, + from node.ID, + msgType messaging.IOType, + msg messaging.IOTypeT, +) *Event { + return &Event{ + changefeedID: changefeedID, + eventType: EventMessage, + message: &messaging.TargetMessage{ + From: from, + To: "", + Type: msgType, + Message: []messaging.IOTypeT{msg}, + }, + } +} + +func pickRunningError(current, candidate *heartbeatpb.RunningError) *heartbeatpb.RunningError { + if candidate != nil { + return candidate + } + return current +} + +func pickWatermark(current, candidate *heartbeatpb.Watermark) *heartbeatpb.Watermark { + if candidate == nil { + return current + } + if current == nil { + return candidate + } + if candidate.Seq > current.Seq || (candidate.Seq == current.Seq && candidate.CheckpointTs >= current.CheckpointTs) { + current = candidate + } + if candidate.LastSyncedTs > current.LastSyncedTs { + current.LastSyncedTs = candidate.LastSyncedTs + } + return current +} diff --git a/pkg/config/debug.go b/pkg/config/debug.go index 148e927430..5f724d1a62 100644 --- a/pkg/config/debug.go +++ b/pkg/config/debug.go @@ -118,6 +118,14 @@ type EventServiceConfig struct { ScanTaskQueueSize int `toml:"scan-task-queue-size" json:"scan_task_queue_size"` ScanLimitInBytes int `toml:"scan-limit-in-bytes" json:"scan_limit_in_bytes"` + // SyncPointCheckpointCapMultiplier controls how far scan can lead checkpoint when syncpoint is enabled. + // The scan upper bound is capped by checkpointTs + multiplier*syncPointInterval. + SyncPointCheckpointCapMultiplier int `toml:"sync-point-checkpoint-cap-multiplier" json:"sync_point_checkpoint_cap_multiplier"` + // SyncPointLagSuppressThreshold controls when to suppress syncpoint emission for lagging dispatchers. + SyncPointLagSuppressThreshold time.Duration `toml:"sync-point-lag-suppress-threshold" json:"sync_point_lag_suppress_threshold"` + // SyncPointLagResumeThreshold controls when to resume syncpoint emission after suppression. + SyncPointLagResumeThreshold time.Duration `toml:"sync-point-lag-resume-threshold" json:"sync_point_lag_resume_threshold"` + // DMLEventMaxRows is the maximum number of rows in a DML event when split txn is enabled. DMLEventMaxRows int32 `toml:"dml-event-max-rows" json:"dml_event_max_rows"` // DMLEventMaxBytes is the maximum size of a DML event in bytes when split txn is enabled. @@ -133,10 +141,13 @@ type EventServiceConfig struct { // NewDefaultEventServiceConfig return the default event service configuration func NewDefaultEventServiceConfig() *EventServiceConfig { return &EventServiceConfig{ - ScanTaskQueueSize: 1024 * 8, - ScanLimitInBytes: 1024 * 1024 * 256, // 256MB - DMLEventMaxRows: 256, - DMLEventMaxBytes: 1024 * 1024 * 1, // 1MB - EnableRemoteEventService: true, + ScanTaskQueueSize: 1024 * 8, + ScanLimitInBytes: 1024 * 1024 * 256, // 256MB + SyncPointCheckpointCapMultiplier: 2, + SyncPointLagSuppressThreshold: 20 * time.Minute, + SyncPointLagResumeThreshold: 15 * time.Minute, + DMLEventMaxRows: 256, + DMLEventMaxBytes: 1024 * 1024 * 1, // 1MB + EnableRemoteEventService: true, } } diff --git a/pkg/eventservice/dispatcher_stat.go b/pkg/eventservice/dispatcher_stat.go index a99767a704..424caab604 100644 --- a/pkg/eventservice/dispatcher_stat.go +++ b/pkg/eventservice/dispatcher_stat.go @@ -70,7 +70,9 @@ type dispatcherStat struct { enableSyncPoint bool nextSyncPoint atomic.Uint64 syncPointInterval time.Duration - txnAtomicity config.AtomicityLevel + // syncPointSendSuppressed tracks whether syncpoint emission is temporarily suppressed due to lag. + syncPointSendSuppressed atomic.Bool + txnAtomicity config.AtomicityLevel // ============================================================================= // ================== below are fields need copied when reset ================== @@ -428,6 +430,7 @@ type changefeedStatus struct { availableMemoryQuota sync.Map // nodeID -> atomic.Uint64 (memory quota in bytes) minSentTs atomic.Uint64 + minCheckpointTs atomic.Uint64 scanInterval atomic.Int64 lastAdjustTime atomic.Time @@ -436,6 +439,8 @@ type changefeedStatus struct { syncPointInterval time.Duration } +const invalidMinCheckpointTs = ^uint64(0) + func newChangefeedStatus(changefeedID common.ChangeFeedID, syncPointInterval time.Duration) *changefeedStatus { status := &changefeedStatus{ changefeedID: changefeedID, @@ -443,6 +448,7 @@ func newChangefeedStatus(changefeedID common.ChangeFeedID, syncPointInterval tim syncPointInterval: syncPointInterval, } status.scanInterval.Store(int64(defaultScanInterval)) + status.minCheckpointTs.Store(invalidMinCheckpointTs) status.lastAdjustTime.Store(time.Now()) status.lastTrendAdjustTime.Store(time.Now()) @@ -469,3 +475,11 @@ func (c *changefeedStatus) isEmpty() bool { func (c *changefeedStatus) isSyncpointEnabled() bool { return c.syncPointInterval > 0 } + +func (c *changefeedStatus) getMinCheckpointTs() (uint64, bool) { + minCheckpointTs := c.minCheckpointTs.Load() + if minCheckpointTs == invalidMinCheckpointTs { + return 0, false + } + return minCheckpointTs, true +} diff --git a/pkg/eventservice/event_broker.go b/pkg/eventservice/event_broker.go index 49e14d395f..a88e8976d5 100644 --- a/pkg/eventservice/event_broker.go +++ b/pkg/eventservice/event_broker.go @@ -52,6 +52,9 @@ const ( // defaultSendResolvedTsInterval use to control whether to send a resolvedTs event to the dispatcher when its scan is skipped. defaultSendResolvedTsInterval = time.Second * 2 defaultRefreshMinSentResolvedTsInterval = time.Second * 1 + defaultSyncPointCheckpointCapMultiplier = 2 + defaultSyncPointLagSuppressThreshold = 20 * time.Minute + defaultSyncPointLagResumeThreshold = 15 * time.Minute ) // eventBroker get event from the eventStore, and send the event to the dispatchers. @@ -92,8 +95,11 @@ type eventBroker struct { // metricsCollector handles all metrics collection and reporting metricsCollector *metricsCollector - scanRateLimiter *rate.Limiter - scanLimitInBytes uint64 + scanRateLimiter *rate.Limiter + scanLimitInBytes uint64 + syncPointCheckpointCapMultiplier uint64 + syncPointLagSuppressThreshold time.Duration + syncPointLagResumeThreshold time.Duration } func newEventBroker( @@ -116,7 +122,24 @@ func newEventBroker( scanTaskQueueSize := config.GetGlobalServerConfig().Debug.EventService.ScanTaskQueueSize / scanWorkerCount sendMessageQueueSize := basicChannelSize * 4 - scanLimitInBytes := config.GetGlobalServerConfig().Debug.EventService.ScanLimitInBytes + eventServiceConfig := config.GetGlobalServerConfig().Debug.EventService + + scanLimitInBytes := eventServiceConfig.ScanLimitInBytes + syncPointCheckpointCapMultiplier := eventServiceConfig.SyncPointCheckpointCapMultiplier + if syncPointCheckpointCapMultiplier <= 0 { + syncPointCheckpointCapMultiplier = defaultSyncPointCheckpointCapMultiplier + } + syncPointLagSuppressThreshold := eventServiceConfig.SyncPointLagSuppressThreshold + if syncPointLagSuppressThreshold <= 0 { + syncPointLagSuppressThreshold = defaultSyncPointLagSuppressThreshold + } + syncPointLagResumeThreshold := eventServiceConfig.SyncPointLagResumeThreshold + if syncPointLagResumeThreshold <= 0 { + syncPointLagResumeThreshold = defaultSyncPointLagResumeThreshold + } + if syncPointLagResumeThreshold > syncPointLagSuppressThreshold { + syncPointLagResumeThreshold = syncPointLagSuppressThreshold + } g, ctx := errgroup.WithContext(ctx) ctx, cancel := context.WithCancel(ctx) @@ -125,22 +148,25 @@ func newEventBroker( // For now, since there is only one upstream, using the default pdClock is sufficient. pdClock := appcontext.GetService[pdutil.Clock](appcontext.DefaultPDClock) c := &eventBroker{ - tidbClusterID: id, - eventStore: eventStore, - pdClock: pdClock, - mounter: event.NewMounter(tz, integrity), - schemaStore: schemaStore, - changefeedMap: sync.Map{}, - dispatchers: sync.Map{}, - tableTriggerDispatchers: sync.Map{}, - msgSender: mc, - taskChan: make([]chan scanTask, scanWorkerCount), - messageCh: make([]chan *wrapEvent, sendMessageWorkerCount), - redoMessageCh: make([]chan *wrapEvent, sendMessageWorkerCount), - cancel: cancel, - g: g, - scanRateLimiter: rate.NewLimiter(rate.Limit(scanLimitInBytes), scanLimitInBytes), - scanLimitInBytes: uint64(scanLimitInBytes), + tidbClusterID: id, + eventStore: eventStore, + pdClock: pdClock, + mounter: event.NewMounter(tz, integrity), + schemaStore: schemaStore, + changefeedMap: sync.Map{}, + dispatchers: sync.Map{}, + tableTriggerDispatchers: sync.Map{}, + msgSender: mc, + taskChan: make([]chan scanTask, scanWorkerCount), + messageCh: make([]chan *wrapEvent, sendMessageWorkerCount), + redoMessageCh: make([]chan *wrapEvent, sendMessageWorkerCount), + cancel: cancel, + g: g, + scanRateLimiter: rate.NewLimiter(rate.Limit(scanLimitInBytes), scanLimitInBytes), + scanLimitInBytes: uint64(scanLimitInBytes), + syncPointCheckpointCapMultiplier: uint64(syncPointCheckpointCapMultiplier), + syncPointLagSuppressThreshold: syncPointLagSuppressThreshold, + syncPointLagResumeThreshold: syncPointLagResumeThreshold, } // Initialize metrics collector @@ -185,7 +211,12 @@ func newEventBroker( return c.refreshMinSentResolvedTs(ctx) }) - log.Info("new event broker created", zap.Uint64("id", id), zap.Uint64("scanLimitInBytes", c.scanLimitInBytes)) + log.Info("new event broker created", + zap.Uint64("id", id), + zap.Uint64("scanLimitInBytes", c.scanLimitInBytes), + zap.Uint64("syncPointCheckpointCapMultiplier", c.syncPointCheckpointCapMultiplier), + zap.Duration("syncPointLagSuppressThreshold", c.syncPointLagSuppressThreshold), + zap.Duration("syncPointLagResumeThreshold", c.syncPointLagResumeThreshold)) return c } @@ -294,6 +325,7 @@ func (c *eventBroker) sendResolvedTs(d *dispatcherStat, watermark uint64) { resolvedEvent := newWrapResolvedEvent(remoteID, re) c.getMessageCh(d.messageWorkerIndex, common.IsRedoMode(d.info.GetMode())) <- resolvedEvent d.updateSentResolvedTs(watermark) + log.Debug("send resolvedTs", zap.Uint64("resolvedTs", watermark), zap.String("dispatcher", d.id.String())) updateMetricEventServiceSendResolvedTsCount(d.info.GetMode()) } @@ -355,13 +387,19 @@ func (c *eventBroker) tickTableTriggerDispatchers(ctx context.Context) error { return true } stat.receivedResolvedTs.Store(endTs) + //boundedEndTs := c.capCommitTsEndBySyncPoint(stat, endTs) for _, e := range ddlEvents { ep := &e c.sendDDL(ctx, remoteID, ep, stat) } + if endTs > startTs { // After all the events are sent, we send the watermark to the dispatcher. c.sendResolvedTs(stat, endTs) + } else { + // If there is no new ddl event, we still need to send a signal resolved-ts event to keep downstream responsive, + // but do not advance the watermark here. + c.sendSignalResolvedTs(stat) } return true }) @@ -402,6 +440,37 @@ func (c *eventBroker) logUninitializedDispatchers(ctx context.Context) error { } } +// capCommitTsEndBySyncPoint caps the commitTsEnd by the checkpoint bound determined by the sync point configuration. +func (c *eventBroker) capCommitTsEndBySyncPoint(task scanTask, commitTsEnd uint64) uint64 { + + return commitTsEnd + + if !task.enableSyncPoint || task.syncPointInterval <= 0 || c.syncPointCheckpointCapMultiplier == 0 { + return commitTsEnd + } + checkpointTs := task.checkpointTs.Load() + if checkpointTs == 0 { + return commitTsEnd + } + + capDuration := time.Duration(c.syncPointCheckpointCapMultiplier) * task.syncPointInterval + checkpointCapTs := oracle.GoTimeToTS(oracle.GetTimeFromTS(checkpointTs).Add(capDuration)) + if checkpointCapTs >= commitTsEnd { + return commitTsEnd + } + + log.Debug("scan range commitTsEnd capped by checkpoint bound", + zap.Stringer("changefeedID", task.changefeedStat.changefeedID), + zap.Stringer("dispatcherID", task.id), + zap.Uint64("oldCommitTsEnd", commitTsEnd), + zap.Uint64("newCommitTsEnd", checkpointCapTs), + zap.Uint64("checkpointTs", checkpointTs), + zap.Duration("syncPointInterval", task.syncPointInterval), + zap.Uint64("multiplier", c.syncPointCheckpointCapMultiplier)) + metrics.EventServiceScanCappedByCheckpointCount.WithLabelValues(task.changefeedStat.changefeedID.String()).Inc() + return checkpointCapTs +} + // getScanTaskDataRange determines the valid data range for scanning a given task. // It checks various conditions (dispatcher status, DDL state, max commit ts of dml event) // to decide whether scanning is needed and returns the appropriate time range. @@ -457,6 +526,24 @@ func (c *eventBroker) getScanTaskDataRange(task scanTask) (bool, common.DataRang } localScanMaxTs := oracle.GoTimeToTS(oracle.GetTimeFromTS(dataRange.CommitTsStart).Add(interval)) dataRange.CommitTsEnd = min(commitTsEndBeforeWindow, localScanMaxTs) + + // Don't cap by syncpoint again when in comes to local advance. + //dataRange.CommitTsEnd = c.capCommitTsEndBySyncPoint(task, dataRange.CommitTsEnd) + // if dataRange.CommitTsEnd <= dataRange.CommitTsStart { + // bypassEndTs := min(commitTsEndBeforeWindow, localScanMaxTs) + // bypassEndTs = min(bypassEndTs, ddlState.MaxEventCommitTs) + // if bypassEndTs > dataRange.CommitTsStart { + // dataRange.CommitTsEnd = bypassEndTs + // log.Info("scan window local advance bypass checkpoint cap due to pending ddl", + // zap.Stringer("changefeedID", task.changefeedStat.changefeedID), + // zap.Stringer("dispatcherID", task.id), + // zap.Uint64("startTs", dataRange.CommitTsStart), + // zap.Uint64("checkpointTs", task.checkpointTs.Load()), + // zap.Uint64("ddlCommitTs", ddlState.MaxEventCommitTs), + // zap.Uint64("newEndTs", dataRange.CommitTsEnd)) + // } + // } + if dataRange.CommitTsEnd > dataRange.CommitTsStart { log.Info("scan window local advance due to pending ddl", zap.Stringer("changefeedID", task.changefeedStat.changefeedID), @@ -583,6 +670,68 @@ func (c *eventBroker) hasSyncPointEventsBeforeTs(ts uint64, d *dispatcherStat) b return d.enableSyncPoint && ts > d.nextSyncPoint.Load() } +func syncPointLagDuration(sentResolvedTs, checkpointTs uint64) time.Duration { + if sentResolvedTs <= checkpointTs { + return 0 + } + return oracle.GetTimeFromTS(sentResolvedTs).Sub(oracle.GetTimeFromTS(checkpointTs)) +} + +func (c *eventBroker) shouldSuppressSyncPointEmission(d *dispatcherStat) bool { + if d == nil || c.syncPointLagSuppressThreshold <= 0 { + return false + } + + receivedResolvedTs := d.receivedResolvedTs.Load() + checkpointTs, ok := d.changefeedStat.getMinCheckpointTs() + if !ok { + metrics.EventServiceSyncPointLagGaugeVec.WithLabelValues(d.changefeedStat.changefeedID.String()).Set(0) + if d.syncPointSendSuppressed.Load() { + if d.syncPointSendSuppressed.CompareAndSwap(true, false) { + log.Info("syncpoint emission resumed", + zap.Stringer("changefeedID", d.changefeedStat.changefeedID), + zap.Stringer("dispatcherID", d.id), + zap.Uint64("receivedResolvedTs", receivedResolvedTs), + zap.Duration("resumeThreshold", c.syncPointLagResumeThreshold)) + } + } + return false + } + lag := syncPointLagDuration(receivedResolvedTs, checkpointTs) + metrics.EventServiceSyncPointLagGaugeVec.WithLabelValues(d.changefeedStat.changefeedID.String()).Set(lag.Seconds()) + + if d.syncPointSendSuppressed.Load() { + if lag <= c.syncPointLagResumeThreshold { + if d.syncPointSendSuppressed.CompareAndSwap(true, false) { + log.Info("syncpoint emission resumed", + zap.Stringer("changefeedID", d.changefeedStat.changefeedID), + zap.Stringer("dispatcherID", d.id), + zap.Uint64("receivedResolvedTs", receivedResolvedTs), + zap.Uint64("minCheckpointTs", checkpointTs), + zap.Duration("lag", lag), + zap.Duration("resumeThreshold", c.syncPointLagResumeThreshold)) + } + return false + } + return true + } + + if lag > c.syncPointLagSuppressThreshold { + if d.syncPointSendSuppressed.CompareAndSwap(false, true) { + log.Info("syncpoint emission suppressed due to lag", + zap.Stringer("changefeedID", d.changefeedStat.changefeedID), + zap.Stringer("dispatcherID", d.id), + zap.Uint64("sentResolvedTs", receivedResolvedTs), + zap.Uint64("minCheckpointTs", checkpointTs), + zap.Duration("lag", lag), + zap.Duration("suppressThreshold", c.syncPointLagSuppressThreshold), + zap.Duration("resumeThreshold", c.syncPointLagResumeThreshold)) + } + return true + } + return false +} + // emitSyncPointEventIfNeeded emits a sync point event if the current ts is greater than the next sync point, and updates the next sync point. // We need call this function every time we send a event(whether dml/ddl/resolvedTs), // thus to ensure the sync point event is in correct order for each dispatcher. @@ -590,15 +739,18 @@ func (c *eventBroker) emitSyncPointEventIfNeeded(ts uint64, d *dispatcherStat, r for d.enableSyncPoint && ts > d.nextSyncPoint.Load() { commitTs := d.nextSyncPoint.Load() d.nextSyncPoint.Store(oracle.GoTimeToTS(oracle.GetTimeFromTS(commitTs).Add(d.syncPointInterval))) + if c.shouldSuppressSyncPointEmission(d) { + metrics.EventServiceSyncPointSuppressedCount.WithLabelValues(d.changefeedStat.changefeedID.String()).Inc() + continue + } e := event.NewSyncPointEvent(d.id, commitTs, d.seq.Add(1), d.epoch) + syncPointEvent := newWrapSyncPointEvent(remoteID, e) + c.getMessageCh(d.messageWorkerIndex, common.IsRedoMode(d.info.GetMode())) <- syncPointEvent log.Debug("send syncpoint event to dispatcher", zap.Stringer("changefeedID", d.changefeedStat.changefeedID), zap.Stringer("dispatcherID", d.id), zap.Int64("tableID", d.info.GetTableSpan().GetTableID()), zap.Uint64("commitTs", e.GetCommitTs()), zap.Uint64("seq", e.GetSeq())) - - syncPointEvent := newWrapSyncPointEvent(remoteID, e) - c.getMessageCh(d.messageWorkerIndex, common.IsRedoMode(d.info.GetMode())) <- syncPointEvent } } @@ -1283,6 +1435,9 @@ func (c *eventBroker) handleDispatcherHeartbeat(heartbeat *DispatcherHeartBeatWi handleProgress(dp.DispatcherID, dp.CheckpointTs, 0, false) } } + for changefeed := range changedChangefeeds { + changefeed.refreshMinSentResolvedTs() + } c.sendDispatcherResponse(responseMap) } diff --git a/pkg/eventservice/event_broker_test.go b/pkg/eventservice/event_broker_test.go index bc32e1a9fc..fdc90edab9 100644 --- a/pkg/eventservice/event_broker_test.go +++ b/pkg/eventservice/event_broker_test.go @@ -349,6 +349,342 @@ func TestGetScanTaskDataRangeRingWaitWithThreeDispatchersCanAdvancePendingDDL(t require.Equal(t, ts103, dataRange.CommitTsEnd) } +func TestGetScanTaskDataRangeCappedByCheckpointWithSyncPoint(t *testing.T) { + broker, _, ss, _ := newEventBrokerForTest() + broker.close() + + base := time.Unix(0, 0) + ts100 := oracle.GoTimeToTS(base.Add(100 * time.Second)) + ts120 := oracle.GoTimeToTS(base.Add(120 * time.Second)) + ts200 := oracle.GoTimeToTS(base.Add(200 * time.Second)) + + info := newMockDispatcherInfoForTest(t) + info.epoch = 1 + info.enableSyncPoint = true + info.syncPointInterval = 10 * time.Second + info.nextSyncPoint = ts120 + + changefeedStatus := broker.getOrSetChangefeedStatus(info.GetChangefeedID(), info.GetSyncPointInterval()) + changefeedStatus.scanInterval.Store(int64(5 * time.Minute)) + changefeedStatus.minSentTs.Store(ts100) + + disp := newDispatcherStat(info, 1, 1, nil, changefeedStatus) + disp.seq.Store(1) + disp.checkpointTs.Store(ts100) + disp.sentResolvedTs.Store(ts100) + disp.lastScannedCommitTs.Store(ts100) + disp.lastScannedStartTs.Store(0) + disp.receivedResolvedTs.Store(ts200) + disp.eventStoreCommitTs.Store(ts200) + + ss.resolvedTs = ts200 + ss.maxDDLCommitTs = ts200 + + needScan, dataRange := broker.getScanTaskDataRange(disp) + require.True(t, needScan) + require.Equal(t, ts100, dataRange.CommitTsStart) + require.Equal(t, ts120, dataRange.CommitTsEnd) +} + +func TestGetScanTaskDataRangePendingDDLWithCheckpointCapCanAdvance(t *testing.T) { + broker, _, ss, _ := newEventBrokerForTest() + broker.close() + + base := time.Unix(0, 0) + ts100 := oracle.GoTimeToTS(base.Add(100 * time.Second)) + ts101 := oracle.GoTimeToTS(base.Add(101 * time.Second)) + ts102 := oracle.GoTimeToTS(base.Add(102 * time.Second)) + ts103 := oracle.GoTimeToTS(base.Add(103 * time.Second)) + ts110 := oracle.GoTimeToTS(base.Add(110 * time.Second)) + + info := newMockDispatcherInfoForTest(t) + info.epoch = 1 + info.enableSyncPoint = true + info.syncPointInterval = time.Second + info.nextSyncPoint = ts110 + + changefeedStatus := broker.getOrSetChangefeedStatus(info.GetChangefeedID(), info.GetSyncPointInterval()) + changefeedStatus.scanInterval.Store(int64(time.Second)) + changefeedStatus.minSentTs.Store(ts100) + + disp := newDispatcherStat(info, 1, 1, nil, changefeedStatus) + disp.seq.Store(1) + disp.checkpointTs.Store(ts100) + disp.sentResolvedTs.Store(ts101) + disp.lastScannedCommitTs.Store(ts101) + disp.lastScannedStartTs.Store(ts101 - 1) + disp.receivedResolvedTs.Store(ts110) + disp.eventStoreCommitTs.Store(ts103) + + ss.resolvedTs = ts110 + ss.maxDDLCommitTs = ts103 + + needScan, dataRange := broker.getScanTaskDataRange(disp) + require.True(t, needScan) + require.Equal(t, ts101, dataRange.CommitTsStart) + require.Equal(t, ts102, dataRange.CommitTsEnd) +} + +func TestGetScanTaskDataRangePendingDDLBypassesCheckpointCapLock(t *testing.T) { + broker, _, ss, _ := newEventBrokerForTest() + broker.close() + + base := time.Unix(0, 0) + ts100 := oracle.GoTimeToTS(base.Add(100 * time.Second)) + ts102 := oracle.GoTimeToTS(base.Add(102 * time.Second)) + ts103 := oracle.GoTimeToTS(base.Add(103 * time.Second)) + ts110 := oracle.GoTimeToTS(base.Add(110 * time.Second)) + + info := newMockDispatcherInfoForTest(t) + info.epoch = 1 + info.enableSyncPoint = true + info.syncPointInterval = time.Second + info.nextSyncPoint = ts110 + + changefeedStatus := broker.getOrSetChangefeedStatus(info.GetChangefeedID(), info.GetSyncPointInterval()) + changefeedStatus.scanInterval.Store(int64(time.Second)) + changefeedStatus.minSentTs.Store(ts100) + + disp := newDispatcherStat(info, 1, 1, nil, changefeedStatus) + disp.seq.Store(1) + disp.checkpointTs.Store(ts100) + disp.sentResolvedTs.Store(ts102) + disp.lastScannedCommitTs.Store(ts102) + disp.lastScannedStartTs.Store(ts102 - 1) + disp.receivedResolvedTs.Store(ts110) + disp.eventStoreCommitTs.Store(ts103) + + ss.resolvedTs = ts110 + ss.maxDDLCommitTs = ts103 + + needScan, dataRange := broker.getScanTaskDataRange(disp) + require.True(t, needScan) + require.Equal(t, ts102, dataRange.CommitTsStart) + require.Equal(t, ts103, dataRange.CommitTsEnd) +} + +func TestEmitSyncPointEventIfNeededSuppressesWhenLagging(t *testing.T) { + broker, _, _, _ := newEventBrokerForTest() + broker.close() + broker.syncPointLagSuppressThreshold = 20 * time.Second + broker.syncPointLagResumeThreshold = 10 * time.Second + + base := time.Unix(0, 0) + ts100 := oracle.GoTimeToTS(base.Add(100 * time.Second)) + ts110 := oracle.GoTimeToTS(base.Add(110 * time.Second)) + ts130 := oracle.GoTimeToTS(base.Add(130 * time.Second)) + ts180 := oracle.GoTimeToTS(base.Add(180 * time.Second)) + + info := newMockDispatcherInfoForTest(t) + info.epoch = 1 + info.enableSyncPoint = true + info.syncPointInterval = 10 * time.Second + info.nextSyncPoint = ts110 + + changefeedStatus := broker.getOrSetChangefeedStatus(info.GetChangefeedID(), info.GetSyncPointInterval()) + disp := newDispatcherStat(info, 1, 1, nil, changefeedStatus) + disp.seq.Store(1) + disp.checkpointTs.Store(ts100) + disp.receivedResolvedTs.Store(ts180) + disp.lastReceivedHeartbeatTime.Store(time.Now().Unix()) + + dispPtr := &atomic.Pointer[dispatcherStat]{} + dispPtr.Store(disp) + changefeedStatus.addDispatcher(disp.id, dispPtr) + changefeedStatus.refreshMinSentResolvedTs() + + broker.emitSyncPointEventIfNeeded(ts130, disp, node.ID(info.GetServerID())) + + require.True(t, disp.syncPointSendSuppressed.Load()) + require.Equal(t, ts130, disp.nextSyncPoint.Load()) + require.Equal(t, 0, len(broker.messageCh[disp.messageWorkerIndex])) +} + +func TestEmitSyncPointEventIfNeededSuppressesByMinChangefeedCheckpoint(t *testing.T) { + broker, _, _, _ := newEventBrokerForTest() + broker.close() + broker.syncPointLagSuppressThreshold = 20 * time.Second + broker.syncPointLagResumeThreshold = 10 * time.Second + + base := time.Unix(0, 0) + ts100 := oracle.GoTimeToTS(base.Add(100 * time.Second)) + ts110 := oracle.GoTimeToTS(base.Add(110 * time.Second)) + ts115 := oracle.GoTimeToTS(base.Add(115 * time.Second)) + ts120 := oracle.GoTimeToTS(base.Add(120 * time.Second)) + ts125 := oracle.GoTimeToTS(base.Add(125 * time.Second)) + ts130 := oracle.GoTimeToTS(base.Add(130 * time.Second)) + + info := newMockDispatcherInfoForTest(t) + info.epoch = 1 + info.enableSyncPoint = true + info.syncPointInterval = 10 * time.Second + info.nextSyncPoint = ts110 + + changefeedStatus := broker.getOrSetChangefeedStatus(info.GetChangefeedID(), info.GetSyncPointInterval()) + disp := newDispatcherStat(info, 1, 1, nil, changefeedStatus) + disp.seq.Store(1) + disp.receivedResolvedTs.Store(ts130) + disp.checkpointTs.Store(ts125) + disp.lastReceivedHeartbeatTime.Store(time.Now().Unix()) + + dispPtr := &atomic.Pointer[dispatcherStat]{} + dispPtr.Store(disp) + changefeedStatus.addDispatcher(disp.id, dispPtr) + + lagging := newDispatcherStat(newMockDispatcherInfoForTest(t), 1, 1, nil, changefeedStatus) + lagging.seq.Store(1) + lagging.checkpointTs.Store(ts100) + lagging.lastReceivedHeartbeatTime.Store(time.Now().Unix()) + + laggingPtr := &atomic.Pointer[dispatcherStat]{} + laggingPtr.Store(lagging) + changefeedStatus.addDispatcher(lagging.id, laggingPtr) + changefeedStatus.refreshMinSentResolvedTs() + + broker.emitSyncPointEventIfNeeded(ts115, disp, node.ID(info.GetServerID())) + + require.True(t, disp.syncPointSendSuppressed.Load()) + require.Equal(t, ts120, disp.nextSyncPoint.Load()) + require.Equal(t, 0, len(broker.messageCh[disp.messageWorkerIndex])) +} + +func TestEmitSyncPointEventIfNeededResumesAfterLagRecovers(t *testing.T) { + broker, _, _, _ := newEventBrokerForTest() + broker.close() + broker.syncPointLagSuppressThreshold = 20 * time.Second + broker.syncPointLagResumeThreshold = 10 * time.Second + + base := time.Unix(0, 0) + ts110 := oracle.GoTimeToTS(base.Add(110 * time.Second)) + ts115 := oracle.GoTimeToTS(base.Add(115 * time.Second)) + ts120 := oracle.GoTimeToTS(base.Add(120 * time.Second)) + ts125 := oracle.GoTimeToTS(base.Add(125 * time.Second)) + ts130 := oracle.GoTimeToTS(base.Add(130 * time.Second)) + + info := newMockDispatcherInfoForTest(t) + info.epoch = 1 + info.enableSyncPoint = true + info.syncPointInterval = 10 * time.Second + info.nextSyncPoint = ts110 + + changefeedStatus := broker.getOrSetChangefeedStatus(info.GetChangefeedID(), info.GetSyncPointInterval()) + disp := newDispatcherStat(info, 1, 1, nil, changefeedStatus) + disp.seq.Store(1) + disp.syncPointSendSuppressed.Store(true) + disp.receivedResolvedTs.Store(ts130) + disp.checkpointTs.Store(ts125) + disp.lastReceivedHeartbeatTime.Store(time.Now().Unix()) + + dispPtr := &atomic.Pointer[dispatcherStat]{} + dispPtr.Store(disp) + changefeedStatus.addDispatcher(disp.id, dispPtr) + changefeedStatus.refreshMinSentResolvedTs() + + broker.emitSyncPointEventIfNeeded(ts115, disp, node.ID(info.GetServerID())) + + require.False(t, disp.syncPointSendSuppressed.Load()) + require.Equal(t, ts120, disp.nextSyncPoint.Load()) + require.Equal(t, 1, len(broker.messageCh[disp.messageWorkerIndex])) + + msg := <-broker.messageCh[disp.messageWorkerIndex] + require.Equal(t, event.TypeSyncPointEvent, msg.msgType) + syncPointEvent, ok := msg.e.(*event.SyncPointEvent) + require.True(t, ok) + require.Equal(t, ts110, syncPointEvent.GetCommitTs()) +} + +func TestEmitSyncPointEventIfNeededResumesWhenStaleCheckpointIsIgnored(t *testing.T) { + broker, _, _, _ := newEventBrokerForTest() + broker.close() + broker.syncPointLagSuppressThreshold = 20 * time.Second + broker.syncPointLagResumeThreshold = 10 * time.Second + + base := time.Unix(0, 0) + ts100 := oracle.GoTimeToTS(base.Add(100 * time.Second)) + ts110 := oracle.GoTimeToTS(base.Add(110 * time.Second)) + ts115 := oracle.GoTimeToTS(base.Add(115 * time.Second)) + ts120 := oracle.GoTimeToTS(base.Add(120 * time.Second)) + ts125 := oracle.GoTimeToTS(base.Add(125 * time.Second)) + ts130 := oracle.GoTimeToTS(base.Add(130 * time.Second)) + + info := newMockDispatcherInfoForTest(t) + info.epoch = 1 + info.enableSyncPoint = true + info.syncPointInterval = 10 * time.Second + info.nextSyncPoint = ts110 + + changefeedStatus := broker.getOrSetChangefeedStatus(info.GetChangefeedID(), info.GetSyncPointInterval()) + disp := newDispatcherStat(info, 1, 1, nil, changefeedStatus) + disp.seq.Store(1) + disp.syncPointSendSuppressed.Store(true) + disp.receivedResolvedTs.Store(ts130) + disp.checkpointTs.Store(ts125) + disp.lastReceivedHeartbeatTime.Store(time.Now().Unix()) + + dispPtr := &atomic.Pointer[dispatcherStat]{} + dispPtr.Store(disp) + changefeedStatus.addDispatcher(disp.id, dispPtr) + + stale := newDispatcherStat(newMockDispatcherInfoForTest(t), 1, 1, nil, changefeedStatus) + stale.seq.Store(1) + stale.checkpointTs.Store(ts100) + stale.lastReceivedHeartbeatTime.Store(time.Now().Add(-scanWindowStaleDispatcherHeartbeatThreshold - time.Second).Unix()) + + stalePtr := &atomic.Pointer[dispatcherStat]{} + stalePtr.Store(stale) + changefeedStatus.addDispatcher(stale.id, stalePtr) + changefeedStatus.refreshMinSentResolvedTs() + + broker.emitSyncPointEventIfNeeded(ts115, disp, node.ID(info.GetServerID())) + + require.False(t, disp.syncPointSendSuppressed.Load()) + require.Equal(t, ts120, disp.nextSyncPoint.Load()) + require.Equal(t, 1, len(broker.messageCh[disp.messageWorkerIndex])) + + msg := <-broker.messageCh[disp.messageWorkerIndex] + require.Equal(t, event.TypeSyncPointEvent, msg.msgType) + syncPointEvent, ok := msg.e.(*event.SyncPointEvent) + require.True(t, ok) + require.Equal(t, ts110, syncPointEvent.GetCommitTs()) +} + +func TestEmitSyncPointEventIfNeededNoSuppressWhenCheckpointAhead(t *testing.T) { + broker, _, _, _ := newEventBrokerForTest() + broker.close() + broker.syncPointLagSuppressThreshold = time.Second + broker.syncPointLagResumeThreshold = time.Second + + base := time.Unix(0, 0) + ts100 := oracle.GoTimeToTS(base.Add(100 * time.Second)) + ts110 := oracle.GoTimeToTS(base.Add(110 * time.Second)) + ts115 := oracle.GoTimeToTS(base.Add(115 * time.Second)) + ts120 := oracle.GoTimeToTS(base.Add(120 * time.Second)) + + info := newMockDispatcherInfoForTest(t) + info.epoch = 1 + info.enableSyncPoint = true + info.syncPointInterval = 10 * time.Second + info.nextSyncPoint = ts110 + + changefeedStatus := broker.getOrSetChangefeedStatus(info.GetChangefeedID(), info.GetSyncPointInterval()) + disp := newDispatcherStat(info, 1, 1, nil, changefeedStatus) + disp.seq.Store(1) + disp.receivedResolvedTs.Store(ts100) + disp.checkpointTs.Store(ts120) + disp.lastReceivedHeartbeatTime.Store(time.Now().Unix()) + + dispPtr := &atomic.Pointer[dispatcherStat]{} + dispPtr.Store(disp) + changefeedStatus.addDispatcher(disp.id, dispPtr) + changefeedStatus.refreshMinSentResolvedTs() + + broker.emitSyncPointEventIfNeeded(ts115, disp, node.ID(info.GetServerID())) + + require.False(t, disp.syncPointSendSuppressed.Load()) + require.Equal(t, ts120, disp.nextSyncPoint.Load()) + require.Equal(t, 1, len(broker.messageCh[disp.messageWorkerIndex])) +} + func TestHandleCongestionControlV2AdjustsScanInterval(t *testing.T) { broker, _, _, _ := newEventBrokerForTest() defer broker.close() diff --git a/pkg/eventservice/metrics_collector.go b/pkg/eventservice/metrics_collector.go index 0952ed9260..47d27215ad 100644 --- a/pkg/eventservice/metrics_collector.go +++ b/pkg/eventservice/metrics_collector.go @@ -132,6 +132,8 @@ func (d *dispatcherHeapItem) LessThan(other *dispatcherHeapItem) bool { type metricsSnapshot struct { receivedMinResolvedTs uint64 sentMinResolvedTs uint64 + receivedMinDispatcher *dispatcherStat + sentMinDispatcher *dispatcherStat dispatcherCount int pendingTaskCount int slowestDispatchers []*dispatcherStat // top 10 dispatchers with slowest checkpointTs @@ -222,11 +224,13 @@ func (mc *metricsCollector) collectDispatcherMetrics(snapshot *metricsSnapshot) resolvedTs := dispatcher.receivedResolvedTs.Load() if resolvedTs < snapshot.receivedMinResolvedTs { snapshot.receivedMinResolvedTs = resolvedTs + snapshot.receivedMinDispatcher = dispatcher } watermark := dispatcher.sentResolvedTs.Load() if watermark < snapshot.sentMinResolvedTs { snapshot.sentMinResolvedTs = watermark + snapshot.sentMinDispatcher = dispatcher } // Maintain a min-heap of size 10 for the slowest dispatchers @@ -301,7 +305,9 @@ func (mc *metricsCollector) updateMetricsFromSnapshot(snapshot *metricsSnapshot) // logSlowDispatchers logs warnings for dispatchers that are too slow func (mc *metricsCollector) logSlowDispatchers(snapshot *metricsSnapshot) { - if len(snapshot.slowestDispatchers) == 0 { + if len(snapshot.slowestDispatchers) == 0 && + snapshot.receivedMinDispatcher == nil && + snapshot.sentMinDispatcher == nil { return } @@ -311,6 +317,16 @@ func (mc *metricsCollector) logSlowDispatchers(snapshot *metricsSnapshot) { mc.lastLogSlowDispatchersTime = time.Now() + if snapshot.receivedMinDispatcher != nil && snapshot.sentMinDispatcher != nil { + log.Warn("event service min resolved ts snapshot", + zap.Stringer("receivedMinDispatcherID", snapshot.receivedMinDispatcher.id), + zap.Stringer("receivedMinChangefeedID", snapshot.receivedMinDispatcher.changefeedStat.changefeedID), + zap.Uint64("receivedMinResolvedTs", snapshot.receivedMinResolvedTs), + zap.Stringer("sentMinDispatcherID", snapshot.sentMinDispatcher.id), + zap.Stringer("sentMinChangefeedID", snapshot.sentMinDispatcher.changefeedStat.changefeedID), + zap.Uint64("sentMinResolvedTs", snapshot.sentMinResolvedTs)) + } + for _, dispatcher := range snapshot.slowestDispatchers { checkpointTs := dispatcher.checkpointTs.Load() lag := time.Since(oracle.GetTimeFromTS(checkpointTs)) diff --git a/pkg/eventservice/scan_window.go b/pkg/eventservice/scan_window.go index 46cb9cffb3..a5d1a8a3e4 100644 --- a/pkg/eventservice/scan_window.go +++ b/pkg/eventservice/scan_window.go @@ -82,6 +82,11 @@ const ( scanWindowStaleDispatcherHeartbeatThreshold = 1 * time.Minute ) +func isDispatcherStale(lastHeartbeatTime int64, now time.Time) bool { + return lastHeartbeatTime > 0 && + now.Sub(time.Unix(lastHeartbeatTime, 0)) > scanWindowStaleDispatcherHeartbeatThreshold +} + type memoryUsageSample struct { ts time.Time ratio float64 @@ -332,8 +337,10 @@ func (c *changefeedStatus) refreshMinSentResolvedTs() { now := time.Now() minSentResolvedTs := ^uint64(0) minSentResolvedTsWithStale := ^uint64(0) + minCheckpointTs := uint64(0) hasEligible := false hasNonStale := false + hasMinCheckpointTs := false c.dispatchers.Range(func(_ any, value any) bool { dispatcher := value.(*atomic.Pointer[dispatcherStat]).Load() if dispatcher == nil || dispatcher.isRemoved.Load() || dispatcher.seq.Load() == 0 { @@ -346,9 +353,7 @@ func (c *changefeedStatus) refreshMinSentResolvedTs() { minSentResolvedTsWithStale = sentResolvedTs } - lastHeartbeatTime := dispatcher.lastReceivedHeartbeatTime.Load() - if lastHeartbeatTime > 0 && - now.Sub(time.Unix(lastHeartbeatTime, 0)) > scanWindowStaleDispatcherHeartbeatThreshold { + if isDispatcherStale(dispatcher.lastReceivedHeartbeatTime.Load(), now) { log.Info("dispatcher is stale, skip it's sent resolved ts", zap.Stringer("changefeedID", c.changefeedID), zap.Stringer("dispatcherID", dispatcher.id)) return true } @@ -357,18 +362,30 @@ func (c *changefeedStatus) refreshMinSentResolvedTs() { if sentResolvedTs < minSentResolvedTs { minSentResolvedTs = sentResolvedTs } + checkpointTs := dispatcher.checkpointTs.Load() + if !hasMinCheckpointTs || checkpointTs < minCheckpointTs { + minCheckpointTs = checkpointTs + hasMinCheckpointTs = true + } return true }) if !hasEligible { c.storeMinSentTs(0) + c.storeMinCheckpointTs(invalidMinCheckpointTs) return } if !hasNonStale { c.storeMinSentTs(minSentResolvedTsWithStale) + c.storeMinCheckpointTs(invalidMinCheckpointTs) return } c.storeMinSentTs(minSentResolvedTs) + if hasMinCheckpointTs { + c.storeMinCheckpointTs(minCheckpointTs) + } else { + c.storeMinCheckpointTs(invalidMinCheckpointTs) + } } func (c *changefeedStatus) getScanMaxTs() uint64 { @@ -393,6 +410,14 @@ func (c *changefeedStatus) storeMinSentTs(value uint64) { metrics.EventServiceScanWindowBaseTsGaugeVec.WithLabelValues(c.changefeedID.String()).Set(float64(value)) } +func (c *changefeedStatus) storeMinCheckpointTs(value uint64) { + prev := c.minCheckpointTs.Load() + if prev == value { + return + } + c.minCheckpointTs.Store(value) +} + func scaleDuration(d time.Duration, numerator int64, denominator int64) time.Duration { if numerator <= 0 || denominator <= 0 { return d diff --git a/pkg/eventservice/scan_window_test.go b/pkg/eventservice/scan_window_test.go index 01ee458e70..e6400b69a0 100644 --- a/pkg/eventservice/scan_window_test.go +++ b/pkg/eventservice/scan_window_test.go @@ -153,6 +153,7 @@ func TestRefreshMinSentResolvedTsMinAndSkipRules(t *testing.T) { stale := &dispatcherStat{} stale.seq.Store(1) stale.sentResolvedTs.Store(10) + stale.checkpointTs.Store(10) stale.lastReceivedHeartbeatTime.Store(time.Now().Add(-scanWindowStaleDispatcherHeartbeatThreshold - time.Second).Unix()) removed := &dispatcherStat{} @@ -167,10 +168,12 @@ func TestRefreshMinSentResolvedTsMinAndSkipRules(t *testing.T) { first := &dispatcherStat{} first.seq.Store(1) first.sentResolvedTs.Store(200) + first.checkpointTs.Store(200) second := &dispatcherStat{} second.seq.Store(1) second.sentResolvedTs.Store(50) + second.checkpointTs.Store(50) stalePtr := &atomic.Pointer[dispatcherStat]{} stalePtr.Store(stale) @@ -194,15 +197,23 @@ func TestRefreshMinSentResolvedTsMinAndSkipRules(t *testing.T) { status.refreshMinSentResolvedTs() require.Equal(t, uint64(50), status.minSentTs.Load()) + minCheckpointTs, ok := status.getMinCheckpointTs() + require.True(t, ok) + require.Equal(t, uint64(50), minCheckpointTs) second.isRemoved.Store(true) status.refreshMinSentResolvedTs() require.Equal(t, uint64(200), status.minSentTs.Load()) + minCheckpointTs, ok = status.getMinCheckpointTs() + require.True(t, ok) + require.Equal(t, uint64(200), minCheckpointTs) stale.isRemoved.Store(true) first.seq.Store(0) status.refreshMinSentResolvedTs() require.Equal(t, uint64(0), status.minSentTs.Load()) + _, ok = status.getMinCheckpointTs() + require.False(t, ok) } func TestRefreshMinSentResolvedTsStaleFallback(t *testing.T) { @@ -213,6 +224,7 @@ func TestRefreshMinSentResolvedTsStaleFallback(t *testing.T) { stale := &dispatcherStat{} stale.seq.Store(1) stale.sentResolvedTs.Store(123) + stale.checkpointTs.Store(123) stale.lastReceivedHeartbeatTime.Store(time.Now().Add(-scanWindowStaleDispatcherHeartbeatThreshold - time.Second).Unix()) stalePtr := &atomic.Pointer[dispatcherStat]{} @@ -221,6 +233,8 @@ func TestRefreshMinSentResolvedTsStaleFallback(t *testing.T) { status.refreshMinSentResolvedTs() require.Equal(t, uint64(123), status.minSentTs.Load()) + _, ok := status.getMinCheckpointTs() + require.False(t, ok) } func TestGetScanMaxTsFallbackInterval(t *testing.T) { diff --git a/pkg/metrics/event_service.go b/pkg/metrics/event_service.go index e6ed10cc90..45c10a298f 100644 --- a/pkg/metrics/event_service.go +++ b/pkg/metrics/event_service.go @@ -69,6 +69,27 @@ var ( Name: "scan_window_interval", Help: "The scan window interval in seconds for each changefeed", }, []string{"changefeed"}) + EventServiceSyncPointLagGaugeVec = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "ticdc", + Subsystem: "event_service", + Name: "syncpoint_lag_seconds", + Help: "The lag between sent resolved ts and checkpoint ts in seconds for each changefeed", + }, []string{"changefeed"}) + EventServiceSyncPointSuppressedCount = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "ticdc", + Subsystem: "event_service", + Name: "syncpoint_suppressed_count", + Help: "The number of syncpoint events suppressed due to lagging checkpoint", + }, []string{"changefeed"}) + EventServiceScanCappedByCheckpointCount = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "ticdc", + Subsystem: "event_service", + Name: "scan_capped_by_checkpoint_count", + Help: "The number of scan ranges capped by checkpoint based bound", + }, []string{"changefeed"}) EventServiceScanDuration = prometheus.NewHistogram( prometheus.HistogramOpts{ Namespace: "ticdc", @@ -201,6 +222,9 @@ func initEventServiceMetrics(registry *prometheus.Registry) { registry.MustRegister(EventServiceResolvedTsLagGauge) registry.MustRegister(EventServiceScanWindowBaseTsGaugeVec) registry.MustRegister(EventServiceScanWindowIntervalGaugeVec) + registry.MustRegister(EventServiceSyncPointLagGaugeVec) + registry.MustRegister(EventServiceSyncPointSuppressedCount) + registry.MustRegister(EventServiceScanCappedByCheckpointCount) registry.MustRegister(EventServiceScanDuration) registry.MustRegister(EventServiceScannedCount) registry.MustRegister(EventServiceDispatcherGauge)