Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions pkg/config/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,11 @@ type EventServiceConfig struct {
ScanTaskQueueSize int `toml:"scan-task-queue-size" json:"scan_task_queue_size"`
ScanLimitInBytes int `toml:"scan-limit-in-bytes" json:"scan_limit_in_bytes"`

// SyncPointLagSuppressThreshold controls when to suppress syncpoint emission for lagging dispatchers.
SyncPointLagSuppressThreshold time.Duration `toml:"sync-point-lag-suppress-threshold" json:"sync_point_lag_suppress_threshold"`
// SyncPointLagResumeThreshold controls when to resume syncpoint emission after suppression.
SyncPointLagResumeThreshold time.Duration `toml:"sync-point-lag-resume-threshold" json:"sync_point_lag_resume_threshold"`
Comment on lines +122 to +124
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

In TiCDC, durations in configuration structs should use TomlDuration instead of time.Duration to ensure they are correctly parsed from human-readable strings (e.g., "20m") in the TOML configuration file. This maintains consistency with other duration fields like ResolvedTsStuckInterval.

	SyncPointLagSuppressThreshold TomlDuration `toml:"sync-point-lag-suppress-threshold" json:"sync_point_lag_suppress_threshold"`\n	// SyncPointLagResumeThreshold controls when to resume syncpoint emission after suppression.\n	SyncPointLagResumeThreshold   TomlDuration `toml:"sync-point-lag-resume-threshold" json:"sync_point_lag_resume_threshold"`

Comment on lines +121 to +124
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Description: Inspect duration-typed TOML config fields and the new sync-point threshold consumers.

rg -n -C2 'TomlDuration|time\.Duration `toml:' --type=go
rg -n -C3 'SyncPointLag(Suppress|Resume)Threshold' --type=go

Repository: pingcap/ticdc

Length of output: 17769


Use TomlDuration for TOML duration fields in EventServiceConfig.

These fields are inconsistent with other duration fields in the debug config (e.g., ResolvedTsStuckInterval). All TOML-facing durations in the config should use TomlDuration to ensure consistent unmarshaling behavior across the codebase.

Proposed fix
-	SyncPointLagSuppressThreshold time.Duration `toml:"sync-point-lag-suppress-threshold" json:"sync_point_lag_suppress_threshold"`
+	SyncPointLagSuppressThreshold TomlDuration `toml:"sync-point-lag-suppress-threshold" json:"sync_point_lag_suppress_threshold"`
 	// SyncPointLagResumeThreshold controls when to resume syncpoint emission after suppression.
-	SyncPointLagResumeThreshold time.Duration `toml:"sync-point-lag-resume-threshold" json:"sync_point_lag_resume_threshold"`
+	SyncPointLagResumeThreshold TomlDuration `toml:"sync-point-lag-resume-threshold" json:"sync_point_lag_resume_threshold"`

And update the default values:

-		SyncPointLagSuppressThreshold: 20 * time.Minute,
-		SyncPointLagResumeThreshold:   15 * time.Minute,
+		SyncPointLagSuppressThreshold: TomlDuration(20 * time.Minute),
+		SyncPointLagResumeThreshold:   TomlDuration(15 * time.Minute),

At the consumer boundary in event_broker.go, cast to time.Duration:

syncPointLagSuppressThreshold := time.Duration(eventServiceConfig.SyncPointLagSuppressThreshold)
syncPointLagResumeThreshold := time.Duration(eventServiceConfig.SyncPointLagResumeThreshold)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/config/debug.go` around lines 121 - 124, The TOML fields
SyncPointLagSuppressThreshold and SyncPointLagResumeThreshold in
EventServiceConfig are declared as time.Duration but must use TomlDuration for
correct TOML unmarshaling; change their types to TomlDuration in
pkg/config/debug.go, update their default values accordingly, and at the
consumer boundary in event_broker.go convert them to time.Duration (e.g., assign
time.Duration(eventServiceConfig.SyncPointLagSuppressThreshold) and
time.Duration(eventServiceConfig.SyncPointLagResumeThreshold)) so downstream
code continues to use time.Duration.


// DMLEventMaxRows is the maximum number of rows in a DML event when split txn is enabled.
DMLEventMaxRows int32 `toml:"dml-event-max-rows" json:"dml_event_max_rows"`
// DMLEventMaxBytes is the maximum size of a DML event in bytes when split txn is enabled.
Expand All @@ -133,10 +138,12 @@ type EventServiceConfig struct {
// NewDefaultEventServiceConfig return the default event service configuration
func NewDefaultEventServiceConfig() *EventServiceConfig {
return &EventServiceConfig{
ScanTaskQueueSize: 1024 * 8,
ScanLimitInBytes: 1024 * 1024 * 256, // 256MB
DMLEventMaxRows: 256,
DMLEventMaxBytes: 1024 * 1024 * 1, // 1MB
EnableRemoteEventService: true,
ScanTaskQueueSize: 1024 * 8,
ScanLimitInBytes: 1024 * 1024 * 256, // 256MB
SyncPointLagSuppressThreshold: 20 * time.Minute,
SyncPointLagResumeThreshold: 15 * time.Minute,
Comment on lines +143 to +144
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Update the default values to use TomlDuration to match the field type change.

		SyncPointLagSuppressThreshold: TomlDuration(20 * time.Minute),\n		SyncPointLagResumeThreshold:   TomlDuration(15 * time.Minute),

DMLEventMaxRows: 256,
DMLEventMaxBytes: 1024 * 1024 * 1, // 1MB
EnableRemoteEventService: true,
}
}
16 changes: 15 additions & 1 deletion pkg/eventservice/dispatcher_stat.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ type dispatcherStat struct {
enableSyncPoint bool
nextSyncPoint atomic.Uint64
syncPointInterval time.Duration
txnAtomicity config.AtomicityLevel
// syncPointSendSuppressed tracks whether syncpoint emission is temporarily suppressed due to lag.
syncPointSendSuppressed atomic.Bool
txnAtomicity config.AtomicityLevel

// =============================================================================
// ================== below are fields need copied when reset ==================
Expand Down Expand Up @@ -429,6 +431,7 @@ type changefeedStatus struct {

availableMemoryQuota sync.Map // nodeID -> atomic.Uint64 (memory quota in bytes)
minSentTs atomic.Uint64
minCheckpointTs atomic.Uint64
scanInterval atomic.Int64

lastAdjustTime atomic.Time
Expand All @@ -437,13 +440,16 @@ type changefeedStatus struct {
syncPointInterval time.Duration
}

const invalidMinCheckpointTs = ^uint64(0)

func newChangefeedStatus(changefeedID common.ChangeFeedID, syncPointInterval time.Duration) *changefeedStatus {
status := &changefeedStatus{
changefeedID: changefeedID,
usageWindow: newMemoryUsageWindow(memoryUsageWindowDuration),
syncPointInterval: syncPointInterval,
}
status.scanInterval.Store(int64(defaultScanInterval))
status.minCheckpointTs.Store(invalidMinCheckpointTs)
status.lastAdjustTime.Store(time.Now())
status.lastTrendAdjustTime.Store(time.Now())

Expand All @@ -470,3 +476,11 @@ func (c *changefeedStatus) isEmpty() bool {
func (c *changefeedStatus) isSyncpointEnabled() bool {
return c.syncPointInterval > 0
}

func (c *changefeedStatus) getMinCheckpointTs() (uint64, bool) {
minCheckpointTs := c.minCheckpointTs.Load()
if minCheckpointTs == invalidMinCheckpointTs {
return 0, false
}
return minCheckpointTs, true
}
141 changes: 117 additions & 24 deletions pkg/eventservice/event_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ const (
// defaultSendResolvedTsInterval use to control whether to send a resolvedTs event to the dispatcher when its scan is skipped.
defaultSendResolvedTsInterval = time.Second * 2
defaultRefreshMinSentResolvedTsInterval = time.Second * 1
defaultSyncPointLagSuppressThreshold = 20 * time.Minute
defaultSyncPointLagResumeThreshold = 15 * time.Minute
)

// eventBroker get event from the eventStore, and send the event to the dispatchers.
Expand Down Expand Up @@ -94,8 +96,10 @@ type eventBroker struct {
// metricsCollector handles all metrics collection and reporting
metricsCollector *metricsCollector

scanRateLimiter *rate.Limiter
scanLimitInBytes uint64
scanRateLimiter *rate.Limiter
scanLimitInBytes uint64
syncPointLagSuppressThreshold time.Duration
syncPointLagResumeThreshold time.Duration
}

func newEventBroker(
Expand All @@ -118,7 +122,19 @@ func newEventBroker(
scanTaskQueueSize := config.GetGlobalServerConfig().Debug.EventService.ScanTaskQueueSize / scanWorkerCount
sendMessageQueueSize := basicChannelSize * 4

scanLimitInBytes := config.GetGlobalServerConfig().Debug.EventService.ScanLimitInBytes
eventServiceConfig := config.GetGlobalServerConfig().Debug.EventService
scanLimitInBytes := eventServiceConfig.ScanLimitInBytes
syncPointLagSuppressThreshold := eventServiceConfig.SyncPointLagSuppressThreshold
if syncPointLagSuppressThreshold <= 0 {
syncPointLagSuppressThreshold = defaultSyncPointLagSuppressThreshold
}
syncPointLagResumeThreshold := eventServiceConfig.SyncPointLagResumeThreshold
if syncPointLagResumeThreshold <= 0 {
syncPointLagResumeThreshold = defaultSyncPointLagResumeThreshold
Comment on lines +127 to +133
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Since SyncPointLagSuppressThreshold and SyncPointLagResumeThreshold are now TomlDuration in the config, they should be explicitly cast to time.Duration when assigned to local variables or struct fields that expect time.Duration.

	syncPointLagSuppressThreshold := time.Duration(eventServiceConfig.SyncPointLagSuppressThreshold)\n	if syncPointLagSuppressThreshold <= 0 {\n		syncPointLagSuppressThreshold = defaultSyncPointLagSuppressThreshold\n	}\n	syncPointLagResumeThreshold := time.Duration(eventServiceConfig.SyncPointLagResumeThreshold)\n	if syncPointLagResumeThreshold <= 0 {\n		syncPointLagResumeThreshold = defaultSyncPointLagResumeThreshold\n	}

}
if syncPointLagResumeThreshold > syncPointLagSuppressThreshold {
syncPointLagResumeThreshold = syncPointLagSuppressThreshold
}

g, ctx := errgroup.WithContext(ctx)
ctx, cancel := context.WithCancel(ctx)
Expand All @@ -127,23 +143,25 @@ func newEventBroker(
// For now, since there is only one upstream, using the default pdClock is sufficient.
pdClock := appcontext.GetService[pdutil.Clock](appcontext.DefaultPDClock)
c := &eventBroker{
tidbClusterID: id,
eventStore: eventStore,
pdClock: pdClock,
mounter: event.NewMounter(tz, integrity),
timezone: tz.String(),
schemaStore: schemaStore,
changefeedMap: sync.Map{},
dispatchers: sync.Map{},
tableTriggerDispatchers: sync.Map{},
msgSender: mc,
taskChan: make([]chan scanTask, scanWorkerCount),
messageCh: make([]chan *wrapEvent, sendMessageWorkerCount),
redoMessageCh: make([]chan *wrapEvent, sendMessageWorkerCount),
cancel: cancel,
g: g,
scanRateLimiter: rate.NewLimiter(rate.Limit(scanLimitInBytes), scanLimitInBytes),
scanLimitInBytes: uint64(scanLimitInBytes),
tidbClusterID: id,
eventStore: eventStore,
pdClock: pdClock,
mounter: event.NewMounter(tz, integrity),
timezone: tz.String(),
schemaStore: schemaStore,
changefeedMap: sync.Map{},
dispatchers: sync.Map{},
tableTriggerDispatchers: sync.Map{},
msgSender: mc,
taskChan: make([]chan scanTask, scanWorkerCount),
messageCh: make([]chan *wrapEvent, sendMessageWorkerCount),
redoMessageCh: make([]chan *wrapEvent, sendMessageWorkerCount),
cancel: cancel,
g: g,
scanRateLimiter: rate.NewLimiter(rate.Limit(scanLimitInBytes), scanLimitInBytes),
scanLimitInBytes: uint64(scanLimitInBytes),
syncPointLagSuppressThreshold: syncPointLagSuppressThreshold,
syncPointLagResumeThreshold: syncPointLagResumeThreshold,
}

// Initialize metrics collector
Expand Down Expand Up @@ -188,7 +206,11 @@ func newEventBroker(
return c.refreshMinSentResolvedTs(ctx)
})

log.Info("new event broker created", zap.Uint64("id", id), zap.Uint64("scanLimitInBytes", c.scanLimitInBytes))
log.Info("new event broker created",
zap.Uint64("id", id),
zap.Uint64("scanLimitInBytes", c.scanLimitInBytes),
zap.Duration("syncPointLagSuppressThreshold", c.syncPointLagSuppressThreshold),
zap.Duration("syncPointLagResumeThreshold", c.syncPointLagResumeThreshold))
return c
}

Expand Down Expand Up @@ -586,6 +608,68 @@ func (c *eventBroker) hasSyncPointEventsBeforeTs(ts uint64, d *dispatcherStat) b
return d.enableSyncPoint && ts > d.nextSyncPoint.Load()
}

func syncPointLagDuration(sentResolvedTs, checkpointTs uint64) time.Duration {
if sentResolvedTs <= checkpointTs {
return 0
}
return oracle.GetTimeFromTS(sentResolvedTs).Sub(oracle.GetTimeFromTS(checkpointTs))
}
Comment thread
asddongmen marked this conversation as resolved.

func (c *eventBroker) shouldSuppressSyncPointEmission(d *dispatcherStat) bool {
if d == nil || c.syncPointLagSuppressThreshold <= 0 {
return false
}

receivedResolvedTs := d.receivedResolvedTs.Load()
checkpointTs, ok := d.changefeedStat.getMinCheckpointTs()
if !ok {
metrics.EventServiceSyncPointLagGaugeVec.WithLabelValues(d.changefeedStat.changefeedID.String()).Set(0)
if d.syncPointSendSuppressed.Load() {
if d.syncPointSendSuppressed.CompareAndSwap(true, false) {
log.Info("syncpoint emission resumed",
zap.Stringer("changefeedID", d.changefeedStat.changefeedID),
zap.Stringer("dispatcherID", d.id),
zap.Uint64("receivedResolvedTs", receivedResolvedTs),
zap.Duration("resumeThreshold", c.syncPointLagResumeThreshold))
}
}
return false
}
lag := syncPointLagDuration(receivedResolvedTs, checkpointTs)
metrics.EventServiceSyncPointLagGaugeVec.WithLabelValues(d.changefeedStat.changefeedID.String()).Set(lag.Seconds())

if d.syncPointSendSuppressed.Load() {
if lag <= c.syncPointLagResumeThreshold {
if d.syncPointSendSuppressed.CompareAndSwap(true, false) {
log.Info("syncpoint emission resumed",
zap.Stringer("changefeedID", d.changefeedStat.changefeedID),
zap.Stringer("dispatcherID", d.id),
zap.Uint64("receivedResolvedTs", receivedResolvedTs),
zap.Uint64("minCheckpointTs", checkpointTs),
zap.Duration("lag", lag),
zap.Duration("resumeThreshold", c.syncPointLagResumeThreshold))
}
return false
}
return true
}

if lag > c.syncPointLagSuppressThreshold {
if d.syncPointSendSuppressed.CompareAndSwap(false, true) {
log.Info("syncpoint emission suppressed due to lag",
zap.Stringer("changefeedID", d.changefeedStat.changefeedID),
zap.Stringer("dispatcherID", d.id),
zap.Uint64("sentResolvedTs", receivedResolvedTs),
Comment thread
asddongmen marked this conversation as resolved.
zap.Uint64("minCheckpointTs", checkpointTs),
zap.Duration("lag", lag),
zap.Duration("suppressThreshold", c.syncPointLagSuppressThreshold),
zap.Duration("resumeThreshold", c.syncPointLagResumeThreshold))
Comment thread
asddongmen marked this conversation as resolved.
}
return true
}
return false
}

// emitSyncPointEventIfNeeded emits a sync point event if the current ts is greater than the next sync point, and updates the next sync point.
// We need call this function every time we send a event(whether dml/ddl/resolvedTs),
// thus to ensure the sync point event is in correct order for each dispatcher.
Expand All @@ -594,14 +678,18 @@ func (c *eventBroker) emitSyncPointEventIfNeeded(ts uint64, d *dispatcherStat, r
commitTs := d.nextSyncPoint.Load()
d.nextSyncPoint.Store(oracle.GoTimeToTS(oracle.GetTimeFromTS(commitTs).Add(d.syncPointInterval)))

if c.shouldSuppressSyncPointEmission(d) {
metrics.EventServiceSyncPointSuppressedCount.WithLabelValues(d.changefeedStat.changefeedID.String()).Inc()
continue
}

e := event.NewSyncPointEvent(d.id, commitTs, d.seq.Add(1), d.epoch)
syncPointEvent := newWrapSyncPointEvent(remoteID, e)
c.getMessageCh(d.messageWorkerIndex, common.IsRedoMode(d.info.GetMode())) <- syncPointEvent
log.Debug("send syncpoint event to dispatcher",
zap.Stringer("changefeedID", d.changefeedStat.changefeedID),
zap.Stringer("dispatcherID", d.id), zap.Int64("tableID", d.info.GetTableSpan().GetTableID()),
zap.Uint64("commitTs", e.GetCommitTs()), zap.Uint64("seq", e.GetSeq()))

syncPointEvent := newWrapSyncPointEvent(remoteID, e)
c.getMessageCh(d.messageWorkerIndex, common.IsRedoMode(d.info.GetMode())) <- syncPointEvent
}
}

Expand Down Expand Up @@ -1145,6 +1233,8 @@ func (c *eventBroker) removeChangefeedStatus(status *changefeedStatus) {
metrics.EventServiceAvailableMemoryQuotaGaugeVec.DeleteLabelValues(changefeedID.String())
metrics.EventServiceScanWindowBaseTsGaugeVec.DeleteLabelValues(changefeedID.String())
metrics.EventServiceScanWindowIntervalGaugeVec.DeleteLabelValues(changefeedID.String())
metrics.EventServiceSyncPointLagGaugeVec.DeleteLabelValues(changefeedID.String())
metrics.EventServiceSyncPointSuppressedCount.DeleteLabelValues(changefeedID.String())
}

func (c *eventBroker) resetDispatcher(dispatcherInfo DispatcherInfo) error {
Expand Down Expand Up @@ -1311,6 +1401,9 @@ func (c *eventBroker) handleDispatcherHeartbeat(heartbeat *DispatcherHeartBeatWi
handleProgress(dp.DispatcherID, dp.CheckpointTs, 0, false)
}
}
for changefeed := range changedChangefeeds {
changefeed.refreshMinSentResolvedTs()
}
c.sendDispatcherResponse(responseMap)
}

Expand Down
Loading
Loading