Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 5 additions & 23 deletions downstreamadapter/dispatcher/basic_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ type Dispatcher interface {
GetHeartBeatInfo(h *HeartBeatInfo)
GetComponentStatus() heartbeatpb.ComponentState
GetBlockEventStatus() *heartbeatpb.State
GetBlockStatusesChan() chan *heartbeatpb.TableSpanBlockStatus
OfferBlockStatus(status *heartbeatpb.TableSpanBlockStatus)
GetEventSizePerSecond() float32
IsTableTriggerDispatcher() bool
DealWithBlockEvent(event commonEvent.BlockEvent)
Expand Down Expand Up @@ -836,16 +836,7 @@ func (d *BasicDispatcher) HandleDispatcherStatus(dispatcherStatus *heartbeatpb.D
}

// Step3: whether the outdate message or not, we need to return message show we have finished the event.
d.sharedInfo.blockStatusesChan <- &heartbeatpb.TableSpanBlockStatus{
ID: d.id.ToPB(),
State: &heartbeatpb.State{
IsBlocked: true,
BlockTs: action.CommitTs,
IsSyncPoint: action.IsSyncPoint,
Stage: heartbeatpb.BlockStage_DONE,
},
Mode: d.GetMode(),
}
d.OfferDoneBlockStatus(action.CommitTs, action.IsSyncPoint)
}
return false
}
Expand Down Expand Up @@ -881,16 +872,7 @@ func (d *BasicDispatcher) reportBlockedEventDone(
actionCommitTs uint64,
actionIsSyncPoint bool,
) {
d.sharedInfo.blockStatusesChan <- &heartbeatpb.TableSpanBlockStatus{
ID: d.id.ToPB(),
State: &heartbeatpb.State{
IsBlocked: true,
BlockTs: actionCommitTs,
IsSyncPoint: actionIsSyncPoint,
Stage: heartbeatpb.BlockStage_DONE,
},
Mode: d.GetMode(),
}
d.OfferDoneBlockStatus(actionCommitTs, actionIsSyncPoint)
GetDispatcherStatusDynamicStream().Wake(d.id)
}

Expand Down Expand Up @@ -1042,7 +1024,7 @@ func (d *BasicDispatcher) DealWithBlockEvent(event commonEvent.BlockEvent) {
} else {
d.resendTaskMap.Set(identifier, newResendTask(message, d, nil))
}
d.sharedInfo.blockStatusesChan <- message
d.OfferBlockStatus(message)
})

// dealing with events which update schema ids
Expand Down Expand Up @@ -1166,7 +1148,7 @@ func (d *BasicDispatcher) reportBlockedEventToMaintainer(event commonEvent.Block
IsSyncPoint: event.GetType() == commonEvent.TypeSyncPointEvent,
}
d.resendTaskMap.Set(identifier, newResendTask(message, d, nil))
d.sharedInfo.blockStatusesChan <- message
d.OfferBlockStatus(message)
}

func (d *BasicDispatcher) flushBlockedEventAndReportToMaintainer(event commonEvent.BlockEvent) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ func TestDDLEventsAlwaysValidateActiveActive(t *testing.T) {
func newTestBasicDispatcher(t *testing.T, sinkType common.SinkType, enableActiveActive bool) *BasicDispatcher {
t.Helper()
statuses := make(chan TableSpanStatusWithSeq, 2)
blockStatuses := make(chan *heartbeatpb.TableSpanBlockStatus, 1)
errCh := make(chan error, 1)
sharedInfo := NewSharedInfo(
common.NewChangefeedID("test"),
Expand All @@ -143,7 +142,7 @@ func newTestBasicDispatcher(t *testing.T, sinkType common.SinkType, enableActive
nil,
false,
statuses,
blockStatuses,
1,
errCh,
)
dispatcherSink := newDispatcherTestSink(t, sinkType)
Expand Down
57 changes: 48 additions & 9 deletions downstreamadapter/dispatcher/basic_dispatcher_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package dispatcher

import (
"context"
"sync/atomic"
"time"

Expand Down Expand Up @@ -58,9 +59,9 @@ type SharedInfo struct {
// statusesChan is used to store the status of dispatchers when status changed
// and push to heartbeatRequestQueue
statusesChan chan TableSpanStatusWithSeq
// blockStatusesChan use to collector block status of ddl/sync point event to Maintainer
// shared by the event dispatcher manager
blockStatusesChan chan *heartbeatpb.TableSpanBlockStatus
// blockStatusBuffer keeps block statuses for the dispatcher manager.
// Identical WAITING and DONE statuses are coalesced while pending to reduce local memory amplification.
blockStatusBuffer *BlockStatusBuffer

// blockExecutor is used to execute block events such as DDL and sync point events asynchronously
// to avoid callback() called in handleEvents, causing deadlock in ds
Expand Down Expand Up @@ -88,7 +89,7 @@ func NewSharedInfo(
txnAtomicity *config.AtomicityLevel,
enableSplittableCheck bool,
statusesChan chan TableSpanStatusWithSeq,
blockStatusesChan chan *heartbeatpb.TableSpanBlockStatus,
blockStatusBufferSize int,
errCh chan error,
) *SharedInfo {
sharedInfo := &SharedInfo{
Expand All @@ -102,7 +103,7 @@ func NewSharedInfo(
syncPointConfig: syncPointConfig,
enableSplittableCheck: enableSplittableCheck,
statusesChan: statusesChan,
blockStatusesChan: blockStatusesChan,
blockStatusBuffer: NewBlockStatusBuffer(blockStatusBufferSize),
blockExecutor: newBlockEventExecutor(),
errCh: errCh,
metricHandleDDLHis: metrics.HandleDDLHistogram.WithLabelValues(changefeedID.Keyspace(), changefeedID.Name()),
Expand Down Expand Up @@ -219,8 +220,20 @@ func (d *BasicDispatcher) GetTxnAtomicity() config.AtomicityLevel {
return d.sharedInfo.txnAtomicity
}

func (d *BasicDispatcher) GetBlockStatusesChan() chan *heartbeatpb.TableSpanBlockStatus {
return d.sharedInfo.blockStatusesChan
func (d *BasicDispatcher) OfferBlockStatus(status *heartbeatpb.TableSpanBlockStatus) {
d.sharedInfo.OfferBlockStatus(status)
}

func (d *BasicDispatcher) OfferDoneBlockStatus(blockTs uint64, isSyncPoint bool) {
d.sharedInfo.OfferDoneBlockStatus(d.id, blockTs, isSyncPoint, d.GetMode())
}

func (d *BasicDispatcher) TakeBlockStatus(ctx context.Context) *heartbeatpb.TableSpanBlockStatus {
return d.sharedInfo.TakeBlockStatus(ctx)
}

func (d *BasicDispatcher) TakeBlockStatusWithTimeout(timeout time.Duration) (*heartbeatpb.TableSpanBlockStatus, bool) {
return d.sharedInfo.TakeBlockStatusWithTimeout(timeout)
}

func (d *BasicDispatcher) GetEventSizePerSecond() float32 {
Expand Down Expand Up @@ -254,8 +267,34 @@ func (s *SharedInfo) EnableActiveActive() bool {
return s.enableActiveActive
}

func (s *SharedInfo) GetBlockStatusesChan() chan *heartbeatpb.TableSpanBlockStatus {
return s.blockStatusesChan
func (s *SharedInfo) OfferBlockStatus(status *heartbeatpb.TableSpanBlockStatus) {
s.blockStatusBuffer.Offer(status)
}

func (s *SharedInfo) OfferDoneBlockStatus(dispatcherID common.DispatcherID, blockTs uint64, isSyncPoint bool, mode int64) {
s.blockStatusBuffer.OfferDone(dispatcherID, blockTs, isSyncPoint, mode)
}

func (s *SharedInfo) TakeBlockStatus(ctx context.Context) *heartbeatpb.TableSpanBlockStatus {
return s.blockStatusBuffer.Take(ctx)
}

func (s *SharedInfo) TakeBlockStatusWithTimeout(timeout time.Duration) (*heartbeatpb.TableSpanBlockStatus, bool) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
status := s.TakeBlockStatus(ctx)
if status == nil {
return nil, false
}
return status, true
}

func (s *SharedInfo) TryTakeBlockStatus() (*heartbeatpb.TableSpanBlockStatus, bool) {
return s.blockStatusBuffer.TryTake()
}

func (s *SharedInfo) BlockStatusLen() int {
return s.blockStatusBuffer.Len()
}

func (s *SharedInfo) GetErrCh() chan error {
Expand Down
194 changes: 194 additions & 0 deletions downstreamadapter/dispatcher/block_status_buffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
// Copyright 2026 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package dispatcher

import (
"context"
"sync"

"github.com/pingcap/ticdc/heartbeatpb"
"github.com/pingcap/ticdc/pkg/common"
)

type blockStatusKey struct {
dispatcherID common.DispatcherID
blockTs uint64
mode int64
isSyncPoint bool
}

type blockStatusQueueEntry struct {
status *heartbeatpb.TableSpanBlockStatus
doneKey *blockStatusKey
}

// BlockStatusBuffer keeps block statuses ordered while coalescing identical
// WAITING and DONE statuses that are still pending locally. Other statuses keep
// the original protobuf object and ordering.
type BlockStatusBuffer struct {
queue chan blockStatusQueueEntry

mu sync.Mutex
pendingDone map[blockStatusKey]struct{}
pendingWaiting map[blockStatusKey]struct{}
}

// NewBlockStatusBuffer creates a bounded local mailbox for dispatcher block
// statuses. The buffer keeps enqueue order while coalescing identical pending
// WAITING and DONE statuses before protobuf materialization.
func NewBlockStatusBuffer(size int) *BlockStatusBuffer {
if size <= 0 {
size = 1
}
return &BlockStatusBuffer{
queue: make(chan blockStatusQueueEntry, size),
pendingDone: make(map[blockStatusKey]struct{}),
pendingWaiting: make(map[blockStatusKey]struct{}),
}
}

func (b *BlockStatusBuffer) Offer(status *heartbeatpb.TableSpanBlockStatus) {
if status == nil {
return
}
if isWaitingBlockStatus(status) {
key := newBlockStatusKey(status)
if !b.reserveWaiting(key) {
return
}
b.queue <- blockStatusQueueEntry{status: status}
return
}
if !isDoneBlockStatus(status) {
b.queue <- blockStatusQueueEntry{status: status}
return
}

key := newBlockStatusKey(status)
if !b.reserveDone(key) {
return
}
b.queue <- blockStatusQueueEntry{doneKey: &key}
}
Comment on lines +73 to +83
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Minor: Offer()'s DONE path silently drops all State fields except key components.

When Offer() receives a DONE-staged status (Line 78-82), only the key (dispatcherID, blockTs, mode, isSyncPoint) is preserved via the doneKey placeholder; materialize() (Line 161-170) then reconstructs a minimal TableSpanBlockStatus that loses any additional State fields (BlockTables, NeedDroppedTables, NeedAddedTables, UpdatedSchemas) the caller may have populated.

In the current PR this path is unreachable — all DONE messages flow through OfferDoneBlockStatus / OfferDone which by design only carry the key fields — so behavior is preserved. However, the public Offer signature accepts any *TableSpanBlockStatus, so a future caller that hands a DONE-staged status with extras into Offer would experience silent field loss.

Consider one of:

  • Reject (or panic with a clear message) if Offer receives a DONE-staged status with non-zero auxiliary fields.
  • Store the original status pointer for DONE too (skip the placeholder optimization for the Offer entry point) and only use the placeholder form from OfferDone.
  • Add a doc comment on Offer explicitly stating that DONE statuses are reconstructed from key-only and must not carry extras.

Also applies to: 145-171


func (b *BlockStatusBuffer) OfferDone(
dispatcherID common.DispatcherID,
blockTs uint64,
isSyncPoint bool,
mode int64,
) {
key := blockStatusKey{
dispatcherID: dispatcherID,
blockTs: blockTs,
mode: mode,
isSyncPoint: isSyncPoint,
}
if !b.reserveDone(key) {
return
}
b.queue <- blockStatusQueueEntry{doneKey: &key}
}

func (b *BlockStatusBuffer) Take(ctx context.Context) *heartbeatpb.TableSpanBlockStatus {
select {
case <-ctx.Done():
return nil
case entry := <-b.queue:
return b.materialize(entry)
}
}

func (b *BlockStatusBuffer) TryTake() (*heartbeatpb.TableSpanBlockStatus, bool) {
select {
case entry := <-b.queue:
return b.materialize(entry), true
default:
return nil, false
}
}

func (b *BlockStatusBuffer) Len() int {
return len(b.queue)
}

func (b *BlockStatusBuffer) reserveWaiting(key blockStatusKey) bool {
b.mu.Lock()
defer b.mu.Unlock()
if _, ok := b.pendingWaiting[key]; ok {
return false
}
b.pendingWaiting[key] = struct{}{}
return true
}

func (b *BlockStatusBuffer) reserveDone(key blockStatusKey) bool {
b.mu.Lock()
defer b.mu.Unlock()
if _, ok := b.pendingDone[key]; ok {
return false
}
b.pendingDone[key] = struct{}{}
return true
}

func (b *BlockStatusBuffer) materialize(entry blockStatusQueueEntry) *heartbeatpb.TableSpanBlockStatus {
if entry.status != nil {
if isWaitingBlockStatus(entry.status) {
key := newBlockStatusKey(entry.status)
b.mu.Lock()
delete(b.pendingWaiting, key)
b.mu.Unlock()
}
return entry.status
}

key := *entry.doneKey
b.mu.Lock()
delete(b.pendingDone, key)
b.mu.Unlock()

return &heartbeatpb.TableSpanBlockStatus{
ID: key.dispatcherID.ToPB(),
State: &heartbeatpb.State{
IsBlocked: true,
BlockTs: key.blockTs,
IsSyncPoint: key.isSyncPoint,
Stage: heartbeatpb.BlockStage_DONE,
},
Mode: key.mode,
}
}

func newBlockStatusKey(status *heartbeatpb.TableSpanBlockStatus) blockStatusKey {
return blockStatusKey{
dispatcherID: common.NewDispatcherIDFromPB(status.ID),
blockTs: status.State.BlockTs,
mode: status.Mode,
isSyncPoint: status.State.IsSyncPoint,
}
}

func isWaitingBlockStatus(status *heartbeatpb.TableSpanBlockStatus) bool {
return status != nil &&
status.State != nil &&
status.State.IsBlocked &&
status.State.Stage == heartbeatpb.BlockStage_WAITING
}

func isDoneBlockStatus(status *heartbeatpb.TableSpanBlockStatus) bool {
return status != nil &&
status.State != nil &&
status.State.IsBlocked &&
status.State.Stage == heartbeatpb.BlockStage_DONE
}
Comment on lines +182 to +194
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The functions isWaitingBlockStatus and isDoneBlockStatus do not check if status.ID is non-nil. Since newBlockStatusKey (called in Offer and materialize) relies on status.ID to create a key, a nil status.ID will cause a panic. Although BasicDispatcher currently always sets this field, adding a defensive check here is safer for the buffer's integrity.

Loading
Loading