-
Notifications
You must be signed in to change notification settings - Fork 50
eventBroker: skip syncpoint while lag incresing #4868
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
915058d
9962260
0acf811
c7eb02c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -118,6 +118,11 @@ type EventServiceConfig struct { | |
| ScanTaskQueueSize int `toml:"scan-task-queue-size" json:"scan_task_queue_size"` | ||
| ScanLimitInBytes int `toml:"scan-limit-in-bytes" json:"scan_limit_in_bytes"` | ||
|
|
||
| // SyncPointLagSuppressThreshold controls when to suppress syncpoint emission for lagging dispatchers. | ||
| SyncPointLagSuppressThreshold time.Duration `toml:"sync-point-lag-suppress-threshold" json:"sync_point_lag_suppress_threshold"` | ||
| // SyncPointLagResumeThreshold controls when to resume syncpoint emission after suppression. | ||
| SyncPointLagResumeThreshold time.Duration `toml:"sync-point-lag-resume-threshold" json:"sync_point_lag_resume_threshold"` | ||
|
Comment on lines
+121
to
+124
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chain🏁 Script executed: #!/bin/bash
# Description: Inspect duration-typed TOML config fields and the new sync-point threshold consumers.
rg -n -C2 'TomlDuration|time\.Duration `toml:' --type=go
rg -n -C3 'SyncPointLag(Suppress|Resume)Threshold' --type=goRepository: pingcap/ticdc Length of output: 17769 Use These fields are inconsistent with other duration fields in the debug config (e.g., Proposed fix- SyncPointLagSuppressThreshold time.Duration `toml:"sync-point-lag-suppress-threshold" json:"sync_point_lag_suppress_threshold"`
+ SyncPointLagSuppressThreshold TomlDuration `toml:"sync-point-lag-suppress-threshold" json:"sync_point_lag_suppress_threshold"`
// SyncPointLagResumeThreshold controls when to resume syncpoint emission after suppression.
- SyncPointLagResumeThreshold time.Duration `toml:"sync-point-lag-resume-threshold" json:"sync_point_lag_resume_threshold"`
+ SyncPointLagResumeThreshold TomlDuration `toml:"sync-point-lag-resume-threshold" json:"sync_point_lag_resume_threshold"`And update the default values: - SyncPointLagSuppressThreshold: 20 * time.Minute,
- SyncPointLagResumeThreshold: 15 * time.Minute,
+ SyncPointLagSuppressThreshold: TomlDuration(20 * time.Minute),
+ SyncPointLagResumeThreshold: TomlDuration(15 * time.Minute),At the consumer boundary in syncPointLagSuppressThreshold := time.Duration(eventServiceConfig.SyncPointLagSuppressThreshold)
syncPointLagResumeThreshold := time.Duration(eventServiceConfig.SyncPointLagResumeThreshold)🤖 Prompt for AI Agents |
||
|
|
||
| // DMLEventMaxRows is the maximum number of rows in a DML event when split txn is enabled. | ||
| DMLEventMaxRows int32 `toml:"dml-event-max-rows" json:"dml_event_max_rows"` | ||
| // DMLEventMaxBytes is the maximum size of a DML event in bytes when split txn is enabled. | ||
|
|
@@ -133,10 +138,12 @@ type EventServiceConfig struct { | |
| // NewDefaultEventServiceConfig return the default event service configuration | ||
| func NewDefaultEventServiceConfig() *EventServiceConfig { | ||
| return &EventServiceConfig{ | ||
| ScanTaskQueueSize: 1024 * 8, | ||
| ScanLimitInBytes: 1024 * 1024 * 256, // 256MB | ||
| DMLEventMaxRows: 256, | ||
| DMLEventMaxBytes: 1024 * 1024 * 1, // 1MB | ||
| EnableRemoteEventService: true, | ||
| ScanTaskQueueSize: 1024 * 8, | ||
| ScanLimitInBytes: 1024 * 1024 * 256, // 256MB | ||
| SyncPointLagSuppressThreshold: 20 * time.Minute, | ||
| SyncPointLagResumeThreshold: 15 * time.Minute, | ||
|
Comment on lines
+143
to
+144
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| DMLEventMaxRows: 256, | ||
| DMLEventMaxBytes: 1024 * 1024 * 1, // 1MB | ||
| EnableRemoteEventService: true, | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -53,6 +53,8 @@ const ( | |
| // defaultSendResolvedTsInterval use to control whether to send a resolvedTs event to the dispatcher when its scan is skipped. | ||
| defaultSendResolvedTsInterval = time.Second * 2 | ||
| defaultRefreshMinSentResolvedTsInterval = time.Second * 1 | ||
| defaultSyncPointLagSuppressThreshold = 20 * time.Minute | ||
| defaultSyncPointLagResumeThreshold = 15 * time.Minute | ||
| ) | ||
|
|
||
| // eventBroker get event from the eventStore, and send the event to the dispatchers. | ||
|
|
@@ -94,8 +96,10 @@ type eventBroker struct { | |
| // metricsCollector handles all metrics collection and reporting | ||
| metricsCollector *metricsCollector | ||
|
|
||
| scanRateLimiter *rate.Limiter | ||
| scanLimitInBytes uint64 | ||
| scanRateLimiter *rate.Limiter | ||
| scanLimitInBytes uint64 | ||
| syncPointLagSuppressThreshold time.Duration | ||
| syncPointLagResumeThreshold time.Duration | ||
| } | ||
|
|
||
| func newEventBroker( | ||
|
|
@@ -118,7 +122,19 @@ func newEventBroker( | |
| scanTaskQueueSize := config.GetGlobalServerConfig().Debug.EventService.ScanTaskQueueSize / scanWorkerCount | ||
| sendMessageQueueSize := basicChannelSize * 4 | ||
|
|
||
| scanLimitInBytes := config.GetGlobalServerConfig().Debug.EventService.ScanLimitInBytes | ||
| eventServiceConfig := config.GetGlobalServerConfig().Debug.EventService | ||
| scanLimitInBytes := eventServiceConfig.ScanLimitInBytes | ||
| syncPointLagSuppressThreshold := eventServiceConfig.SyncPointLagSuppressThreshold | ||
| if syncPointLagSuppressThreshold <= 0 { | ||
| syncPointLagSuppressThreshold = defaultSyncPointLagSuppressThreshold | ||
| } | ||
| syncPointLagResumeThreshold := eventServiceConfig.SyncPointLagResumeThreshold | ||
| if syncPointLagResumeThreshold <= 0 { | ||
| syncPointLagResumeThreshold = defaultSyncPointLagResumeThreshold | ||
|
Comment on lines
+127
to
+133
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since syncPointLagSuppressThreshold := time.Duration(eventServiceConfig.SyncPointLagSuppressThreshold)\n if syncPointLagSuppressThreshold <= 0 {\n syncPointLagSuppressThreshold = defaultSyncPointLagSuppressThreshold\n }\n syncPointLagResumeThreshold := time.Duration(eventServiceConfig.SyncPointLagResumeThreshold)\n if syncPointLagResumeThreshold <= 0 {\n syncPointLagResumeThreshold = defaultSyncPointLagResumeThreshold\n } |
||
| } | ||
| if syncPointLagResumeThreshold > syncPointLagSuppressThreshold { | ||
| syncPointLagResumeThreshold = syncPointLagSuppressThreshold | ||
| } | ||
|
|
||
| g, ctx := errgroup.WithContext(ctx) | ||
| ctx, cancel := context.WithCancel(ctx) | ||
|
|
@@ -127,23 +143,25 @@ func newEventBroker( | |
| // For now, since there is only one upstream, using the default pdClock is sufficient. | ||
| pdClock := appcontext.GetService[pdutil.Clock](appcontext.DefaultPDClock) | ||
| c := &eventBroker{ | ||
| tidbClusterID: id, | ||
| eventStore: eventStore, | ||
| pdClock: pdClock, | ||
| mounter: event.NewMounter(tz, integrity), | ||
| timezone: tz.String(), | ||
| schemaStore: schemaStore, | ||
| changefeedMap: sync.Map{}, | ||
| dispatchers: sync.Map{}, | ||
| tableTriggerDispatchers: sync.Map{}, | ||
| msgSender: mc, | ||
| taskChan: make([]chan scanTask, scanWorkerCount), | ||
| messageCh: make([]chan *wrapEvent, sendMessageWorkerCount), | ||
| redoMessageCh: make([]chan *wrapEvent, sendMessageWorkerCount), | ||
| cancel: cancel, | ||
| g: g, | ||
| scanRateLimiter: rate.NewLimiter(rate.Limit(scanLimitInBytes), scanLimitInBytes), | ||
| scanLimitInBytes: uint64(scanLimitInBytes), | ||
| tidbClusterID: id, | ||
| eventStore: eventStore, | ||
| pdClock: pdClock, | ||
| mounter: event.NewMounter(tz, integrity), | ||
| timezone: tz.String(), | ||
| schemaStore: schemaStore, | ||
| changefeedMap: sync.Map{}, | ||
| dispatchers: sync.Map{}, | ||
| tableTriggerDispatchers: sync.Map{}, | ||
| msgSender: mc, | ||
| taskChan: make([]chan scanTask, scanWorkerCount), | ||
| messageCh: make([]chan *wrapEvent, sendMessageWorkerCount), | ||
| redoMessageCh: make([]chan *wrapEvent, sendMessageWorkerCount), | ||
| cancel: cancel, | ||
| g: g, | ||
| scanRateLimiter: rate.NewLimiter(rate.Limit(scanLimitInBytes), scanLimitInBytes), | ||
| scanLimitInBytes: uint64(scanLimitInBytes), | ||
| syncPointLagSuppressThreshold: syncPointLagSuppressThreshold, | ||
| syncPointLagResumeThreshold: syncPointLagResumeThreshold, | ||
| } | ||
|
|
||
| // Initialize metrics collector | ||
|
|
@@ -188,7 +206,11 @@ func newEventBroker( | |
| return c.refreshMinSentResolvedTs(ctx) | ||
| }) | ||
|
|
||
| log.Info("new event broker created", zap.Uint64("id", id), zap.Uint64("scanLimitInBytes", c.scanLimitInBytes)) | ||
| log.Info("new event broker created", | ||
| zap.Uint64("id", id), | ||
| zap.Uint64("scanLimitInBytes", c.scanLimitInBytes), | ||
| zap.Duration("syncPointLagSuppressThreshold", c.syncPointLagSuppressThreshold), | ||
| zap.Duration("syncPointLagResumeThreshold", c.syncPointLagResumeThreshold)) | ||
| return c | ||
| } | ||
|
|
||
|
|
@@ -586,6 +608,68 @@ func (c *eventBroker) hasSyncPointEventsBeforeTs(ts uint64, d *dispatcherStat) b | |
| return d.enableSyncPoint && ts > d.nextSyncPoint.Load() | ||
| } | ||
|
|
||
| func syncPointLagDuration(sentResolvedTs, checkpointTs uint64) time.Duration { | ||
| if sentResolvedTs <= checkpointTs { | ||
| return 0 | ||
| } | ||
| return oracle.GetTimeFromTS(sentResolvedTs).Sub(oracle.GetTimeFromTS(checkpointTs)) | ||
| } | ||
|
asddongmen marked this conversation as resolved.
|
||
|
|
||
| func (c *eventBroker) shouldSuppressSyncPointEmission(d *dispatcherStat) bool { | ||
| if d == nil || c.syncPointLagSuppressThreshold <= 0 { | ||
| return false | ||
| } | ||
|
|
||
| receivedResolvedTs := d.receivedResolvedTs.Load() | ||
| checkpointTs, ok := d.changefeedStat.getMinCheckpointTs() | ||
| if !ok { | ||
| metrics.EventServiceSyncPointLagGaugeVec.WithLabelValues(d.changefeedStat.changefeedID.String()).Set(0) | ||
| if d.syncPointSendSuppressed.Load() { | ||
| if d.syncPointSendSuppressed.CompareAndSwap(true, false) { | ||
| log.Info("syncpoint emission resumed", | ||
| zap.Stringer("changefeedID", d.changefeedStat.changefeedID), | ||
| zap.Stringer("dispatcherID", d.id), | ||
| zap.Uint64("receivedResolvedTs", receivedResolvedTs), | ||
| zap.Duration("resumeThreshold", c.syncPointLagResumeThreshold)) | ||
| } | ||
| } | ||
| return false | ||
| } | ||
| lag := syncPointLagDuration(receivedResolvedTs, checkpointTs) | ||
| metrics.EventServiceSyncPointLagGaugeVec.WithLabelValues(d.changefeedStat.changefeedID.String()).Set(lag.Seconds()) | ||
|
|
||
| if d.syncPointSendSuppressed.Load() { | ||
| if lag <= c.syncPointLagResumeThreshold { | ||
| if d.syncPointSendSuppressed.CompareAndSwap(true, false) { | ||
| log.Info("syncpoint emission resumed", | ||
| zap.Stringer("changefeedID", d.changefeedStat.changefeedID), | ||
| zap.Stringer("dispatcherID", d.id), | ||
| zap.Uint64("receivedResolvedTs", receivedResolvedTs), | ||
| zap.Uint64("minCheckpointTs", checkpointTs), | ||
| zap.Duration("lag", lag), | ||
| zap.Duration("resumeThreshold", c.syncPointLagResumeThreshold)) | ||
| } | ||
| return false | ||
| } | ||
| return true | ||
| } | ||
|
|
||
| if lag > c.syncPointLagSuppressThreshold { | ||
| if d.syncPointSendSuppressed.CompareAndSwap(false, true) { | ||
| log.Info("syncpoint emission suppressed due to lag", | ||
| zap.Stringer("changefeedID", d.changefeedStat.changefeedID), | ||
| zap.Stringer("dispatcherID", d.id), | ||
| zap.Uint64("sentResolvedTs", receivedResolvedTs), | ||
|
asddongmen marked this conversation as resolved.
|
||
| zap.Uint64("minCheckpointTs", checkpointTs), | ||
| zap.Duration("lag", lag), | ||
| zap.Duration("suppressThreshold", c.syncPointLagSuppressThreshold), | ||
| zap.Duration("resumeThreshold", c.syncPointLagResumeThreshold)) | ||
|
asddongmen marked this conversation as resolved.
|
||
| } | ||
| return true | ||
| } | ||
| return false | ||
| } | ||
|
|
||
| // emitSyncPointEventIfNeeded emits a sync point event if the current ts is greater than the next sync point, and updates the next sync point. | ||
| // We need call this function every time we send a event(whether dml/ddl/resolvedTs), | ||
| // thus to ensure the sync point event is in correct order for each dispatcher. | ||
|
|
@@ -594,14 +678,18 @@ func (c *eventBroker) emitSyncPointEventIfNeeded(ts uint64, d *dispatcherStat, r | |
| commitTs := d.nextSyncPoint.Load() | ||
| d.nextSyncPoint.Store(oracle.GoTimeToTS(oracle.GetTimeFromTS(commitTs).Add(d.syncPointInterval))) | ||
|
|
||
| if c.shouldSuppressSyncPointEmission(d) { | ||
| metrics.EventServiceSyncPointSuppressedCount.WithLabelValues(d.changefeedStat.changefeedID.String()).Inc() | ||
| continue | ||
| } | ||
|
|
||
| e := event.NewSyncPointEvent(d.id, commitTs, d.seq.Add(1), d.epoch) | ||
| syncPointEvent := newWrapSyncPointEvent(remoteID, e) | ||
| c.getMessageCh(d.messageWorkerIndex, common.IsRedoMode(d.info.GetMode())) <- syncPointEvent | ||
| log.Debug("send syncpoint event to dispatcher", | ||
| zap.Stringer("changefeedID", d.changefeedStat.changefeedID), | ||
| zap.Stringer("dispatcherID", d.id), zap.Int64("tableID", d.info.GetTableSpan().GetTableID()), | ||
| zap.Uint64("commitTs", e.GetCommitTs()), zap.Uint64("seq", e.GetSeq())) | ||
|
|
||
| syncPointEvent := newWrapSyncPointEvent(remoteID, e) | ||
| c.getMessageCh(d.messageWorkerIndex, common.IsRedoMode(d.info.GetMode())) <- syncPointEvent | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -1145,6 +1233,8 @@ func (c *eventBroker) removeChangefeedStatus(status *changefeedStatus) { | |
| metrics.EventServiceAvailableMemoryQuotaGaugeVec.DeleteLabelValues(changefeedID.String()) | ||
| metrics.EventServiceScanWindowBaseTsGaugeVec.DeleteLabelValues(changefeedID.String()) | ||
| metrics.EventServiceScanWindowIntervalGaugeVec.DeleteLabelValues(changefeedID.String()) | ||
| metrics.EventServiceSyncPointLagGaugeVec.DeleteLabelValues(changefeedID.String()) | ||
| metrics.EventServiceSyncPointSuppressedCount.DeleteLabelValues(changefeedID.String()) | ||
| } | ||
|
|
||
| func (c *eventBroker) resetDispatcher(dispatcherInfo DispatcherInfo) error { | ||
|
|
@@ -1311,6 +1401,9 @@ func (c *eventBroker) handleDispatcherHeartbeat(heartbeat *DispatcherHeartBeatWi | |
| handleProgress(dp.DispatcherID, dp.CheckpointTs, 0, false) | ||
| } | ||
| } | ||
| for changefeed := range changedChangefeeds { | ||
| changefeed.refreshMinSentResolvedTs() | ||
| } | ||
| c.sendDispatcherResponse(responseMap) | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In TiCDC, durations in configuration structs should use
TomlDurationinstead oftime.Durationto ensure they are correctly parsed from human-readable strings (e.g., "20m") in the TOML configuration file. This maintains consistency with other duration fields likeResolvedTsStuckInterval.