diff --git a/pkg/config/debug.go b/pkg/config/debug.go index 148e927430..dfeae26952 100644 --- a/pkg/config/debug.go +++ b/pkg/config/debug.go @@ -118,6 +118,11 @@ type EventServiceConfig struct { ScanTaskQueueSize int `toml:"scan-task-queue-size" json:"scan_task_queue_size"` ScanLimitInBytes int `toml:"scan-limit-in-bytes" json:"scan_limit_in_bytes"` + // SyncPointLagSuppressThreshold controls when to suppress syncpoint emission for lagging dispatchers. + SyncPointLagSuppressThreshold time.Duration `toml:"sync-point-lag-suppress-threshold" json:"sync_point_lag_suppress_threshold"` + // SyncPointLagResumeThreshold controls when to resume syncpoint emission after suppression. + SyncPointLagResumeThreshold time.Duration `toml:"sync-point-lag-resume-threshold" json:"sync_point_lag_resume_threshold"` + // DMLEventMaxRows is the maximum number of rows in a DML event when split txn is enabled. DMLEventMaxRows int32 `toml:"dml-event-max-rows" json:"dml_event_max_rows"` // DMLEventMaxBytes is the maximum size of a DML event in bytes when split txn is enabled. @@ -133,10 +138,12 @@ type EventServiceConfig struct { // NewDefaultEventServiceConfig return the default event service configuration func NewDefaultEventServiceConfig() *EventServiceConfig { return &EventServiceConfig{ - ScanTaskQueueSize: 1024 * 8, - ScanLimitInBytes: 1024 * 1024 * 256, // 256MB - DMLEventMaxRows: 256, - DMLEventMaxBytes: 1024 * 1024 * 1, // 1MB - EnableRemoteEventService: true, + ScanTaskQueueSize: 1024 * 8, + ScanLimitInBytes: 1024 * 1024 * 256, // 256MB + SyncPointLagSuppressThreshold: 20 * time.Minute, + SyncPointLagResumeThreshold: 15 * time.Minute, + DMLEventMaxRows: 256, + DMLEventMaxBytes: 1024 * 1024 * 1, // 1MB + EnableRemoteEventService: true, } } diff --git a/pkg/eventservice/dispatcher_stat.go b/pkg/eventservice/dispatcher_stat.go index 552a684ea2..41bf08bc10 100644 --- a/pkg/eventservice/dispatcher_stat.go +++ b/pkg/eventservice/dispatcher_stat.go @@ -70,7 +70,9 @@ type dispatcherStat struct { enableSyncPoint bool nextSyncPoint atomic.Uint64 syncPointInterval time.Duration - txnAtomicity config.AtomicityLevel + // syncPointSendSuppressed tracks whether syncpoint emission is temporarily suppressed due to lag. + syncPointSendSuppressed atomic.Bool + txnAtomicity config.AtomicityLevel // ============================================================================= // ================== below are fields need copied when reset ================== @@ -429,6 +431,7 @@ type changefeedStatus struct { availableMemoryQuota sync.Map // nodeID -> atomic.Uint64 (memory quota in bytes) minSentTs atomic.Uint64 + minCheckpointTs atomic.Uint64 scanInterval atomic.Int64 lastAdjustTime atomic.Time @@ -437,6 +440,8 @@ type changefeedStatus struct { syncPointInterval time.Duration } +const invalidMinCheckpointTs = ^uint64(0) + func newChangefeedStatus(changefeedID common.ChangeFeedID, syncPointInterval time.Duration) *changefeedStatus { status := &changefeedStatus{ changefeedID: changefeedID, @@ -444,6 +449,7 @@ func newChangefeedStatus(changefeedID common.ChangeFeedID, syncPointInterval tim syncPointInterval: syncPointInterval, } status.scanInterval.Store(int64(defaultScanInterval)) + status.minCheckpointTs.Store(invalidMinCheckpointTs) status.lastAdjustTime.Store(time.Now()) status.lastTrendAdjustTime.Store(time.Now()) @@ -470,3 +476,11 @@ func (c *changefeedStatus) isEmpty() bool { func (c *changefeedStatus) isSyncpointEnabled() bool { return c.syncPointInterval > 0 } + +func (c *changefeedStatus) getMinCheckpointTs() (uint64, bool) { + minCheckpointTs := c.minCheckpointTs.Load() + if minCheckpointTs == invalidMinCheckpointTs { + return 0, false + } + return minCheckpointTs, true +} diff --git a/pkg/eventservice/event_broker.go b/pkg/eventservice/event_broker.go index 21bdb93641..52b9daee32 100644 --- a/pkg/eventservice/event_broker.go +++ b/pkg/eventservice/event_broker.go @@ -53,6 +53,8 @@ const ( // defaultSendResolvedTsInterval use to control whether to send a resolvedTs event to the dispatcher when its scan is skipped. defaultSendResolvedTsInterval = time.Second * 2 defaultRefreshMinSentResolvedTsInterval = time.Second * 1 + defaultSyncPointLagSuppressThreshold = 20 * time.Minute + defaultSyncPointLagResumeThreshold = 15 * time.Minute ) // eventBroker get event from the eventStore, and send the event to the dispatchers. @@ -94,8 +96,10 @@ type eventBroker struct { // metricsCollector handles all metrics collection and reporting metricsCollector *metricsCollector - scanRateLimiter *rate.Limiter - scanLimitInBytes uint64 + scanRateLimiter *rate.Limiter + scanLimitInBytes uint64 + syncPointLagSuppressThreshold time.Duration + syncPointLagResumeThreshold time.Duration } func newEventBroker( @@ -118,7 +122,19 @@ func newEventBroker( scanTaskQueueSize := config.GetGlobalServerConfig().Debug.EventService.ScanTaskQueueSize / scanWorkerCount sendMessageQueueSize := basicChannelSize * 4 - scanLimitInBytes := config.GetGlobalServerConfig().Debug.EventService.ScanLimitInBytes + eventServiceConfig := config.GetGlobalServerConfig().Debug.EventService + scanLimitInBytes := eventServiceConfig.ScanLimitInBytes + syncPointLagSuppressThreshold := eventServiceConfig.SyncPointLagSuppressThreshold + if syncPointLagSuppressThreshold <= 0 { + syncPointLagSuppressThreshold = defaultSyncPointLagSuppressThreshold + } + syncPointLagResumeThreshold := eventServiceConfig.SyncPointLagResumeThreshold + if syncPointLagResumeThreshold <= 0 { + syncPointLagResumeThreshold = defaultSyncPointLagResumeThreshold + } + if syncPointLagResumeThreshold > syncPointLagSuppressThreshold { + syncPointLagResumeThreshold = syncPointLagSuppressThreshold + } g, ctx := errgroup.WithContext(ctx) ctx, cancel := context.WithCancel(ctx) @@ -127,23 +143,25 @@ func newEventBroker( // For now, since there is only one upstream, using the default pdClock is sufficient. pdClock := appcontext.GetService[pdutil.Clock](appcontext.DefaultPDClock) c := &eventBroker{ - tidbClusterID: id, - eventStore: eventStore, - pdClock: pdClock, - mounter: event.NewMounter(tz, integrity), - timezone: tz.String(), - schemaStore: schemaStore, - changefeedMap: sync.Map{}, - dispatchers: sync.Map{}, - tableTriggerDispatchers: sync.Map{}, - msgSender: mc, - taskChan: make([]chan scanTask, scanWorkerCount), - messageCh: make([]chan *wrapEvent, sendMessageWorkerCount), - redoMessageCh: make([]chan *wrapEvent, sendMessageWorkerCount), - cancel: cancel, - g: g, - scanRateLimiter: rate.NewLimiter(rate.Limit(scanLimitInBytes), scanLimitInBytes), - scanLimitInBytes: uint64(scanLimitInBytes), + tidbClusterID: id, + eventStore: eventStore, + pdClock: pdClock, + mounter: event.NewMounter(tz, integrity), + timezone: tz.String(), + schemaStore: schemaStore, + changefeedMap: sync.Map{}, + dispatchers: sync.Map{}, + tableTriggerDispatchers: sync.Map{}, + msgSender: mc, + taskChan: make([]chan scanTask, scanWorkerCount), + messageCh: make([]chan *wrapEvent, sendMessageWorkerCount), + redoMessageCh: make([]chan *wrapEvent, sendMessageWorkerCount), + cancel: cancel, + g: g, + scanRateLimiter: rate.NewLimiter(rate.Limit(scanLimitInBytes), scanLimitInBytes), + scanLimitInBytes: uint64(scanLimitInBytes), + syncPointLagSuppressThreshold: syncPointLagSuppressThreshold, + syncPointLagResumeThreshold: syncPointLagResumeThreshold, } // Initialize metrics collector @@ -188,7 +206,11 @@ func newEventBroker( return c.refreshMinSentResolvedTs(ctx) }) - log.Info("new event broker created", zap.Uint64("id", id), zap.Uint64("scanLimitInBytes", c.scanLimitInBytes)) + log.Info("new event broker created", + zap.Uint64("id", id), + zap.Uint64("scanLimitInBytes", c.scanLimitInBytes), + zap.Duration("syncPointLagSuppressThreshold", c.syncPointLagSuppressThreshold), + zap.Duration("syncPointLagResumeThreshold", c.syncPointLagResumeThreshold)) return c } @@ -586,6 +608,68 @@ func (c *eventBroker) hasSyncPointEventsBeforeTs(ts uint64, d *dispatcherStat) b return d.enableSyncPoint && ts > d.nextSyncPoint.Load() } +func syncPointLagDuration(sentResolvedTs, checkpointTs uint64) time.Duration { + if sentResolvedTs <= checkpointTs { + return 0 + } + return oracle.GetTimeFromTS(sentResolvedTs).Sub(oracle.GetTimeFromTS(checkpointTs)) +} + +func (c *eventBroker) shouldSuppressSyncPointEmission(d *dispatcherStat) bool { + if d == nil || c.syncPointLagSuppressThreshold <= 0 { + return false + } + + receivedResolvedTs := d.receivedResolvedTs.Load() + checkpointTs, ok := d.changefeedStat.getMinCheckpointTs() + if !ok { + metrics.EventServiceSyncPointLagGaugeVec.WithLabelValues(d.changefeedStat.changefeedID.String()).Set(0) + if d.syncPointSendSuppressed.Load() { + if d.syncPointSendSuppressed.CompareAndSwap(true, false) { + log.Info("syncpoint emission resumed", + zap.Stringer("changefeedID", d.changefeedStat.changefeedID), + zap.Stringer("dispatcherID", d.id), + zap.Uint64("receivedResolvedTs", receivedResolvedTs), + zap.Duration("resumeThreshold", c.syncPointLagResumeThreshold)) + } + } + return false + } + lag := syncPointLagDuration(receivedResolvedTs, checkpointTs) + metrics.EventServiceSyncPointLagGaugeVec.WithLabelValues(d.changefeedStat.changefeedID.String()).Set(lag.Seconds()) + + if d.syncPointSendSuppressed.Load() { + if lag <= c.syncPointLagResumeThreshold { + if d.syncPointSendSuppressed.CompareAndSwap(true, false) { + log.Info("syncpoint emission resumed", + zap.Stringer("changefeedID", d.changefeedStat.changefeedID), + zap.Stringer("dispatcherID", d.id), + zap.Uint64("receivedResolvedTs", receivedResolvedTs), + zap.Uint64("minCheckpointTs", checkpointTs), + zap.Duration("lag", lag), + zap.Duration("resumeThreshold", c.syncPointLagResumeThreshold)) + } + return false + } + return true + } + + if lag > c.syncPointLagSuppressThreshold { + if d.syncPointSendSuppressed.CompareAndSwap(false, true) { + log.Info("syncpoint emission suppressed due to lag", + zap.Stringer("changefeedID", d.changefeedStat.changefeedID), + zap.Stringer("dispatcherID", d.id), + zap.Uint64("sentResolvedTs", receivedResolvedTs), + zap.Uint64("minCheckpointTs", checkpointTs), + zap.Duration("lag", lag), + zap.Duration("suppressThreshold", c.syncPointLagSuppressThreshold), + zap.Duration("resumeThreshold", c.syncPointLagResumeThreshold)) + } + return true + } + return false +} + // emitSyncPointEventIfNeeded emits a sync point event if the current ts is greater than the next sync point, and updates the next sync point. // We need call this function every time we send a event(whether dml/ddl/resolvedTs), // thus to ensure the sync point event is in correct order for each dispatcher. @@ -594,14 +678,18 @@ func (c *eventBroker) emitSyncPointEventIfNeeded(ts uint64, d *dispatcherStat, r commitTs := d.nextSyncPoint.Load() d.nextSyncPoint.Store(oracle.GoTimeToTS(oracle.GetTimeFromTS(commitTs).Add(d.syncPointInterval))) + if c.shouldSuppressSyncPointEmission(d) { + metrics.EventServiceSyncPointSuppressedCount.WithLabelValues(d.changefeedStat.changefeedID.String()).Inc() + continue + } + e := event.NewSyncPointEvent(d.id, commitTs, d.seq.Add(1), d.epoch) + syncPointEvent := newWrapSyncPointEvent(remoteID, e) + c.getMessageCh(d.messageWorkerIndex, common.IsRedoMode(d.info.GetMode())) <- syncPointEvent log.Debug("send syncpoint event to dispatcher", zap.Stringer("changefeedID", d.changefeedStat.changefeedID), zap.Stringer("dispatcherID", d.id), zap.Int64("tableID", d.info.GetTableSpan().GetTableID()), zap.Uint64("commitTs", e.GetCommitTs()), zap.Uint64("seq", e.GetSeq())) - - syncPointEvent := newWrapSyncPointEvent(remoteID, e) - c.getMessageCh(d.messageWorkerIndex, common.IsRedoMode(d.info.GetMode())) <- syncPointEvent } } @@ -1145,6 +1233,8 @@ func (c *eventBroker) removeChangefeedStatus(status *changefeedStatus) { metrics.EventServiceAvailableMemoryQuotaGaugeVec.DeleteLabelValues(changefeedID.String()) metrics.EventServiceScanWindowBaseTsGaugeVec.DeleteLabelValues(changefeedID.String()) metrics.EventServiceScanWindowIntervalGaugeVec.DeleteLabelValues(changefeedID.String()) + metrics.EventServiceSyncPointLagGaugeVec.DeleteLabelValues(changefeedID.String()) + metrics.EventServiceSyncPointSuppressedCount.DeleteLabelValues(changefeedID.String()) } func (c *eventBroker) resetDispatcher(dispatcherInfo DispatcherInfo) error { @@ -1311,6 +1401,9 @@ func (c *eventBroker) handleDispatcherHeartbeat(heartbeat *DispatcherHeartBeatWi handleProgress(dp.DispatcherID, dp.CheckpointTs, 0, false) } } + for changefeed := range changedChangefeeds { + changefeed.refreshMinSentResolvedTs() + } c.sendDispatcherResponse(responseMap) } diff --git a/pkg/eventservice/event_broker_test.go b/pkg/eventservice/event_broker_test.go index 4a274e0c12..4d9b80a46d 100644 --- a/pkg/eventservice/event_broker_test.go +++ b/pkg/eventservice/event_broker_test.go @@ -365,6 +365,227 @@ func TestGetScanTaskDataRangeRingWaitWithThreeDispatchersCanAdvancePendingDDL(t require.Equal(t, ts103, dataRange.CommitTsEnd) } +func TestEmitSyncPointEventIfNeededSuppressesWhenLagging(t *testing.T) { + broker, _, _, _ := newEventBrokerForTest() + broker.close() + broker.syncPointLagSuppressThreshold = 20 * time.Second + broker.syncPointLagResumeThreshold = 10 * time.Second + + base := time.Unix(0, 0) + ts100 := oracle.GoTimeToTS(base.Add(100 * time.Second)) + ts110 := oracle.GoTimeToTS(base.Add(110 * time.Second)) + ts130 := oracle.GoTimeToTS(base.Add(130 * time.Second)) + ts180 := oracle.GoTimeToTS(base.Add(180 * time.Second)) + + info := newMockDispatcherInfoForTest(t) + info.epoch = 1 + info.enableSyncPoint = true + info.syncPointInterval = 10 * time.Second + info.nextSyncPoint = ts110 + + changefeedStatus := broker.getOrSetChangefeedStatus(info) + disp := newDispatcherStat(info, 1, 1, nil, changefeedStatus) + disp.seq.Store(1) + disp.checkpointTs.Store(ts100) + disp.receivedResolvedTs.Store(ts180) + disp.lastReceivedHeartbeatTime.Store(time.Now().Unix()) + + dispPtr := &atomic.Pointer[dispatcherStat]{} + dispPtr.Store(disp) + changefeedStatus.addDispatcher(disp.id, dispPtr) + changefeedStatus.refreshMinSentResolvedTs() + + broker.emitSyncPointEventIfNeeded(ts130, disp, node.ID(info.GetServerID())) + + require.True(t, disp.syncPointSendSuppressed.Load()) + require.Equal(t, ts130, disp.nextSyncPoint.Load()) + require.Equal(t, 0, len(broker.messageCh[disp.messageWorkerIndex])) +} + +func TestEmitSyncPointEventIfNeededSuppressesByMinChangefeedCheckpoint(t *testing.T) { + broker, _, _, _ := newEventBrokerForTest() + broker.close() + broker.syncPointLagSuppressThreshold = 20 * time.Second + broker.syncPointLagResumeThreshold = 10 * time.Second + + base := time.Unix(0, 0) + ts100 := oracle.GoTimeToTS(base.Add(100 * time.Second)) + ts110 := oracle.GoTimeToTS(base.Add(110 * time.Second)) + ts115 := oracle.GoTimeToTS(base.Add(115 * time.Second)) + ts120 := oracle.GoTimeToTS(base.Add(120 * time.Second)) + ts125 := oracle.GoTimeToTS(base.Add(125 * time.Second)) + ts130 := oracle.GoTimeToTS(base.Add(130 * time.Second)) + + info := newMockDispatcherInfoForTest(t) + info.epoch = 1 + info.enableSyncPoint = true + info.syncPointInterval = 10 * time.Second + info.nextSyncPoint = ts110 + + changefeedStatus := broker.getOrSetChangefeedStatus(info) + disp := newDispatcherStat(info, 1, 1, nil, changefeedStatus) + disp.seq.Store(1) + disp.receivedResolvedTs.Store(ts130) + disp.checkpointTs.Store(ts125) + disp.lastReceivedHeartbeatTime.Store(time.Now().Unix()) + + dispPtr := &atomic.Pointer[dispatcherStat]{} + dispPtr.Store(disp) + changefeedStatus.addDispatcher(disp.id, dispPtr) + + lagging := newDispatcherStat(newMockDispatcherInfoForTest(t), 1, 1, nil, changefeedStatus) + lagging.seq.Store(1) + lagging.checkpointTs.Store(ts100) + lagging.lastReceivedHeartbeatTime.Store(time.Now().Unix()) + + laggingPtr := &atomic.Pointer[dispatcherStat]{} + laggingPtr.Store(lagging) + changefeedStatus.addDispatcher(lagging.id, laggingPtr) + changefeedStatus.refreshMinSentResolvedTs() + + broker.emitSyncPointEventIfNeeded(ts115, disp, node.ID(info.GetServerID())) + + require.True(t, disp.syncPointSendSuppressed.Load()) + require.Equal(t, ts120, disp.nextSyncPoint.Load()) + require.Equal(t, 0, len(broker.messageCh[disp.messageWorkerIndex])) +} + +func TestEmitSyncPointEventIfNeededResumesAfterLagRecovers(t *testing.T) { + broker, _, _, _ := newEventBrokerForTest() + broker.close() + broker.syncPointLagSuppressThreshold = 20 * time.Second + broker.syncPointLagResumeThreshold = 10 * time.Second + + base := time.Unix(0, 0) + ts110 := oracle.GoTimeToTS(base.Add(110 * time.Second)) + ts115 := oracle.GoTimeToTS(base.Add(115 * time.Second)) + ts120 := oracle.GoTimeToTS(base.Add(120 * time.Second)) + ts125 := oracle.GoTimeToTS(base.Add(125 * time.Second)) + ts130 := oracle.GoTimeToTS(base.Add(130 * time.Second)) + + info := newMockDispatcherInfoForTest(t) + info.epoch = 1 + info.enableSyncPoint = true + info.syncPointInterval = 10 * time.Second + info.nextSyncPoint = ts110 + + changefeedStatus := broker.getOrSetChangefeedStatus(info) + disp := newDispatcherStat(info, 1, 1, nil, changefeedStatus) + disp.seq.Store(1) + disp.syncPointSendSuppressed.Store(true) + disp.receivedResolvedTs.Store(ts130) + disp.checkpointTs.Store(ts125) + disp.lastReceivedHeartbeatTime.Store(time.Now().Unix()) + + dispPtr := &atomic.Pointer[dispatcherStat]{} + dispPtr.Store(disp) + changefeedStatus.addDispatcher(disp.id, dispPtr) + changefeedStatus.refreshMinSentResolvedTs() + + broker.emitSyncPointEventIfNeeded(ts115, disp, node.ID(info.GetServerID())) + + require.False(t, disp.syncPointSendSuppressed.Load()) + require.Equal(t, ts120, disp.nextSyncPoint.Load()) + require.Equal(t, 1, len(broker.messageCh[disp.messageWorkerIndex])) + + msg := <-broker.messageCh[disp.messageWorkerIndex] + require.Equal(t, event.TypeSyncPointEvent, msg.msgType) + syncPointEvent, ok := msg.e.(*event.SyncPointEvent) + require.True(t, ok) + require.Equal(t, ts110, syncPointEvent.GetCommitTs()) +} + +func TestEmitSyncPointEventIfNeededResumesWhenStaleCheckpointIsIgnored(t *testing.T) { + broker, _, _, _ := newEventBrokerForTest() + broker.close() + broker.syncPointLagSuppressThreshold = 20 * time.Second + broker.syncPointLagResumeThreshold = 10 * time.Second + + base := time.Unix(0, 0) + ts100 := oracle.GoTimeToTS(base.Add(100 * time.Second)) + ts110 := oracle.GoTimeToTS(base.Add(110 * time.Second)) + ts115 := oracle.GoTimeToTS(base.Add(115 * time.Second)) + ts120 := oracle.GoTimeToTS(base.Add(120 * time.Second)) + ts125 := oracle.GoTimeToTS(base.Add(125 * time.Second)) + ts130 := oracle.GoTimeToTS(base.Add(130 * time.Second)) + + info := newMockDispatcherInfoForTest(t) + info.epoch = 1 + info.enableSyncPoint = true + info.syncPointInterval = 10 * time.Second + info.nextSyncPoint = ts110 + + changefeedStatus := broker.getOrSetChangefeedStatus(info) + disp := newDispatcherStat(info, 1, 1, nil, changefeedStatus) + disp.seq.Store(1) + disp.syncPointSendSuppressed.Store(true) + disp.receivedResolvedTs.Store(ts130) + disp.checkpointTs.Store(ts125) + disp.lastReceivedHeartbeatTime.Store(time.Now().Unix()) + + dispPtr := &atomic.Pointer[dispatcherStat]{} + dispPtr.Store(disp) + changefeedStatus.addDispatcher(disp.id, dispPtr) + + stale := newDispatcherStat(newMockDispatcherInfoForTest(t), 1, 1, nil, changefeedStatus) + stale.seq.Store(1) + stale.checkpointTs.Store(ts100) + stale.lastReceivedHeartbeatTime.Store(time.Now().Add(-scanWindowStaleDispatcherHeartbeatThreshold - time.Second).Unix()) + + stalePtr := &atomic.Pointer[dispatcherStat]{} + stalePtr.Store(stale) + changefeedStatus.addDispatcher(stale.id, stalePtr) + changefeedStatus.refreshMinSentResolvedTs() + + broker.emitSyncPointEventIfNeeded(ts115, disp, node.ID(info.GetServerID())) + + require.False(t, disp.syncPointSendSuppressed.Load()) + require.Equal(t, ts120, disp.nextSyncPoint.Load()) + require.Equal(t, 1, len(broker.messageCh[disp.messageWorkerIndex])) + + msg := <-broker.messageCh[disp.messageWorkerIndex] + require.Equal(t, event.TypeSyncPointEvent, msg.msgType) + syncPointEvent, ok := msg.e.(*event.SyncPointEvent) + require.True(t, ok) + require.Equal(t, ts110, syncPointEvent.GetCommitTs()) +} + +func TestEmitSyncPointEventIfNeededNoSuppressWhenCheckpointAhead(t *testing.T) { + broker, _, _, _ := newEventBrokerForTest() + broker.close() + broker.syncPointLagSuppressThreshold = time.Second + broker.syncPointLagResumeThreshold = time.Second + + base := time.Unix(0, 0) + ts100 := oracle.GoTimeToTS(base.Add(100 * time.Second)) + ts110 := oracle.GoTimeToTS(base.Add(110 * time.Second)) + ts115 := oracle.GoTimeToTS(base.Add(115 * time.Second)) + ts120 := oracle.GoTimeToTS(base.Add(120 * time.Second)) + + info := newMockDispatcherInfoForTest(t) + info.epoch = 1 + info.enableSyncPoint = true + info.syncPointInterval = 10 * time.Second + info.nextSyncPoint = ts110 + + changefeedStatus := broker.getOrSetChangefeedStatus(info) + disp := newDispatcherStat(info, 1, 1, nil, changefeedStatus) + disp.seq.Store(1) + disp.receivedResolvedTs.Store(ts100) + disp.checkpointTs.Store(ts120) + disp.lastReceivedHeartbeatTime.Store(time.Now().Unix()) + + dispPtr := &atomic.Pointer[dispatcherStat]{} + dispPtr.Store(disp) + changefeedStatus.addDispatcher(disp.id, dispPtr) + changefeedStatus.refreshMinSentResolvedTs() + + broker.emitSyncPointEventIfNeeded(ts115, disp, node.ID(info.GetServerID())) + + require.False(t, disp.syncPointSendSuppressed.Load()) + require.Equal(t, ts120, disp.nextSyncPoint.Load()) + require.Equal(t, 1, len(broker.messageCh[disp.messageWorkerIndex])) +} func TestHandleCongestionControlV2AdjustsScanInterval(t *testing.T) { broker, _, _, _ := newEventBrokerForTest() defer broker.close() diff --git a/pkg/eventservice/scan_window.go b/pkg/eventservice/scan_window.go index 46cb9cffb3..a5d1a8a3e4 100644 --- a/pkg/eventservice/scan_window.go +++ b/pkg/eventservice/scan_window.go @@ -82,6 +82,11 @@ const ( scanWindowStaleDispatcherHeartbeatThreshold = 1 * time.Minute ) +func isDispatcherStale(lastHeartbeatTime int64, now time.Time) bool { + return lastHeartbeatTime > 0 && + now.Sub(time.Unix(lastHeartbeatTime, 0)) > scanWindowStaleDispatcherHeartbeatThreshold +} + type memoryUsageSample struct { ts time.Time ratio float64 @@ -332,8 +337,10 @@ func (c *changefeedStatus) refreshMinSentResolvedTs() { now := time.Now() minSentResolvedTs := ^uint64(0) minSentResolvedTsWithStale := ^uint64(0) + minCheckpointTs := uint64(0) hasEligible := false hasNonStale := false + hasMinCheckpointTs := false c.dispatchers.Range(func(_ any, value any) bool { dispatcher := value.(*atomic.Pointer[dispatcherStat]).Load() if dispatcher == nil || dispatcher.isRemoved.Load() || dispatcher.seq.Load() == 0 { @@ -346,9 +353,7 @@ func (c *changefeedStatus) refreshMinSentResolvedTs() { minSentResolvedTsWithStale = sentResolvedTs } - lastHeartbeatTime := dispatcher.lastReceivedHeartbeatTime.Load() - if lastHeartbeatTime > 0 && - now.Sub(time.Unix(lastHeartbeatTime, 0)) > scanWindowStaleDispatcherHeartbeatThreshold { + if isDispatcherStale(dispatcher.lastReceivedHeartbeatTime.Load(), now) { log.Info("dispatcher is stale, skip it's sent resolved ts", zap.Stringer("changefeedID", c.changefeedID), zap.Stringer("dispatcherID", dispatcher.id)) return true } @@ -357,18 +362,30 @@ func (c *changefeedStatus) refreshMinSentResolvedTs() { if sentResolvedTs < minSentResolvedTs { minSentResolvedTs = sentResolvedTs } + checkpointTs := dispatcher.checkpointTs.Load() + if !hasMinCheckpointTs || checkpointTs < minCheckpointTs { + minCheckpointTs = checkpointTs + hasMinCheckpointTs = true + } return true }) if !hasEligible { c.storeMinSentTs(0) + c.storeMinCheckpointTs(invalidMinCheckpointTs) return } if !hasNonStale { c.storeMinSentTs(minSentResolvedTsWithStale) + c.storeMinCheckpointTs(invalidMinCheckpointTs) return } c.storeMinSentTs(minSentResolvedTs) + if hasMinCheckpointTs { + c.storeMinCheckpointTs(minCheckpointTs) + } else { + c.storeMinCheckpointTs(invalidMinCheckpointTs) + } } func (c *changefeedStatus) getScanMaxTs() uint64 { @@ -393,6 +410,14 @@ func (c *changefeedStatus) storeMinSentTs(value uint64) { metrics.EventServiceScanWindowBaseTsGaugeVec.WithLabelValues(c.changefeedID.String()).Set(float64(value)) } +func (c *changefeedStatus) storeMinCheckpointTs(value uint64) { + prev := c.minCheckpointTs.Load() + if prev == value { + return + } + c.minCheckpointTs.Store(value) +} + func scaleDuration(d time.Duration, numerator int64, denominator int64) time.Duration { if numerator <= 0 || denominator <= 0 { return d diff --git a/pkg/eventservice/scan_window_test.go b/pkg/eventservice/scan_window_test.go index 01ee458e70..e6400b69a0 100644 --- a/pkg/eventservice/scan_window_test.go +++ b/pkg/eventservice/scan_window_test.go @@ -153,6 +153,7 @@ func TestRefreshMinSentResolvedTsMinAndSkipRules(t *testing.T) { stale := &dispatcherStat{} stale.seq.Store(1) stale.sentResolvedTs.Store(10) + stale.checkpointTs.Store(10) stale.lastReceivedHeartbeatTime.Store(time.Now().Add(-scanWindowStaleDispatcherHeartbeatThreshold - time.Second).Unix()) removed := &dispatcherStat{} @@ -167,10 +168,12 @@ func TestRefreshMinSentResolvedTsMinAndSkipRules(t *testing.T) { first := &dispatcherStat{} first.seq.Store(1) first.sentResolvedTs.Store(200) + first.checkpointTs.Store(200) second := &dispatcherStat{} second.seq.Store(1) second.sentResolvedTs.Store(50) + second.checkpointTs.Store(50) stalePtr := &atomic.Pointer[dispatcherStat]{} stalePtr.Store(stale) @@ -194,15 +197,23 @@ func TestRefreshMinSentResolvedTsMinAndSkipRules(t *testing.T) { status.refreshMinSentResolvedTs() require.Equal(t, uint64(50), status.minSentTs.Load()) + minCheckpointTs, ok := status.getMinCheckpointTs() + require.True(t, ok) + require.Equal(t, uint64(50), minCheckpointTs) second.isRemoved.Store(true) status.refreshMinSentResolvedTs() require.Equal(t, uint64(200), status.minSentTs.Load()) + minCheckpointTs, ok = status.getMinCheckpointTs() + require.True(t, ok) + require.Equal(t, uint64(200), minCheckpointTs) stale.isRemoved.Store(true) first.seq.Store(0) status.refreshMinSentResolvedTs() require.Equal(t, uint64(0), status.minSentTs.Load()) + _, ok = status.getMinCheckpointTs() + require.False(t, ok) } func TestRefreshMinSentResolvedTsStaleFallback(t *testing.T) { @@ -213,6 +224,7 @@ func TestRefreshMinSentResolvedTsStaleFallback(t *testing.T) { stale := &dispatcherStat{} stale.seq.Store(1) stale.sentResolvedTs.Store(123) + stale.checkpointTs.Store(123) stale.lastReceivedHeartbeatTime.Store(time.Now().Add(-scanWindowStaleDispatcherHeartbeatThreshold - time.Second).Unix()) stalePtr := &atomic.Pointer[dispatcherStat]{} @@ -221,6 +233,8 @@ func TestRefreshMinSentResolvedTsStaleFallback(t *testing.T) { status.refreshMinSentResolvedTs() require.Equal(t, uint64(123), status.minSentTs.Load()) + _, ok := status.getMinCheckpointTs() + require.False(t, ok) } func TestGetScanMaxTsFallbackInterval(t *testing.T) { diff --git a/pkg/metrics/event_service.go b/pkg/metrics/event_service.go index e6ed10cc90..5cd5991c8d 100644 --- a/pkg/metrics/event_service.go +++ b/pkg/metrics/event_service.go @@ -69,6 +69,20 @@ var ( Name: "scan_window_interval", Help: "The scan window interval in seconds for each changefeed", }, []string{"changefeed"}) + EventServiceSyncPointLagGaugeVec = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "ticdc", + Subsystem: "event_service", + Name: "syncpoint_lag_seconds", + Help: "The lag between received resolved ts and checkpoint ts in seconds for each changefeed", + }, []string{"changefeed"}) + EventServiceSyncPointSuppressedCount = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "ticdc", + Subsystem: "event_service", + Name: "syncpoint_suppressed_count", + Help: "The number of syncpoint events suppressed due to lagging checkpoint", + }, []string{"changefeed"}) EventServiceScanDuration = prometheus.NewHistogram( prometheus.HistogramOpts{ Namespace: "ticdc", @@ -201,6 +215,8 @@ func initEventServiceMetrics(registry *prometheus.Registry) { registry.MustRegister(EventServiceResolvedTsLagGauge) registry.MustRegister(EventServiceScanWindowBaseTsGaugeVec) registry.MustRegister(EventServiceScanWindowIntervalGaugeVec) + registry.MustRegister(EventServiceSyncPointLagGaugeVec) + registry.MustRegister(EventServiceSyncPointSuppressedCount) registry.MustRegister(EventServiceScanDuration) registry.MustRegister(EventServiceScannedCount) registry.MustRegister(EventServiceDispatcherGauge)