From f1da3c7c5823f4aac3624ede0578381c78fc20f6 Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Thu, 9 Apr 2026 10:18:08 +0800 Subject: [PATCH 1/5] add debug log Signed-off-by: dongmen <414110582@qq.com> --- .../eventcollector/dispatcher_stat.go | 15 +++ pkg/eventservice/dispatcher_stat.go | 109 +++++++++++++-- pkg/eventservice/event_broker.go | 87 +++++++++++- pkg/eventservice/event_broker_test.go | 127 ++++++++++++++++++ 4 files changed, 326 insertions(+), 12 deletions(-) diff --git a/downstreamadapter/eventcollector/dispatcher_stat.go b/downstreamadapter/eventcollector/dispatcher_stat.go index 0b1862279e..24002c11f3 100644 --- a/downstreamadapter/eventcollector/dispatcher_stat.go +++ b/downstreamadapter/eventcollector/dispatcher_stat.go @@ -224,6 +224,21 @@ func (d *dispatcherStat) doReset(serverID node.ID, resetTs uint64) { } // remove the dispatcher from the dynamic stream resetRequest := d.newDispatcherResetRequest(d.eventCollector.getLocalServerID().String(), resetTs, epoch) + if d.target.EnableSyncPoint() { + req := resetRequest.DispatcherRequest + log.Info("send reset dispatcher syncpoint request", + zap.Stringer("changefeedID", d.target.GetChangefeedID()), + zap.Stringer("dispatcher", d.getDispatcherID()), + zap.Stringer("eventServiceID", serverID), + zap.Uint64("epoch", epoch), + zap.Uint64("resetTs", resetTs), + zap.Uint64("checkpointTs", d.target.GetCheckpointTs()), + zap.Uint64("lastEventCommitTs", d.lastEventCommitTs.Load()), + zap.Uint64("syncPointTs", req.SyncPointTs), + zap.Uint64("syncPointIntervalSeconds", req.SyncPointInterval), + zap.Bool("skipSyncpointAtStartTs", d.target.GetSkipSyncpointAtStartTs()), + ) + } msg := messaging.NewSingleTargetMessage(serverID, messaging.EventServiceTopic, resetRequest) d.eventCollector.enqueueMessageForSend(msg) log.Info("send reset dispatcher request to event service", diff --git a/pkg/eventservice/dispatcher_stat.go b/pkg/eventservice/dispatcher_stat.go index 982fd9d916..124492f98b 100644 --- a/pkg/eventservice/dispatcher_stat.go +++ b/pkg/eventservice/dispatcher_stat.go @@ -504,6 +504,11 @@ func (c *changefeedStatus) tryEnterSyncPointPrepare(candidateTs uint64) bool { case c.syncPointInFlightTs.Load() != 0: return false case candidateTs < preparingTs: + log.Info("syncpoint prepare ts moved backward", + zap.Stringer("changefeedID", c.changefeedID), + zap.Uint64("oldPreparingTs", preparingTs), + zap.Uint64("newPreparingTs", candidateTs), + zap.Uint64("inFlightTs", c.syncPointInFlightTs.Load())) c.syncPointPreparingTs.Store(candidateTs) return true default: @@ -531,19 +536,56 @@ func (c *changefeedStatus) tryPromoteSyncPointToCommitIfReady() { hasEligible := false ready := true + blockingFound := false + var ( + blockingDispatcherID common.DispatcherID + blockingSentResolvedTs uint64 + blockingCheckpointTs uint64 + blockingNextSyncPoint uint64 + blockingSeq uint64 + blockingEpoch uint64 + ) c.dispatchers.Range(func(_ any, value any) bool { dispatcher := value.(*atomic.Pointer[dispatcherStat]).Load() if dispatcher == nil || dispatcher.isRemoved.Load() || dispatcher.seq.Load() == 0 { return true } hasEligible = true - if dispatcher.sentResolvedTs.Load() < preparingTs { + sentResolvedTs := dispatcher.sentResolvedTs.Load() + if sentResolvedTs < preparingTs { ready = false + if !blockingFound { + blockingFound = true + blockingDispatcherID = dispatcher.id + blockingSentResolvedTs = sentResolvedTs + blockingCheckpointTs = dispatcher.checkpointTs.Load() + blockingNextSyncPoint = dispatcher.nextSyncPoint.Load() + blockingSeq = dispatcher.seq.Load() + blockingEpoch = dispatcher.epoch + } return false } return true }) - if !hasEligible || !ready { + if !hasEligible { + return + } + if !ready { + fields := []zap.Field{ + zap.Stringer("changefeedID", c.changefeedID), + zap.Uint64("preparingTs", preparingTs), + } + if blockingFound { + fields = append(fields, + zap.Stringer("blockingDispatcherID", blockingDispatcherID), + zap.Uint64("blockingSentResolvedTs", blockingSentResolvedTs), + zap.Uint64("blockingCheckpointTs", blockingCheckpointTs), + zap.Uint64("blockingNextSyncPointTs", blockingNextSyncPoint), + zap.Uint64("blockingSeq", blockingSeq), + zap.Uint64("blockingEpoch", blockingEpoch), + ) + } + log.Debug("syncpoint prepare stage blocked by dispatcher", fields...) return } @@ -564,26 +606,77 @@ func (c *changefeedStatus) tryFinishSyncPointCommitIfAllEmitted() { } canAdvance := true + blockingFound := false + blockingReason := "" + var ( + blockingDispatcherID common.DispatcherID + blockingNextSyncPointTs uint64 + blockingCheckpointTs uint64 + blockingSentResolvedTs uint64 + blockingDispatcherSeq uint64 + blockingDispatcherEpoch uint64 + ) c.dispatchers.Range(func(_ any, value any) bool { dispatcher := value.(*atomic.Pointer[dispatcherStat]).Load() if dispatcher == nil || dispatcher.isRemoved.Load() || dispatcher.seq.Load() == 0 { return true } - if dispatcher.nextSyncPoint.Load() <= inFlightTs { + nextSyncPointTs := dispatcher.nextSyncPoint.Load() + checkpointTs := dispatcher.checkpointTs.Load() + if nextSyncPointTs <= inFlightTs { canAdvance = false + if !blockingFound { + blockingFound = true + blockingReason = "nextSyncPointNotAdvanced" + blockingDispatcherID = dispatcher.id + blockingNextSyncPointTs = nextSyncPointTs + blockingCheckpointTs = checkpointTs + blockingSentResolvedTs = dispatcher.sentResolvedTs.Load() + blockingDispatcherSeq = dispatcher.seq.Load() + blockingDispatcherEpoch = dispatcher.epoch + } return false } - if dispatcher.checkpointTs.Load() <= inFlightTs { + if checkpointTs <= inFlightTs { canAdvance = false + if !blockingFound { + blockingFound = true + blockingReason = "checkpointNotAdvanced" + blockingDispatcherID = dispatcher.id + blockingNextSyncPointTs = nextSyncPointTs + blockingCheckpointTs = checkpointTs + blockingSentResolvedTs = dispatcher.sentResolvedTs.Load() + blockingDispatcherSeq = dispatcher.seq.Load() + blockingDispatcherEpoch = dispatcher.epoch + } return false } return true }) - if canAdvance { - c.syncPointInFlightTs.Store(0) - if c.syncPointPreparingTs.Load() == inFlightTs { - c.syncPointPreparingTs.Store(0) + if !canAdvance { + fields := []zap.Field{ + zap.Stringer("changefeedID", c.changefeedID), + zap.Uint64("inFlightTs", inFlightTs), + zap.Uint64("preparingTs", c.syncPointPreparingTs.Load()), } + if blockingFound { + fields = append(fields, + zap.String("blockingReason", blockingReason), + zap.Stringer("blockingDispatcherID", blockingDispatcherID), + zap.Uint64("blockingNextSyncPointTs", blockingNextSyncPointTs), + zap.Uint64("blockingCheckpointTs", blockingCheckpointTs), + zap.Uint64("blockingSentResolvedTs", blockingSentResolvedTs), + zap.Uint64("blockingSeq", blockingDispatcherSeq), + zap.Uint64("blockingEpoch", blockingDispatcherEpoch), + ) + } + log.Debug("syncpoint commit stage blocked by dispatcher", fields...) + return + } + + c.syncPointInFlightTs.Store(0) + if c.syncPointPreparingTs.Load() == inFlightTs { + c.syncPointPreparingTs.Store(0) } } diff --git a/pkg/eventservice/event_broker.go b/pkg/eventservice/event_broker.go index ecd03fb3bf..a6b2041bd5 100644 --- a/pkg/eventservice/event_broker.go +++ b/pkg/eventservice/event_broker.go @@ -497,9 +497,21 @@ func (c *eventBroker) nudgeSyncPointCommitIfNeeded(d *dispatcherStat) bool { if !c.shouldNudgeSyncPointCommit(d) { return false } + commitTs := d.nextSyncPoint.Load() + watermark := d.sentResolvedTs.Load() + preparingTs := d.changefeedStat.getSyncPointPreparingTs() + inFlightTs := d.changefeedStat.syncPointInFlightTs.Load() + log.Debug("nudge syncpoint commit with resolved event", + zap.Stringer("changefeedID", d.changefeedStat.changefeedID), + zap.Stringer("dispatcherID", d.id), + zap.Uint64("watermark", watermark), + zap.Uint64("nextSyncPointTs", commitTs), + zap.Uint64("preparingTs", preparingTs), + zap.Uint64("inFlightTs", inFlightTs), + zap.Bool("inCommitStage", d.changefeedStat.isSyncPointInCommitStage(commitTs))) // Resend resolved-ts at current watermark to trigger syncpoint emission in commit stage, // even when there is no fresh upstream event to drive a new scan. - c.sendResolvedTs(d, d.sentResolvedTs.Load()) + c.sendResolvedTs(d, watermark) return true } @@ -722,7 +734,30 @@ func (c *eventBroker) getScanTaskDataRange(task scanTask) (bool, common.DataRang if dataRange.CommitTsEnd <= dataRange.CommitTsStart { updateMetricEventServiceSkipResolvedTsCount(task.info.GetMode()) - if c.nudgeSyncPointCommitIfNeeded(task) { + nextSyncPointTs := uint64(0) + preparingTs := uint64(0) + inFlightTs := uint64(0) + if task.enableSyncPoint { + nextSyncPointTs = task.nextSyncPoint.Load() + preparingTs = task.changefeedStat.getSyncPointPreparingTs() + inFlightTs = task.changefeedStat.syncPointInFlightTs.Load() + } + nudged := c.nudgeSyncPointCommitIfNeeded(task) + log.Debug("scan range empty after capping", + zap.Stringer("changefeedID", task.changefeedStat.changefeedID), + zap.Stringer("dispatcherID", task.id), + zap.Uint64("startTs", dataRange.CommitTsStart), + zap.Uint64("endTs", dataRange.CommitTsEnd), + zap.Uint64("scanMaxTs", scanMaxTs), + zap.Uint64("ddlResolvedTs", ddlState.ResolvedTs), + zap.Uint64("ddlCommitTs", ddlState.MaxEventCommitTs), + zap.Bool("hasPendingDDLEventInCurrentRange", hasPendingDDLEventInCurrentRange), + zap.Uint64("nextSyncPointTs", nextSyncPointTs), + zap.Uint64("preparingTs", preparingTs), + zap.Uint64("inFlightTs", inFlightTs), + zap.Bool("nudgedSyncPointCommit", nudged), + ) + if nudged { return false, common.DataRange{} } // Scan range can become empty after applying capping (for example, scan window). @@ -756,15 +791,41 @@ func (c *eventBroker) capCommitTsEndBySyncPoint(task scanTask, commitTsEnd uint6 if !task.enableSyncPoint { return commitTsEnd } + originalCommitTsEnd := commitTsEnd c.fastForwardSyncPointIfNeeded(task) + + cappedByNextSyncPoint := false nextSyncPoint := task.nextSyncPoint.Load() if nextSyncPoint > 0 && commitTsEnd > nextSyncPoint { task.changefeedStat.tryEnterSyncPointPrepare(nextSyncPoint) commitTsEnd = nextSyncPoint + cappedByNextSyncPoint = true } + + cappedByPreparingTs := false preparingTs := task.changefeedStat.getSyncPointPreparingTs() - if preparingTs > 0 && !task.changefeedStat.isSyncPointInCommitStage(preparingTs) { - commitTsEnd = min(commitTsEnd, preparingTs) + inCommitStage := false + if preparingTs > 0 { + inCommitStage = task.changefeedStat.isSyncPointInCommitStage(preparingTs) + if !inCommitStage { + newCommitTsEnd := min(commitTsEnd, preparingTs) + cappedByPreparingTs = newCommitTsEnd < commitTsEnd + commitTsEnd = newCommitTsEnd + } + } + + if commitTsEnd < originalCommitTsEnd { + log.Debug("scan range commitTsEnd capped by syncpoint", + zap.Stringer("changefeedID", task.changefeedStat.changefeedID), + zap.Stringer("dispatcherID", task.id), + zap.Uint64("oldCommitTsEnd", originalCommitTsEnd), + zap.Uint64("newCommitTsEnd", commitTsEnd), + zap.Bool("cappedByNextSyncPoint", cappedByNextSyncPoint), + zap.Bool("cappedByPreparingTs", cappedByPreparingTs), + zap.Uint64("nextSyncPointTs", nextSyncPoint), + zap.Uint64("preparingTs", preparingTs), + zap.Bool("inCommitStage", inCommitStage), + ) } return commitTsEnd } @@ -1536,6 +1597,24 @@ func (c *eventBroker) resetDispatcher(dispatcherInfo DispatcherInfo) error { newStat := newDispatcherStat(dispatcherInfo, uint64(len(c.taskChan)), uint64(len(c.messageCh)), tableInfo, status) newStat.copyStatistics(oldStat) + if newStat.enableSyncPoint { + oldNextSyncPoint := oldStat.nextSyncPoint.Load() + newNextSyncPoint := newStat.nextSyncPoint.Load() + if newNextSyncPoint > 0 && oldNextSyncPoint > 0 && newNextSyncPoint < oldNextSyncPoint { + log.Warn("dispatcher syncpoint moved backward after reset", + zap.Stringer("changefeedID", changefeedID), + zap.Stringer("dispatcherID", dispatcherID), + zap.Uint64("oldEpoch", oldStat.epoch), + zap.Uint64("newEpoch", newStat.epoch), + zap.Uint64("oldStartTs", oldStat.info.GetStartTs()), + zap.Uint64("newStartTs", dispatcherInfo.GetStartTs()), + zap.Uint64("oldNextSyncPointTs", oldNextSyncPoint), + zap.Uint64("newNextSyncPointTs", newNextSyncPoint), + zap.Uint64("preparingTs", status.getSyncPointPreparingTs()), + zap.Uint64("inFlightTs", status.syncPointInFlightTs.Load()), + ) + } + } for { if statPtr.CompareAndSwap(oldStat, newStat) { diff --git a/pkg/eventservice/event_broker_test.go b/pkg/eventservice/event_broker_test.go index 637bee5ec2..1615a55795 100644 --- a/pkg/eventservice/event_broker_test.go +++ b/pkg/eventservice/event_broker_test.go @@ -16,6 +16,7 @@ package eventservice import ( "context" "errors" + "io" "sync" "testing" "time" @@ -35,6 +36,8 @@ import ( "github.com/tikv/client-go/v2/oracle" "go.uber.org/atomic" "go.uber.org/zap" + "go.uber.org/zap/zapcore" + "go.uber.org/zap/zaptest/observer" ) const testTableTriggerKeyspaceID uint32 = 1 @@ -1662,3 +1665,127 @@ func TestResetDispatcherCopiesRuntimeStatistics(t *testing.T) { require.Equal(t, int64(8192), newStat.maxScanLimitInBytes.Load()) require.Equal(t, int64(2048), newStat.lastScanBytes.Load()) } + +func TestLogSyncPointStageDoesNotPrintWhenSyncPointDisabledEvenIfDispatcherLags(t *testing.T) { + observedLogs := installObservedDebugLogger(t) + + broker, _, _, _ := newEventBrokerForTest() + broker.close() + + info := newMockDispatcherInfoForTest(t) + info.enableSyncPoint = false + info.syncPointInterval = 0 + + changefeed := broker.getOrSetChangefeedStatus(info.GetChangefeedID(), info.GetSyncPointInterval()) + dispatcher := newDispatcherStat(info, 1, 1, nil, changefeed) + dispatcher.seq.Store(1) + dispatcher.sentResolvedTs.Store(100) + dispatcher.checkpointTs.Store(90) + + dispatcherPtr := &atomic.Pointer[dispatcherStat]{} + dispatcherPtr.Store(dispatcher) + changefeed.addDispatcher(info.GetID(), dispatcherPtr) + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Millisecond) + defer cancel() + _ = broker.logSyncPointStage(ctx, 5*time.Millisecond) + + require.Equal(t, 0, observedLogs.FilterMessage("syncpoint stage snapshot").Len()) +} + +func TestLogSyncPointStagePrintsPrepareSnapshotForLaggingDispatcher(t *testing.T) { + observedLogs := installObservedDebugLogger(t) + + broker, _, _, _ := newEventBrokerForTest() + broker.close() + + base := time.Unix(0, 0) + ts5 := oracle.GoTimeToTS(base.Add(5 * time.Second)) + ts8 := oracle.GoTimeToTS(base.Add(8 * time.Second)) + ts10 := oracle.GoTimeToTS(base.Add(10 * time.Second)) + + fastInfo := newMockDispatcherInfoForTest(t) + fastInfo.enableSyncPoint = true + fastInfo.syncPointInterval = 10 * time.Second + fastInfo.startTs = ts5 + fastInfo.nextSyncPoint = ts10 + + slowInfo := newMockDispatcherInfoForTest(t) + slowInfo.enableSyncPoint = true + slowInfo.syncPointInterval = fastInfo.syncPointInterval + slowInfo.startTs = ts5 + slowInfo.nextSyncPoint = ts10 + slowInfo.changefeedID = fastInfo.changefeedID + + changefeed := broker.getOrSetChangefeedStatus(fastInfo.GetChangefeedID(), fastInfo.GetSyncPointInterval()) + changefeed.syncPointPreparingTs.Store(ts10) + + fastDispatcher := newDispatcherStat(fastInfo, 1, 1, nil, changefeed) + fastDispatcher.seq.Store(1) + fastDispatcher.sentResolvedTs.Store(ts10) + fastDispatcher.checkpointTs.Store(ts10) + + slowDispatcher := newDispatcherStat(slowInfo, 1, 1, nil, changefeed) + slowDispatcher.seq.Store(1) + slowDispatcher.sentResolvedTs.Store(ts8) + slowDispatcher.checkpointTs.Store(ts8) + + fastPtr := &atomic.Pointer[dispatcherStat]{} + fastPtr.Store(fastDispatcher) + changefeed.addDispatcher(fastInfo.GetID(), fastPtr) + + slowPtr := &atomic.Pointer[dispatcherStat]{} + slowPtr.Store(slowDispatcher) + changefeed.addDispatcher(slowInfo.GetID(), slowPtr) + + ctx, cancel := context.WithTimeout(context.Background(), 40*time.Millisecond) + defer cancel() + _ = broker.logSyncPointStage(ctx, 5*time.Millisecond) + + snapshotLogs := observedLogs.FilterMessage("syncpoint stage snapshot").All() + require.NotEmpty(t, snapshotLogs) + + last := snapshotLogs[len(snapshotLogs)-1] + require.Equal(t, "prepare", requireLogStringField(t, last.Context, "stage")) + require.Equal(t, int64(ts10), requireLogInt64Field(t, last.Context, "preparingTs")) + require.Equal(t, int64(2), requireLogInt64Field(t, last.Context, "dispatchersActive")) + require.Equal(t, int64(1), requireLogInt64Field(t, last.Context, "prepareReadyCount")) + require.Equal(t, int64(1), requireLogInt64Field(t, last.Context, "prepareWaitingCount")) + require.Equal(t, int64(ts8), requireLogInt64Field(t, last.Context, "prepareBlockingSentResolvedTs")) +} + +func installObservedDebugLogger(t *testing.T) *observer.ObservedLogs { + t.Helper() + core, observedLogs := observer.New(zap.DebugLevel) + logger := zap.New(core) + restore := log.ReplaceGlobals(logger, &log.ZapProperties{ + Core: core, + Syncer: zapcore.AddSync(io.Discard), + ErrSyncer: zapcore.AddSync(io.Discard), + Level: zap.NewAtomicLevelAt(zap.DebugLevel), + }) + t.Cleanup(restore) + return observedLogs +} + +func requireLogInt64Field(t *testing.T, fields []zapcore.Field, key string) int64 { + t.Helper() + for _, field := range fields { + if field.Key == key { + return field.Integer + } + } + require.FailNowf(t, "missing log field", "field %s not found", key) + return 0 +} + +func requireLogStringField(t *testing.T, fields []zapcore.Field, key string) string { + t.Helper() + for _, field := range fields { + if field.Key == key { + return field.String + } + } + require.FailNowf(t, "missing log field", "field %s not found", key) + return "" +} From dcf78d8c6600b3401eae1fff859f96d7dd8968b0 Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Fri, 10 Apr 2026 12:06:47 +0800 Subject: [PATCH 2/5] fix 1 Signed-off-by: dongmen <414110582@qq.com> --- pkg/eventservice/dispatcher_stat.go | 22 +++++++++ pkg/eventservice/dispatcher_stat_test.go | 60 ++++++++++++++++++++++++ 2 files changed, 82 insertions(+) diff --git a/pkg/eventservice/dispatcher_stat.go b/pkg/eventservice/dispatcher_stat.go index 124492f98b..ab4ad50050 100644 --- a/pkg/eventservice/dispatcher_stat.go +++ b/pkg/eventservice/dispatcher_stat.go @@ -484,6 +484,14 @@ func (c *changefeedStatus) getSyncPointPreparingTs() uint64 { return c.syncPointPreparingTs.Load() } +func (c *changefeedStatus) isDispatcherStaleForSyncpoint(dispatcher *dispatcherStat, now time.Time) bool { + lastHeartbeatTime := dispatcher.lastReceivedHeartbeatTime.Load() + if lastHeartbeatTime <= 0 { + return false + } + return now.Sub(time.Unix(lastHeartbeatTime, 0)) > scanWindowStaleDispatcherHeartbeatThreshold +} + // tryEnterSyncPointPrepare tries to enter syncpoint prepare stage for candidateTs. // If a prepare ts already exists, the same ts is accepted, and a smaller ts can // replace it before commit stage starts. @@ -534,6 +542,7 @@ func (c *changefeedStatus) tryPromoteSyncPointToCommitIfReady() { return } + now := time.Now() hasEligible := false ready := true blockingFound := false @@ -550,6 +559,9 @@ func (c *changefeedStatus) tryPromoteSyncPointToCommitIfReady() { if dispatcher == nil || dispatcher.isRemoved.Load() || dispatcher.seq.Load() == 0 { return true } + if c.isDispatcherStaleForSyncpoint(dispatcher, now) { + return true + } hasEligible = true sentResolvedTs := dispatcher.sentResolvedTs.Load() if sentResolvedTs < preparingTs { @@ -605,6 +617,8 @@ func (c *changefeedStatus) tryFinishSyncPointCommitIfAllEmitted() { return } + now := time.Now() + hasEligible := false canAdvance := true blockingFound := false blockingReason := "" @@ -621,6 +635,10 @@ func (c *changefeedStatus) tryFinishSyncPointCommitIfAllEmitted() { if dispatcher == nil || dispatcher.isRemoved.Load() || dispatcher.seq.Load() == 0 { return true } + if c.isDispatcherStaleForSyncpoint(dispatcher, now) { + return true + } + hasEligible = true nextSyncPointTs := dispatcher.nextSyncPoint.Load() checkpointTs := dispatcher.checkpointTs.Load() if nextSyncPointTs <= inFlightTs { @@ -654,6 +672,10 @@ func (c *changefeedStatus) tryFinishSyncPointCommitIfAllEmitted() { return true }) + if !hasEligible { + return + } + if !canAdvance { fields := []zap.Field{ zap.Stringer("changefeedID", c.changefeedID), diff --git a/pkg/eventservice/dispatcher_stat_test.go b/pkg/eventservice/dispatcher_stat_test.go index 0b9230fd5c..a42bd643c9 100644 --- a/pkg/eventservice/dispatcher_stat_test.go +++ b/pkg/eventservice/dispatcher_stat_test.go @@ -204,3 +204,63 @@ func TestSyncPointPrepareCannotLowerAfterPromote(t *testing.T) { require.Equal(t, preparingTs, status.syncPointPreparingTs.Load()) require.Equal(t, preparingTs, status.syncPointInFlightTs.Load()) } + +func TestSyncPointPreparePromotionSkipsStaleDispatcher(t *testing.T) { + t.Parallel() + + status := newChangefeedStatus(common.NewChangefeedID4Test("default", "syncpoint-stale-prepare"), 10*time.Second) + preparingTs := uint64(200) + status.syncPointPreparingTs.Store(preparingTs) + + fresh := &dispatcherStat{} + fresh.seq.Store(1) + fresh.sentResolvedTs.Store(preparingTs) + fresh.lastReceivedHeartbeatTime.Store(time.Now().Unix()) + freshPtr := &atomic.Pointer[dispatcherStat]{} + freshPtr.Store(fresh) + status.addDispatcher(common.DispatcherID{Low: 1, High: 1}, freshPtr) + + stale := &dispatcherStat{} + stale.seq.Store(1) + stale.sentResolvedTs.Store(preparingTs - 1) + stale.lastReceivedHeartbeatTime.Store(time.Now().Add(-scanWindowStaleDispatcherHeartbeatThreshold - time.Second).Unix()) + stalePtr := &atomic.Pointer[dispatcherStat]{} + stalePtr.Store(stale) + status.addDispatcher(common.DispatcherID{Low: 2, High: 2}, stalePtr) + + status.tryPromoteSyncPointToCommitIfReady() + require.True(t, status.isSyncPointInCommitStage(preparingTs)) +} + +func TestSyncPointCommitFinishSkipsStaleDispatcher(t *testing.T) { + t.Parallel() + + status := newChangefeedStatus(common.NewChangefeedID4Test("default", "syncpoint-stale-commit"), 10*time.Second) + inFlightTs := uint64(300) + status.syncPointPreparingTs.Store(inFlightTs) + status.syncPointInFlightTs.Store(inFlightTs) + + fresh := &dispatcherStat{} + fresh.seq.Store(1) + fresh.nextSyncPoint.Store(inFlightTs + 100) + fresh.checkpointTs.Store(inFlightTs + 100) + fresh.sentResolvedTs.Store(inFlightTs + 100) + fresh.lastReceivedHeartbeatTime.Store(time.Now().Unix()) + freshPtr := &atomic.Pointer[dispatcherStat]{} + freshPtr.Store(fresh) + status.addDispatcher(common.DispatcherID{Low: 3, High: 3}, freshPtr) + + stale := &dispatcherStat{} + stale.seq.Store(1) + stale.nextSyncPoint.Store(inFlightTs) + stale.checkpointTs.Store(inFlightTs) + stale.sentResolvedTs.Store(inFlightTs) + stale.lastReceivedHeartbeatTime.Store(time.Now().Add(-scanWindowStaleDispatcherHeartbeatThreshold - time.Second).Unix()) + stalePtr := &atomic.Pointer[dispatcherStat]{} + stalePtr.Store(stale) + status.addDispatcher(common.DispatcherID{Low: 4, High: 4}, stalePtr) + + status.tryFinishSyncPointCommitIfAllEmitted() + require.Equal(t, uint64(0), status.syncPointInFlightTs.Load()) + require.Equal(t, uint64(0), status.syncPointPreparingTs.Load()) +} From 7ff4a599da4803533a447fc2c1d233a11a900d01 Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Mon, 13 Apr 2026 09:33:26 +0800 Subject: [PATCH 3/5] add two stage syncpoint switch Signed-off-by: dongmen <414110582@qq.com> --- pkg/config/debug.go | 4 ++ pkg/eventservice/dispatcher_stat.go | 12 ++--- pkg/eventservice/event_broker.go | 75 ++++++++++++++++++++++++--- pkg/eventservice/event_broker_test.go | 36 +++++++++++++ 4 files changed, 115 insertions(+), 12 deletions(-) diff --git a/pkg/config/debug.go b/pkg/config/debug.go index 148e927430..be0d0379fa 100644 --- a/pkg/config/debug.go +++ b/pkg/config/debug.go @@ -128,6 +128,9 @@ type EventServiceConfig struct { // TODO: Remove this config after we find a proper way to fix the OOM issue. // Ref: https://github.com/pingcap/ticdc/issues/1784 EnableRemoteEventService bool `toml:"enable-remote-event-service" json:"enable_remote_event_service"` + // EnableTwoStageSyncPoint controls whether event broker uses global two-stage + // syncpoint coordination. Disable this only for troubleshooting. + EnableTwoStageSyncPoint bool `toml:"enable-two-stage-syncpoint" json:"enable_two_stage_syncpoint"` } // NewDefaultEventServiceConfig return the default event service configuration @@ -138,5 +141,6 @@ func NewDefaultEventServiceConfig() *EventServiceConfig { DMLEventMaxRows: 256, DMLEventMaxBytes: 1024 * 1024 * 1, // 1MB EnableRemoteEventService: true, + EnableTwoStageSyncPoint: true, } } diff --git a/pkg/eventservice/dispatcher_stat.go b/pkg/eventservice/dispatcher_stat.go index ab4ad50050..0682561f51 100644 --- a/pkg/eventservice/dispatcher_stat.go +++ b/pkg/eventservice/dispatcher_stat.go @@ -623,12 +623,12 @@ func (c *changefeedStatus) tryFinishSyncPointCommitIfAllEmitted() { blockingFound := false blockingReason := "" var ( - blockingDispatcherID common.DispatcherID - blockingNextSyncPointTs uint64 - blockingCheckpointTs uint64 - blockingSentResolvedTs uint64 - blockingDispatcherSeq uint64 - blockingDispatcherEpoch uint64 + blockingDispatcherID common.DispatcherID + blockingNextSyncPointTs uint64 + blockingCheckpointTs uint64 + blockingSentResolvedTs uint64 + blockingDispatcherSeq uint64 + blockingDispatcherEpoch uint64 ) c.dispatchers.Range(func(_ any, value any) bool { dispatcher := value.(*atomic.Pointer[dispatcherStat]).Load() diff --git a/pkg/eventservice/event_broker.go b/pkg/eventservice/event_broker.go index a6b2041bd5..2dde50e2bb 100644 --- a/pkg/eventservice/event_broker.go +++ b/pkg/eventservice/event_broker.go @@ -95,6 +95,9 @@ type eventBroker struct { scanRateLimiter *rate.Limiter scanLimitInBytes uint64 + // enableTwoStageSyncPoint controls whether event broker uses global + // prepare/commit syncpoint coordination. + enableTwoStageSyncPoint bool } func newEventBroker( @@ -143,6 +146,7 @@ func newEventBroker( g: g, scanRateLimiter: rate.NewLimiter(rate.Limit(scanLimitInBytes), scanLimitInBytes), scanLimitInBytes: uint64(scanLimitInBytes), + enableTwoStageSyncPoint: eventServiceConfig.EnableTwoStageSyncPoint, } // Initialize metrics collector @@ -187,13 +191,16 @@ func newEventBroker( return c.refreshMinSentResolvedTs(ctx) }) - g.Go(func() error { - return c.logSyncPointStage(ctx, defaultLogSyncPointStageInterval) - }) + if c.enableTwoStageSyncPoint { + g.Go(func() error { + return c.logSyncPointStage(ctx, defaultLogSyncPointStageInterval) + }) + } log.Info("new event broker created", zap.Uint64("id", id), - zap.Uint64("scanLimitInBytes", c.scanLimitInBytes)) + zap.Uint64("scanLimitInBytes", c.scanLimitInBytes), + zap.Bool("enableTwoStageSyncPoint", c.enableTwoStageSyncPoint)) return c } @@ -456,11 +463,17 @@ func (c *eventBroker) refreshMinSentResolvedTs(ctx context.Context) error { } func (c *eventBroker) advanceSyncPointState(status *changefeedStatus) { + if !c.enableTwoStageSyncPoint { + return + } status.tryPromoteSyncPointToCommitIfReady() status.tryFinishSyncPointCommitIfAllEmitted() } func (c *eventBroker) nudgeSyncPointCommitDispatchers(status *changefeedStatus) { + if !c.enableTwoStageSyncPoint { + return + } inFlightTs := status.syncPointInFlightTs.Load() if inFlightTs == 0 { return @@ -480,7 +493,7 @@ func (c *eventBroker) nudgeSyncPointCommitDispatchers(status *changefeedStatus) } func (c *eventBroker) shouldNudgeSyncPointCommit(d *dispatcherStat) bool { - if d == nil || d.isRemoved.Load() || !d.enableSyncPoint || d.seq.Load() == 0 { + if !c.enableTwoStageSyncPoint || d == nil || d.isRemoved.Load() || !d.enableSyncPoint || d.seq.Load() == 0 { return false } c.fastForwardSyncPointIfNeeded(d) @@ -792,6 +805,24 @@ func (c *eventBroker) capCommitTsEndBySyncPoint(task scanTask, commitTsEnd uint6 return commitTsEnd } originalCommitTsEnd := commitTsEnd + if !c.enableTwoStageSyncPoint { + nextSyncPoint := task.nextSyncPoint.Load() + if nextSyncPoint > 0 && commitTsEnd > nextSyncPoint { + commitTsEnd = nextSyncPoint + log.Debug("scan range commitTsEnd capped by syncpoint", + zap.Stringer("changefeedID", task.changefeedStat.changefeedID), + zap.Stringer("dispatcherID", task.id), + zap.Uint64("oldCommitTsEnd", originalCommitTsEnd), + zap.Uint64("newCommitTsEnd", commitTsEnd), + zap.Bool("cappedByNextSyncPoint", true), + zap.Bool("cappedByPreparingTs", false), + zap.Uint64("nextSyncPointTs", nextSyncPoint), + zap.Uint64("preparingTs", 0), + zap.Bool("inCommitStage", false), + ) + } + return commitTsEnd + } c.fastForwardSyncPointIfNeeded(task) cappedByNextSyncPoint := false @@ -915,10 +946,13 @@ func (c *eventBroker) sendHandshakeIfNeed(task scanTask) { // hasSyncPointEventBeforeTs checks if there is any sync point events before the given ts. func (c *eventBroker) hasSyncPointEventsBeforeTs(ts uint64, d *dispatcherStat) bool { - c.fastForwardSyncPointIfNeeded(d) if !d.enableSyncPoint { return false } + if !c.enableTwoStageSyncPoint { + return ts > d.nextSyncPoint.Load() + } + c.fastForwardSyncPointIfNeeded(d) nextSyncPoint := d.nextSyncPoint.Load() if ts <= nextSyncPoint { return false @@ -931,6 +965,32 @@ func (c *eventBroker) hasSyncPointEventsBeforeTs(ts uint64, d *dispatcherStat) b // We need call this function every time we send a event(whether dml/ddl/resolvedTs), // thus to ensure the sync point event is in correct order for each dispatcher. func (c *eventBroker) emitSyncPointEventIfNeeded(ts uint64, d *dispatcherStat, remoteID node.ID) { + if !d.enableSyncPoint { + return + } + if !c.enableTwoStageSyncPoint { + for { + commitTs := d.nextSyncPoint.Load() + if commitTs == 0 || ts < commitTs { + return + } + nextSyncPoint := oracle.GoTimeToTS(oracle.GetTimeFromTS(commitTs).Add(d.syncPointInterval)) + // Advance nextSyncPoint with CAS so concurrent send paths cannot emit the same + // syncpoint twice or move nextSyncPoint backward. + if !d.nextSyncPoint.CompareAndSwap(commitTs, nextSyncPoint) { + continue + } + + e := event.NewSyncPointEvent(d.id, commitTs, d.seq.Add(1), d.epoch) + log.Debug("send syncpoint event to dispatcher", + zap.Stringer("changefeedID", d.changefeedStat.changefeedID), + zap.Stringer("dispatcherID", d.id), zap.Int64("tableID", d.info.GetTableSpan().GetTableID()), + zap.Uint64("commitTs", e.GetCommitTs()), zap.Uint64("seq", e.GetSeq())) + + syncPointEvent := newWrapSyncPointEvent(remoteID, e) + c.getMessageCh(d.messageWorkerIndex, common.IsRedoMode(d.info.GetMode())) <- syncPointEvent + } + } c.fastForwardSyncPointIfNeeded(d) for d.enableSyncPoint { commitTs := d.nextSyncPoint.Load() @@ -965,6 +1025,9 @@ func (c *eventBroker) emitSyncPointEventIfNeeded(ts uint64, d *dispatcherStat, r } func (c *eventBroker) fastForwardSyncPointIfNeeded(d *dispatcherStat) { + if !c.enableTwoStageSyncPoint { + return + } c.fastForwardSyncPointToInFlightIfNeeded(d) } diff --git a/pkg/eventservice/event_broker_test.go b/pkg/eventservice/event_broker_test.go index 1615a55795..3cd9af5a60 100644 --- a/pkg/eventservice/event_broker_test.go +++ b/pkg/eventservice/event_broker_test.go @@ -423,6 +423,42 @@ func TestNudgeSyncPointCommitIfNeededFastForwardsStaleSyncPoint(t *testing.T) { require.Equal(t, ts30, disp.nextSyncPoint.Load()) } +func TestEmitSyncPointEventIfNeededWithoutTwoStageDoesNotWaitCommitStage(t *testing.T) { + broker, _, _, _ := newEventBrokerForTest() + // Close the broker, so we can catch all messages in the test. + broker.close() + + info := newMockDispatcherInfoForTest(t) + info.epoch = 1 + info.enableSyncPoint = true + info.syncPointInterval = 10 * time.Second + + ts5 := oracle.GoTimeToTS(time.Unix(0, 0).Add(5 * time.Second)) + ts10 := oracle.GoTimeToTS(time.Unix(0, 0).Add(10 * time.Second)) + ts20 := oracle.GoTimeToTS(time.Unix(0, 0).Add(20 * time.Second)) + + info.startTs = ts5 + info.nextSyncPoint = ts10 + + changefeedStatus := broker.getOrSetChangefeedStatus(info.GetChangefeedID(), info.GetSyncPointInterval()) + disp := newDispatcherStat(info, 1, 1, nil, changefeedStatus) + disp.setHandshaked() + + // Turn off two-stage syncpoint and verify syncpoint can be emitted at boundary ts + // without entering commit stage. + broker.enableTwoStageSyncPoint = false + require.False(t, changefeedStatus.isSyncPointInCommitStage(ts10)) + + broker.emitSyncPointEventIfNeeded(ts10, disp, node.ID(info.GetServerID())) + + first := <-broker.messageCh[disp.messageWorkerIndex] + require.Equal(t, event.TypeSyncPointEvent, first.msgType) + syncPointEvent, ok := first.e.(*event.SyncPointEvent) + require.True(t, ok) + require.Equal(t, ts10, syncPointEvent.GetCommitTs()) + require.Equal(t, ts20, disp.nextSyncPoint.Load()) +} + func TestNudgeSyncPointCommitDispatchersPushesTask(t *testing.T) { broker, _, _, _ := newEventBrokerForTest() // Close the broker, so we can inspect task queue. From a5fce16f2a346c9416aa7aebb93d201572f79a37 Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Mon, 13 Apr 2026 09:48:43 +0800 Subject: [PATCH 4/5] usd sentResolvedTs as block point Signed-off-by: dongmen <414110582@qq.com> --- pkg/eventservice/dispatcher_stat.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/eventservice/dispatcher_stat.go b/pkg/eventservice/dispatcher_stat.go index 0682561f51..7bfcd24499 100644 --- a/pkg/eventservice/dispatcher_stat.go +++ b/pkg/eventservice/dispatcher_stat.go @@ -641,6 +641,7 @@ func (c *changefeedStatus) tryFinishSyncPointCommitIfAllEmitted() { hasEligible = true nextSyncPointTs := dispatcher.nextSyncPoint.Load() checkpointTs := dispatcher.checkpointTs.Load() + sentResolvedTs := dispatcher.sentResolvedTs.Load() if nextSyncPointTs <= inFlightTs { canAdvance = false if !blockingFound { @@ -655,7 +656,7 @@ func (c *changefeedStatus) tryFinishSyncPointCommitIfAllEmitted() { } return false } - if checkpointTs <= inFlightTs { + if sentResolvedTs <= inFlightTs { canAdvance = false if !blockingFound { blockingFound = true From 612b9ede9f9efa15fcf94d5f00b4f8b08b94e6e6 Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Mon, 13 Apr 2026 10:52:58 +0800 Subject: [PATCH 5/5] dispatcher: limit resend message count Signed-off-by: dongmen <414110582@qq.com> --- .../dispatcher/basic_dispatcher.go | 44 ++++++++++++++++ .../dispatcher/event_dispatcher_test.go | 52 +++++++++++++++++++ 2 files changed, 96 insertions(+) diff --git a/downstreamadapter/dispatcher/basic_dispatcher.go b/downstreamadapter/dispatcher/basic_dispatcher.go index 0c31275a4c..dc1402f8fe 100644 --- a/downstreamadapter/dispatcher/basic_dispatcher.go +++ b/downstreamadapter/dispatcher/basic_dispatcher.go @@ -35,6 +35,11 @@ import ( "go.uber.org/zap/zapcore" ) +const ( + outdatedActionDoneReportInterval = time.Second + outdatedActionDoneReportMaxEntries = 256 +) + // DispatcherService defines the interface for providing dispatcher information and basic event handling. type DispatcherService interface { GetId() common.DispatcherID @@ -217,6 +222,11 @@ type BasicDispatcher struct { BootstrapState bootstrapState + // outdatedActionDoneAt throttles DONE reports for the same outdated action to avoid + // flooding blockStatusesChan when maintainer resends stale actions repeatedly. + outdatedActionDoneMu sync.Mutex + outdatedActionDoneAt map[BlockEventIdentifier]time.Time + // tableModeCompatibilityChecked indicates whether we have already validated the newest // table schema is compatible with the current replication mode configuration. // Only when the initial case or a ddl event is received, we will reset tableModeCompatibilityChecked to check the compatibility. @@ -256,12 +266,39 @@ func NewBasicDispatcher( creationPDTs: currentPDTs, mode: mode, BootstrapState: BootstrapFinished, + outdatedActionDoneAt: make(map[BlockEventIdentifier]time.Time), } dispatcher.resolvedTs.Store(startTs) return dispatcher } +func (d *BasicDispatcher) shouldReportOutdatedActionDone(identifier BlockEventIdentifier, now time.Time) bool { + d.outdatedActionDoneMu.Lock() + defer d.outdatedActionDoneMu.Unlock() + + if last, ok := d.outdatedActionDoneAt[identifier]; ok && now.Sub(last) < outdatedActionDoneReportInterval { + return false + } + + if len(d.outdatedActionDoneAt) >= outdatedActionDoneReportMaxEntries { + expireBefore := now.Add(-2 * outdatedActionDoneReportInterval) + for id, ts := range d.outdatedActionDoneAt { + if ts.Before(expireBefore) { + delete(d.outdatedActionDoneAt, id) + } + } + if len(d.outdatedActionDoneAt) >= outdatedActionDoneReportMaxEntries { + for id := range d.outdatedActionDoneAt { + delete(d.outdatedActionDoneAt, id) + } + } + } + + d.outdatedActionDoneAt[identifier] = now + return true +} + // AddDMLEventsToSink filters events for special tables, registers batch wake // callbacks, and returns true when at least one event remains to be written to // the downstream sink. @@ -836,6 +873,13 @@ func (d *BasicDispatcher) HandleDispatcherStatus(dispatcherStatus *heartbeatpb.D } // Step3: whether the outdate message or not, we need to return message show we have finished the event. + identifier := BlockEventIdentifier{ + CommitTs: action.CommitTs, + IsSyncPoint: action.IsSyncPoint, + } + if !d.shouldReportOutdatedActionDone(identifier, time.Now()) { + return false + } d.sharedInfo.blockStatusesChan <- &heartbeatpb.TableSpanBlockStatus{ ID: d.id.ToPB(), State: &heartbeatpb.State{ diff --git a/downstreamadapter/dispatcher/event_dispatcher_test.go b/downstreamadapter/dispatcher/event_dispatcher_test.go index 29b8ace3e2..68bf18e7f4 100644 --- a/downstreamadapter/dispatcher/event_dispatcher_test.go +++ b/downstreamadapter/dispatcher/event_dispatcher_test.go @@ -429,6 +429,58 @@ func TestDispatcherHandleEvents(t *testing.T) { t.Run("cloud storage wake callback after batch enqueue", verifyDMLWakeCallbackStorageAfterBatchEnqueue) } +func TestOutdatedActionDoneIsThrottled(t *testing.T) { + tableSpan, err := getCompleteTableSpan(getTestingKeyspaceID()) + require.NoError(t, err) + testSink := newDispatcherTestSink(t, common.MysqlSinkType) + dispatcher := newDispatcherForTest(testSink.Sink(), tableSpan) + nodeID := node.NewID() + + block := dispatcher.HandleEvents([]DispatcherEvent{ + NewDispatcherEvent(&nodeID, commonEvent.ResolvedEvent{ResolvedTs: 10}), + }, func() {}) + require.False(t, block) + require.Equal(t, uint64(10), dispatcher.GetResolvedTs()) + + staleStatus := &heartbeatpb.DispatcherStatus{ + Action: &heartbeatpb.DispatcherAction{ + Action: heartbeatpb.Action_Pass, + CommitTs: 5, + IsSyncPoint: false, + }, + } + + await := dispatcher.HandleDispatcherStatus(staleStatus) + require.False(t, await) + select { + case msg := <-dispatcher.GetBlockStatusesChan(): + require.Equal(t, heartbeatpb.BlockStage_DONE, msg.State.Stage) + require.Equal(t, uint64(5), msg.State.BlockTs) + case <-time.After(time.Second): + require.FailNow(t, "expected stale action DONE") + } + + await = dispatcher.HandleDispatcherStatus(staleStatus) + require.False(t, await) + select { + case msg := <-dispatcher.GetBlockStatusesChan(): + require.FailNow(t, "unexpected duplicate stale action DONE", "msg=%v", msg) + case <-time.After(100 * time.Millisecond): + } + + time.Sleep(outdatedActionDoneReportInterval + 50*time.Millisecond) + + await = dispatcher.HandleDispatcherStatus(staleStatus) + require.False(t, await) + select { + case msg := <-dispatcher.GetBlockStatusesChan(): + require.Equal(t, heartbeatpb.BlockStage_DONE, msg.State.Stage) + require.Equal(t, uint64(5), msg.State.BlockTs) + case <-time.After(time.Second): + require.FailNow(t, "expected stale action DONE after throttle interval") + } +} + func TestBlockingDDLFlushBeforeWaitingAndWriteDoesNotFlushAgain(t *testing.T) { keyspaceID := getTestingKeyspaceID() tableSpan := getUncompleteTableSpan()