Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 5 additions & 23 deletions downstreamadapter/dispatcher/basic_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ type Dispatcher interface {
GetHeartBeatInfo(h *HeartBeatInfo)
GetComponentStatus() heartbeatpb.ComponentState
GetBlockEventStatus() *heartbeatpb.State
GetBlockStatusesChan() chan *heartbeatpb.TableSpanBlockStatus
OfferBlockStatus(status *heartbeatpb.TableSpanBlockStatus)
GetEventSizePerSecond() float32
IsTableTriggerDispatcher() bool
DealWithBlockEvent(event commonEvent.BlockEvent)
Expand Down Expand Up @@ -836,16 +836,7 @@ func (d *BasicDispatcher) HandleDispatcherStatus(dispatcherStatus *heartbeatpb.D
}

// Step3: whether the outdate message or not, we need to return message show we have finished the event.
d.sharedInfo.blockStatusesChan <- &heartbeatpb.TableSpanBlockStatus{
ID: d.id.ToPB(),
State: &heartbeatpb.State{
IsBlocked: true,
BlockTs: action.CommitTs,
IsSyncPoint: action.IsSyncPoint,
Stage: heartbeatpb.BlockStage_DONE,
},
Mode: d.GetMode(),
}
d.OfferDoneBlockStatus(action.CommitTs, action.IsSyncPoint)
}
return false
}
Expand Down Expand Up @@ -881,16 +872,7 @@ func (d *BasicDispatcher) reportBlockedEventDone(
actionCommitTs uint64,
actionIsSyncPoint bool,
) {
d.sharedInfo.blockStatusesChan <- &heartbeatpb.TableSpanBlockStatus{
ID: d.id.ToPB(),
State: &heartbeatpb.State{
IsBlocked: true,
BlockTs: actionCommitTs,
IsSyncPoint: actionIsSyncPoint,
Stage: heartbeatpb.BlockStage_DONE,
},
Mode: d.GetMode(),
}
d.OfferDoneBlockStatus(actionCommitTs, actionIsSyncPoint)
GetDispatcherStatusDynamicStream().Wake(d.id)
}

Expand Down Expand Up @@ -1042,7 +1024,7 @@ func (d *BasicDispatcher) DealWithBlockEvent(event commonEvent.BlockEvent) {
} else {
d.resendTaskMap.Set(identifier, newResendTask(message, d, nil))
}
d.sharedInfo.blockStatusesChan <- message
d.OfferBlockStatus(message)
})

// dealing with events which update schema ids
Expand Down Expand Up @@ -1166,7 +1148,7 @@ func (d *BasicDispatcher) reportBlockedEventToMaintainer(event commonEvent.Block
IsSyncPoint: event.GetType() == commonEvent.TypeSyncPointEvent,
}
d.resendTaskMap.Set(identifier, newResendTask(message, d, nil))
d.sharedInfo.blockStatusesChan <- message
d.OfferBlockStatus(message)
}

func (d *BasicDispatcher) flushBlockedEventAndReportToMaintainer(event commonEvent.BlockEvent) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ func TestDDLEventsAlwaysValidateActiveActive(t *testing.T) {
func newTestBasicDispatcher(t *testing.T, sinkType common.SinkType, enableActiveActive bool) *BasicDispatcher {
t.Helper()
statuses := make(chan TableSpanStatusWithSeq, 2)
blockStatuses := make(chan *heartbeatpb.TableSpanBlockStatus, 1)
errCh := make(chan error, 1)
sharedInfo := NewSharedInfo(
common.NewChangefeedID("test"),
Expand All @@ -143,7 +142,7 @@ func newTestBasicDispatcher(t *testing.T, sinkType common.SinkType, enableActive
nil,
false,
statuses,
blockStatuses,
1,
errCh,
)
dispatcherSink := newDispatcherTestSink(t, sinkType)
Expand Down
57 changes: 48 additions & 9 deletions downstreamadapter/dispatcher/basic_dispatcher_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package dispatcher

import (
"context"
"sync/atomic"
"time"

Expand Down Expand Up @@ -58,9 +59,9 @@ type SharedInfo struct {
// statusesChan is used to store the status of dispatchers when status changed
// and push to heartbeatRequestQueue
statusesChan chan TableSpanStatusWithSeq
// blockStatusesChan use to collector block status of ddl/sync point event to Maintainer
// shared by the event dispatcher manager
blockStatusesChan chan *heartbeatpb.TableSpanBlockStatus
// blockStatusBuffer keeps block statuses for the dispatcher manager.
// Identical DONE statuses are coalesced while pending to reduce local memory amplification.
blockStatusBuffer *BlockStatusBuffer

// blockExecutor is used to execute block events such as DDL and sync point events asynchronously
// to avoid callback() called in handleEvents, causing deadlock in ds
Expand Down Expand Up @@ -88,7 +89,7 @@ func NewSharedInfo(
txnAtomicity *config.AtomicityLevel,
enableSplittableCheck bool,
statusesChan chan TableSpanStatusWithSeq,
blockStatusesChan chan *heartbeatpb.TableSpanBlockStatus,
blockStatusBufferSize int,
errCh chan error,
) *SharedInfo {
sharedInfo := &SharedInfo{
Expand All @@ -102,7 +103,7 @@ func NewSharedInfo(
syncPointConfig: syncPointConfig,
enableSplittableCheck: enableSplittableCheck,
statusesChan: statusesChan,
blockStatusesChan: blockStatusesChan,
blockStatusBuffer: NewBlockStatusBuffer(blockStatusBufferSize),
blockExecutor: newBlockEventExecutor(),
errCh: errCh,
metricHandleDDLHis: metrics.HandleDDLHistogram.WithLabelValues(changefeedID.Keyspace(), changefeedID.Name()),
Expand Down Expand Up @@ -219,8 +220,20 @@ func (d *BasicDispatcher) GetTxnAtomicity() config.AtomicityLevel {
return d.sharedInfo.txnAtomicity
}

func (d *BasicDispatcher) GetBlockStatusesChan() chan *heartbeatpb.TableSpanBlockStatus {
return d.sharedInfo.blockStatusesChan
func (d *BasicDispatcher) OfferBlockStatus(status *heartbeatpb.TableSpanBlockStatus) {
d.sharedInfo.OfferBlockStatus(status)
}

func (d *BasicDispatcher) OfferDoneBlockStatus(blockTs uint64, isSyncPoint bool) {
d.sharedInfo.OfferDoneBlockStatus(d.id, blockTs, isSyncPoint, d.GetMode())
}

func (d *BasicDispatcher) TakeBlockStatus(ctx context.Context) *heartbeatpb.TableSpanBlockStatus {
return d.sharedInfo.TakeBlockStatus(ctx)
}

func (d *BasicDispatcher) TakeBlockStatusWithTimeout(timeout time.Duration) (*heartbeatpb.TableSpanBlockStatus, bool) {
return d.sharedInfo.TakeBlockStatusWithTimeout(timeout)
}

func (d *BasicDispatcher) GetEventSizePerSecond() float32 {
Expand Down Expand Up @@ -254,8 +267,34 @@ func (s *SharedInfo) EnableActiveActive() bool {
return s.enableActiveActive
}

func (s *SharedInfo) GetBlockStatusesChan() chan *heartbeatpb.TableSpanBlockStatus {
return s.blockStatusesChan
func (s *SharedInfo) OfferBlockStatus(status *heartbeatpb.TableSpanBlockStatus) {
s.blockStatusBuffer.Offer(status)
}

func (s *SharedInfo) OfferDoneBlockStatus(dispatcherID common.DispatcherID, blockTs uint64, isSyncPoint bool, mode int64) {
s.blockStatusBuffer.OfferDone(dispatcherID, blockTs, isSyncPoint, mode)
}

func (s *SharedInfo) TakeBlockStatus(ctx context.Context) *heartbeatpb.TableSpanBlockStatus {
return s.blockStatusBuffer.Take(ctx)
}

func (s *SharedInfo) TakeBlockStatusWithTimeout(timeout time.Duration) (*heartbeatpb.TableSpanBlockStatus, bool) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
status := s.TakeBlockStatus(ctx)
if status == nil {
return nil, false
}
return status, true
}

func (s *SharedInfo) TryTakeBlockStatus() (*heartbeatpb.TableSpanBlockStatus, bool) {
return s.blockStatusBuffer.TryTake()
}

func (s *SharedInfo) BlockStatusLen() int {
return s.blockStatusBuffer.Len()
}

func (s *SharedInfo) GetErrCh() chan error {
Expand Down
157 changes: 157 additions & 0 deletions downstreamadapter/dispatcher/block_status_buffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
// Copyright 2026 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package dispatcher

import (
"context"
"sync"

"github.com/pingcap/ticdc/heartbeatpb"
"github.com/pingcap/ticdc/pkg/common"
)

type doneBlockStatusKey struct {
dispatcherID common.DispatcherID
blockTs uint64
mode int64
isSyncPoint bool
}

type blockStatusQueueEntry struct {
status *heartbeatpb.TableSpanBlockStatus
doneKey *doneBlockStatusKey
}

// BlockStatusBuffer keeps block statuses ordered while coalescing identical DONE
// statuses that are still pending locally. Non-DONE statuses keep the original
// protobuf object and ordering.
type BlockStatusBuffer struct {
queue chan blockStatusQueueEntry

mu sync.Mutex
pendingDone map[doneBlockStatusKey]struct{}
}

// NewBlockStatusBuffer creates a bounded local mailbox for dispatcher block
// statuses. The buffer keeps enqueue order while coalescing identical pending
// DONE statuses before protobuf materialization.
func NewBlockStatusBuffer(size int) *BlockStatusBuffer {
if size <= 0 {
size = 1
}
return &BlockStatusBuffer{
queue: make(chan blockStatusQueueEntry, size),
pendingDone: make(map[doneBlockStatusKey]struct{}),
}
}

func (b *BlockStatusBuffer) Offer(status *heartbeatpb.TableSpanBlockStatus) {
if status == nil {
return
}
if !isDoneBlockStatus(status) {
b.queue <- blockStatusQueueEntry{status: status}
return
}

key := doneBlockStatusKey{
dispatcherID: common.NewDispatcherIDFromPB(status.ID),
blockTs: status.State.BlockTs,
mode: status.Mode,
isSyncPoint: status.State.IsSyncPoint,
}
if !b.reserveDone(key) {
return
}
b.queue <- blockStatusQueueEntry{doneKey: &key}
}

func (b *BlockStatusBuffer) OfferDone(
dispatcherID common.DispatcherID,
blockTs uint64,
isSyncPoint bool,
mode int64,
) {
key := doneBlockStatusKey{
dispatcherID: dispatcherID,
blockTs: blockTs,
mode: mode,
isSyncPoint: isSyncPoint,
}
if !b.reserveDone(key) {
return
}
b.queue <- blockStatusQueueEntry{doneKey: &key}
}

func (b *BlockStatusBuffer) Take(ctx context.Context) *heartbeatpb.TableSpanBlockStatus {
select {
case <-ctx.Done():
return nil
case entry := <-b.queue:
return b.materialize(entry)
}
}

func (b *BlockStatusBuffer) TryTake() (*heartbeatpb.TableSpanBlockStatus, bool) {
select {
case entry := <-b.queue:
return b.materialize(entry), true
default:
return nil, false
}
}

func (b *BlockStatusBuffer) Len() int {
return len(b.queue)
}

func (b *BlockStatusBuffer) reserveDone(key doneBlockStatusKey) bool {
b.mu.Lock()
defer b.mu.Unlock()
if _, ok := b.pendingDone[key]; ok {
return false
}
b.pendingDone[key] = struct{}{}
return true
}

func (b *BlockStatusBuffer) materialize(entry blockStatusQueueEntry) *heartbeatpb.TableSpanBlockStatus {
if entry.status != nil {
return entry.status
}

key := *entry.doneKey
b.mu.Lock()
delete(b.pendingDone, key)
b.mu.Unlock()

return &heartbeatpb.TableSpanBlockStatus{
ID: key.dispatcherID.ToPB(),
State: &heartbeatpb.State{
IsBlocked: true,
BlockTs: key.blockTs,
IsSyncPoint: key.isSyncPoint,
Stage: heartbeatpb.BlockStage_DONE,
},
Mode: key.mode,
}
}

func isDoneBlockStatus(status *heartbeatpb.TableSpanBlockStatus) bool {
return status != nil &&
status.State != nil &&
status.State.IsBlocked &&
status.State.Stage == heartbeatpb.BlockStage_DONE
}
Loading