Skip to content

[WIP] logpuller: improve retry backoff and range reload scheduling#4700

Open
lidezhu wants to merge 2 commits intopingcap:masterfrom
lidezhu:ldz/improve-puller0329
Open

[WIP] logpuller: improve retry backoff and range reload scheduling#4700
lidezhu wants to merge 2 commits intopingcap:masterfrom
lidezhu:ldz/improve-puller0329

Conversation

@lidezhu
Copy link
Copy Markdown
Collaborator

@lidezhu lidezhu commented Apr 3, 2026

What problem does this PR solve?

Issue Number: close #xxx

What is changed and how it works?

Check List

Tests

  • Unit test
  • Integration test
  • Manual test (add detailed scripts or steps below)
  • No code

Questions

Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?

Release note

Please refer to [Release Notes Language Style Guide](https://pingcap.github.io/tidb-dev-guide/contribute-to-tidb/release-notes-style-guide.html) to write a quality release note.

If you don't think this PR needs a release note then fill it with `None`.

Summary by CodeRabbit

  • New Features

    • Added a centralized retry scheduler with prioritized, delayed retries and range-reload aggregation.
    • Added store cooldown tracking to avoid repeatedly hitting failing stores.
  • Bug Fixes

    • Improved error-handling and recovery: automatic backoff reset on success and clearer store success/failure signaling.
    • Reduced redundant range reloads by merging adjacent/overlapping work.
  • Tests

    • Added unit tests covering retry scheduling, backoff behavior, range-merge logic, and store backoff.

@ti-chi-bot ti-chi-bot Bot added do-not-merge/needs-linked-issue release-note Denotes a PR that will be considered when it comes time to generate release notes. labels Apr 3, 2026
@ti-chi-bot
Copy link
Copy Markdown

ti-chi-bot Bot commented Apr 3, 2026

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:
Once this PR has been reviewed and has the lgtm label, please assign wk989898 for approval. For more information see the Code Review Process.
Please ensure that each of them provides their approval before proceeding.

The full list of commands accepted by this bot can be found here.

Details Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Apr 3, 2026

📝 Walkthrough

Walkthrough

Introduces a retry scheduler, per-region and per-store backoff managers, and a range-reload aggregator; integrates them into the subscription client and region worker flows to schedule delayed region/range retries and track store cooldowns.

Changes

Cohort / File(s) Summary
Range Reload Aggregator
logservice/logpuller/range_reload_aggregator.go
New unexported aggregator that batches per-subscription span reloads, lowers stored task priority, debounces flushes, merges overlapping/adjacent spans, and enqueues range reload tasks.
Retry Scheduling & Backoff
logservice/logpuller/retry_scheduler.go, logservice/logpuller/retry_scheduler_test.go
New scheduler with a min-heap and single timer, APIs to schedule delayed actions, per-region exponential backoff manager (by subscription/region/reason), and per-store cooldown manager; tests cover scheduling, span merging, and store backoff behavior.
Subscription Client Integration
logservice/logpuller/subscription_client.go
Wires retryScheduler, rangeReloadAggregator, retryBackoff, and storeBackoff into subscriptionClient; routes errors to scheduled retries or aggregated range reloads; checks store cooldown before dispatching region tasks.
Region Request Worker Signals
logservice/logpuller/region_request_worker.go
Adds markStoreFailure on detected store errors and markStoreSuccess on successful connect/receive/send; resets retry backoff after successful sends.
Region Event Handler
logservice/logpuller/region_event_handler.go
Resets worker client's retry backoff when region becomes initialized and after valid resolved-ts updates.

Sequence Diagram

sequenceDiagram
    participant Client as Subscription Client
    participant Worker as Region Request Worker
    participant Scheduler as Retry Scheduler
    participant Manager as Backoff Managers
    participant Queue as Region Task Queue

    Worker->>Client: report region error (not-leader/busy/store-fail/ctx)
    Client->>Manager: markStoreFailure(storeAddr) / computeRetryDelay(...)
    Manager-->>Client: delay
    Client->>Scheduler: scheduleRegionRequestAfter(delay, regionTask)
    Scheduler->>Scheduler: enqueue delayed task (min-heap)
    Note over Scheduler: wait until scheduled time
    Scheduler->>Queue: push region task
    Queue-->>Worker: dequeue (client checks store cooldown before RPC)
    Worker->>Worker: attempt region request
    Worker->>Client: success
    Client->>Manager: markStoreSuccess(storeAddr) / resetRetryBackoff(subID, regionID)
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

Suggested labels

lgtm, approved, size/XL

Suggested reviewers

  • wk989898
  • 3AceShowHand
  • asddongmen

Poem

🐰 I hopped through heaps of scheduled time,
Collected spans and stitched their rhyme,
Marked stores failed, then cheered "retry!" —
Backoffs shrink when failures fly,
A tiny rabbit queues resilience.

🚥 Pre-merge checks | ✅ 1 | ❌ 2

❌ Failed checks (2 warnings)

Check name Status Explanation Resolution
Description check ⚠️ Warning The PR description is mostly unmodified template content with placeholder values. Required sections like 'Issue Number', 'What is changed and how it works', answers to template questions, and release notes are not completed. Complete the description by: (1) replacing 'close #xxx' with actual linked issue(s), (2) explaining the changes and their implementation, (3) confirming which tests were added (Unit test checkbox is already marked), (4) answering questions about performance/compatibility/documentation impacts, and (5) filling in the release-note section.
Docstring Coverage ⚠️ Warning Docstring coverage is 7.69% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (1 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately describes the main changes: implementing retry backoff improvements and range reload scheduling enhancements across multiple logpuller components.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@ti-chi-bot ti-chi-bot Bot added the size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files. label Apr 3, 2026
Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a centralized retryScheduler and backoff management system for the logpuller component, including exponential backoff for region retries, store-level cooldowns, and a rangeReloadAggregator for merging overlapping span reload tasks. Feedback focuses on improving the scheduler's responsiveness by executing tasks in separate goroutines to prevent blocking the main loop, removing redundant backoff resets during frequent ResolvedTs events to optimize performance, and adding an early-return check in the backoff manager to avoid unnecessary map iterations.

break
}
heap.Pop(&taskHeap)
task.run(ctx)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The task.run(ctx) call is executed synchronously within the scheduler's main loop. If a task blocks (e.g., scheduleRegionRequest waiting for a range lock), it will stall the entire scheduler, preventing other delayed tasks from being processed. It is recommended to execute tasks in a separate goroutine to ensure the scheduler remains responsive.

Suggested change
task.run(ctx)
go task.run(ctx)

Comment on lines +370 to +372
if state.worker != nil {
state.worker.client.resetRetryBackoff(SubscriptionID(state.requestID), regionID)
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Resetting the retry backoff on every ResolvedTs event is redundant and potentially expensive, as ResolvedTs events are received very frequently. The backoff state only needs to be reset when a region is successfully initialized or when a new request is sent. Since resetRetryBackoff is already called on Event_INITIALIZED and after doSend, this call should be removed to avoid unnecessary map iterations in retryBackoffManager.

Comment on lines +204 to +206
m.mu.Lock()
defer m.mu.Unlock()
for key := range m.attempts {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The resetRegion method iterates over the entire attempts map, which has $O(N)$ complexity where $N$ is the number of active backoffs. While $N$ might be small in most cases, this method is called frequently. Adding a check to return early if the map is empty can avoid unnecessary overhead.

	m.mu.Lock()
	defer m.mu.Unlock()
	if len(m.attempts) == 0 {
		return
	}

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (3)
logservice/logpuller/region_request_worker.go (1)

226-226: Consider adding a fast-path to avoid lock contention on the hot path.

markStoreSuccess acquires a mutex on every call (storeBackoffManager.markSuccess in retry_scheduler.go:251-258). Since this is called for every successfully received change event, it could cause lock contention under high throughput. The current implementation deletes from the map unconditionally, even when the store is already in success state (no entry exists).

Consider adding a fast-path check before acquiring the lock:

♻️ Proposed optimization in storeBackoffManager.markSuccess
 func (m *storeBackoffManager) markSuccess(storeAddr string) {
 	if storeAddr == "" {
 		return
 	}
+	// Fast-path: avoid lock if store is already healthy
+	m.mu.Lock()
+	_, exists := m.states[storeAddr]
+	if !exists {
+		m.mu.Unlock()
+		return
+	}
-	m.mu.Lock()
-	defer m.mu.Unlock()
 	delete(m.states, storeAddr)
+	m.mu.Unlock()
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@logservice/logpuller/region_request_worker.go` at line 226, markStoreSuccess
(calling storeBackoffManager.markSuccess) is contended because it always takes
the write mutex and deletes the map entry even when no entry exists; add a
fast-path to avoid the write lock on the hot path by doing a read-only check
first (e.g., acquire read lock or check existence without lock if safe) and
early-return when the store key is not present or already marked success, then
only take the write lock to delete/modify when the key exists; update
storeBackoffManager.markSuccess (in retry_scheduler.go) to perform the
read-check → conditional write-lock → delete sequence to reduce lock contention.
logservice/logpuller/retry_scheduler.go (1)

251-258: Consider a fast-path check to reduce lock contention.

This markSuccess is called on every successfully received event (see region_request_worker.go:226). Adding a fast-path check could reduce lock contention:

♻️ Proposed optimization
 func (m *storeBackoffManager) markSuccess(storeAddr string) {
 	if storeAddr == "" {
 		return
 	}
 	m.mu.Lock()
+	if _, exists := m.states[storeAddr]; !exists {
+		m.mu.Unlock()
+		return
+	}
 	defer m.mu.Unlock()
 	delete(m.states, storeAddr)
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@logservice/logpuller/retry_scheduler.go` around lines 251 - 258, markSuccess
on storeBackoffManager is called very frequently and always takes the write
lock; add a fast-path read to avoid unnecessary writes: first return on empty
storeAddr, then use m.mu.RLock() to check if the entry exists in m.states and if
not release RLock and return; only acquire the full m.mu.Lock() when the entry
does exist and needs deletion. Reference: markSuccess, storeBackoffManager,
m.states, m.mu (called from region_request_worker).
logservice/logpuller/range_reload_aggregator.go (1)

98-129: Minor: TOCTOU race on stopped check is benign but worth noting.

The stopped.Load() check at line 114 happens after releasing the mutex. A subscription could be stopped between this check and task processing. This is benign since downstream task handlers will also check for stopped subscriptions, but consider adding a brief comment explaining this is intentional.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@logservice/logpuller/range_reload_aggregator.go` around lines 98 - 129, The
TOCTOU check of subscribedSpan.stopped.Load() in rangeReloadAggregator.flush is
intentionally performed outside the mutex and can race with subscription
stopping; update the flush function (around the subscribedSpan.stopped.Load()
check) to add a brief clarifying comment stating that the race is benign because
downstream handlers also validate stopped state and tasks are safe to schedule
even if the subscription stops before processing. Ensure the comment references
rangeReloadAggregator.flush, subscribedSpan.stopped.Load, and the downstream
task checks so future readers understand this is deliberate.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@logservice/logpuller/range_reload_aggregator.go`:
- Around line 98-129: The TOCTOU check of subscribedSpan.stopped.Load() in
rangeReloadAggregator.flush is intentionally performed outside the mutex and can
race with subscription stopping; update the flush function (around the
subscribedSpan.stopped.Load() check) to add a brief clarifying comment stating
that the race is benign because downstream handlers also validate stopped state
and tasks are safe to schedule even if the subscription stops before processing.
Ensure the comment references rangeReloadAggregator.flush,
subscribedSpan.stopped.Load, and the downstream task checks so future readers
understand this is deliberate.

In `@logservice/logpuller/region_request_worker.go`:
- Line 226: markStoreSuccess (calling storeBackoffManager.markSuccess) is
contended because it always takes the write mutex and deletes the map entry even
when no entry exists; add a fast-path to avoid the write lock on the hot path by
doing a read-only check first (e.g., acquire read lock or check existence
without lock if safe) and early-return when the store key is not present or
already marked success, then only take the write lock to delete/modify when the
key exists; update storeBackoffManager.markSuccess (in retry_scheduler.go) to
perform the read-check → conditional write-lock → delete sequence to reduce lock
contention.

In `@logservice/logpuller/retry_scheduler.go`:
- Around line 251-258: markSuccess on storeBackoffManager is called very
frequently and always takes the write lock; add a fast-path read to avoid
unnecessary writes: first return on empty storeAddr, then use m.mu.RLock() to
check if the entry exists in m.states and if not release RLock and return; only
acquire the full m.mu.Lock() when the entry does exist and needs deletion.
Reference: markSuccess, storeBackoffManager, m.states, m.mu (called from
region_request_worker).

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 6ff0c398-0d0d-4337-883b-ed9ef452327c

📥 Commits

Reviewing files that changed from the base of the PR and between 19400c6 and d0643ae.

📒 Files selected for processing (6)
  • logservice/logpuller/range_reload_aggregator.go
  • logservice/logpuller/region_event_handler.go
  • logservice/logpuller/region_request_worker.go
  • logservice/logpuller/retry_scheduler.go
  • logservice/logpuller/retry_scheduler_test.go
  • logservice/logpuller/subscription_client.go

@lidezhu
Copy link
Copy Markdown
Collaborator Author

lidezhu commented Apr 3, 2026

/test all

@lidezhu lidezhu changed the title logpuller: improve retry backoff and range reload scheduling [WIP] logpuller: improve retry backoff and range reload scheduling Apr 5, 2026
@ti-chi-bot ti-chi-bot Bot added the do-not-merge/work-in-progress Indicates that a PR should not merge because it is a work in progress. label Apr 5, 2026
@ti-chi-bot
Copy link
Copy Markdown

ti-chi-bot Bot commented Apr 5, 2026

[FORMAT CHECKER NOTIFICATION]

Notice: To remove the do-not-merge/needs-linked-issue label, please provide the linked issue number on one line in the PR body, for example: Issue Number: close #123 or Issue Number: ref #456.

📖 For more info, you can check the "Contribute Code" section in the development guide.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

do-not-merge/needs-linked-issue do-not-merge/work-in-progress Indicates that a PR should not merge because it is a work in progress. release-note Denotes a PR that will be considered when it comes time to generate release notes. size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant