Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions downstreamadapter/dispatcher/basic_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ import (
"go.uber.org/zap/zapcore"
)

const (
outdatedActionDoneReportInterval = time.Second
outdatedActionDoneReportMaxEntries = 256
)

// DispatcherService defines the interface for providing dispatcher information and basic event handling.
type DispatcherService interface {
GetId() common.DispatcherID
Expand Down Expand Up @@ -217,6 +222,11 @@ type BasicDispatcher struct {

BootstrapState bootstrapState

// outdatedActionDoneAt throttles DONE reports for the same outdated action to avoid
// flooding blockStatusesChan when maintainer resends stale actions repeatedly.
outdatedActionDoneMu sync.Mutex
outdatedActionDoneAt map[BlockEventIdentifier]time.Time

// tableModeCompatibilityChecked indicates whether we have already validated the newest
// table schema is compatible with the current replication mode configuration.
// Only when the initial case or a ddl event is received, we will reset tableModeCompatibilityChecked to check the compatibility.
Expand Down Expand Up @@ -256,12 +266,39 @@ func NewBasicDispatcher(
creationPDTs: currentPDTs,
mode: mode,
BootstrapState: BootstrapFinished,
outdatedActionDoneAt: make(map[BlockEventIdentifier]time.Time),
}
dispatcher.resolvedTs.Store(startTs)

return dispatcher
}

func (d *BasicDispatcher) shouldReportOutdatedActionDone(identifier BlockEventIdentifier, now time.Time) bool {
d.outdatedActionDoneMu.Lock()
defer d.outdatedActionDoneMu.Unlock()

if last, ok := d.outdatedActionDoneAt[identifier]; ok && now.Sub(last) < outdatedActionDoneReportInterval {
return false
}

if len(d.outdatedActionDoneAt) >= outdatedActionDoneReportMaxEntries {
expireBefore := now.Add(-2 * outdatedActionDoneReportInterval)
for id, ts := range d.outdatedActionDoneAt {
if ts.Before(expireBefore) {
delete(d.outdatedActionDoneAt, id)
}
}
if len(d.outdatedActionDoneAt) >= outdatedActionDoneReportMaxEntries {
for id := range d.outdatedActionDoneAt {
delete(d.outdatedActionDoneAt, id)
}
}
}

d.outdatedActionDoneAt[identifier] = now
return true
}

// AddDMLEventsToSink filters events for special tables, registers batch wake
// callbacks, and returns true when at least one event remains to be written to
// the downstream sink.
Expand Down Expand Up @@ -836,6 +873,13 @@ func (d *BasicDispatcher) HandleDispatcherStatus(dispatcherStatus *heartbeatpb.D
}

// Step3: whether the outdate message or not, we need to return message show we have finished the event.
identifier := BlockEventIdentifier{
CommitTs: action.CommitTs,
IsSyncPoint: action.IsSyncPoint,
}
if !d.shouldReportOutdatedActionDone(identifier, time.Now()) {
return false
}
d.sharedInfo.blockStatusesChan <- &heartbeatpb.TableSpanBlockStatus{
ID: d.id.ToPB(),
State: &heartbeatpb.State{
Expand Down
52 changes: 52 additions & 0 deletions downstreamadapter/dispatcher/event_dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,58 @@ func TestDispatcherHandleEvents(t *testing.T) {
t.Run("cloud storage wake callback after batch enqueue", verifyDMLWakeCallbackStorageAfterBatchEnqueue)
}

func TestOutdatedActionDoneIsThrottled(t *testing.T) {
tableSpan, err := getCompleteTableSpan(getTestingKeyspaceID())
require.NoError(t, err)
testSink := newDispatcherTestSink(t, common.MysqlSinkType)
dispatcher := newDispatcherForTest(testSink.Sink(), tableSpan)
nodeID := node.NewID()

block := dispatcher.HandleEvents([]DispatcherEvent{
NewDispatcherEvent(&nodeID, commonEvent.ResolvedEvent{ResolvedTs: 10}),
}, func() {})
require.False(t, block)
require.Equal(t, uint64(10), dispatcher.GetResolvedTs())

staleStatus := &heartbeatpb.DispatcherStatus{
Action: &heartbeatpb.DispatcherAction{
Action: heartbeatpb.Action_Pass,
CommitTs: 5,
IsSyncPoint: false,
},
}

await := dispatcher.HandleDispatcherStatus(staleStatus)
require.False(t, await)
select {
case msg := <-dispatcher.GetBlockStatusesChan():
require.Equal(t, heartbeatpb.BlockStage_DONE, msg.State.Stage)
require.Equal(t, uint64(5), msg.State.BlockTs)
case <-time.After(time.Second):
require.FailNow(t, "expected stale action DONE")
}

await = dispatcher.HandleDispatcherStatus(staleStatus)
require.False(t, await)
select {
case msg := <-dispatcher.GetBlockStatusesChan():
require.FailNow(t, "unexpected duplicate stale action DONE", "msg=%v", msg)
case <-time.After(100 * time.Millisecond):
}

time.Sleep(outdatedActionDoneReportInterval + 50*time.Millisecond)

await = dispatcher.HandleDispatcherStatus(staleStatus)
require.False(t, await)
select {
case msg := <-dispatcher.GetBlockStatusesChan():
require.Equal(t, heartbeatpb.BlockStage_DONE, msg.State.Stage)
require.Equal(t, uint64(5), msg.State.BlockTs)
case <-time.After(time.Second):
require.FailNow(t, "expected stale action DONE after throttle interval")
}
}

func TestBlockingDDLFlushBeforeWaitingAndWriteDoesNotFlushAgain(t *testing.T) {
keyspaceID := getTestingKeyspaceID()
tableSpan := getUncompleteTableSpan()
Expand Down
15 changes: 15 additions & 0 deletions downstreamadapter/eventcollector/dispatcher_stat.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,21 @@ func (d *dispatcherStat) doReset(serverID node.ID, resetTs uint64) {
}
// remove the dispatcher from the dynamic stream
resetRequest := d.newDispatcherResetRequest(d.eventCollector.getLocalServerID().String(), resetTs, epoch)
if d.target.EnableSyncPoint() {
req := resetRequest.DispatcherRequest
log.Info("send reset dispatcher syncpoint request",
zap.Stringer("changefeedID", d.target.GetChangefeedID()),
zap.Stringer("dispatcher", d.getDispatcherID()),
zap.Stringer("eventServiceID", serverID),
zap.Uint64("epoch", epoch),
zap.Uint64("resetTs", resetTs),
zap.Uint64("checkpointTs", d.target.GetCheckpointTs()),
zap.Uint64("lastEventCommitTs", d.lastEventCommitTs.Load()),
zap.Uint64("syncPointTs", req.SyncPointTs),
zap.Uint64("syncPointIntervalSeconds", req.SyncPointInterval),
zap.Bool("skipSyncpointAtStartTs", d.target.GetSkipSyncpointAtStartTs()),
)
}
msg := messaging.NewSingleTargetMessage(serverID, messaging.EventServiceTopic, resetRequest)
d.eventCollector.enqueueMessageForSend(msg)
log.Info("send reset dispatcher request to event service",
Expand Down
4 changes: 4 additions & 0 deletions pkg/config/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ type EventServiceConfig struct {
// TODO: Remove this config after we find a proper way to fix the OOM issue.
// Ref: https://github.com/pingcap/ticdc/issues/1784
EnableRemoteEventService bool `toml:"enable-remote-event-service" json:"enable_remote_event_service"`
// EnableTwoStageSyncPoint controls whether event broker uses global two-stage
// syncpoint coordination. Disable this only for troubleshooting.
EnableTwoStageSyncPoint bool `toml:"enable-two-stage-syncpoint" json:"enable_two_stage_syncpoint"`
}

// NewDefaultEventServiceConfig return the default event service configuration
Expand All @@ -138,5 +141,6 @@ func NewDefaultEventServiceConfig() *EventServiceConfig {
DMLEventMaxRows: 256,
DMLEventMaxBytes: 1024 * 1024 * 1, // 1MB
EnableRemoteEventService: true,
EnableTwoStageSyncPoint: true,
}
}
132 changes: 124 additions & 8 deletions pkg/eventservice/dispatcher_stat.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,14 @@ func (c *changefeedStatus) getSyncPointPreparingTs() uint64 {
return c.syncPointPreparingTs.Load()
}

func (c *changefeedStatus) isDispatcherStaleForSyncpoint(dispatcher *dispatcherStat, now time.Time) bool {
lastHeartbeatTime := dispatcher.lastReceivedHeartbeatTime.Load()
if lastHeartbeatTime <= 0 {
return false
}
return now.Sub(time.Unix(lastHeartbeatTime, 0)) > scanWindowStaleDispatcherHeartbeatThreshold
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The constant scanWindowStaleDispatcherHeartbeatThreshold is used here but does not appear to be defined in the provided code. This will cause a compilation error. Please ensure it is defined in an appropriate location (e.g., in the const block at the top of this file).

}

// tryEnterSyncPointPrepare tries to enter syncpoint prepare stage for candidateTs.
// If a prepare ts already exists, the same ts is accepted, and a smaller ts can
// replace it before commit stage starts.
Expand All @@ -504,6 +512,11 @@ func (c *changefeedStatus) tryEnterSyncPointPrepare(candidateTs uint64) bool {
case c.syncPointInFlightTs.Load() != 0:
return false
case candidateTs < preparingTs:
log.Info("syncpoint prepare ts moved backward",
zap.Stringer("changefeedID", c.changefeedID),
zap.Uint64("oldPreparingTs", preparingTs),
zap.Uint64("newPreparingTs", candidateTs),
zap.Uint64("inFlightTs", c.syncPointInFlightTs.Load()))
c.syncPointPreparingTs.Store(candidateTs)
return true
default:
Expand All @@ -529,21 +542,62 @@ func (c *changefeedStatus) tryPromoteSyncPointToCommitIfReady() {
return
}

now := time.Now()
hasEligible := false
ready := true
blockingFound := false
var (
blockingDispatcherID common.DispatcherID
blockingSentResolvedTs uint64
blockingCheckpointTs uint64
blockingNextSyncPoint uint64
blockingSeq uint64
blockingEpoch uint64
)
c.dispatchers.Range(func(_ any, value any) bool {
dispatcher := value.(*atomic.Pointer[dispatcherStat]).Load()
if dispatcher == nil || dispatcher.isRemoved.Load() || dispatcher.seq.Load() == 0 {
return true
}
if c.isDispatcherStaleForSyncpoint(dispatcher, now) {
return true
}
hasEligible = true
if dispatcher.sentResolvedTs.Load() < preparingTs {
sentResolvedTs := dispatcher.sentResolvedTs.Load()
if sentResolvedTs < preparingTs {
ready = false
if !blockingFound {
blockingFound = true
blockingDispatcherID = dispatcher.id
blockingSentResolvedTs = sentResolvedTs
blockingCheckpointTs = dispatcher.checkpointTs.Load()
blockingNextSyncPoint = dispatcher.nextSyncPoint.Load()
blockingSeq = dispatcher.seq.Load()
blockingEpoch = dispatcher.epoch
}
return false
}
return true
})
if !hasEligible || !ready {
if !hasEligible {
return
}
if !ready {
fields := []zap.Field{
zap.Stringer("changefeedID", c.changefeedID),
zap.Uint64("preparingTs", preparingTs),
}
if blockingFound {
fields = append(fields,
zap.Stringer("blockingDispatcherID", blockingDispatcherID),
zap.Uint64("blockingSentResolvedTs", blockingSentResolvedTs),
zap.Uint64("blockingCheckpointTs", blockingCheckpointTs),
zap.Uint64("blockingNextSyncPointTs", blockingNextSyncPoint),
zap.Uint64("blockingSeq", blockingSeq),
zap.Uint64("blockingEpoch", blockingEpoch),
)
}
log.Debug("syncpoint prepare stage blocked by dispatcher", fields...)
return
}

Expand All @@ -563,27 +617,89 @@ func (c *changefeedStatus) tryFinishSyncPointCommitIfAllEmitted() {
return
}

now := time.Now()
hasEligible := false
canAdvance := true
blockingFound := false
blockingReason := ""
var (
blockingDispatcherID common.DispatcherID
blockingNextSyncPointTs uint64
blockingCheckpointTs uint64
blockingSentResolvedTs uint64
blockingDispatcherSeq uint64
blockingDispatcherEpoch uint64
)
c.dispatchers.Range(func(_ any, value any) bool {
dispatcher := value.(*atomic.Pointer[dispatcherStat]).Load()
if dispatcher == nil || dispatcher.isRemoved.Load() || dispatcher.seq.Load() == 0 {
return true
}
if dispatcher.nextSyncPoint.Load() <= inFlightTs {
if c.isDispatcherStaleForSyncpoint(dispatcher, now) {
return true
}
hasEligible = true
nextSyncPointTs := dispatcher.nextSyncPoint.Load()
checkpointTs := dispatcher.checkpointTs.Load()
sentResolvedTs := dispatcher.sentResolvedTs.Load()
if nextSyncPointTs <= inFlightTs {
canAdvance = false
if !blockingFound {
blockingFound = true
blockingReason = "nextSyncPointNotAdvanced"
blockingDispatcherID = dispatcher.id
blockingNextSyncPointTs = nextSyncPointTs
blockingCheckpointTs = checkpointTs
blockingSentResolvedTs = dispatcher.sentResolvedTs.Load()
blockingDispatcherSeq = dispatcher.seq.Load()
blockingDispatcherEpoch = dispatcher.epoch
}
return false
}
if dispatcher.checkpointTs.Load() <= inFlightTs {
if sentResolvedTs <= inFlightTs {
canAdvance = false
if !blockingFound {
blockingFound = true
blockingReason = "checkpointNotAdvanced"
blockingDispatcherID = dispatcher.id
blockingNextSyncPointTs = nextSyncPointTs
blockingCheckpointTs = checkpointTs
blockingSentResolvedTs = dispatcher.sentResolvedTs.Load()
blockingDispatcherSeq = dispatcher.seq.Load()
blockingDispatcherEpoch = dispatcher.epoch
}
return false
}
return true
})

if canAdvance {
c.syncPointInFlightTs.Store(0)
if c.syncPointPreparingTs.Load() == inFlightTs {
c.syncPointPreparingTs.Store(0)
if !hasEligible {
return
}

if !canAdvance {
fields := []zap.Field{
zap.Stringer("changefeedID", c.changefeedID),
zap.Uint64("inFlightTs", inFlightTs),
zap.Uint64("preparingTs", c.syncPointPreparingTs.Load()),
}
if blockingFound {
fields = append(fields,
zap.String("blockingReason", blockingReason),
zap.Stringer("blockingDispatcherID", blockingDispatcherID),
zap.Uint64("blockingNextSyncPointTs", blockingNextSyncPointTs),
zap.Uint64("blockingCheckpointTs", blockingCheckpointTs),
zap.Uint64("blockingSentResolvedTs", blockingSentResolvedTs),
zap.Uint64("blockingSeq", blockingDispatcherSeq),
zap.Uint64("blockingEpoch", blockingDispatcherEpoch),
)
}
log.Debug("syncpoint commit stage blocked by dispatcher", fields...)
return
}

c.syncPointInFlightTs.Store(0)
if c.syncPointPreparingTs.Load() == inFlightTs {
c.syncPointPreparingTs.Store(0)
}
}
Loading
Loading