Skip to content

maintainer,dispatcher: dedupe waiting dispatcherStatus#4841

Open
asddongmen wants to merge 4 commits intopingcap:masterfrom
asddongmen:0416-maintainer-dedupe-waiting
Open

maintainer,dispatcher: dedupe waiting dispatcherStatus#4841
asddongmen wants to merge 4 commits intopingcap:masterfrom
asddongmen:0416-maintainer-dedupe-waiting

Conversation

@asddongmen
Copy link
Copy Markdown
Collaborator

@asddongmen asddongmen commented Apr 17, 2026

What problem does this PR solve?

Issue Number: close #xxx

What is changed and how it works?

Check List

Tests

  • Unit test
  • Integration test
  • Manual test (add detailed scripts or steps below)
  • No code

Questions

Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?

Release note

Please refer to [Release Notes Language Style Guide](https://pingcap.github.io/tidb-dev-guide/contribute-to-tidb/release-notes-style-guide.html) to write a quality release note.

If you don't think this PR needs a release note then fill it with `None`.

Summary by CodeRabbit

  • Bug Fixes

    • Improved deduplication of block-status messages to avoid redundant WAITING/DONE processing.
  • Performance

    • Switched to bounded buffering and non-blocking handoff for block-status events, reducing contention and message overhead.
  • New Features

    • Added internal buffering for maintainer status-request events to aggregate and coalesce requests per sender.
  • Tests

    • Added unit tests covering buffer deduplication, ordering, and maintainer buffering behavior.

@ti-chi-bot ti-chi-bot Bot added do-not-merge/needs-linked-issue release-note Denotes a PR that will be considered when it comes time to generate release notes. labels Apr 17, 2026
@ti-chi-bot
Copy link
Copy Markdown

ti-chi-bot Bot commented Apr 17, 2026

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:
Once this PR has been reviewed and has the lgtm label, please assign sdojjy for approval. For more information see the Code Review Process.
Please ensure that each of them provides their approval before proceeding.

The full list of commands accepted by this bot can be found here.

Details Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Apr 17, 2026

📝 Walkthrough

Walkthrough

Replaced dispatcher-exported block-status channel with a bounded in-process buffer (BlockStatusBuffer) and method-based handoff APIs (OfferBlockStatus, OfferDoneBlockStatus, TakeBlockStatus*). Refactored consumers to dequeue with context/timeouts and added deduplication and in‑flight tracking at the queue/collector layers.

Changes

Cohort / File(s) Summary
Dispatcher Interface & Core Info
downstreamadapter/dispatcher/basic_dispatcher.go, downstreamadapter/dispatcher/basic_dispatcher_info.go
Removed GetBlockStatusesChan(); added OfferBlockStatus and OfferDoneBlockStatus. NewSharedInfo now accepts blockStatusBufferSize int. Added TakeBlockStatus, TakeBlockStatusWithTimeout, TryTakeBlockStatus, and BlockStatusLen.
Block Status Buffer
downstreamadapter/dispatcher/block_status_buffer.go, downstreamadapter/dispatcher/block_status_buffer_test.go
New BlockStatusBuffer type with bounded queue, deduplication/coalescing for WAITING/DONE per key, lazy DONE materialization, and APIs Offer, OfferDone, Take, TryTake, Len. Tests cover dedupe, ordering, and mode/isSyncPoint distinctions.
Dispatcher Usage & Tests
downstreamadapter/dispatcher/helper.go, downstreamadapter/dispatcher/event_dispatcher_test.go, downstreamadapter/dispatcher/basic_dispatcher_active_active_test.go, downstreamadapter/dispatcher/redo_dispatcher_test.go
Replaced direct channel sends/reads with OfferBlockStatus and TakeBlockStatusWithTimeout. Tests updated to pass buffer-size ints (e.g., 128, 1) instead of block-status channels and to assert via take APIs. ResendTask.Execute() now calls OfferBlockStatus.
Dispatcher Manager & Collector
downstreamadapter/dispatchermanager/dispatcher_manager.go, downstreamadapter/dispatchermanager/dispatcher_manager_test.go, downstreamadapter/dispatchermanager/heartbeat_collector.go
NewDispatcherManager now passes buffer-size int. collectBlockStatusRequest reads via TakeBlockStatus(ctx) and batches with a timeout context; uses BlockStatusLen() for metrics. sendBlockStatusMessages now calls OnSendComplete() after successful sends.
Heartbeat Queue Deduplication
downstreamadapter/dispatchermanager/heartbeat_queue.go, downstreamadapter/dispatchermanager/heartbeat_queue_test.go
Enhanced BlockStatusRequestQueue with mutexed dedupe/in‑flight tracking, request filtering of duplicable WAITING/DONE statuses, and OnSendComplete to clear in-flight state. New tests validate deduplication, per-key distinctions, and completion behavior.
Maintainer Buffered Requests
maintainer/maintainer.go, maintainer/status_request_buffer.go, maintainer/maintainer_test.go
Added internal buffering for status-request events, statusRequestNotifyCh, buffering/dedup logic, and handleBufferedStatusRequests draining behavior. pushEvent now tries buffered path and returns bool; added tests for buffering and drop behavior.

Sequence Diagram(s)

sequenceDiagram
    participant Dispatcher
    participant BlockStatusBuffer
    participant DispatcherManager
    participant Queue as BlockStatusRequestQueue
    participant Collector as HeartbeatCollector

    Dispatcher->>BlockStatusBuffer: OfferBlockStatus(status)
    Note over BlockStatusBuffer: Deduplicate WAITING by key
    Dispatcher->>BlockStatusBuffer: OfferDoneBlockStatus(dispatcherID, blockTs, isSyncPoint, mode)
    Note over BlockStatusBuffer: Coalesce DONE placeholders

    DispatcherManager->>BlockStatusBuffer: TakeBlockStatus(ctx)
    BlockStatusBuffer-->>DispatcherManager: TableSpanBlockStatus
    DispatcherManager->>DispatcherManager: Classify by mode (default vs redo)
    DispatcherManager->>Queue: Enqueue(blockStatusRequest)

    Queue->>Queue: Filter duplicates by dedupeKey
    Queue->>Collector: Dequeue()
    Collector-->>Queue: BlockStatusRequestWithTargetID
    Collector->>Collector: SendCommand(request)
    Collector->>Queue: OnSendComplete(request)
    Note over Queue: Clear in-flight tracking
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

Suggested labels

lgtm, approved, release-note-none

Suggested reviewers

  • wk989898
  • flowbehappy
  • lidezhu
  • hongyunyan

Poem

🐰 I offer statuses with a hop and a wink,
My buffer keeps duplicates from making you blink,
TAKE with a timeout, OFFER with grace,
The queue tracks in‑flight, keeps order in place.
Hop on, code—let dedupe win the race!

🚥 Pre-merge checks | ✅ 1 | ❌ 2

❌ Failed checks (2 warnings)

Check name Status Explanation Resolution
Description check ⚠️ Warning The PR description is incomplete. It contains only the template structure with no substantive details about what problem is solved, what changes were made, test coverage, or release notes. Fill in the PR description with: the issue being closed, a summary of changes and how they work, which tests are included, answers to compatibility/regression questions, and appropriate release notes.
Docstring Coverage ⚠️ Warning Docstring coverage is 22.50% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (1 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly references the main change: deduplication of waiting dispatcher status messages in the maintainer and dispatcher components.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@ti-chi-bot ti-chi-bot Bot added the size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files. label Apr 17, 2026
Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a BlockStatusBuffer and deduplication logic in the BlockStatusRequestQueue to optimize the handling of dispatcher block statuses. By coalescing identical WAITING and DONE statuses and tracking in-flight requests, the changes aim to reduce memory amplification and redundant heartbeat reports. Feedback was provided regarding potential nil pointer dereferences when processing status IDs and suggestions to move non-state-dependent calculations outside of critical sections to reduce lock contention.

Comment on lines +182 to +194
func isWaitingBlockStatus(status *heartbeatpb.TableSpanBlockStatus) bool {
return status != nil &&
status.State != nil &&
status.State.IsBlocked &&
status.State.Stage == heartbeatpb.BlockStage_WAITING
}

func isDoneBlockStatus(status *heartbeatpb.TableSpanBlockStatus) bool {
return status != nil &&
status.State != nil &&
status.State.IsBlocked &&
status.State.Stage == heartbeatpb.BlockStage_DONE
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The functions isWaitingBlockStatus and isDoneBlockStatus do not check if status.ID is non-nil. Since newBlockStatusKey (called in Offer and materialize) relies on status.ID to create a key, a nil status.ID will cause a panic. Although BasicDispatcher currently always sets this field, adding a defensive check here is safer for the buffer's integrity.

Comment on lines +146 to +153
key := blockStatusRequestDedupeKey{
targetID: request.TargetID,
dispatcherID: common.NewDispatcherIDFromPB(status.ID),
blockTs: status.State.BlockTs,
mode: status.Mode,
isSyncPoint: status.State.IsSyncPoint,
stage: status.State.Stage,
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The blockStatusRequestDedupeKey is calculated inside the critical section protected by q.mu. While common.NewDispatcherIDFromPB is likely fast, it is generally better practice to perform calculations that do not depend on the protected state outside of the lock to minimize contention. Additionally, similar to the feedback in block_status_buffer.go, there is no check for status.ID != nil before calling NewDispatcherIDFromPB, which could lead to a panic.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
downstreamadapter/dispatcher/helper.go (1)

271-283: ⚠️ Potential issue | 🟡 Minor

OfferBlockStatus can block the task scheduler on a full buffer.

OfferBlockStatus → BlockStatusBuffer.Offer performs blocking channel sends (b.queue <- … at lines 70, 74, 82 in downstreamadapter/dispatcher/block_status_buffer.go). If the consumer (collectBlockStatusRequest) stalls and the buffer fills, ResendTask.Execute() will block the DispatcherTaskScheduler worker indefinitely, starving other resend tasks. Line 74 sends unconditionally for non-done statuses without even a reservation guard.

Risk is low in production (1M-sized buffer) but non-zero during prolonged maintainer stalls or high block status pressure. No TryOffer non-blocking variant exists yet, and there are no metrics tracking buffer saturation. Consider adding either a non-blocking variant (drop if full since resend will retry in 5s anyway) or at minimum expose logging/metrics when buffer pressure builds.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@downstreamadapter/dispatcher/helper.go` around lines 271 - 283,
ResendTask.Execute currently calls OfferBlockStatus which calls
BlockStatusBuffer.Offer and can block the DispatcherTaskScheduler when the
internal channel fills; change the code so Execute uses a non-blocking offer
path (implement a TryOffer non-blocking variant on BlockStatusBuffer and have
OfferBlockStatus call it, or add a non-blocking branch in OfferBlockStatus that
uses a select with a default to drop the send) so that ResendTask.Execute never
blocks the scheduler; when the non-blocking send fails increment/log a metric
and a debug/info log (include dispatcher.GetId() and message) so buffer pressure
is observable; keep the existing blocking Offer for explicit callers if needed
and retain resendTimeInterval retry behavior.
downstreamadapter/dispatchermanager/heartbeat_collector.go (1)

245-254: ⚠️ Potential issue | 🟠 Major

OnSendComplete should not run on SendCommand failure.

OnSendComplete is invoked unconditionally right after SendCommand at line 251, before the error is checked at line 252. When SendCommand fails, the in-flight deduplication tracking for this request's statuses is cleared even though nothing reached the maintainer. Consequences:

  • For WAITING statuses this is recoverable because ResendTask.Execute re-offers them every 5s, allowing them to re-pass deduplication and get re-enqueued.
  • For DONE statuses (pushed once via BlockStatusBuffer.OfferDone after write/pass completes), there is no periodic re-publisher. A transient send failure silently loses the completion signal, potentially leaving the maintainer stuck waiting on a DONE that will never arrive again.

Move the call so it only runs on success:

🛡️ Proposed fix
 			err := c.mc.SendCommand(
 				messaging.NewSingleTargetMessage(
 					blockStatusRequestWithTargetID.TargetID,
 					messaging.MaintainerManagerTopic,
 					blockStatusRequestWithTargetID.Request,
 				))
-			c.blockStatusReqQueue.OnSendComplete(blockStatusRequestWithTargetID)
 			if err != nil {
 				log.Error("failed to send block status request message", zap.Error(err))
+			} else {
+				c.blockStatusReqQueue.OnSendComplete(blockStatusRequestWithTargetID)
 			}

Note: even this fix is incomplete. Keeping OnSendComplete only on success leaves failed request keys in inFlightStatuses indefinitely, blocking future attempts to send the same status. The real fix requires either (a) a TTL/expiry on in-flight keys, (b) explicit re-enqueue of failed requests, or (c) adding a resend path for DONE statuses equivalent to ResendTask for WAITING. Confirm the intended failure-recovery strategy.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@downstreamadapter/dispatchermanager/heartbeat_collector.go` around lines 245
- 254, The call to blockStatusReqQueue.OnSendComplete is currently executed
unconditionally right after c.mc.SendCommand; change the flow in the SendCommand
handling so that OnSendComplete(blockStatusRequestWithTargetID) is only invoked
when err == nil (i.e., move it after the nil-check of the SendCommand result and
call it only on success), leaving the existing log.Error(...) for failures;
additionally, address the longer-term recovery for inFlightStatuses/DONE
messages by implementing one of: TTL/expiry for in-flight keys, explicit
re-enqueue of failed requests, or a resend path for DONE statuses (update the
logic around blockStatusReqQueue, inFlightStatuses, and BlockStatusBuffer
accordingly) and confirm which strategy to use.
♻️ Duplicate comments (1)
downstreamadapter/dispatchermanager/heartbeat_queue.go (1)

106-117: ⚠️ Potential issue | 🟡 Minor

OnSendComplete semantics need to match send outcome.

As currently wired from heartbeat_collector.go Line 251, this method is invoked regardless of whether SendCommand actually succeeded. See the detailed finding in that file — the fix likely belongs at the caller (only call on success), but if you instead decide that OnSendComplete should model both outcomes, consider renaming it and/or taking an err parameter so callers cannot inadvertently clear in-flight tracking for a failed send.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@downstreamadapter/dispatchermanager/heartbeat_queue.go` around lines 106 -
117, The current OnSendComplete on BlockStatusRequestQueue unconditionally
clears in-flight tracking for a request even when SendCommand failed; either
stop calling OnSendComplete on failure at the caller (heartbeat_collector.go
send path) or change OnSendComplete to accept an error/result flag and only
delete q.inFlightStatuses and q.requestStatusKeys when the send succeeded.
Concretely: update the caller (SendCommand path) to only call
OnSendComplete(request) on success, or change the method signature (e.g.,
OnSendResult(request *BlockStatusRequestWithTargetID, err error) or rename to
OnSendSuccess) and branch to perform the delete logic only when err == nil,
keeping locking and map cleanup as-is.
🧹 Nitpick comments (7)
downstreamadapter/dispatchermanager/heartbeat_queue.go (1)

132-171: In-place mutation of caller's slice is subtle; consider cloning.

filtered := statuses[:0] reuses request.Request.BlockStatuses' backing array and the final request.Request.BlockStatuses = filtered mutates caller-observable state. This is safe only because Enqueue takes ownership, but it means any caller-side reference to the original slice (e.g., for logging) will observe a shrunken slice after Enqueue returns. Also, dropped entries in statuses[len(filtered):] retain their pointers until GC frees the enclosing array.

Either document the ownership transfer contract on Enqueue, or allocate a fresh slice:

♻️ Optional cleanup
-	statuses := request.Request.BlockStatuses
-	filtered := statuses[:0]
-	statusKeys := make([]blockStatusRequestDedupeKey, 0)
+	statuses := request.Request.BlockStatuses
+	filtered := make([]*heartbeatpb.TableSpanBlockStatus, 0, len(statuses))
+	statusKeys := make([]blockStatusRequestDedupeKey, 0)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@downstreamadapter/dispatchermanager/heartbeat_queue.go` around lines 132 -
171, trackPendingStatuses currently does an in-place filter using filtered :=
statuses[:0] which reuses the original backing array and mutates
request.Request.BlockStatuses (subtle ownership/leak issues); change to allocate
a fresh slice (e.g., make([]*pb.BlockStatus, 0, len(statuses)) and append kept
items) and assign that new slice to request.Request.BlockStatuses so we don't
shrink the caller's slice or retain dropped-element pointers, keep the rest of
the logic that updates q.queuedStatuses, q.inFlightStatuses and
q.requestStatusKeys the same; alternatively, if you prefer not to change code,
add a clear comment on Enqueue's ownership contract for
request.Request.BlockStatuses (but prefer the fresh-slice approach for safety).
downstreamadapter/dispatcher/block_status_buffer_test.go (1)

26-168: LGTM — good coverage of the dedup contract.

The five tests cover the primary semantics: DONE dedup via OfferDone, WAITING dedup via Offer, re-queueability after Take clears pending, FIFO ordering across WAITING→DONE for the same key, and distinctness of DONE keys by isSyncPoint/Mode. Uses require.* from testify per guideline, and timeouts are deterministic (1s default, no sleeps).

One small nit: in TestBlockStatusBufferAllowsWaitingAgainAfterTake you re-offer the exact same waiting pointer (Line 100) and then assert second.State.Stage == WAITING. Because materialize returns entry.status verbatim for WAITING, first and second are pointer-equal, so the second assertion is effectively re-asserting on the same object. Consider asserting require.Same(t, waiting, second) or constructing a distinct-but-equivalent status to make the intent (buffer accepts a new instance for the same key) explicit.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@downstreamadapter/dispatcher/block_status_buffer_test.go` around lines 26 -
168, The test re-offers the exact same waiting pointer in
TestBlockStatusBufferAllowsWaitingAgainAfterTake which makes the second
assertion redundant because materialize returns the same object; change the test
to either assert pointer equality with require.Same(t, waiting, second) or
instead re-create a new, equal heartbeatpb.TableSpanBlockStatus instance (same
ID/State/Mode) before the second buffer.Offer to prove the buffer accepts a
distinct-but-equivalent object; update references to waiting, first, and second
in that test accordingly.
downstreamadapter/dispatcher/basic_dispatcher_info.go (1)

231-237: TakeBlockStatus/TakeBlockStatusWithTimeout on BasicDispatcher appear to be test-only; SharedInfo.TakeBlockStatusWithTimeout uses context.Background().

Two small observations on the take-side surface:

  1. Per the PR summary, production consumers (dispatcher_manager.go::collectBlockStatusRequest) take via e.sharedInfo.TakeBlockStatus(ctx) directly and never go through the dispatcher methods. The BasicDispatcher.TakeBlockStatus / TakeBlockStatusWithTimeout wrappers (Line 231-237) only exist to support event_dispatcher_test.go. Consider either removing them from the dispatcher (exposing via SharedInfo only) or renaming/commenting them as test helpers to avoid implying that individual dispatchers are a legitimate consumer of the shared buffer.

  2. SharedInfo.TakeBlockStatusWithTimeout (Line 282-290) derives its context from context.Background() rather than an externally supplied parent. That's fine for tests (what this method is used for today), but means the call cannot be cancelled earlier by a changefeed/manager shutdown — it will always wait out the full timeout. If this method ever gets used outside tests, consider taking a parent ctx:

♻️ Proposed signature for non-test usage
-func (s *SharedInfo) TakeBlockStatusWithTimeout(timeout time.Duration) (*heartbeatpb.TableSpanBlockStatus, bool) {
-	ctx, cancel := context.WithTimeout(context.Background(), timeout)
+func (s *SharedInfo) TakeBlockStatusWithTimeout(parent context.Context, timeout time.Duration) (*heartbeatpb.TableSpanBlockStatus, bool) {
+	ctx, cancel := context.WithTimeout(parent, timeout)
 	defer cancel()
 	status := s.TakeBlockStatus(ctx)
 	if status == nil {
 		return nil, false
 	}
 	return status, true
 }

Also applies to: 282-290

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@downstreamadapter/dispatcher/basic_dispatcher_info.go` around lines 231 -
237, The BasicDispatcher methods TakeBlockStatus and TakeBlockStatusWithTimeout
are only used by tests and duplicate SharedInfo behavior; remove these two
wrapper methods (TakeBlockStatus, TakeBlockStatusWithTimeout) from
BasicDispatcher or mark them clearly as test helpers to avoid implying they’re
for production use, and update callers to call SharedInfo.TakeBlockStatus /
SharedInfo.TakeBlockStatusWithTimeout directly; additionally change
SharedInfo.TakeBlockStatusWithTimeout to accept a parent context parameter
instead of deriving context.Background() so callers can cancel early (update its
signature and all call sites to pass ctx).
downstreamadapter/dispatchermanager/heartbeat_queue_test.go (2)

64-163: Optional: extract a helper for WAITING request construction to reduce duplication.

newDoneBlockStatusRequest (Line 232-249) keeps DONE-request test fixtures concise, but the four WAITING-request literals on Lines 69-85, 86-102, 116-132, 141-157, 201-217 are near-identical and expand the tests considerably. A small newWaitingBlockStatusRequest(targetID, dispatcherID, blockTs) helper mirroring the DONE one would shrink these tests and make diffs more readable.

Otherwise the tests correctly exercise queued/in-flight dedup + OnSendComplete semantics, distinct DONE keys across sync-point/mode, and WAITING/DONE distinctness. Uses require.Same for pointer identity which is appropriate here.

Also applies to: 196-230

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@downstreamadapter/dispatchermanager/heartbeat_queue_test.go` around lines 64
- 163, The test repeats four near-identical WAITING BlockStatusRequest fixtures;
add a helper function newWaitingBlockStatusRequest(targetID, dispatcherID,
blockTs) (mirroring existing newDoneBlockStatusRequest) and replace the literal
constructions used in
TestBlockStatusRequestQueueDeduplicatesQueuedAndInFlightWaiting (the variables
first, second, third, fourth) with calls to this helper to reduce duplication
and improve readability.

43-45: Consider raising the 20ms timeout to improve test robustness on loaded CI.

The Dequeue assertions at lines 43-45, 50-52, 112-114, and 135-137 rely on short 20ms context timeouts to verify that dedup is blocking requests. Under heavy CI load or with -race enabled, spurious latency can occasionally exceed this threshold. Since these are negative assertions (not performance tests), raising to 50–100ms adds safety without affecting what the tests measure. Monitor for timeout-related flakiness; adjust if you see occasional failures on busy runs.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@downstreamadapter/dispatchermanager/heartbeat_queue_test.go` around lines 43
- 45, The test uses a very short 20ms timeout when creating shortCtx (via
context.WithTimeout) before calling queue.Dequeue, which makes negative
assertions flaky under CI or -race; update the context timeout value from
20*time.Millisecond to a larger safe window (e.g., 75*time.Millisecond) for the
shortCtx/shortCancel usages that wrap queue.Dequeue to ensure stability—search
for the context.WithTimeout(..., 20*time.Millisecond) occurrences in the
heartbeat_queue_test (the shortCtx/shortCancel definitions used with
queue.Dequeue) and change them all to the chosen higher duration.
downstreamadapter/dispatcher/block_status_buffer.go (1)

61-101: Reserve→enqueue is not atomic; a crashed producer can permanently leak a pending key.

reserveWaiting/reserveDone insert into the pending map, then the caller sends to b.queue outside the lock. If the producer goroutine panics or is cancelled between the successful reservation and the channel send (or blocks on a full queue and then the process is forced to shut down), the key remains in pendingWaiting/pendingDone forever, which silently drops all future dedup-eligible statuses for that same (dispatcherID, blockTs, mode, isSyncPoint) tuple.

Given the 1M-capacity queue this is unlikely to bite in practice, but is worth protecting against. A simple fix is to use defer to release the reservation if the send doesn't happen:

♻️ Suggested defensive fix
 func (b *BlockStatusBuffer) Offer(status *heartbeatpb.TableSpanBlockStatus) {
 	if status == nil {
 		return
 	}
 	if isWaitingBlockStatus(status) {
 		key := newBlockStatusKey(status)
 		if !b.reserveWaiting(key) {
 			return
 		}
-		b.queue <- blockStatusQueueEntry{status: status}
+		enqueued := false
+		defer func() {
+			if !enqueued {
+				b.releaseWaiting(key)
+			}
+		}()
+		b.queue <- blockStatusQueueEntry{status: status}
+		enqueued = true
 		return
 	}

(and analogous changes for the DONE branch / OfferDone).

Optional; flag only if robustness under panic/forced shutdown is considered important for this subsystem.

Also applies to: 125-143

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@downstreamadapter/dispatcher/block_status_buffer.go` around lines 61 - 101,
The reserve->enqueue path in BlockStatusBuffer.Offer and OfferDone can leak
entries in pendingWaiting/pendingDone if a panic or cancellation occurs between
reserveWaiting/reserveDone and sending to b.queue; change both Offer and
OfferDone so that immediately after a successful reserveWaiting/reserveDone you
install a short-lived defer cleanup (or scoped rollback flag) that removes the
pending entry unless the subsequent send to b.queue completed successfully
(clear the defer/flip the flag after send); apply the same pattern for both the
waiting and done branches (referencing reserveWaiting, reserveDone,
pendingWaiting, pendingDone, BlockStatusBuffer.Offer, OfferDone, and
blockStatusQueueEntry/b.queue) to ensure the reservation is released if the
channel send does not happen.
downstreamadapter/dispatcher/basic_dispatcher.go (1)

989-1027: Note: non-blocking schedule DDL status (Stage_NONE) is not deduped in the buffer.

The message built here has Stage: heartbeatpb.BlockStage_NONE with IsBlocked: false, so it falls through Offer()'s pass-through branch in BlockStatusBuffer (neither isWaitingBlockStatus nor isDoneBlockStatus matches). The resendTask set on Line 1023/1025 will re-invoke OfferBlockStatus(message) periodically until ACKed by maintainer, and each resend pushes a new entry into the 1M-capacity buffer.

This is not a correctness problem (the downstream BlockStatusRequestQueue in the dispatcher manager performs request-level dedup), but it does leave a gap in the PR's stated intent of deduplicating pending statuses at the dispatcher layer and can amplify buffer occupancy under slow ACK. Consider whether non-blocking schedule events should also go through a dedup key (e.g., by treating (dispatcherID, blockTs) as a distinct "scheduling" key), or add a brief comment explaining that dedup for this path is intentionally deferred to the upper queue.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@downstreamadapter/dispatcher/basic_dispatcher.go` around lines 989 - 1027,
The code builds a non-blocking TableSpanBlockStatus (message with Stage:
heartbeatpb.BlockStage_NONE and State.IsBlocked=false) and registers a periodic
resend (resendTaskMap.Set with newResendTask) which repeatedly calls
OfferBlockStatus(message); because BlockStatusBuffer.Offer treats Stage_NONE as
pass-through (not matching isWaitingBlockStatus/isDoneBlockStatus), each resend
enqueues a duplicate into the 1M buffer; fix by deduping these non-blocking
schedule events before enqueuing: add a scheduling-key check (e.g., key = (d.id,
event.GetCommitTs())) in OfferBlockStatus or just before d.resendTaskMap.Set to
skip re-Offer if the same key is already tracked, or alternatively convert the
non-blocking message to use the same dedup path as blocking ones (mark
waiting/done) so BlockStatusBuffer.Offer collapses duplicates; update
references: message, Stage: heartbeatpb.BlockStage_NONE, resendTaskMap.Set,
newResendTask, OfferBlockStatus, and
BlockStatusBuffer.Offer/isWaitingBlockStatus/isDoneBlockStatus.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Outside diff comments:
In `@downstreamadapter/dispatcher/helper.go`:
- Around line 271-283: ResendTask.Execute currently calls OfferBlockStatus which
calls BlockStatusBuffer.Offer and can block the DispatcherTaskScheduler when the
internal channel fills; change the code so Execute uses a non-blocking offer
path (implement a TryOffer non-blocking variant on BlockStatusBuffer and have
OfferBlockStatus call it, or add a non-blocking branch in OfferBlockStatus that
uses a select with a default to drop the send) so that ResendTask.Execute never
blocks the scheduler; when the non-blocking send fails increment/log a metric
and a debug/info log (include dispatcher.GetId() and message) so buffer pressure
is observable; keep the existing blocking Offer for explicit callers if needed
and retain resendTimeInterval retry behavior.

In `@downstreamadapter/dispatchermanager/heartbeat_collector.go`:
- Around line 245-254: The call to blockStatusReqQueue.OnSendComplete is
currently executed unconditionally right after c.mc.SendCommand; change the flow
in the SendCommand handling so that
OnSendComplete(blockStatusRequestWithTargetID) is only invoked when err == nil
(i.e., move it after the nil-check of the SendCommand result and call it only on
success), leaving the existing log.Error(...) for failures; additionally,
address the longer-term recovery for inFlightStatuses/DONE messages by
implementing one of: TTL/expiry for in-flight keys, explicit re-enqueue of
failed requests, or a resend path for DONE statuses (update the logic around
blockStatusReqQueue, inFlightStatuses, and BlockStatusBuffer accordingly) and
confirm which strategy to use.

---

Duplicate comments:
In `@downstreamadapter/dispatchermanager/heartbeat_queue.go`:
- Around line 106-117: The current OnSendComplete on BlockStatusRequestQueue
unconditionally clears in-flight tracking for a request even when SendCommand
failed; either stop calling OnSendComplete on failure at the caller
(heartbeat_collector.go send path) or change OnSendComplete to accept an
error/result flag and only delete q.inFlightStatuses and q.requestStatusKeys
when the send succeeded. Concretely: update the caller (SendCommand path) to
only call OnSendComplete(request) on success, or change the method signature
(e.g., OnSendResult(request *BlockStatusRequestWithTargetID, err error) or
rename to OnSendSuccess) and branch to perform the delete logic only when err ==
nil, keeping locking and map cleanup as-is.

---

Nitpick comments:
In `@downstreamadapter/dispatcher/basic_dispatcher_info.go`:
- Around line 231-237: The BasicDispatcher methods TakeBlockStatus and
TakeBlockStatusWithTimeout are only used by tests and duplicate SharedInfo
behavior; remove these two wrapper methods (TakeBlockStatus,
TakeBlockStatusWithTimeout) from BasicDispatcher or mark them clearly as test
helpers to avoid implying they’re for production use, and update callers to call
SharedInfo.TakeBlockStatus / SharedInfo.TakeBlockStatusWithTimeout directly;
additionally change SharedInfo.TakeBlockStatusWithTimeout to accept a parent
context parameter instead of deriving context.Background() so callers can cancel
early (update its signature and all call sites to pass ctx).

In `@downstreamadapter/dispatcher/basic_dispatcher.go`:
- Around line 989-1027: The code builds a non-blocking TableSpanBlockStatus
(message with Stage: heartbeatpb.BlockStage_NONE and State.IsBlocked=false) and
registers a periodic resend (resendTaskMap.Set with newResendTask) which
repeatedly calls OfferBlockStatus(message); because BlockStatusBuffer.Offer
treats Stage_NONE as pass-through (not matching
isWaitingBlockStatus/isDoneBlockStatus), each resend enqueues a duplicate into
the 1M buffer; fix by deduping these non-blocking schedule events before
enqueuing: add a scheduling-key check (e.g., key = (d.id, event.GetCommitTs()))
in OfferBlockStatus or just before d.resendTaskMap.Set to skip re-Offer if the
same key is already tracked, or alternatively convert the non-blocking message
to use the same dedup path as blocking ones (mark waiting/done) so
BlockStatusBuffer.Offer collapses duplicates; update references: message, Stage:
heartbeatpb.BlockStage_NONE, resendTaskMap.Set, newResendTask, OfferBlockStatus,
and BlockStatusBuffer.Offer/isWaitingBlockStatus/isDoneBlockStatus.

In `@downstreamadapter/dispatcher/block_status_buffer_test.go`:
- Around line 26-168: The test re-offers the exact same waiting pointer in
TestBlockStatusBufferAllowsWaitingAgainAfterTake which makes the second
assertion redundant because materialize returns the same object; change the test
to either assert pointer equality with require.Same(t, waiting, second) or
instead re-create a new, equal heartbeatpb.TableSpanBlockStatus instance (same
ID/State/Mode) before the second buffer.Offer to prove the buffer accepts a
distinct-but-equivalent object; update references to waiting, first, and second
in that test accordingly.

In `@downstreamadapter/dispatcher/block_status_buffer.go`:
- Around line 61-101: The reserve->enqueue path in BlockStatusBuffer.Offer and
OfferDone can leak entries in pendingWaiting/pendingDone if a panic or
cancellation occurs between reserveWaiting/reserveDone and sending to b.queue;
change both Offer and OfferDone so that immediately after a successful
reserveWaiting/reserveDone you install a short-lived defer cleanup (or scoped
rollback flag) that removes the pending entry unless the subsequent send to
b.queue completed successfully (clear the defer/flip the flag after send); apply
the same pattern for both the waiting and done branches (referencing
reserveWaiting, reserveDone, pendingWaiting, pendingDone,
BlockStatusBuffer.Offer, OfferDone, and blockStatusQueueEntry/b.queue) to ensure
the reservation is released if the channel send does not happen.

In `@downstreamadapter/dispatchermanager/heartbeat_queue_test.go`:
- Around line 64-163: The test repeats four near-identical WAITING
BlockStatusRequest fixtures; add a helper function
newWaitingBlockStatusRequest(targetID, dispatcherID, blockTs) (mirroring
existing newDoneBlockStatusRequest) and replace the literal constructions used
in TestBlockStatusRequestQueueDeduplicatesQueuedAndInFlightWaiting (the
variables first, second, third, fourth) with calls to this helper to reduce
duplication and improve readability.
- Around line 43-45: The test uses a very short 20ms timeout when creating
shortCtx (via context.WithTimeout) before calling queue.Dequeue, which makes
negative assertions flaky under CI or -race; update the context timeout value
from 20*time.Millisecond to a larger safe window (e.g., 75*time.Millisecond) for
the shortCtx/shortCancel usages that wrap queue.Dequeue to ensure
stability—search for the context.WithTimeout(..., 20*time.Millisecond)
occurrences in the heartbeat_queue_test (the shortCtx/shortCancel definitions
used with queue.Dequeue) and change them all to the chosen higher duration.

In `@downstreamadapter/dispatchermanager/heartbeat_queue.go`:
- Around line 132-171: trackPendingStatuses currently does an in-place filter
using filtered := statuses[:0] which reuses the original backing array and
mutates request.Request.BlockStatuses (subtle ownership/leak issues); change to
allocate a fresh slice (e.g., make([]*pb.BlockStatus, 0, len(statuses)) and
append kept items) and assign that new slice to request.Request.BlockStatuses so
we don't shrink the caller's slice or retain dropped-element pointers, keep the
rest of the logic that updates q.queuedStatuses, q.inFlightStatuses and
q.requestStatusKeys the same; alternatively, if you prefer not to change code,
add a clear comment on Enqueue's ownership contract for
request.Request.BlockStatuses (but prefer the fresh-slice approach for safety).

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: e341cd4a-11f5-4550-8e01-df07805b486f

📥 Commits

Reviewing files that changed from the base of the PR and between d9288d3 and e4432d9.

📒 Files selected for processing (13)
  • downstreamadapter/dispatcher/basic_dispatcher.go
  • downstreamadapter/dispatcher/basic_dispatcher_active_active_test.go
  • downstreamadapter/dispatcher/basic_dispatcher_info.go
  • downstreamadapter/dispatcher/block_status_buffer.go
  • downstreamadapter/dispatcher/block_status_buffer_test.go
  • downstreamadapter/dispatcher/event_dispatcher_test.go
  • downstreamadapter/dispatcher/helper.go
  • downstreamadapter/dispatcher/redo_dispatcher_test.go
  • downstreamadapter/dispatchermanager/dispatcher_manager.go
  • downstreamadapter/dispatchermanager/dispatcher_manager_test.go
  • downstreamadapter/dispatchermanager/heartbeat_collector.go
  • downstreamadapter/dispatchermanager/heartbeat_queue.go
  • downstreamadapter/dispatchermanager/heartbeat_queue_test.go

Comment on lines +73 to +83
if !isDoneBlockStatus(status) {
b.queue <- blockStatusQueueEntry{status: status}
return
}

key := newBlockStatusKey(status)
if !b.reserveDone(key) {
return
}
b.queue <- blockStatusQueueEntry{doneKey: &key}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Minor: Offer()'s DONE path silently drops all State fields except key components.

When Offer() receives a DONE-staged status (Line 78-82), only the key (dispatcherID, blockTs, mode, isSyncPoint) is preserved via the doneKey placeholder; materialize() (Line 161-170) then reconstructs a minimal TableSpanBlockStatus that loses any additional State fields (BlockTables, NeedDroppedTables, NeedAddedTables, UpdatedSchemas) the caller may have populated.

In the current PR this path is unreachable — all DONE messages flow through OfferDoneBlockStatus / OfferDone which by design only carry the key fields — so behavior is preserved. However, the public Offer signature accepts any *TableSpanBlockStatus, so a future caller that hands a DONE-staged status with extras into Offer would experience silent field loss.

Consider one of:

  • Reject (or panic with a clear message) if Offer receives a DONE-staged status with non-zero auxiliary fields.
  • Store the original status pointer for DONE too (skip the placeholder optimization for the Offer entry point) and only use the placeholder form from OfferDone.
  • Add a doc comment on Offer explicitly stating that DONE statuses are reconstructed from key-only and must not carry extras.

Also applies to: 145-171

Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
@ti-chi-bot
Copy link
Copy Markdown

ti-chi-bot Bot commented Apr 17, 2026

[FORMAT CHECKER NOTIFICATION]

Notice: To remove the do-not-merge/needs-linked-issue label, please provide the linked issue number on one line in the PR body, for example: Issue Number: close #123 or Issue Number: ref #456.

📖 For more info, you can check the "Contribute Code" section in the development guide.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (2)
maintainer/maintainer.go (1)

1191-1208: Nit: cumulative droppedCount in throttled log obscures per-window drop volume.

droppedCount is monotonically accumulated and logged each time the 5 s throttle fires, so operators cannot distinguish a quiet period (e.g., 1 new drop) from a burst (e.g., 10k new drops) without diffing two log lines. Consider logging the delta since the last log (or both) for better signal.

♻️ Proposed change
 func (m *Maintainer) recordDroppedStatusRequest(msgType messaging.IOType, reason string) {
 	m.droppedStatusRequests.Lock()
 	defer m.droppedStatusRequests.Unlock()
 
 	m.droppedStatusRequests.count++
 	now := time.Now()
 	if !m.droppedStatusRequests.lastLogTime.IsZero() && now.Sub(m.droppedStatusRequests.lastLogTime) < 5*time.Second {
 		return
 	}
+	sinceLast := m.droppedStatusRequests.count - m.droppedStatusRequests.lastLoggedCount
 	m.droppedStatusRequests.lastLogTime = now
+	m.droppedStatusRequests.lastLoggedCount = m.droppedStatusRequests.count
 
 	log.Warn("drop maintainer status request",
 		zap.Stringer("changefeedID", m.changefeedID),
 		zap.String("type", msgType.String()),
 		zap.String("reason", reason),
 		zap.Int("eventChLen", m.eventCh.Len()),
-		zap.Uint64("droppedCount", m.droppedStatusRequests.count))
+		zap.Uint64("droppedSinceLastLog", sinceLast),
+		zap.Uint64("droppedCount", m.droppedStatusRequests.count))
 }

(lastLoggedCount uint64 would need to be added to the droppedStatusRequests struct.)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@maintainer/maintainer.go` around lines 1191 - 1208, The throttled log in
recordDroppedStatusRequest currently reports the cumulative
droppedStatusRequests.count, hiding per-window drops; add a lastLoggedCount
uint64 field to the droppedStatusRequests struct, compute delta :=
m.droppedStatusRequests.count - m.droppedStatusRequests.lastLoggedCount inside
recordDroppedStatusRequest when emitting the log, include both the cumulative
droppedCount and the per-window delta (e.g., "droppedDelta") in the zap fields,
and set m.droppedStatusRequests.lastLoggedCount = m.droppedStatusRequests.count
when you update lastLogTime so subsequent logs show the next window's delta.
maintainer/status_request_buffer.go (1)

115-150: mergeHeartbeat stores incoming proto pointers directly — consider defensive copy.

entry.changefeedID, entry.watermark, entry.redoWatermark, and each entry.statuses[key] = status are stored as pointers from the incoming *heartbeatpb.HeartBeatRequest. The entry sits in the buffer until drained (possibly multiple merges later). Combined with the in-place mutation in pickWatermark flagged above, this means a later merge can retroactively change the watermark observed by an earlier one still in the buffer, and any caller up the stack that reuses/recycles the proto can observe buffered mutations.

Not a bug today if the messaging layer treats incoming messages as read-only and doesn't pool them, but it's fragile. Consider copying the scalar fields / Watermark into the buffered entry; pointer reuse for *TableSpanStatus is fine if those objects are immutable post-send.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@maintainer/status_request_buffer.go` around lines 115 - 150, mergeHeartbeat
currently stores incoming protobuf pointers (e.g., req.ChangefeedID,
req.Watermark, req.RedoWatermark and req.Statuses elements) directly into the
bufferedHeartbeatRequest, which can be mutated later; instead, make defensive
copies when merging: copy scalar fields (assign a new string/ID value rather
than pointer), clone Watermark/RedoWatermark via a deep-copy (e.g., proto.Clone
or constructing new Watermark structs) before assigning to
entry.watermark/entry.redoWatermark, and either clone or ensure immutability of
each TableSpanStatus stored in entry.statuses (clone status before
entry.statuses[key] = status if those protos can be mutated/recycled); update
mergeHeartbeat and any helper usages (pickWatermark) to operate on and return
copies rather than reusing incoming pointers.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@maintainer/status_request_buffer.go`:
- Around line 1-10: Add the standard Apache 2.0 license header used across this
package to the top of this file (before the "package maintainer" line) so it
matches the banner in maintainer/maintainer.go; ensure the header text and
year/owner fields match the project's existing header format exactly and then
run the repository's check-copyright CI locally or re-run the pipeline.
- Around line 285-299: pickWatermark currently mutates the incoming
*heartbeatpb.Watermark objects and can drop a higher LastSyncedTs when the
candidate wins by Seq; change it to allocate and return a new
heartbeatpb.Watermark instead of reusing current or candidate, compute
Seq/CheckpointTs selection as before (choose the candidate when
candidate.Seq>current.Seq or equal Seq and
candidate.CheckpointTs>=current.CheckpointTs) but always set LastSyncedTs to the
max(current.LastSyncedTs, candidate.LastSyncedTs) without mutating either input,
and ensure nil handling for current/candidate still returns a freshly allocated
Watermark with the appropriate fields.

---

Nitpick comments:
In `@maintainer/maintainer.go`:
- Around line 1191-1208: The throttled log in recordDroppedStatusRequest
currently reports the cumulative droppedStatusRequests.count, hiding per-window
drops; add a lastLoggedCount uint64 field to the droppedStatusRequests struct,
compute delta := m.droppedStatusRequests.count -
m.droppedStatusRequests.lastLoggedCount inside recordDroppedStatusRequest when
emitting the log, include both the cumulative droppedCount and the per-window
delta (e.g., "droppedDelta") in the zap fields, and set
m.droppedStatusRequests.lastLoggedCount = m.droppedStatusRequests.count when you
update lastLogTime so subsequent logs show the next window's delta.

In `@maintainer/status_request_buffer.go`:
- Around line 115-150: mergeHeartbeat currently stores incoming protobuf
pointers (e.g., req.ChangefeedID, req.Watermark, req.RedoWatermark and
req.Statuses elements) directly into the bufferedHeartbeatRequest, which can be
mutated later; instead, make defensive copies when merging: copy scalar fields
(assign a new string/ID value rather than pointer), clone
Watermark/RedoWatermark via a deep-copy (e.g., proto.Clone or constructing new
Watermark structs) before assigning to entry.watermark/entry.redoWatermark, and
either clone or ensure immutability of each TableSpanStatus stored in
entry.statuses (clone status before entry.statuses[key] = status if those protos
can be mutated/recycled); update mergeHeartbeat and any helper usages
(pickWatermark) to operate on and return copies rather than reusing incoming
pointers.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 4b012fd1-7a51-4209-8579-9f4da556b8b8

📥 Commits

Reviewing files that changed from the base of the PR and between e4432d9 and 73c97af.

📒 Files selected for processing (3)
  • maintainer/maintainer.go
  • maintainer/maintainer_test.go
  • maintainer/status_request_buffer.go

Comment on lines +1 to +10
package maintainer

import (
"sync"

"github.com/pingcap/ticdc/heartbeatpb"
"github.com/pingcap/ticdc/pkg/common"
"github.com/pingcap/ticdc/pkg/messaging"
"github.com/pingcap/ticdc/pkg/node"
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Missing Apache 2.0 copyright header — breaks check-copyright in CI.

The pipeline's check-copyright job reports that ./maintainer/status_request_buffer.go is missing the standard license header used by the rest of this package (see the banner in maintainer/maintainer.go). Please prepend it.

🛠️ Proposed fix
+// Copyright 2026 PingCAP, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
 package maintainer
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
package maintainer
import (
"sync"
"github.com/pingcap/ticdc/heartbeatpb"
"github.com/pingcap/ticdc/pkg/common"
"github.com/pingcap/ticdc/pkg/messaging"
"github.com/pingcap/ticdc/pkg/node"
)
// Copyright 2026 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package maintainer
import (
"sync"
"github.com/pingcap/ticdc/heartbeatpb"
"github.com/pingcap/ticdc/pkg/common"
"github.com/pingcap/ticdc/pkg/messaging"
"github.com/pingcap/ticdc/pkg/node"
)
🧰 Tools
🪛 GitHub Actions: PR Build and Unit Test

[error] 1-1: check-copyright failed: The copyright information of the following file is incorrect: ./maintainer/status_request_buffer.go

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@maintainer/status_request_buffer.go` around lines 1 - 10, Add the standard
Apache 2.0 license header used across this package to the top of this file
(before the "package maintainer" line) so it matches the banner in
maintainer/maintainer.go; ensure the header text and year/owner fields match the
project's existing header format exactly and then run the repository's
check-copyright CI locally or re-run the pipeline.

Comment on lines +285 to +299
func pickWatermark(current, candidate *heartbeatpb.Watermark) *heartbeatpb.Watermark {
if candidate == nil {
return current
}
if current == nil {
return candidate
}
if candidate.Seq > current.Seq || (candidate.Seq == current.Seq && candidate.CheckpointTs >= current.CheckpointTs) {
current = candidate
}
if candidate.LastSyncedTs > current.LastSyncedTs {
current.LastSyncedTs = candidate.LastSyncedTs
}
return current
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

pickWatermark drops LastSyncedTs max and mutates the incoming proto object.

Two concerns in this helper:

  1. Lost LastSyncedTs max. When candidate wins by Seq and has a smaller LastSyncedTs than the previously buffered current, the higher value is silently lost. The maintainer's normal heartbeat path (onHeartbeatRequest) treats LastSyncedTs as max-monotonic; the dedupe path should preserve that invariant — otherwise a buffered + drained heartbeat can regress LastSyncedTs compared to processing the same requests individually.

  2. Aliasing / in-place mutation of the incoming proto. After current = candidate, the next statement current.LastSyncedTs = candidate.LastSyncedTs (and more importantly, on the alternate branch, current.LastSyncedTs = candidate.LastSyncedTs when current is the previously buffered watermark) mutates the *heartbeatpb.Watermark object the caller passed in or that a prior sender's req.Watermark still points at. That object may still be referenced by the messaging layer or by subsequent merges. Prefer allocating a fresh Watermark inside the buffer.

🛡️ Proposed fix — compute max(LastSyncedTs) explicitly and avoid mutating either input
 func pickWatermark(current, candidate *heartbeatpb.Watermark) *heartbeatpb.Watermark {
 	if candidate == nil {
 		return current
 	}
 	if current == nil {
-		return candidate
+		c := *candidate
+		return &c
 	}
-	if candidate.Seq > current.Seq || (candidate.Seq == current.Seq && candidate.CheckpointTs >= current.CheckpointTs) {
-		current = candidate
-	}
-	if candidate.LastSyncedTs > current.LastSyncedTs {
-		current.LastSyncedTs = candidate.LastSyncedTs
-	}
-	return current
+	maxLastSynced := current.LastSyncedTs
+	if candidate.LastSyncedTs > maxLastSynced {
+		maxLastSynced = candidate.LastSyncedTs
+	}
+	chosen := current
+	if candidate.Seq > current.Seq ||
+		(candidate.Seq == current.Seq && candidate.CheckpointTs >= current.CheckpointTs) {
+		chosen = candidate
+	}
+	out := *chosen
+	out.LastSyncedTs = maxLastSynced
+	return &out
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func pickWatermark(current, candidate *heartbeatpb.Watermark) *heartbeatpb.Watermark {
if candidate == nil {
return current
}
if current == nil {
return candidate
}
if candidate.Seq > current.Seq || (candidate.Seq == current.Seq && candidate.CheckpointTs >= current.CheckpointTs) {
current = candidate
}
if candidate.LastSyncedTs > current.LastSyncedTs {
current.LastSyncedTs = candidate.LastSyncedTs
}
return current
}
func pickWatermark(current, candidate *heartbeatpb.Watermark) *heartbeatpb.Watermark {
if candidate == nil {
return current
}
if current == nil {
c := *candidate
return &c
}
maxLastSynced := current.LastSyncedTs
if candidate.LastSyncedTs > maxLastSynced {
maxLastSynced = candidate.LastSyncedTs
}
chosen := current
if candidate.Seq > current.Seq ||
(candidate.Seq == current.Seq && candidate.CheckpointTs >= current.CheckpointTs) {
chosen = candidate
}
out := *chosen
out.LastSyncedTs = maxLastSynced
return &out
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@maintainer/status_request_buffer.go` around lines 285 - 299, pickWatermark
currently mutates the incoming *heartbeatpb.Watermark objects and can drop a
higher LastSyncedTs when the candidate wins by Seq; change it to allocate and
return a new heartbeatpb.Watermark instead of reusing current or candidate,
compute Seq/CheckpointTs selection as before (choose the candidate when
candidate.Seq>current.Seq or equal Seq and
candidate.CheckpointTs>=current.CheckpointTs) but always set LastSyncedTs to the
max(current.LastSyncedTs, candidate.LastSyncedTs) without mutating either input,
and ensure nil handling for current/candidate still returns a freshly allocated
Watermark with the appropriate fields.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

do-not-merge/needs-linked-issue release-note Denotes a PR that will be considered when it comes time to generate release notes. size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants