Skip to content

eventBroker: skip syncpoint while lag incresing#4868

Open
asddongmen wants to merge 4 commits intopingcap:masterfrom
asddongmen:0420-skip-syncpoint-while-lag-incresing
Open

eventBroker: skip syncpoint while lag incresing#4868
asddongmen wants to merge 4 commits intopingcap:masterfrom
asddongmen:0420-skip-syncpoint-while-lag-incresing

Conversation

@asddongmen
Copy link
Copy Markdown
Collaborator

@asddongmen asddongmen commented Apr 20, 2026

What problem does this PR solve?

Issue Number: close #xxx

What is changed and how it works?

This PR adds syncpoint suppression to eventBroker based on the changefeed-wide minimum checkpoint.
Syncpoint emission is paused when resolved-ts lag exceeds a configurable threshold and resumes with a lower threshold to avoid flapping. The changefeed status now tracks the minimum checkpoint while excluding stale dispatchers, and heartbeat updates refresh that state.
This PR also wires the needed debug config and metrics, adds unit tests for suppression/resume and stale-dispatcher handling, and cleans up syncpoint metric labels when a changefeed is removed.

Check List

Tests

  • Unit test
  • Manual test (add detailed scripts or steps below)

Questions

Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?

Release note

Please refer to [Release Notes Language Style Guide](https://pingcap.github.io/tidb-dev-guide/contribute-to-tidb/release-notes-style-guide.html) to write a quality release note.

If you don't think this PR needs a release note then fill it with `None`.

Summary by CodeRabbit

Release Notes

  • New Features

    • Added adaptive sync-point event suppression to reduce overhead during high-lag conditions, with configurable thresholds controlling when suppression activates and resumes (defaults: 20-minute suppress, 15-minute resume).
    • Introduced new Prometheus metrics to monitor sync-point lag duration and event suppression count per changefeed.
  • Tests

    • Expanded test coverage for sync-point suppression and resumption behavior under various lag and dispatcher conditions.

Signed-off-by: dongmen <414110582@qq.com>
(cherry picked from commit 330eda5)
Signed-off-by: dongmen <414110582@qq.com>
(cherry picked from commit 531f764)
@ti-chi-bot
Copy link
Copy Markdown

ti-chi-bot Bot commented Apr 20, 2026

Skipping CI for Draft Pull Request.
If you want CI signal for your change, please convert it to an actual PR.
You can still manually trigger a test run with /test all

@ti-chi-bot ti-chi-bot Bot added do-not-merge/needs-linked-issue release-note Denotes a PR that will be considered when it comes time to generate release notes. do-not-merge/work-in-progress Indicates that a PR should not merge because it is a work in progress. labels Apr 20, 2026
@ti-chi-bot
Copy link
Copy Markdown

ti-chi-bot Bot commented Apr 20, 2026

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:
Once this PR has been reviewed and has the lgtm label, please assign flowbehappy for approval. For more information see the Code Review Process.
Please ensure that each of them provides their approval before proceeding.

The full list of commands accepted by this bot can be found here.

Details Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Apr 20, 2026

📝 Walkthrough

Walkthrough

This PR introduces sync-point lag suppression to the event service. It adds configurable thresholds, tracks dispatcher suppression state and minimum checkpoint timestamps, computes lag from timestamp differences, and conditionally suppresses sync-point emission when lag exceeds thresholds. Recovery occurs when lag subsides, with new Prometheus metrics tracking lag duration and suppressed counts.

Changes

Cohort / File(s) Summary
Configuration
pkg/config/debug.go
Added SyncPointLagSuppressThreshold and SyncPointLagResumeThreshold duration fields to EventServiceConfig with default values of 20 and 15 minutes respectively.
Dispatcher State Tracking
pkg/eventservice/dispatcher_stat.go
Added syncPointSendSuppressed atomic.Bool to track suppression state and minCheckpointTs atomic.Uint64 to changefeedStatus with getter method for sentinel value handling.
Core Suppression Logic
pkg/eventservice/event_broker.go
Implemented lag computation from resolved/checkpoint timestamps, configurable suppression thresholds with bounds enforcement, and conditional sync-point emission gating via new shouldSuppressSyncPointEmission helper. Updates metrics and manages dispatcher heartbeat handling to refresh min-sent resolved timestamps.
Checkpoint Tracking
pkg/eventservice/scan_window.go
Extracted dispatcher staleness check into isDispatcherStale helper and extended refreshMinSentResolvedTs to compute and persist minimum checkpoint timestamp across dispatchers, with stale dispatcher filtering.
Metrics
pkg/metrics/event_service.go
Added EventServiceSyncPointLagGaugeVec and EventServiceSyncPointSuppressedCount Prometheus metrics labeled by changefeed.
Test Coverage
pkg/eventservice/event_broker_test.go, pkg/eventservice/scan_window_test.go
Added 6 new test cases validating suppression/resumption behavior under various lag and checkpoint conditions; updated existing tests to initialize checkpoint timestamps and assert min-checkpoint-ts retrieval.

Sequence Diagram

sequenceDiagram
    participant Heartbeat as Dispatcher Heartbeat
    participant Broker as EventBroker
    participant ScanWindow as ScanWindow
    participant Status as ChangefeedStatus
    participant Metrics as Prometheus Metrics

    Heartbeat->>Broker: Process heartbeat
    Broker->>ScanWindow: refreshMinSentResolvedTs()
    ScanWindow->>Status: Update minCheckpointTs
    Broker->>Broker: shouldSuppressSyncPointEmission()
    Broker->>Status: Load resolved TS vs minCheckpointTs
    Broker->>Broker: Compute lag duration
    alt Lag exceeds suppress threshold
        Broker->>Status: Set syncPointSendSuppressed = true
        Broker->>Metrics: Increment suppressed counter
    else Lag <= resume threshold
        Broker->>Status: Set syncPointSendSuppressed = false
        Broker->>Broker: Emit SyncPointEvent
        Broker->>Metrics: Update lag gauge
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

Possibly related PRs

Suggested labels

lgtm, approved

Suggested reviewers

  • 3AceShowHand

Poem

🐰 A rabbit bounces through the event stream,
Checking lags and thresholds like a dream,
When lag runs high, we pause the sync,
Resume when lag recedes—quick as a wink!
Metrics track each hop and bound,
The cleanest sync-points ever found! 🌟

🚥 Pre-merge checks | ✅ 1 | ❌ 2

❌ Failed checks (2 warnings)

Check name Status Explanation Resolution
Description check ⚠️ Warning The description is missing critical information required by the template: no Issue Number is provided (shows placeholder 'close #xxx'), and the release note section is incomplete with only a template comment and no actual content. Add the actual issue number being closed and provide a substantive release note following the style guide, or mark as 'None' if no release note is needed.
Docstring Coverage ⚠️ Warning Docstring coverage is 11.76% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (1 passed)
Check name Status Explanation
Title check ✅ Passed The title mentions the key feature (skipping syncpoint) but contains a typo ('incresing' instead of 'increasing') and is somewhat vague about the lag condition triggering suppression.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@ti-chi-bot ti-chi-bot Bot added the size/XL Denotes a PR that changes 500-999 lines, ignoring generated files. label Apr 20, 2026
Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a lag-based suppression mechanism for syncpoint emission in the EventService, including new configuration thresholds, tracking of minimum checkpoint timestamps across dispatchers, and monitoring metrics. Reviewer feedback suggests using TomlDuration for configuration fields to ensure correct TOML parsing, renaming parameters for clarity, and correcting a misleading log key.

Comment thread pkg/config/debug.go
Comment on lines +122 to +124
SyncPointLagSuppressThreshold time.Duration `toml:"sync-point-lag-suppress-threshold" json:"sync_point_lag_suppress_threshold"`
// SyncPointLagResumeThreshold controls when to resume syncpoint emission after suppression.
SyncPointLagResumeThreshold time.Duration `toml:"sync-point-lag-resume-threshold" json:"sync_point_lag_resume_threshold"`
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

In TiCDC, durations in configuration structs should use TomlDuration instead of time.Duration to ensure they are correctly parsed from human-readable strings (e.g., "20m") in the TOML configuration file. This maintains consistency with other duration fields like ResolvedTsStuckInterval.

	SyncPointLagSuppressThreshold TomlDuration `toml:"sync-point-lag-suppress-threshold" json:"sync_point_lag_suppress_threshold"`\n	// SyncPointLagResumeThreshold controls when to resume syncpoint emission after suppression.\n	SyncPointLagResumeThreshold   TomlDuration `toml:"sync-point-lag-resume-threshold" json:"sync_point_lag_resume_threshold"`

Comment thread pkg/config/debug.go
Comment on lines +143 to +144
SyncPointLagSuppressThreshold: 20 * time.Minute,
SyncPointLagResumeThreshold: 15 * time.Minute,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Update the default values to use TomlDuration to match the field type change.

		SyncPointLagSuppressThreshold: TomlDuration(20 * time.Minute),\n		SyncPointLagResumeThreshold:   TomlDuration(15 * time.Minute),

Comment on lines +127 to +133
syncPointLagSuppressThreshold := eventServiceConfig.SyncPointLagSuppressThreshold
if syncPointLagSuppressThreshold <= 0 {
syncPointLagSuppressThreshold = defaultSyncPointLagSuppressThreshold
}
syncPointLagResumeThreshold := eventServiceConfig.SyncPointLagResumeThreshold
if syncPointLagResumeThreshold <= 0 {
syncPointLagResumeThreshold = defaultSyncPointLagResumeThreshold
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Since SyncPointLagSuppressThreshold and SyncPointLagResumeThreshold are now TomlDuration in the config, they should be explicitly cast to time.Duration when assigned to local variables or struct fields that expect time.Duration.

	syncPointLagSuppressThreshold := time.Duration(eventServiceConfig.SyncPointLagSuppressThreshold)\n	if syncPointLagSuppressThreshold <= 0 {\n		syncPointLagSuppressThreshold = defaultSyncPointLagSuppressThreshold\n	}\n	syncPointLagResumeThreshold := time.Duration(eventServiceConfig.SyncPointLagResumeThreshold)\n	if syncPointLagResumeThreshold <= 0 {\n		syncPointLagResumeThreshold = defaultSyncPointLagResumeThreshold\n	}

Comment thread pkg/eventservice/event_broker.go
Comment thread pkg/eventservice/event_broker.go
@asddongmen asddongmen marked this pull request as ready for review April 20, 2026 05:50
@ti-chi-bot ti-chi-bot Bot removed the do-not-merge/work-in-progress Indicates that a PR should not merge because it is a work in progress. label Apr 20, 2026
@asddongmen
Copy link
Copy Markdown
Collaborator Author

/test all

@ti-chi-bot
Copy link
Copy Markdown

ti-chi-bot Bot commented Apr 20, 2026

[FORMAT CHECKER NOTIFICATION]

Notice: To remove the do-not-merge/needs-linked-issue label, please provide the linked issue number on one line in the PR body, for example: Issue Number: close #123 or Issue Number: ref #456.

📖 For more info, you can check the "Contribute Code" section in the development guide.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@pkg/config/debug.go`:
- Around line 121-124: The TOML fields SyncPointLagSuppressThreshold and
SyncPointLagResumeThreshold in EventServiceConfig are declared as time.Duration
but must use TomlDuration for correct TOML unmarshaling; change their types to
TomlDuration in pkg/config/debug.go, update their default values accordingly,
and at the consumer boundary in event_broker.go convert them to time.Duration
(e.g., assign time.Duration(eventServiceConfig.SyncPointLagSuppressThreshold)
and time.Duration(eventServiceConfig.SyncPointLagResumeThreshold)) so downstream
code continues to use time.Duration.

In `@pkg/eventservice/event_broker.go`:
- Around line 657-666: The log field for the resolved timestamp is using the
wrong variable: change the zap field in the suppression branch so the structured
key matches the actual value (either replace zap.Uint64("sentResolvedTs",
receivedResolvedTs) with zap.Uint64("sentResolvedTs", sentResolvedTs) or rename
the key to "receivedResolvedTs" if you intend to log receivedResolvedTs) in the
block that calls d.syncPointSendSuppressed.CompareAndSwap; ensure the logged zap
field accurately reflects the variable used for the timestamp to avoid
misleading diagnostics.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 84a17ad5-2c81-48a8-9749-5c2d9f3c1a12

📥 Commits

Reviewing files that changed from the base of the PR and between 0213a79 and c7eb02c.

📒 Files selected for processing (7)
  • pkg/config/debug.go
  • pkg/eventservice/dispatcher_stat.go
  • pkg/eventservice/event_broker.go
  • pkg/eventservice/event_broker_test.go
  • pkg/eventservice/scan_window.go
  • pkg/eventservice/scan_window_test.go
  • pkg/metrics/event_service.go

Comment thread pkg/config/debug.go
Comment on lines +121 to +124
// SyncPointLagSuppressThreshold controls when to suppress syncpoint emission for lagging dispatchers.
SyncPointLagSuppressThreshold time.Duration `toml:"sync-point-lag-suppress-threshold" json:"sync_point_lag_suppress_threshold"`
// SyncPointLagResumeThreshold controls when to resume syncpoint emission after suppression.
SyncPointLagResumeThreshold time.Duration `toml:"sync-point-lag-resume-threshold" json:"sync_point_lag_resume_threshold"`
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Description: Inspect duration-typed TOML config fields and the new sync-point threshold consumers.

rg -n -C2 'TomlDuration|time\.Duration `toml:' --type=go
rg -n -C3 'SyncPointLag(Suppress|Resume)Threshold' --type=go

Repository: pingcap/ticdc

Length of output: 17769


Use TomlDuration for TOML duration fields in EventServiceConfig.

These fields are inconsistent with other duration fields in the debug config (e.g., ResolvedTsStuckInterval). All TOML-facing durations in the config should use TomlDuration to ensure consistent unmarshaling behavior across the codebase.

Proposed fix
-	SyncPointLagSuppressThreshold time.Duration `toml:"sync-point-lag-suppress-threshold" json:"sync_point_lag_suppress_threshold"`
+	SyncPointLagSuppressThreshold TomlDuration `toml:"sync-point-lag-suppress-threshold" json:"sync_point_lag_suppress_threshold"`
 	// SyncPointLagResumeThreshold controls when to resume syncpoint emission after suppression.
-	SyncPointLagResumeThreshold time.Duration `toml:"sync-point-lag-resume-threshold" json:"sync_point_lag_resume_threshold"`
+	SyncPointLagResumeThreshold TomlDuration `toml:"sync-point-lag-resume-threshold" json:"sync_point_lag_resume_threshold"`

And update the default values:

-		SyncPointLagSuppressThreshold: 20 * time.Minute,
-		SyncPointLagResumeThreshold:   15 * time.Minute,
+		SyncPointLagSuppressThreshold: TomlDuration(20 * time.Minute),
+		SyncPointLagResumeThreshold:   TomlDuration(15 * time.Minute),

At the consumer boundary in event_broker.go, cast to time.Duration:

syncPointLagSuppressThreshold := time.Duration(eventServiceConfig.SyncPointLagSuppressThreshold)
syncPointLagResumeThreshold := time.Duration(eventServiceConfig.SyncPointLagResumeThreshold)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/config/debug.go` around lines 121 - 124, The TOML fields
SyncPointLagSuppressThreshold and SyncPointLagResumeThreshold in
EventServiceConfig are declared as time.Duration but must use TomlDuration for
correct TOML unmarshaling; change their types to TomlDuration in
pkg/config/debug.go, update their default values accordingly, and at the
consumer boundary in event_broker.go convert them to time.Duration (e.g., assign
time.Duration(eventServiceConfig.SyncPointLagSuppressThreshold) and
time.Duration(eventServiceConfig.SyncPointLagResumeThreshold)) so downstream
code continues to use time.Duration.

Comment thread pkg/eventservice/event_broker.go
@ti-chi-bot
Copy link
Copy Markdown

ti-chi-bot Bot commented Apr 20, 2026

@asddongmen: The following test failed, say /retest to rerun all failed tests or /retest-required to rerun all mandatory failed tests:

Test name Commit Details Required Rerun command
pull-cdc-storage-integration-light c7eb02c link true /test pull-cdc-storage-integration-light

Full PR test history. Your PR dashboard.

Details

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

do-not-merge/needs-linked-issue release-note Denotes a PR that will be considered when it comes time to generate release notes. size/XL Denotes a PR that changes 500-999 lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant