Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
b00d689
revert tow stage syncpoint
asddongmen Apr 13, 2026
1600a5c
eventBroker: sent signal resolvedTs for table trigger dispatcher
asddongmen Apr 13, 2026
e38bc9d
eventservice: simplify syncpoint handling with checkpoint bounds
asddongmen Apr 13, 2026
89a702d
dispatcher,dispatchermanager: deduplicate pending done statuses
hongyunyan Apr 13, 2026
f17b2c3
remove verbose log
asddongmen Apr 14, 2026
75bae6f
Merge remote-tracking branch 'hyy/codex/done-dedupe-v1' into 0413-rem…
asddongmen Apr 14, 2026
0a9d4e8
dispatahcer: fix stuck
asddongmen Apr 14, 2026
afcf43f
dispatahcer: fix stuck 2
asddongmen Apr 14, 2026
956d0fe
eventBroker: use received resolvedTs
asddongmen Apr 14, 2026
cd4c09f
eventBroker: add debug log
asddongmen Apr 14, 2026
55d6b26
eventBroker: allow dispatcher advence when there is a ddl
asddongmen Apr 15, 2026
d5f8123
eventBroker: add log
asddongmen Apr 15, 2026
e4432d9
dedupe waitting
asddongmen Apr 16, 2026
1dcab73
Merge branch '0416-maintainer-dedupe-waiting' into 0413-remove-two-sg…
asddongmen Apr 16, 2026
baabb30
maintainer: refine
asddongmen Apr 17, 2026
73c97af
maintainer: refine
asddongmen Apr 17, 2026
4eb056b
Merge remote-tracking branch 'upstream/master' into 0413-remove-two-s…
asddongmen Apr 17, 2026
59d19d7
Merge branch '0416-maintainer-dedupe-waiting' into 0413-remove-two-sg…
asddongmen Apr 17, 2026
6d02d5b
fix make
asddongmen Apr 17, 2026
330eda5
eventBroker: refine syncpoint produce logic
asddongmen Apr 17, 2026
531f764
eventBroker: refine syncpoint produce logic 2
asddongmen Apr 17, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ tools/bin
tools/include
tools/workload/bin

.design
.codex
.issue
.vscode
.idea
Expand Down
28 changes: 5 additions & 23 deletions downstreamadapter/dispatcher/basic_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ type Dispatcher interface {
GetHeartBeatInfo(h *HeartBeatInfo)
GetComponentStatus() heartbeatpb.ComponentState
GetBlockEventStatus() *heartbeatpb.State
GetBlockStatusesChan() chan *heartbeatpb.TableSpanBlockStatus
OfferBlockStatus(status *heartbeatpb.TableSpanBlockStatus)
GetEventSizePerSecond() float32
IsTableTriggerDispatcher() bool
DealWithBlockEvent(event commonEvent.BlockEvent)
Expand Down Expand Up @@ -838,16 +838,7 @@ func (d *BasicDispatcher) HandleDispatcherStatus(dispatcherStatus *heartbeatpb.D
}

// Step3: whether the outdate message or not, we need to return message show we have finished the event.
d.sharedInfo.blockStatusesChan <- &heartbeatpb.TableSpanBlockStatus{
ID: d.id.ToPB(),
State: &heartbeatpb.State{
IsBlocked: true,
BlockTs: action.CommitTs,
IsSyncPoint: action.IsSyncPoint,
Stage: heartbeatpb.BlockStage_DONE,
},
Mode: d.GetMode(),
}
d.OfferDoneBlockStatus(action.CommitTs, action.IsSyncPoint)
}
return false
}
Expand Down Expand Up @@ -883,16 +874,7 @@ func (d *BasicDispatcher) reportBlockedEventDone(
actionCommitTs uint64,
actionIsSyncPoint bool,
) {
d.sharedInfo.blockStatusesChan <- &heartbeatpb.TableSpanBlockStatus{
ID: d.id.ToPB(),
State: &heartbeatpb.State{
IsBlocked: true,
BlockTs: actionCommitTs,
IsSyncPoint: actionIsSyncPoint,
Stage: heartbeatpb.BlockStage_DONE,
},
Mode: d.GetMode(),
}
d.OfferDoneBlockStatus(actionCommitTs, actionIsSyncPoint)
GetDispatcherStatusDynamicStream().Wake(d.id)
}

Expand Down Expand Up @@ -1044,7 +1026,7 @@ func (d *BasicDispatcher) DealWithBlockEvent(event commonEvent.BlockEvent) {
} else {
d.resendTaskMap.Set(identifier, newResendTask(message, d, nil))
}
d.sharedInfo.blockStatusesChan <- message
d.OfferBlockStatus(message)
})

// dealing with events which update schema ids
Expand Down Expand Up @@ -1168,7 +1150,7 @@ func (d *BasicDispatcher) reportBlockedEventToMaintainer(event commonEvent.Block
IsSyncPoint: event.GetType() == commonEvent.TypeSyncPointEvent,
}
d.resendTaskMap.Set(identifier, newResendTask(message, d, nil))
d.sharedInfo.blockStatusesChan <- message
d.OfferBlockStatus(message)
}

func (d *BasicDispatcher) flushBlockedEventAndReportToMaintainer(event commonEvent.BlockEvent) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ func TestDDLEventsAlwaysValidateActiveActive(t *testing.T) {
func newTestBasicDispatcher(t *testing.T, sinkType common.SinkType, enableActiveActive bool) *BasicDispatcher {
t.Helper()
statuses := make(chan TableSpanStatusWithSeq, 2)
blockStatuses := make(chan *heartbeatpb.TableSpanBlockStatus, 1)
errCh := make(chan error, 1)
sharedInfo := NewSharedInfo(
common.NewChangefeedID("test"),
Expand All @@ -143,7 +142,7 @@ func newTestBasicDispatcher(t *testing.T, sinkType common.SinkType, enableActive
nil,
false,
statuses,
blockStatuses,
1,
errCh,
)
dispatcherSink := newDispatcherTestSink(t, sinkType)
Expand Down
57 changes: 48 additions & 9 deletions downstreamadapter/dispatcher/basic_dispatcher_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package dispatcher

import (
"context"
"sync/atomic"
"time"

Expand Down Expand Up @@ -58,9 +59,9 @@ type SharedInfo struct {
// statusesChan is used to store the status of dispatchers when status changed
// and push to heartbeatRequestQueue
statusesChan chan TableSpanStatusWithSeq
// blockStatusesChan use to collector block status of ddl/sync point event to Maintainer
// shared by the event dispatcher manager
blockStatusesChan chan *heartbeatpb.TableSpanBlockStatus
// blockStatusBuffer keeps block statuses for the dispatcher manager.
// Identical WAITING and DONE statuses are coalesced while pending to reduce local memory amplification.
blockStatusBuffer *BlockStatusBuffer

// blockExecutor is used to execute block events such as DDL and sync point events asynchronously
// to avoid callback() called in handleEvents, causing deadlock in ds
Expand Down Expand Up @@ -88,7 +89,7 @@ func NewSharedInfo(
txnAtomicity *config.AtomicityLevel,
enableSplittableCheck bool,
statusesChan chan TableSpanStatusWithSeq,
blockStatusesChan chan *heartbeatpb.TableSpanBlockStatus,
blockStatusBufferSize int,
errCh chan error,
) *SharedInfo {
sharedInfo := &SharedInfo{
Expand All @@ -102,7 +103,7 @@ func NewSharedInfo(
syncPointConfig: syncPointConfig,
enableSplittableCheck: enableSplittableCheck,
statusesChan: statusesChan,
blockStatusesChan: blockStatusesChan,
blockStatusBuffer: NewBlockStatusBuffer(blockStatusBufferSize),
blockExecutor: newBlockEventExecutor(),
errCh: errCh,
metricHandleDDLHis: metrics.HandleDDLHistogram.WithLabelValues(changefeedID.Keyspace(), changefeedID.Name()),
Expand Down Expand Up @@ -219,8 +220,20 @@ func (d *BasicDispatcher) GetTxnAtomicity() config.AtomicityLevel {
return d.sharedInfo.txnAtomicity
}

func (d *BasicDispatcher) GetBlockStatusesChan() chan *heartbeatpb.TableSpanBlockStatus {
return d.sharedInfo.blockStatusesChan
func (d *BasicDispatcher) OfferBlockStatus(status *heartbeatpb.TableSpanBlockStatus) {
d.sharedInfo.OfferBlockStatus(status)
}

func (d *BasicDispatcher) OfferDoneBlockStatus(blockTs uint64, isSyncPoint bool) {
d.sharedInfo.OfferDoneBlockStatus(d.id, blockTs, isSyncPoint, d.GetMode())
}

func (d *BasicDispatcher) TakeBlockStatus(ctx context.Context) *heartbeatpb.TableSpanBlockStatus {
return d.sharedInfo.TakeBlockStatus(ctx)
}

func (d *BasicDispatcher) TakeBlockStatusWithTimeout(timeout time.Duration) (*heartbeatpb.TableSpanBlockStatus, bool) {
return d.sharedInfo.TakeBlockStatusWithTimeout(timeout)
}

func (d *BasicDispatcher) GetEventSizePerSecond() float32 {
Expand Down Expand Up @@ -254,8 +267,34 @@ func (s *SharedInfo) EnableActiveActive() bool {
return s.enableActiveActive
}

func (s *SharedInfo) GetBlockStatusesChan() chan *heartbeatpb.TableSpanBlockStatus {
return s.blockStatusesChan
func (s *SharedInfo) OfferBlockStatus(status *heartbeatpb.TableSpanBlockStatus) {
s.blockStatusBuffer.Offer(status)
}

func (s *SharedInfo) OfferDoneBlockStatus(dispatcherID common.DispatcherID, blockTs uint64, isSyncPoint bool, mode int64) {
s.blockStatusBuffer.OfferDone(dispatcherID, blockTs, isSyncPoint, mode)
}

func (s *SharedInfo) TakeBlockStatus(ctx context.Context) *heartbeatpb.TableSpanBlockStatus {
return s.blockStatusBuffer.Take(ctx)
}

func (s *SharedInfo) TakeBlockStatusWithTimeout(timeout time.Duration) (*heartbeatpb.TableSpanBlockStatus, bool) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
status := s.TakeBlockStatus(ctx)
if status == nil {
return nil, false
}
return status, true
}

func (s *SharedInfo) TryTakeBlockStatus() (*heartbeatpb.TableSpanBlockStatus, bool) {
return s.blockStatusBuffer.TryTake()
}

func (s *SharedInfo) BlockStatusLen() int {
return s.blockStatusBuffer.Len()
}

func (s *SharedInfo) GetErrCh() chan error {
Expand Down
194 changes: 194 additions & 0 deletions downstreamadapter/dispatcher/block_status_buffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
// Copyright 2026 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package dispatcher

import (
"context"
"sync"

"github.com/pingcap/ticdc/heartbeatpb"
"github.com/pingcap/ticdc/pkg/common"
)

type blockStatusKey struct {
dispatcherID common.DispatcherID
blockTs uint64
mode int64
isSyncPoint bool
}

type blockStatusQueueEntry struct {
status *heartbeatpb.TableSpanBlockStatus
doneKey *blockStatusKey
}

// BlockStatusBuffer keeps block statuses ordered while coalescing identical
// WAITING and DONE statuses that are still pending locally. Other statuses keep
// the original protobuf object and ordering.
type BlockStatusBuffer struct {
queue chan blockStatusQueueEntry

mu sync.Mutex
pendingDone map[blockStatusKey]struct{}
pendingWaiting map[blockStatusKey]struct{}
}

// NewBlockStatusBuffer creates a bounded local mailbox for dispatcher block
// statuses. The buffer keeps enqueue order while coalescing identical pending
// WAITING and DONE statuses before protobuf materialization.
func NewBlockStatusBuffer(size int) *BlockStatusBuffer {
if size <= 0 {
size = 1
}
return &BlockStatusBuffer{
queue: make(chan blockStatusQueueEntry, size),
pendingDone: make(map[blockStatusKey]struct{}),
pendingWaiting: make(map[blockStatusKey]struct{}),
}
}

func (b *BlockStatusBuffer) Offer(status *heartbeatpb.TableSpanBlockStatus) {
if status == nil {
return
}
if isWaitingBlockStatus(status) {
key := newBlockStatusKey(status)
if !b.reserveWaiting(key) {
return
}
b.queue <- blockStatusQueueEntry{status: status}
return
}
if !isDoneBlockStatus(status) {
b.queue <- blockStatusQueueEntry{status: status}
return
}

key := newBlockStatusKey(status)
if !b.reserveDone(key) {
return
}
b.queue <- blockStatusQueueEntry{doneKey: &key}
}

func (b *BlockStatusBuffer) OfferDone(
dispatcherID common.DispatcherID,
blockTs uint64,
isSyncPoint bool,
mode int64,
) {
key := blockStatusKey{
dispatcherID: dispatcherID,
blockTs: blockTs,
mode: mode,
isSyncPoint: isSyncPoint,
}
if !b.reserveDone(key) {
return
}
b.queue <- blockStatusQueueEntry{doneKey: &key}
}

func (b *BlockStatusBuffer) Take(ctx context.Context) *heartbeatpb.TableSpanBlockStatus {
select {
case <-ctx.Done():
return nil
case entry := <-b.queue:
return b.materialize(entry)
}
}

func (b *BlockStatusBuffer) TryTake() (*heartbeatpb.TableSpanBlockStatus, bool) {
select {
case entry := <-b.queue:
return b.materialize(entry), true
default:
return nil, false
}
}

func (b *BlockStatusBuffer) Len() int {
return len(b.queue)
}

func (b *BlockStatusBuffer) reserveWaiting(key blockStatusKey) bool {
b.mu.Lock()
defer b.mu.Unlock()
if _, ok := b.pendingWaiting[key]; ok {
return false
}
b.pendingWaiting[key] = struct{}{}
return true
}

func (b *BlockStatusBuffer) reserveDone(key blockStatusKey) bool {
b.mu.Lock()
defer b.mu.Unlock()
if _, ok := b.pendingDone[key]; ok {
return false
}
b.pendingDone[key] = struct{}{}
return true
}

func (b *BlockStatusBuffer) materialize(entry blockStatusQueueEntry) *heartbeatpb.TableSpanBlockStatus {
if entry.status != nil {
if isWaitingBlockStatus(entry.status) {
key := newBlockStatusKey(entry.status)
b.mu.Lock()
delete(b.pendingWaiting, key)
b.mu.Unlock()
}
return entry.status
}

key := *entry.doneKey
b.mu.Lock()
delete(b.pendingDone, key)
b.mu.Unlock()

return &heartbeatpb.TableSpanBlockStatus{
ID: key.dispatcherID.ToPB(),
State: &heartbeatpb.State{
IsBlocked: true,
BlockTs: key.blockTs,
IsSyncPoint: key.isSyncPoint,
Stage: heartbeatpb.BlockStage_DONE,
},
Mode: key.mode,
}
}

func newBlockStatusKey(status *heartbeatpb.TableSpanBlockStatus) blockStatusKey {
return blockStatusKey{
dispatcherID: common.NewDispatcherIDFromPB(status.ID),
blockTs: status.State.BlockTs,
mode: status.Mode,
isSyncPoint: status.State.IsSyncPoint,
}
}

func isWaitingBlockStatus(status *heartbeatpb.TableSpanBlockStatus) bool {
return status != nil &&
status.State != nil &&
status.State.IsBlocked &&
status.State.Stage == heartbeatpb.BlockStage_WAITING
}

func isDoneBlockStatus(status *heartbeatpb.TableSpanBlockStatus) bool {
return status != nil &&
status.State != nil &&
status.State.IsBlocked &&
status.State.Stage == heartbeatpb.BlockStage_DONE
}
Loading
Loading