maintainer,dispatcher: dedupe waiting dispatcherStatus#4841
maintainer,dispatcher: dedupe waiting dispatcherStatus#4841asddongmen wants to merge 4 commits intopingcap:masterfrom
Conversation
Signed-off-by: dongmen <414110582@qq.com>
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
📝 WalkthroughWalkthroughReplaced dispatcher-exported block-status channel with a bounded in-process buffer ( Changes
Sequence Diagram(s)sequenceDiagram
participant Dispatcher
participant BlockStatusBuffer
participant DispatcherManager
participant Queue as BlockStatusRequestQueue
participant Collector as HeartbeatCollector
Dispatcher->>BlockStatusBuffer: OfferBlockStatus(status)
Note over BlockStatusBuffer: Deduplicate WAITING by key
Dispatcher->>BlockStatusBuffer: OfferDoneBlockStatus(dispatcherID, blockTs, isSyncPoint, mode)
Note over BlockStatusBuffer: Coalesce DONE placeholders
DispatcherManager->>BlockStatusBuffer: TakeBlockStatus(ctx)
BlockStatusBuffer-->>DispatcherManager: TableSpanBlockStatus
DispatcherManager->>DispatcherManager: Classify by mode (default vs redo)
DispatcherManager->>Queue: Enqueue(blockStatusRequest)
Queue->>Queue: Filter duplicates by dedupeKey
Queue->>Collector: Dequeue()
Collector-->>Queue: BlockStatusRequestWithTargetID
Collector->>Collector: SendCommand(request)
Collector->>Queue: OnSendComplete(request)
Note over Queue: Clear in-flight tracking
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (2 warnings)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request introduces a BlockStatusBuffer and deduplication logic in the BlockStatusRequestQueue to optimize the handling of dispatcher block statuses. By coalescing identical WAITING and DONE statuses and tracking in-flight requests, the changes aim to reduce memory amplification and redundant heartbeat reports. Feedback was provided regarding potential nil pointer dereferences when processing status IDs and suggestions to move non-state-dependent calculations outside of critical sections to reduce lock contention.
| func isWaitingBlockStatus(status *heartbeatpb.TableSpanBlockStatus) bool { | ||
| return status != nil && | ||
| status.State != nil && | ||
| status.State.IsBlocked && | ||
| status.State.Stage == heartbeatpb.BlockStage_WAITING | ||
| } | ||
|
|
||
| func isDoneBlockStatus(status *heartbeatpb.TableSpanBlockStatus) bool { | ||
| return status != nil && | ||
| status.State != nil && | ||
| status.State.IsBlocked && | ||
| status.State.Stage == heartbeatpb.BlockStage_DONE | ||
| } |
There was a problem hiding this comment.
The functions isWaitingBlockStatus and isDoneBlockStatus do not check if status.ID is non-nil. Since newBlockStatusKey (called in Offer and materialize) relies on status.ID to create a key, a nil status.ID will cause a panic. Although BasicDispatcher currently always sets this field, adding a defensive check here is safer for the buffer's integrity.
| key := blockStatusRequestDedupeKey{ | ||
| targetID: request.TargetID, | ||
| dispatcherID: common.NewDispatcherIDFromPB(status.ID), | ||
| blockTs: status.State.BlockTs, | ||
| mode: status.Mode, | ||
| isSyncPoint: status.State.IsSyncPoint, | ||
| stage: status.State.Stage, | ||
| } |
There was a problem hiding this comment.
The blockStatusRequestDedupeKey is calculated inside the critical section protected by q.mu. While common.NewDispatcherIDFromPB is likely fast, it is generally better practice to perform calculations that do not depend on the protected state outside of the lock to minimize contention. Additionally, similar to the feedback in block_status_buffer.go, there is no check for status.ID != nil before calling NewDispatcherIDFromPB, which could lead to a panic.
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
downstreamadapter/dispatcher/helper.go (1)
271-283:⚠️ Potential issue | 🟡 Minor
OfferBlockStatuscan block the task scheduler on a full buffer.
OfferBlockStatus → BlockStatusBuffer.Offerperforms blocking channel sends (b.queue <- …at lines 70, 74, 82 indownstreamadapter/dispatcher/block_status_buffer.go). If the consumer (collectBlockStatusRequest) stalls and the buffer fills,ResendTask.Execute()will block theDispatcherTaskSchedulerworker indefinitely, starving other resend tasks. Line 74 sends unconditionally for non-done statuses without even a reservation guard.Risk is low in production (1M-sized buffer) but non-zero during prolonged maintainer stalls or high block status pressure. No
TryOffernon-blocking variant exists yet, and there are no metrics tracking buffer saturation. Consider adding either a non-blocking variant (drop if full since resend will retry in 5s anyway) or at minimum expose logging/metrics when buffer pressure builds.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@downstreamadapter/dispatcher/helper.go` around lines 271 - 283, ResendTask.Execute currently calls OfferBlockStatus which calls BlockStatusBuffer.Offer and can block the DispatcherTaskScheduler when the internal channel fills; change the code so Execute uses a non-blocking offer path (implement a TryOffer non-blocking variant on BlockStatusBuffer and have OfferBlockStatus call it, or add a non-blocking branch in OfferBlockStatus that uses a select with a default to drop the send) so that ResendTask.Execute never blocks the scheduler; when the non-blocking send fails increment/log a metric and a debug/info log (include dispatcher.GetId() and message) so buffer pressure is observable; keep the existing blocking Offer for explicit callers if needed and retain resendTimeInterval retry behavior.downstreamadapter/dispatchermanager/heartbeat_collector.go (1)
245-254:⚠️ Potential issue | 🟠 Major
OnSendCompleteshould not run onSendCommandfailure.
OnSendCompleteis invoked unconditionally right afterSendCommandat line 251, before the error is checked at line 252. WhenSendCommandfails, the in-flight deduplication tracking for this request's statuses is cleared even though nothing reached the maintainer. Consequences:
- For WAITING statuses this is recoverable because
ResendTask.Executere-offers them every 5s, allowing them to re-pass deduplication and get re-enqueued.- For DONE statuses (pushed once via
BlockStatusBuffer.OfferDoneafter write/pass completes), there is no periodic re-publisher. A transient send failure silently loses the completion signal, potentially leaving the maintainer stuck waiting on a DONE that will never arrive again.Move the call so it only runs on success:
🛡️ Proposed fix
err := c.mc.SendCommand( messaging.NewSingleTargetMessage( blockStatusRequestWithTargetID.TargetID, messaging.MaintainerManagerTopic, blockStatusRequestWithTargetID.Request, )) - c.blockStatusReqQueue.OnSendComplete(blockStatusRequestWithTargetID) if err != nil { log.Error("failed to send block status request message", zap.Error(err)) + } else { + c.blockStatusReqQueue.OnSendComplete(blockStatusRequestWithTargetID) }Note: even this fix is incomplete. Keeping
OnSendCompleteonly on success leaves failed request keys ininFlightStatusesindefinitely, blocking future attempts to send the same status. The real fix requires either (a) a TTL/expiry on in-flight keys, (b) explicit re-enqueue of failed requests, or (c) adding a resend path for DONE statuses equivalent toResendTaskfor WAITING. Confirm the intended failure-recovery strategy.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@downstreamadapter/dispatchermanager/heartbeat_collector.go` around lines 245 - 254, The call to blockStatusReqQueue.OnSendComplete is currently executed unconditionally right after c.mc.SendCommand; change the flow in the SendCommand handling so that OnSendComplete(blockStatusRequestWithTargetID) is only invoked when err == nil (i.e., move it after the nil-check of the SendCommand result and call it only on success), leaving the existing log.Error(...) for failures; additionally, address the longer-term recovery for inFlightStatuses/DONE messages by implementing one of: TTL/expiry for in-flight keys, explicit re-enqueue of failed requests, or a resend path for DONE statuses (update the logic around blockStatusReqQueue, inFlightStatuses, and BlockStatusBuffer accordingly) and confirm which strategy to use.
♻️ Duplicate comments (1)
downstreamadapter/dispatchermanager/heartbeat_queue.go (1)
106-117:⚠️ Potential issue | 🟡 Minor
OnSendCompletesemantics need to match send outcome.As currently wired from
heartbeat_collector.goLine 251, this method is invoked regardless of whetherSendCommandactually succeeded. See the detailed finding in that file — the fix likely belongs at the caller (only call on success), but if you instead decide thatOnSendCompleteshould model both outcomes, consider renaming it and/or taking anerrparameter so callers cannot inadvertently clear in-flight tracking for a failed send.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@downstreamadapter/dispatchermanager/heartbeat_queue.go` around lines 106 - 117, The current OnSendComplete on BlockStatusRequestQueue unconditionally clears in-flight tracking for a request even when SendCommand failed; either stop calling OnSendComplete on failure at the caller (heartbeat_collector.go send path) or change OnSendComplete to accept an error/result flag and only delete q.inFlightStatuses and q.requestStatusKeys when the send succeeded. Concretely: update the caller (SendCommand path) to only call OnSendComplete(request) on success, or change the method signature (e.g., OnSendResult(request *BlockStatusRequestWithTargetID, err error) or rename to OnSendSuccess) and branch to perform the delete logic only when err == nil, keeping locking and map cleanup as-is.
🧹 Nitpick comments (7)
downstreamadapter/dispatchermanager/heartbeat_queue.go (1)
132-171: In-place mutation of caller's slice is subtle; consider cloning.
filtered := statuses[:0]reusesrequest.Request.BlockStatuses' backing array and the finalrequest.Request.BlockStatuses = filteredmutates caller-observable state. This is safe only becauseEnqueuetakes ownership, but it means any caller-side reference to the original slice (e.g., for logging) will observe a shrunken slice afterEnqueuereturns. Also, dropped entries instatuses[len(filtered):]retain their pointers until GC frees the enclosing array.Either document the ownership transfer contract on
Enqueue, or allocate a fresh slice:♻️ Optional cleanup
- statuses := request.Request.BlockStatuses - filtered := statuses[:0] - statusKeys := make([]blockStatusRequestDedupeKey, 0) + statuses := request.Request.BlockStatuses + filtered := make([]*heartbeatpb.TableSpanBlockStatus, 0, len(statuses)) + statusKeys := make([]blockStatusRequestDedupeKey, 0)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@downstreamadapter/dispatchermanager/heartbeat_queue.go` around lines 132 - 171, trackPendingStatuses currently does an in-place filter using filtered := statuses[:0] which reuses the original backing array and mutates request.Request.BlockStatuses (subtle ownership/leak issues); change to allocate a fresh slice (e.g., make([]*pb.BlockStatus, 0, len(statuses)) and append kept items) and assign that new slice to request.Request.BlockStatuses so we don't shrink the caller's slice or retain dropped-element pointers, keep the rest of the logic that updates q.queuedStatuses, q.inFlightStatuses and q.requestStatusKeys the same; alternatively, if you prefer not to change code, add a clear comment on Enqueue's ownership contract for request.Request.BlockStatuses (but prefer the fresh-slice approach for safety).downstreamadapter/dispatcher/block_status_buffer_test.go (1)
26-168: LGTM — good coverage of the dedup contract.The five tests cover the primary semantics: DONE dedup via
OfferDone, WAITING dedup viaOffer, re-queueability afterTakeclears pending, FIFO ordering across WAITING→DONE for the same key, and distinctness of DONE keys byisSyncPoint/Mode. Usesrequire.*from testify per guideline, and timeouts are deterministic (1s default, no sleeps).One small nit: in
TestBlockStatusBufferAllowsWaitingAgainAfterTakeyou re-offer the exact samewaitingpointer (Line 100) and then assertsecond.State.Stage == WAITING. Becausematerializereturnsentry.statusverbatim for WAITING,firstandsecondare pointer-equal, so the second assertion is effectively re-asserting on the same object. Consider assertingrequire.Same(t, waiting, second)or constructing a distinct-but-equivalent status to make the intent (buffer accepts a new instance for the same key) explicit.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@downstreamadapter/dispatcher/block_status_buffer_test.go` around lines 26 - 168, The test re-offers the exact same waiting pointer in TestBlockStatusBufferAllowsWaitingAgainAfterTake which makes the second assertion redundant because materialize returns the same object; change the test to either assert pointer equality with require.Same(t, waiting, second) or instead re-create a new, equal heartbeatpb.TableSpanBlockStatus instance (same ID/State/Mode) before the second buffer.Offer to prove the buffer accepts a distinct-but-equivalent object; update references to waiting, first, and second in that test accordingly.downstreamadapter/dispatcher/basic_dispatcher_info.go (1)
231-237:TakeBlockStatus/TakeBlockStatusWithTimeoutonBasicDispatcherappear to be test-only;SharedInfo.TakeBlockStatusWithTimeoutusescontext.Background().Two small observations on the take-side surface:
Per the PR summary, production consumers (
dispatcher_manager.go::collectBlockStatusRequest) take viae.sharedInfo.TakeBlockStatus(ctx)directly and never go through the dispatcher methods. TheBasicDispatcher.TakeBlockStatus/TakeBlockStatusWithTimeoutwrappers (Line 231-237) only exist to supportevent_dispatcher_test.go. Consider either removing them from the dispatcher (exposing viaSharedInfoonly) or renaming/commenting them as test helpers to avoid implying that individual dispatchers are a legitimate consumer of the shared buffer.
SharedInfo.TakeBlockStatusWithTimeout(Line 282-290) derives its context fromcontext.Background()rather than an externally supplied parent. That's fine for tests (what this method is used for today), but means the call cannot be cancelled earlier by a changefeed/manager shutdown — it will always wait out the fulltimeout. If this method ever gets used outside tests, consider taking a parentctx:♻️ Proposed signature for non-test usage
-func (s *SharedInfo) TakeBlockStatusWithTimeout(timeout time.Duration) (*heartbeatpb.TableSpanBlockStatus, bool) { - ctx, cancel := context.WithTimeout(context.Background(), timeout) +func (s *SharedInfo) TakeBlockStatusWithTimeout(parent context.Context, timeout time.Duration) (*heartbeatpb.TableSpanBlockStatus, bool) { + ctx, cancel := context.WithTimeout(parent, timeout) defer cancel() status := s.TakeBlockStatus(ctx) if status == nil { return nil, false } return status, true }Also applies to: 282-290
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@downstreamadapter/dispatcher/basic_dispatcher_info.go` around lines 231 - 237, The BasicDispatcher methods TakeBlockStatus and TakeBlockStatusWithTimeout are only used by tests and duplicate SharedInfo behavior; remove these two wrapper methods (TakeBlockStatus, TakeBlockStatusWithTimeout) from BasicDispatcher or mark them clearly as test helpers to avoid implying they’re for production use, and update callers to call SharedInfo.TakeBlockStatus / SharedInfo.TakeBlockStatusWithTimeout directly; additionally change SharedInfo.TakeBlockStatusWithTimeout to accept a parent context parameter instead of deriving context.Background() so callers can cancel early (update its signature and all call sites to pass ctx).downstreamadapter/dispatchermanager/heartbeat_queue_test.go (2)
64-163: Optional: extract a helper for WAITING request construction to reduce duplication.
newDoneBlockStatusRequest(Line 232-249) keeps DONE-request test fixtures concise, but the four WAITING-request literals on Lines 69-85, 86-102, 116-132, 141-157, 201-217 are near-identical and expand the tests considerably. A smallnewWaitingBlockStatusRequest(targetID, dispatcherID, blockTs)helper mirroring the DONE one would shrink these tests and make diffs more readable.Otherwise the tests correctly exercise queued/in-flight dedup +
OnSendCompletesemantics, distinct DONE keys across sync-point/mode, and WAITING/DONE distinctness. Usesrequire.Samefor pointer identity which is appropriate here.Also applies to: 196-230
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@downstreamadapter/dispatchermanager/heartbeat_queue_test.go` around lines 64 - 163, The test repeats four near-identical WAITING BlockStatusRequest fixtures; add a helper function newWaitingBlockStatusRequest(targetID, dispatcherID, blockTs) (mirroring existing newDoneBlockStatusRequest) and replace the literal constructions used in TestBlockStatusRequestQueueDeduplicatesQueuedAndInFlightWaiting (the variables first, second, third, fourth) with calls to this helper to reduce duplication and improve readability.
43-45: Consider raising the 20ms timeout to improve test robustness on loaded CI.The Dequeue assertions at lines 43-45, 50-52, 112-114, and 135-137 rely on short 20ms context timeouts to verify that dedup is blocking requests. Under heavy CI load or with
-raceenabled, spurious latency can occasionally exceed this threshold. Since these are negative assertions (not performance tests), raising to 50–100ms adds safety without affecting what the tests measure. Monitor for timeout-related flakiness; adjust if you see occasional failures on busy runs.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@downstreamadapter/dispatchermanager/heartbeat_queue_test.go` around lines 43 - 45, The test uses a very short 20ms timeout when creating shortCtx (via context.WithTimeout) before calling queue.Dequeue, which makes negative assertions flaky under CI or -race; update the context timeout value from 20*time.Millisecond to a larger safe window (e.g., 75*time.Millisecond) for the shortCtx/shortCancel usages that wrap queue.Dequeue to ensure stability—search for the context.WithTimeout(..., 20*time.Millisecond) occurrences in the heartbeat_queue_test (the shortCtx/shortCancel definitions used with queue.Dequeue) and change them all to the chosen higher duration.downstreamadapter/dispatcher/block_status_buffer.go (1)
61-101: Reserve→enqueue is not atomic; a crashed producer can permanently leak a pending key.
reserveWaiting/reserveDoneinsert into the pending map, then the caller sends tob.queueoutside the lock. If the producer goroutine panics or is cancelled between the successful reservation and the channel send (or blocks on a full queue and then the process is forced to shut down), the key remains inpendingWaiting/pendingDoneforever, which silently drops all future dedup-eligible statuses for that same(dispatcherID, blockTs, mode, isSyncPoint)tuple.Given the 1M-capacity queue this is unlikely to bite in practice, but is worth protecting against. A simple fix is to use
deferto release the reservation if the send doesn't happen:♻️ Suggested defensive fix
func (b *BlockStatusBuffer) Offer(status *heartbeatpb.TableSpanBlockStatus) { if status == nil { return } if isWaitingBlockStatus(status) { key := newBlockStatusKey(status) if !b.reserveWaiting(key) { return } - b.queue <- blockStatusQueueEntry{status: status} + enqueued := false + defer func() { + if !enqueued { + b.releaseWaiting(key) + } + }() + b.queue <- blockStatusQueueEntry{status: status} + enqueued = true return }(and analogous changes for the DONE branch /
OfferDone).Optional; flag only if robustness under panic/forced shutdown is considered important for this subsystem.
Also applies to: 125-143
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@downstreamadapter/dispatcher/block_status_buffer.go` around lines 61 - 101, The reserve->enqueue path in BlockStatusBuffer.Offer and OfferDone can leak entries in pendingWaiting/pendingDone if a panic or cancellation occurs between reserveWaiting/reserveDone and sending to b.queue; change both Offer and OfferDone so that immediately after a successful reserveWaiting/reserveDone you install a short-lived defer cleanup (or scoped rollback flag) that removes the pending entry unless the subsequent send to b.queue completed successfully (clear the defer/flip the flag after send); apply the same pattern for both the waiting and done branches (referencing reserveWaiting, reserveDone, pendingWaiting, pendingDone, BlockStatusBuffer.Offer, OfferDone, and blockStatusQueueEntry/b.queue) to ensure the reservation is released if the channel send does not happen.downstreamadapter/dispatcher/basic_dispatcher.go (1)
989-1027: Note: non-blocking schedule DDL status (Stage_NONE) is not deduped in the buffer.The
messagebuilt here hasStage: heartbeatpb.BlockStage_NONEwithIsBlocked: false, so it falls throughOffer()'s pass-through branch inBlockStatusBuffer(neitherisWaitingBlockStatusnorisDoneBlockStatusmatches). TheresendTaskset on Line 1023/1025 will re-invokeOfferBlockStatus(message)periodically until ACKed by maintainer, and each resend pushes a new entry into the 1M-capacity buffer.This is not a correctness problem (the downstream
BlockStatusRequestQueuein the dispatcher manager performs request-level dedup), but it does leave a gap in the PR's stated intent of deduplicating pending statuses at the dispatcher layer and can amplify buffer occupancy under slow ACK. Consider whether non-blocking schedule events should also go through a dedup key (e.g., by treating(dispatcherID, blockTs)as a distinct "scheduling" key), or add a brief comment explaining that dedup for this path is intentionally deferred to the upper queue.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@downstreamadapter/dispatcher/basic_dispatcher.go` around lines 989 - 1027, The code builds a non-blocking TableSpanBlockStatus (message with Stage: heartbeatpb.BlockStage_NONE and State.IsBlocked=false) and registers a periodic resend (resendTaskMap.Set with newResendTask) which repeatedly calls OfferBlockStatus(message); because BlockStatusBuffer.Offer treats Stage_NONE as pass-through (not matching isWaitingBlockStatus/isDoneBlockStatus), each resend enqueues a duplicate into the 1M buffer; fix by deduping these non-blocking schedule events before enqueuing: add a scheduling-key check (e.g., key = (d.id, event.GetCommitTs())) in OfferBlockStatus or just before d.resendTaskMap.Set to skip re-Offer if the same key is already tracked, or alternatively convert the non-blocking message to use the same dedup path as blocking ones (mark waiting/done) so BlockStatusBuffer.Offer collapses duplicates; update references: message, Stage: heartbeatpb.BlockStage_NONE, resendTaskMap.Set, newResendTask, OfferBlockStatus, and BlockStatusBuffer.Offer/isWaitingBlockStatus/isDoneBlockStatus.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Outside diff comments:
In `@downstreamadapter/dispatcher/helper.go`:
- Around line 271-283: ResendTask.Execute currently calls OfferBlockStatus which
calls BlockStatusBuffer.Offer and can block the DispatcherTaskScheduler when the
internal channel fills; change the code so Execute uses a non-blocking offer
path (implement a TryOffer non-blocking variant on BlockStatusBuffer and have
OfferBlockStatus call it, or add a non-blocking branch in OfferBlockStatus that
uses a select with a default to drop the send) so that ResendTask.Execute never
blocks the scheduler; when the non-blocking send fails increment/log a metric
and a debug/info log (include dispatcher.GetId() and message) so buffer pressure
is observable; keep the existing blocking Offer for explicit callers if needed
and retain resendTimeInterval retry behavior.
In `@downstreamadapter/dispatchermanager/heartbeat_collector.go`:
- Around line 245-254: The call to blockStatusReqQueue.OnSendComplete is
currently executed unconditionally right after c.mc.SendCommand; change the flow
in the SendCommand handling so that
OnSendComplete(blockStatusRequestWithTargetID) is only invoked when err == nil
(i.e., move it after the nil-check of the SendCommand result and call it only on
success), leaving the existing log.Error(...) for failures; additionally,
address the longer-term recovery for inFlightStatuses/DONE messages by
implementing one of: TTL/expiry for in-flight keys, explicit re-enqueue of
failed requests, or a resend path for DONE statuses (update the logic around
blockStatusReqQueue, inFlightStatuses, and BlockStatusBuffer accordingly) and
confirm which strategy to use.
---
Duplicate comments:
In `@downstreamadapter/dispatchermanager/heartbeat_queue.go`:
- Around line 106-117: The current OnSendComplete on BlockStatusRequestQueue
unconditionally clears in-flight tracking for a request even when SendCommand
failed; either stop calling OnSendComplete on failure at the caller
(heartbeat_collector.go send path) or change OnSendComplete to accept an
error/result flag and only delete q.inFlightStatuses and q.requestStatusKeys
when the send succeeded. Concretely: update the caller (SendCommand path) to
only call OnSendComplete(request) on success, or change the method signature
(e.g., OnSendResult(request *BlockStatusRequestWithTargetID, err error) or
rename to OnSendSuccess) and branch to perform the delete logic only when err ==
nil, keeping locking and map cleanup as-is.
---
Nitpick comments:
In `@downstreamadapter/dispatcher/basic_dispatcher_info.go`:
- Around line 231-237: The BasicDispatcher methods TakeBlockStatus and
TakeBlockStatusWithTimeout are only used by tests and duplicate SharedInfo
behavior; remove these two wrapper methods (TakeBlockStatus,
TakeBlockStatusWithTimeout) from BasicDispatcher or mark them clearly as test
helpers to avoid implying they’re for production use, and update callers to call
SharedInfo.TakeBlockStatus / SharedInfo.TakeBlockStatusWithTimeout directly;
additionally change SharedInfo.TakeBlockStatusWithTimeout to accept a parent
context parameter instead of deriving context.Background() so callers can cancel
early (update its signature and all call sites to pass ctx).
In `@downstreamadapter/dispatcher/basic_dispatcher.go`:
- Around line 989-1027: The code builds a non-blocking TableSpanBlockStatus
(message with Stage: heartbeatpb.BlockStage_NONE and State.IsBlocked=false) and
registers a periodic resend (resendTaskMap.Set with newResendTask) which
repeatedly calls OfferBlockStatus(message); because BlockStatusBuffer.Offer
treats Stage_NONE as pass-through (not matching
isWaitingBlockStatus/isDoneBlockStatus), each resend enqueues a duplicate into
the 1M buffer; fix by deduping these non-blocking schedule events before
enqueuing: add a scheduling-key check (e.g., key = (d.id, event.GetCommitTs()))
in OfferBlockStatus or just before d.resendTaskMap.Set to skip re-Offer if the
same key is already tracked, or alternatively convert the non-blocking message
to use the same dedup path as blocking ones (mark waiting/done) so
BlockStatusBuffer.Offer collapses duplicates; update references: message, Stage:
heartbeatpb.BlockStage_NONE, resendTaskMap.Set, newResendTask, OfferBlockStatus,
and BlockStatusBuffer.Offer/isWaitingBlockStatus/isDoneBlockStatus.
In `@downstreamadapter/dispatcher/block_status_buffer_test.go`:
- Around line 26-168: The test re-offers the exact same waiting pointer in
TestBlockStatusBufferAllowsWaitingAgainAfterTake which makes the second
assertion redundant because materialize returns the same object; change the test
to either assert pointer equality with require.Same(t, waiting, second) or
instead re-create a new, equal heartbeatpb.TableSpanBlockStatus instance (same
ID/State/Mode) before the second buffer.Offer to prove the buffer accepts a
distinct-but-equivalent object; update references to waiting, first, and second
in that test accordingly.
In `@downstreamadapter/dispatcher/block_status_buffer.go`:
- Around line 61-101: The reserve->enqueue path in BlockStatusBuffer.Offer and
OfferDone can leak entries in pendingWaiting/pendingDone if a panic or
cancellation occurs between reserveWaiting/reserveDone and sending to b.queue;
change both Offer and OfferDone so that immediately after a successful
reserveWaiting/reserveDone you install a short-lived defer cleanup (or scoped
rollback flag) that removes the pending entry unless the subsequent send to
b.queue completed successfully (clear the defer/flip the flag after send); apply
the same pattern for both the waiting and done branches (referencing
reserveWaiting, reserveDone, pendingWaiting, pendingDone,
BlockStatusBuffer.Offer, OfferDone, and blockStatusQueueEntry/b.queue) to ensure
the reservation is released if the channel send does not happen.
In `@downstreamadapter/dispatchermanager/heartbeat_queue_test.go`:
- Around line 64-163: The test repeats four near-identical WAITING
BlockStatusRequest fixtures; add a helper function
newWaitingBlockStatusRequest(targetID, dispatcherID, blockTs) (mirroring
existing newDoneBlockStatusRequest) and replace the literal constructions used
in TestBlockStatusRequestQueueDeduplicatesQueuedAndInFlightWaiting (the
variables first, second, third, fourth) with calls to this helper to reduce
duplication and improve readability.
- Around line 43-45: The test uses a very short 20ms timeout when creating
shortCtx (via context.WithTimeout) before calling queue.Dequeue, which makes
negative assertions flaky under CI or -race; update the context timeout value
from 20*time.Millisecond to a larger safe window (e.g., 75*time.Millisecond) for
the shortCtx/shortCancel usages that wrap queue.Dequeue to ensure
stability—search for the context.WithTimeout(..., 20*time.Millisecond)
occurrences in the heartbeat_queue_test (the shortCtx/shortCancel definitions
used with queue.Dequeue) and change them all to the chosen higher duration.
In `@downstreamadapter/dispatchermanager/heartbeat_queue.go`:
- Around line 132-171: trackPendingStatuses currently does an in-place filter
using filtered := statuses[:0] which reuses the original backing array and
mutates request.Request.BlockStatuses (subtle ownership/leak issues); change to
allocate a fresh slice (e.g., make([]*pb.BlockStatus, 0, len(statuses)) and
append kept items) and assign that new slice to request.Request.BlockStatuses so
we don't shrink the caller's slice or retain dropped-element pointers, keep the
rest of the logic that updates q.queuedStatuses, q.inFlightStatuses and
q.requestStatusKeys the same; alternatively, if you prefer not to change code,
add a clear comment on Enqueue's ownership contract for
request.Request.BlockStatuses (but prefer the fresh-slice approach for safety).
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: e341cd4a-11f5-4550-8e01-df07805b486f
📒 Files selected for processing (13)
downstreamadapter/dispatcher/basic_dispatcher.godownstreamadapter/dispatcher/basic_dispatcher_active_active_test.godownstreamadapter/dispatcher/basic_dispatcher_info.godownstreamadapter/dispatcher/block_status_buffer.godownstreamadapter/dispatcher/block_status_buffer_test.godownstreamadapter/dispatcher/event_dispatcher_test.godownstreamadapter/dispatcher/helper.godownstreamadapter/dispatcher/redo_dispatcher_test.godownstreamadapter/dispatchermanager/dispatcher_manager.godownstreamadapter/dispatchermanager/dispatcher_manager_test.godownstreamadapter/dispatchermanager/heartbeat_collector.godownstreamadapter/dispatchermanager/heartbeat_queue.godownstreamadapter/dispatchermanager/heartbeat_queue_test.go
| if !isDoneBlockStatus(status) { | ||
| b.queue <- blockStatusQueueEntry{status: status} | ||
| return | ||
| } | ||
|
|
||
| key := newBlockStatusKey(status) | ||
| if !b.reserveDone(key) { | ||
| return | ||
| } | ||
| b.queue <- blockStatusQueueEntry{doneKey: &key} | ||
| } |
There was a problem hiding this comment.
Minor: Offer()'s DONE path silently drops all State fields except key components.
When Offer() receives a DONE-staged status (Line 78-82), only the key (dispatcherID, blockTs, mode, isSyncPoint) is preserved via the doneKey placeholder; materialize() (Line 161-170) then reconstructs a minimal TableSpanBlockStatus that loses any additional State fields (BlockTables, NeedDroppedTables, NeedAddedTables, UpdatedSchemas) the caller may have populated.
In the current PR this path is unreachable — all DONE messages flow through OfferDoneBlockStatus / OfferDone which by design only carry the key fields — so behavior is preserved. However, the public Offer signature accepts any *TableSpanBlockStatus, so a future caller that hands a DONE-staged status with extras into Offer would experience silent field loss.
Consider one of:
- Reject (or panic with a clear message) if
Offerreceives a DONE-staged status with non-zero auxiliary fields. - Store the original
statuspointer for DONE too (skip the placeholder optimization for theOfferentry point) and only use the placeholder form fromOfferDone. - Add a doc comment on
Offerexplicitly stating that DONE statuses are reconstructed from key-only and must not carry extras.
Also applies to: 145-171
Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
|
[FORMAT CHECKER NOTIFICATION] Notice: To remove the 📖 For more info, you can check the "Contribute Code" section in the development guide. |
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (2)
maintainer/maintainer.go (1)
1191-1208: Nit: cumulativedroppedCountin throttled log obscures per-window drop volume.
droppedCountis monotonically accumulated and logged each time the 5 s throttle fires, so operators cannot distinguish a quiet period (e.g., 1 new drop) from a burst (e.g., 10k new drops) without diffing two log lines. Consider logging the delta since the last log (or both) for better signal.♻️ Proposed change
func (m *Maintainer) recordDroppedStatusRequest(msgType messaging.IOType, reason string) { m.droppedStatusRequests.Lock() defer m.droppedStatusRequests.Unlock() m.droppedStatusRequests.count++ now := time.Now() if !m.droppedStatusRequests.lastLogTime.IsZero() && now.Sub(m.droppedStatusRequests.lastLogTime) < 5*time.Second { return } + sinceLast := m.droppedStatusRequests.count - m.droppedStatusRequests.lastLoggedCount m.droppedStatusRequests.lastLogTime = now + m.droppedStatusRequests.lastLoggedCount = m.droppedStatusRequests.count log.Warn("drop maintainer status request", zap.Stringer("changefeedID", m.changefeedID), zap.String("type", msgType.String()), zap.String("reason", reason), zap.Int("eventChLen", m.eventCh.Len()), - zap.Uint64("droppedCount", m.droppedStatusRequests.count)) + zap.Uint64("droppedSinceLastLog", sinceLast), + zap.Uint64("droppedCount", m.droppedStatusRequests.count)) }(
lastLoggedCount uint64would need to be added to thedroppedStatusRequestsstruct.)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@maintainer/maintainer.go` around lines 1191 - 1208, The throttled log in recordDroppedStatusRequest currently reports the cumulative droppedStatusRequests.count, hiding per-window drops; add a lastLoggedCount uint64 field to the droppedStatusRequests struct, compute delta := m.droppedStatusRequests.count - m.droppedStatusRequests.lastLoggedCount inside recordDroppedStatusRequest when emitting the log, include both the cumulative droppedCount and the per-window delta (e.g., "droppedDelta") in the zap fields, and set m.droppedStatusRequests.lastLoggedCount = m.droppedStatusRequests.count when you update lastLogTime so subsequent logs show the next window's delta.maintainer/status_request_buffer.go (1)
115-150:mergeHeartbeatstores incoming proto pointers directly — consider defensive copy.
entry.changefeedID,entry.watermark,entry.redoWatermark, and eachentry.statuses[key] = statusare stored as pointers from the incoming*heartbeatpb.HeartBeatRequest. The entry sits in the buffer until drained (possibly multiple merges later). Combined with the in-place mutation inpickWatermarkflagged above, this means a later merge can retroactively change the watermark observed by an earlier one still in the buffer, and any caller up the stack that reuses/recycles the proto can observe buffered mutations.Not a bug today if the messaging layer treats incoming messages as read-only and doesn't pool them, but it's fragile. Consider copying the scalar fields /
Watermarkinto the buffered entry; pointer reuse for*TableSpanStatusis fine if those objects are immutable post-send.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@maintainer/status_request_buffer.go` around lines 115 - 150, mergeHeartbeat currently stores incoming protobuf pointers (e.g., req.ChangefeedID, req.Watermark, req.RedoWatermark and req.Statuses elements) directly into the bufferedHeartbeatRequest, which can be mutated later; instead, make defensive copies when merging: copy scalar fields (assign a new string/ID value rather than pointer), clone Watermark/RedoWatermark via a deep-copy (e.g., proto.Clone or constructing new Watermark structs) before assigning to entry.watermark/entry.redoWatermark, and either clone or ensure immutability of each TableSpanStatus stored in entry.statuses (clone status before entry.statuses[key] = status if those protos can be mutated/recycled); update mergeHeartbeat and any helper usages (pickWatermark) to operate on and return copies rather than reusing incoming pointers.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@maintainer/status_request_buffer.go`:
- Around line 1-10: Add the standard Apache 2.0 license header used across this
package to the top of this file (before the "package maintainer" line) so it
matches the banner in maintainer/maintainer.go; ensure the header text and
year/owner fields match the project's existing header format exactly and then
run the repository's check-copyright CI locally or re-run the pipeline.
- Around line 285-299: pickWatermark currently mutates the incoming
*heartbeatpb.Watermark objects and can drop a higher LastSyncedTs when the
candidate wins by Seq; change it to allocate and return a new
heartbeatpb.Watermark instead of reusing current or candidate, compute
Seq/CheckpointTs selection as before (choose the candidate when
candidate.Seq>current.Seq or equal Seq and
candidate.CheckpointTs>=current.CheckpointTs) but always set LastSyncedTs to the
max(current.LastSyncedTs, candidate.LastSyncedTs) without mutating either input,
and ensure nil handling for current/candidate still returns a freshly allocated
Watermark with the appropriate fields.
---
Nitpick comments:
In `@maintainer/maintainer.go`:
- Around line 1191-1208: The throttled log in recordDroppedStatusRequest
currently reports the cumulative droppedStatusRequests.count, hiding per-window
drops; add a lastLoggedCount uint64 field to the droppedStatusRequests struct,
compute delta := m.droppedStatusRequests.count -
m.droppedStatusRequests.lastLoggedCount inside recordDroppedStatusRequest when
emitting the log, include both the cumulative droppedCount and the per-window
delta (e.g., "droppedDelta") in the zap fields, and set
m.droppedStatusRequests.lastLoggedCount = m.droppedStatusRequests.count when you
update lastLogTime so subsequent logs show the next window's delta.
In `@maintainer/status_request_buffer.go`:
- Around line 115-150: mergeHeartbeat currently stores incoming protobuf
pointers (e.g., req.ChangefeedID, req.Watermark, req.RedoWatermark and
req.Statuses elements) directly into the bufferedHeartbeatRequest, which can be
mutated later; instead, make defensive copies when merging: copy scalar fields
(assign a new string/ID value rather than pointer), clone
Watermark/RedoWatermark via a deep-copy (e.g., proto.Clone or constructing new
Watermark structs) before assigning to entry.watermark/entry.redoWatermark, and
either clone or ensure immutability of each TableSpanStatus stored in
entry.statuses (clone status before entry.statuses[key] = status if those protos
can be mutated/recycled); update mergeHeartbeat and any helper usages
(pickWatermark) to operate on and return copies rather than reusing incoming
pointers.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 4b012fd1-7a51-4209-8579-9f4da556b8b8
📒 Files selected for processing (3)
maintainer/maintainer.gomaintainer/maintainer_test.gomaintainer/status_request_buffer.go
| package maintainer | ||
|
|
||
| import ( | ||
| "sync" | ||
|
|
||
| "github.com/pingcap/ticdc/heartbeatpb" | ||
| "github.com/pingcap/ticdc/pkg/common" | ||
| "github.com/pingcap/ticdc/pkg/messaging" | ||
| "github.com/pingcap/ticdc/pkg/node" | ||
| ) |
There was a problem hiding this comment.
Missing Apache 2.0 copyright header — breaks check-copyright in CI.
The pipeline's check-copyright job reports that ./maintainer/status_request_buffer.go is missing the standard license header used by the rest of this package (see the banner in maintainer/maintainer.go). Please prepend it.
🛠️ Proposed fix
+// Copyright 2026 PingCAP, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
package maintainer📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| package maintainer | |
| import ( | |
| "sync" | |
| "github.com/pingcap/ticdc/heartbeatpb" | |
| "github.com/pingcap/ticdc/pkg/common" | |
| "github.com/pingcap/ticdc/pkg/messaging" | |
| "github.com/pingcap/ticdc/pkg/node" | |
| ) | |
| // Copyright 2026 PingCAP, Inc. | |
| // | |
| // Licensed under the Apache License, Version 2.0 (the "License"); | |
| // you may not use this file except in compliance with the License. | |
| // You may obtain a copy of the License at | |
| // | |
| // http://www.apache.org/licenses/LICENSE-2.0 | |
| // | |
| // Unless required by applicable law or agreed to in writing, software | |
| // distributed under the License is distributed on an "AS IS" BASIS, | |
| // See the License for the specific language governing permissions and | |
| // limitations under the License. | |
| package maintainer | |
| import ( | |
| "sync" | |
| "github.com/pingcap/ticdc/heartbeatpb" | |
| "github.com/pingcap/ticdc/pkg/common" | |
| "github.com/pingcap/ticdc/pkg/messaging" | |
| "github.com/pingcap/ticdc/pkg/node" | |
| ) |
🧰 Tools
🪛 GitHub Actions: PR Build and Unit Test
[error] 1-1: check-copyright failed: The copyright information of the following file is incorrect: ./maintainer/status_request_buffer.go
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@maintainer/status_request_buffer.go` around lines 1 - 10, Add the standard
Apache 2.0 license header used across this package to the top of this file
(before the "package maintainer" line) so it matches the banner in
maintainer/maintainer.go; ensure the header text and year/owner fields match the
project's existing header format exactly and then run the repository's
check-copyright CI locally or re-run the pipeline.
| func pickWatermark(current, candidate *heartbeatpb.Watermark) *heartbeatpb.Watermark { | ||
| if candidate == nil { | ||
| return current | ||
| } | ||
| if current == nil { | ||
| return candidate | ||
| } | ||
| if candidate.Seq > current.Seq || (candidate.Seq == current.Seq && candidate.CheckpointTs >= current.CheckpointTs) { | ||
| current = candidate | ||
| } | ||
| if candidate.LastSyncedTs > current.LastSyncedTs { | ||
| current.LastSyncedTs = candidate.LastSyncedTs | ||
| } | ||
| return current | ||
| } |
There was a problem hiding this comment.
pickWatermark drops LastSyncedTs max and mutates the incoming proto object.
Two concerns in this helper:
-
Lost
LastSyncedTsmax. Whencandidatewins bySeqand has a smallerLastSyncedTsthan the previously bufferedcurrent, the higher value is silently lost. The maintainer's normal heartbeat path (onHeartbeatRequest) treatsLastSyncedTsas max-monotonic; the dedupe path should preserve that invariant — otherwise a buffered + drained heartbeat can regressLastSyncedTscompared to processing the same requests individually. -
Aliasing / in-place mutation of the incoming proto. After
current = candidate, the next statementcurrent.LastSyncedTs = candidate.LastSyncedTs(and more importantly, on the alternate branch,current.LastSyncedTs = candidate.LastSyncedTswhencurrentis the previously buffered watermark) mutates the*heartbeatpb.Watermarkobject the caller passed in or that a prior sender'sreq.Watermarkstill points at. That object may still be referenced by the messaging layer or by subsequent merges. Prefer allocating a freshWatermarkinside the buffer.
🛡️ Proposed fix — compute max(LastSyncedTs) explicitly and avoid mutating either input
func pickWatermark(current, candidate *heartbeatpb.Watermark) *heartbeatpb.Watermark {
if candidate == nil {
return current
}
if current == nil {
- return candidate
+ c := *candidate
+ return &c
}
- if candidate.Seq > current.Seq || (candidate.Seq == current.Seq && candidate.CheckpointTs >= current.CheckpointTs) {
- current = candidate
- }
- if candidate.LastSyncedTs > current.LastSyncedTs {
- current.LastSyncedTs = candidate.LastSyncedTs
- }
- return current
+ maxLastSynced := current.LastSyncedTs
+ if candidate.LastSyncedTs > maxLastSynced {
+ maxLastSynced = candidate.LastSyncedTs
+ }
+ chosen := current
+ if candidate.Seq > current.Seq ||
+ (candidate.Seq == current.Seq && candidate.CheckpointTs >= current.CheckpointTs) {
+ chosen = candidate
+ }
+ out := *chosen
+ out.LastSyncedTs = maxLastSynced
+ return &out
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| func pickWatermark(current, candidate *heartbeatpb.Watermark) *heartbeatpb.Watermark { | |
| if candidate == nil { | |
| return current | |
| } | |
| if current == nil { | |
| return candidate | |
| } | |
| if candidate.Seq > current.Seq || (candidate.Seq == current.Seq && candidate.CheckpointTs >= current.CheckpointTs) { | |
| current = candidate | |
| } | |
| if candidate.LastSyncedTs > current.LastSyncedTs { | |
| current.LastSyncedTs = candidate.LastSyncedTs | |
| } | |
| return current | |
| } | |
| func pickWatermark(current, candidate *heartbeatpb.Watermark) *heartbeatpb.Watermark { | |
| if candidate == nil { | |
| return current | |
| } | |
| if current == nil { | |
| c := *candidate | |
| return &c | |
| } | |
| maxLastSynced := current.LastSyncedTs | |
| if candidate.LastSyncedTs > maxLastSynced { | |
| maxLastSynced = candidate.LastSyncedTs | |
| } | |
| chosen := current | |
| if candidate.Seq > current.Seq || | |
| (candidate.Seq == current.Seq && candidate.CheckpointTs >= current.CheckpointTs) { | |
| chosen = candidate | |
| } | |
| out := *chosen | |
| out.LastSyncedTs = maxLastSynced | |
| return &out | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@maintainer/status_request_buffer.go` around lines 285 - 299, pickWatermark
currently mutates the incoming *heartbeatpb.Watermark objects and can drop a
higher LastSyncedTs when the candidate wins by Seq; change it to allocate and
return a new heartbeatpb.Watermark instead of reusing current or candidate,
compute Seq/CheckpointTs selection as before (choose the candidate when
candidate.Seq>current.Seq or equal Seq and
candidate.CheckpointTs>=current.CheckpointTs) but always set LastSyncedTs to the
max(current.LastSyncedTs, candidate.LastSyncedTs) without mutating either input,
and ensure nil handling for current/candidate still returns a freshly allocated
Watermark with the appropriate fields.
What problem does this PR solve?
Issue Number: close #xxx
What is changed and how it works?
Check List
Tests
Questions
Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?
Release note
Summary by CodeRabbit
Bug Fixes
Performance
New Features
Tests