eventBroker: skip syncpoint while lag incresing#4868
eventBroker: skip syncpoint while lag incresing#4868asddongmen wants to merge 4 commits intopingcap:masterfrom
Conversation
|
Skipping CI for Draft Pull Request. |
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
📝 WalkthroughWalkthroughThis PR introduces sync-point lag suppression to the event service. It adds configurable thresholds, tracks dispatcher suppression state and minimum checkpoint timestamps, computes lag from timestamp differences, and conditionally suppresses sync-point emission when lag exceeds thresholds. Recovery occurs when lag subsides, with new Prometheus metrics tracking lag duration and suppressed counts. Changes
Sequence DiagramsequenceDiagram
participant Heartbeat as Dispatcher Heartbeat
participant Broker as EventBroker
participant ScanWindow as ScanWindow
participant Status as ChangefeedStatus
participant Metrics as Prometheus Metrics
Heartbeat->>Broker: Process heartbeat
Broker->>ScanWindow: refreshMinSentResolvedTs()
ScanWindow->>Status: Update minCheckpointTs
Broker->>Broker: shouldSuppressSyncPointEmission()
Broker->>Status: Load resolved TS vs minCheckpointTs
Broker->>Broker: Compute lag duration
alt Lag exceeds suppress threshold
Broker->>Status: Set syncPointSendSuppressed = true
Broker->>Metrics: Increment suppressed counter
else Lag <= resume threshold
Broker->>Status: Set syncPointSendSuppressed = false
Broker->>Broker: Emit SyncPointEvent
Broker->>Metrics: Update lag gauge
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (2 warnings)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request introduces a lag-based suppression mechanism for syncpoint emission in the EventService, including new configuration thresholds, tracking of minimum checkpoint timestamps across dispatchers, and monitoring metrics. Reviewer feedback suggests using TomlDuration for configuration fields to ensure correct TOML parsing, renaming parameters for clarity, and correcting a misleading log key.
| SyncPointLagSuppressThreshold time.Duration `toml:"sync-point-lag-suppress-threshold" json:"sync_point_lag_suppress_threshold"` | ||
| // SyncPointLagResumeThreshold controls when to resume syncpoint emission after suppression. | ||
| SyncPointLagResumeThreshold time.Duration `toml:"sync-point-lag-resume-threshold" json:"sync_point_lag_resume_threshold"` |
There was a problem hiding this comment.
In TiCDC, durations in configuration structs should use TomlDuration instead of time.Duration to ensure they are correctly parsed from human-readable strings (e.g., "20m") in the TOML configuration file. This maintains consistency with other duration fields like ResolvedTsStuckInterval.
SyncPointLagSuppressThreshold TomlDuration `toml:"sync-point-lag-suppress-threshold" json:"sync_point_lag_suppress_threshold"`\n // SyncPointLagResumeThreshold controls when to resume syncpoint emission after suppression.\n SyncPointLagResumeThreshold TomlDuration `toml:"sync-point-lag-resume-threshold" json:"sync_point_lag_resume_threshold"`| SyncPointLagSuppressThreshold: 20 * time.Minute, | ||
| SyncPointLagResumeThreshold: 15 * time.Minute, |
| syncPointLagSuppressThreshold := eventServiceConfig.SyncPointLagSuppressThreshold | ||
| if syncPointLagSuppressThreshold <= 0 { | ||
| syncPointLagSuppressThreshold = defaultSyncPointLagSuppressThreshold | ||
| } | ||
| syncPointLagResumeThreshold := eventServiceConfig.SyncPointLagResumeThreshold | ||
| if syncPointLagResumeThreshold <= 0 { | ||
| syncPointLagResumeThreshold = defaultSyncPointLagResumeThreshold |
There was a problem hiding this comment.
Since SyncPointLagSuppressThreshold and SyncPointLagResumeThreshold are now TomlDuration in the config, they should be explicitly cast to time.Duration when assigned to local variables or struct fields that expect time.Duration.
syncPointLagSuppressThreshold := time.Duration(eventServiceConfig.SyncPointLagSuppressThreshold)\n if syncPointLagSuppressThreshold <= 0 {\n syncPointLagSuppressThreshold = defaultSyncPointLagSuppressThreshold\n }\n syncPointLagResumeThreshold := time.Duration(eventServiceConfig.SyncPointLagResumeThreshold)\n if syncPointLagResumeThreshold <= 0 {\n syncPointLagResumeThreshold = defaultSyncPointLagResumeThreshold\n }|
/test all |
|
[FORMAT CHECKER NOTIFICATION] Notice: To remove the 📖 For more info, you can check the "Contribute Code" section in the development guide. |
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@pkg/config/debug.go`:
- Around line 121-124: The TOML fields SyncPointLagSuppressThreshold and
SyncPointLagResumeThreshold in EventServiceConfig are declared as time.Duration
but must use TomlDuration for correct TOML unmarshaling; change their types to
TomlDuration in pkg/config/debug.go, update their default values accordingly,
and at the consumer boundary in event_broker.go convert them to time.Duration
(e.g., assign time.Duration(eventServiceConfig.SyncPointLagSuppressThreshold)
and time.Duration(eventServiceConfig.SyncPointLagResumeThreshold)) so downstream
code continues to use time.Duration.
In `@pkg/eventservice/event_broker.go`:
- Around line 657-666: The log field for the resolved timestamp is using the
wrong variable: change the zap field in the suppression branch so the structured
key matches the actual value (either replace zap.Uint64("sentResolvedTs",
receivedResolvedTs) with zap.Uint64("sentResolvedTs", sentResolvedTs) or rename
the key to "receivedResolvedTs" if you intend to log receivedResolvedTs) in the
block that calls d.syncPointSendSuppressed.CompareAndSwap; ensure the logged zap
field accurately reflects the variable used for the timestamp to avoid
misleading diagnostics.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 84a17ad5-2c81-48a8-9749-5c2d9f3c1a12
📒 Files selected for processing (7)
pkg/config/debug.gopkg/eventservice/dispatcher_stat.gopkg/eventservice/event_broker.gopkg/eventservice/event_broker_test.gopkg/eventservice/scan_window.gopkg/eventservice/scan_window_test.gopkg/metrics/event_service.go
| // SyncPointLagSuppressThreshold controls when to suppress syncpoint emission for lagging dispatchers. | ||
| SyncPointLagSuppressThreshold time.Duration `toml:"sync-point-lag-suppress-threshold" json:"sync_point_lag_suppress_threshold"` | ||
| // SyncPointLagResumeThreshold controls when to resume syncpoint emission after suppression. | ||
| SyncPointLagResumeThreshold time.Duration `toml:"sync-point-lag-resume-threshold" json:"sync_point_lag_resume_threshold"` |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Description: Inspect duration-typed TOML config fields and the new sync-point threshold consumers.
rg -n -C2 'TomlDuration|time\.Duration `toml:' --type=go
rg -n -C3 'SyncPointLag(Suppress|Resume)Threshold' --type=goRepository: pingcap/ticdc
Length of output: 17769
Use TomlDuration for TOML duration fields in EventServiceConfig.
These fields are inconsistent with other duration fields in the debug config (e.g., ResolvedTsStuckInterval). All TOML-facing durations in the config should use TomlDuration to ensure consistent unmarshaling behavior across the codebase.
Proposed fix
- SyncPointLagSuppressThreshold time.Duration `toml:"sync-point-lag-suppress-threshold" json:"sync_point_lag_suppress_threshold"`
+ SyncPointLagSuppressThreshold TomlDuration `toml:"sync-point-lag-suppress-threshold" json:"sync_point_lag_suppress_threshold"`
// SyncPointLagResumeThreshold controls when to resume syncpoint emission after suppression.
- SyncPointLagResumeThreshold time.Duration `toml:"sync-point-lag-resume-threshold" json:"sync_point_lag_resume_threshold"`
+ SyncPointLagResumeThreshold TomlDuration `toml:"sync-point-lag-resume-threshold" json:"sync_point_lag_resume_threshold"`And update the default values:
- SyncPointLagSuppressThreshold: 20 * time.Minute,
- SyncPointLagResumeThreshold: 15 * time.Minute,
+ SyncPointLagSuppressThreshold: TomlDuration(20 * time.Minute),
+ SyncPointLagResumeThreshold: TomlDuration(15 * time.Minute),At the consumer boundary in event_broker.go, cast to time.Duration:
syncPointLagSuppressThreshold := time.Duration(eventServiceConfig.SyncPointLagSuppressThreshold)
syncPointLagResumeThreshold := time.Duration(eventServiceConfig.SyncPointLagResumeThreshold)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@pkg/config/debug.go` around lines 121 - 124, The TOML fields
SyncPointLagSuppressThreshold and SyncPointLagResumeThreshold in
EventServiceConfig are declared as time.Duration but must use TomlDuration for
correct TOML unmarshaling; change their types to TomlDuration in
pkg/config/debug.go, update their default values accordingly, and at the
consumer boundary in event_broker.go convert them to time.Duration (e.g., assign
time.Duration(eventServiceConfig.SyncPointLagSuppressThreshold) and
time.Duration(eventServiceConfig.SyncPointLagResumeThreshold)) so downstream
code continues to use time.Duration.
|
@asddongmen: The following test failed, say
Full PR test history. Your PR dashboard. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
What problem does this PR solve?
Issue Number: close #xxx
What is changed and how it works?
This PR adds syncpoint suppression to eventBroker based on the changefeed-wide minimum checkpoint.
Syncpoint emission is paused when resolved-ts lag exceeds a configurable threshold and resumes with a lower threshold to avoid flapping. The changefeed status now tracks the minimum checkpoint while excluding stale dispatchers, and heartbeat updates refresh that state.
This PR also wires the needed debug config and metrics, adds unit tests for suppression/resume and stale-dispatcher handling, and cleans up syncpoint metric labels when a changefeed is removed.
Check List
Tests
Questions
Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?
Release note
Summary by CodeRabbit
Release Notes
New Features
Tests