[WIP] logpuller: improve retry backoff and range reload scheduling#4700
[WIP] logpuller: improve retry backoff and range reload scheduling#4700lidezhu wants to merge 2 commits intopingcap:masterfrom
Conversation
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
📝 WalkthroughWalkthroughIntroduces a retry scheduler, per-region and per-store backoff managers, and a range-reload aggregator; integrates them into the subscription client and region worker flows to schedule delayed region/range retries and track store cooldowns. Changes
Sequence DiagramsequenceDiagram
participant Client as Subscription Client
participant Worker as Region Request Worker
participant Scheduler as Retry Scheduler
participant Manager as Backoff Managers
participant Queue as Region Task Queue
Worker->>Client: report region error (not-leader/busy/store-fail/ctx)
Client->>Manager: markStoreFailure(storeAddr) / computeRetryDelay(...)
Manager-->>Client: delay
Client->>Scheduler: scheduleRegionRequestAfter(delay, regionTask)
Scheduler->>Scheduler: enqueue delayed task (min-heap)
Note over Scheduler: wait until scheduled time
Scheduler->>Queue: push region task
Queue-->>Worker: dequeue (client checks store cooldown before RPC)
Worker->>Worker: attempt region request
Worker->>Client: success
Client->>Manager: markStoreSuccess(storeAddr) / resetRetryBackoff(subID, regionID)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (2 warnings)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request introduces a centralized retryScheduler and backoff management system for the logpuller component, including exponential backoff for region retries, store-level cooldowns, and a rangeReloadAggregator for merging overlapping span reload tasks. Feedback focuses on improving the scheduler's responsiveness by executing tasks in separate goroutines to prevent blocking the main loop, removing redundant backoff resets during frequent ResolvedTs events to optimize performance, and adding an early-return check in the backoff manager to avoid unnecessary map iterations.
| break | ||
| } | ||
| heap.Pop(&taskHeap) | ||
| task.run(ctx) |
There was a problem hiding this comment.
The task.run(ctx) call is executed synchronously within the scheduler's main loop. If a task blocks (e.g., scheduleRegionRequest waiting for a range lock), it will stall the entire scheduler, preventing other delayed tasks from being processed. It is recommended to execute tasks in a separate goroutine to ensure the scheduler remains responsive.
| task.run(ctx) | |
| go task.run(ctx) |
| if state.worker != nil { | ||
| state.worker.client.resetRetryBackoff(SubscriptionID(state.requestID), regionID) | ||
| } |
There was a problem hiding this comment.
Resetting the retry backoff on every ResolvedTs event is redundant and potentially expensive, as ResolvedTs events are received very frequently. The backoff state only needs to be reset when a region is successfully initialized or when a new request is sent. Since resetRetryBackoff is already called on Event_INITIALIZED and after doSend, this call should be removed to avoid unnecessary map iterations in retryBackoffManager.
| m.mu.Lock() | ||
| defer m.mu.Unlock() | ||
| for key := range m.attempts { |
There was a problem hiding this comment.
The resetRegion method iterates over the entire attempts map, which has
m.mu.Lock()
defer m.mu.Unlock()
if len(m.attempts) == 0 {
return
}There was a problem hiding this comment.
🧹 Nitpick comments (3)
logservice/logpuller/region_request_worker.go (1)
226-226: Consider adding a fast-path to avoid lock contention on the hot path.
markStoreSuccessacquires a mutex on every call (storeBackoffManager.markSuccessinretry_scheduler.go:251-258). Since this is called for every successfully received change event, it could cause lock contention under high throughput. The current implementation deletes from the map unconditionally, even when the store is already in success state (no entry exists).Consider adding a fast-path check before acquiring the lock:
♻️ Proposed optimization in storeBackoffManager.markSuccess
func (m *storeBackoffManager) markSuccess(storeAddr string) { if storeAddr == "" { return } + // Fast-path: avoid lock if store is already healthy + m.mu.Lock() + _, exists := m.states[storeAddr] + if !exists { + m.mu.Unlock() + return + } - m.mu.Lock() - defer m.mu.Unlock() delete(m.states, storeAddr) + m.mu.Unlock() }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@logservice/logpuller/region_request_worker.go` at line 226, markStoreSuccess (calling storeBackoffManager.markSuccess) is contended because it always takes the write mutex and deletes the map entry even when no entry exists; add a fast-path to avoid the write lock on the hot path by doing a read-only check first (e.g., acquire read lock or check existence without lock if safe) and early-return when the store key is not present or already marked success, then only take the write lock to delete/modify when the key exists; update storeBackoffManager.markSuccess (in retry_scheduler.go) to perform the read-check → conditional write-lock → delete sequence to reduce lock contention.logservice/logpuller/retry_scheduler.go (1)
251-258: Consider a fast-path check to reduce lock contention.This
markSuccessis called on every successfully received event (seeregion_request_worker.go:226). Adding a fast-path check could reduce lock contention:♻️ Proposed optimization
func (m *storeBackoffManager) markSuccess(storeAddr string) { if storeAddr == "" { return } m.mu.Lock() + if _, exists := m.states[storeAddr]; !exists { + m.mu.Unlock() + return + } defer m.mu.Unlock() delete(m.states, storeAddr) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@logservice/logpuller/retry_scheduler.go` around lines 251 - 258, markSuccess on storeBackoffManager is called very frequently and always takes the write lock; add a fast-path read to avoid unnecessary writes: first return on empty storeAddr, then use m.mu.RLock() to check if the entry exists in m.states and if not release RLock and return; only acquire the full m.mu.Lock() when the entry does exist and needs deletion. Reference: markSuccess, storeBackoffManager, m.states, m.mu (called from region_request_worker).logservice/logpuller/range_reload_aggregator.go (1)
98-129: Minor: TOCTOU race onstoppedcheck is benign but worth noting.The
stopped.Load()check at line 114 happens after releasing the mutex. A subscription could be stopped between this check and task processing. This is benign since downstream task handlers will also check for stopped subscriptions, but consider adding a brief comment explaining this is intentional.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@logservice/logpuller/range_reload_aggregator.go` around lines 98 - 129, The TOCTOU check of subscribedSpan.stopped.Load() in rangeReloadAggregator.flush is intentionally performed outside the mutex and can race with subscription stopping; update the flush function (around the subscribedSpan.stopped.Load() check) to add a brief clarifying comment stating that the race is benign because downstream handlers also validate stopped state and tasks are safe to schedule even if the subscription stops before processing. Ensure the comment references rangeReloadAggregator.flush, subscribedSpan.stopped.Load, and the downstream task checks so future readers understand this is deliberate.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@logservice/logpuller/range_reload_aggregator.go`:
- Around line 98-129: The TOCTOU check of subscribedSpan.stopped.Load() in
rangeReloadAggregator.flush is intentionally performed outside the mutex and can
race with subscription stopping; update the flush function (around the
subscribedSpan.stopped.Load() check) to add a brief clarifying comment stating
that the race is benign because downstream handlers also validate stopped state
and tasks are safe to schedule even if the subscription stops before processing.
Ensure the comment references rangeReloadAggregator.flush,
subscribedSpan.stopped.Load, and the downstream task checks so future readers
understand this is deliberate.
In `@logservice/logpuller/region_request_worker.go`:
- Line 226: markStoreSuccess (calling storeBackoffManager.markSuccess) is
contended because it always takes the write mutex and deletes the map entry even
when no entry exists; add a fast-path to avoid the write lock on the hot path by
doing a read-only check first (e.g., acquire read lock or check existence
without lock if safe) and early-return when the store key is not present or
already marked success, then only take the write lock to delete/modify when the
key exists; update storeBackoffManager.markSuccess (in retry_scheduler.go) to
perform the read-check → conditional write-lock → delete sequence to reduce lock
contention.
In `@logservice/logpuller/retry_scheduler.go`:
- Around line 251-258: markSuccess on storeBackoffManager is called very
frequently and always takes the write lock; add a fast-path read to avoid
unnecessary writes: first return on empty storeAddr, then use m.mu.RLock() to
check if the entry exists in m.states and if not release RLock and return; only
acquire the full m.mu.Lock() when the entry does exist and needs deletion.
Reference: markSuccess, storeBackoffManager, m.states, m.mu (called from
region_request_worker).
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 6ff0c398-0d0d-4337-883b-ed9ef452327c
📒 Files selected for processing (6)
logservice/logpuller/range_reload_aggregator.gologservice/logpuller/region_event_handler.gologservice/logpuller/region_request_worker.gologservice/logpuller/retry_scheduler.gologservice/logpuller/retry_scheduler_test.gologservice/logpuller/subscription_client.go
|
/test all |
|
[FORMAT CHECKER NOTIFICATION] Notice: To remove the 📖 For more info, you can check the "Contribute Code" section in the development guide. |
What problem does this PR solve?
Issue Number: close #xxx
What is changed and how it works?
Check List
Tests
Questions
Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?
Release note
Summary by CodeRabbit
New Features
Bug Fixes
Tests