Skip to content

dispatcher,dispatchermanager: deduplicate pending done statuses#4814

Open
hongyunyan wants to merge 2 commits intopingcap:masterfrom
hongyunyan:codex/done-dedupe-v1
Open

dispatcher,dispatchermanager: deduplicate pending done statuses#4814
hongyunyan wants to merge 2 commits intopingcap:masterfrom
hongyunyan:codex/done-dedupe-v1

Conversation

@hongyunyan
Copy link
Copy Markdown
Collaborator

@hongyunyan hongyunyan commented Apr 13, 2026

What problem does this PR solve?

Issue Number: ref #0

In large table-count DDL scenarios, maintainer responses can cause repeated local DONE statuses to accumulate before the dispatcher side drains them. The previous implementation materialized a fresh protobuf object for every repeated DONE and queued all of them, which amplified memory usage with a very large number of identical small objects.

What is changed and how it works?

Background:

  • Maintainer and dispatcher DONE semantics remain unchanged.
  • This change only reduces local pending-memory amplification on the dispatcher side.

Motivation:

  • DB/ALL block handling can fan out the same DONE result to many dispatchers.
  • Repeated resends previously created duplicate pending DONE protobuf objects and duplicate queue entries.

Summary:

  • Add BlockStatusBuffer between dispatchers and dispatcher manager to keep ordering while coalescing identical pending DONE statuses.
  • Delay DONE protobuf materialization until the dispatcher manager drains the local buffer.
  • Route dispatcher resend tasks through the same buffer so pending identical DONE statuses are coalesced consistently.
  • Add queue-level dedupe in BlockStatusRequestQueue so identical DONE statuses that are already queued or in flight are not enqueued again.
  • Add focused tests for identical DONE dedupe, WAITING pass-through, and key separation by mode and isSyncPoint.

Check List

Tests

  • Unit test

    • go test ./downstreamadapter/dispatchermanager
    • `go test ./downstreamadapter/dispatcher -run 'Test(BlockStatusBuffer|BlockingDDLFlushBeforeWaitingAndWriteDoesNotFlushAgain|HoldBlockEventUntilNoResendTasks|HandleEventsRejectActiveActiveTableWhenDisabled|HandleEventsRejectSoftDeleteTableWhenDisabled|HandleEventsIgnoreSpecialTableOnNonMySQLSink)$'

Questions

Will it cause performance regression or break compatibility?

No protocol or semantic change is introduced. The change only coalesces identical pending DONE statuses locally and delays protobuf materialization until drain time. This is intended to reduce memory pressure and queue amplification on the hot path.

Do you need to update user documentation, design documentation or monitoring documentation?

No.

Release note

None

Summary by CodeRabbit

  • Refactor

    • Restructured internal block status communication mechanism for improved efficiency and deduplication handling.
    • Enhanced DONE block status message deduplication to prevent redundant duplicate messages.
  • Bug Fixes

    • Optimized pass-action resend timing to reduce unnecessary resends after status updates, improving system responsiveness.

@ti-chi-bot ti-chi-bot Bot added do-not-merge/needs-linked-issue release-note-none Denotes a PR that doesn't merit a release note. labels Apr 13, 2026
@ti-chi-bot
Copy link
Copy Markdown

ti-chi-bot Bot commented Apr 13, 2026

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:
Once this PR has been reviewed and has the lgtm label, please assign wlwilliamx for approval. For more information see the Code Review Process.
Please ensure that each of them provides their approval before proceeding.

The full list of commands accepted by this bot can be found here.

Details Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Apr 13, 2026

📝 Walkthrough

Walkthrough

The PR refactors block status reporting from raw Go channels to a new bounded BlockStatusBuffer abstraction with built-in deduplication for DONE statuses. The dispatcher interface is updated to replace GetBlockStatusesChan() with OfferBlockStatus(), all callsites are updated accordingly, and the dispatcher manager's collection loop is refactored to use context-aware takeout methods. Additionally, queue-level deduplication for in-flight DONE entries and barrier event fanout-pass timing improvements are introduced.

Changes

Cohort / File(s) Summary
Dispatcher Interface & Core API
downstreamadapter/dispatcher/basic_dispatcher.go
Replaced GetBlockStatusesChan() with OfferBlockStatus(status) method; updated all internal send paths (HandleDispatcherStatus, reportBlockedEventDone, DealWithBlockEvent, reportBlockedEventToMaintainer) to use the new offer API instead of direct channel sends.
Block Status Buffer Implementation
downstreamadapter/dispatcher/block_status_buffer.go
New bounded in-memory queue for TableSpanBlockStatus with deduplication of DONE entries by (dispatcherID, blockTs, mode, isSyncPoint). Provides Offer, OfferDone, Take(ctx), TryTake(), and Len() methods; DONE statuses are materialized only when consumed.
SharedInfo & Dispatcher Methods
downstreamadapter/dispatcher/basic_dispatcher_info.go
Replaced blockStatusesChan field with blockStatusBuffer *BlockStatusBuffer; changed NewSharedInfo to accept blockStatusBufferSize int instead of channel; added dispatcher methods OfferBlockStatus, OfferDoneBlockStatus, TakeBlockStatus(ctx), TakeBlockStatusWithTimeout(timeout) and corresponding SharedInfo methods.
Dispatcher Manager Collection Loop
downstreamadapter/dispatchermanager/dispatcher_manager.go
Refactored collectBlockStatusRequest to continuously pull via TakeBlockStatus(ctx) with short-timeout batching; removed time.Timer pattern; updated metric source to use BlockStatusLen(); changed NewSharedInfo call to pass 1024*1024 capacity value instead of channel.
Block Status Request Queue Deduplication
downstreamadapter/dispatchermanager/heartbeat_queue.go
Added deduplication tracking for DONE statuses in flight: trackPendingDone filters duplicates during enqueue, markInFlight marks as in-flight on dequeue, OnSendComplete clears in-flight state; uses requestDoneKeys, queuedDone, inFlightDone maps with mutex protection.
Test Infrastructure Updates
downstreamadapter/dispatcher/basic_dispatcher_active_active_test.go, downstreamadapter/dispatcher/event_dispatcher_test.go, downstreamadapter/dispatcher/redo_dispatcher_test.go, downstreamadapter/dispatchermanager/dispatcher_manager_test.go
Updated test helpers to pass buffer capacity integers (1, 128, 1024\*1024) instead of channel allocations to NewSharedInfo; replaced GetBlockStatusesChan() reads with TakeBlockStatusWithTimeout() in event_dispatcher_test.go.
Block Status Buffer & Queue Tests
downstreamadapter/dispatcher/block_status_buffer_test.go, downstreamadapter/dispatchermanager/heartbeat_queue_test.go
Added comprehensive tests for buffer deduplication (pending DONE coalescing, distinct keys for varying isSyncPoint/mode), ordering retention, and queue deduplication across queued/in-flight states with send-complete lifecycle.
Resend Tasks & Heartbeat Collection
downstreamadapter/dispatcher/helper.go, downstreamadapter/dispatchermanager/heartbeat_collector.go
Updated ResendTask.Execute() to call dispatcher.OfferBlockStatus() instead of direct channel send; added OnSendComplete(blockStatusRequestWithTargetID) callback after SendCommand in heartbeat collector.
Barrier Event Fanout-Pass Timing
maintainer/barrier_event.go, maintainer/barrier_event_test.go
Added lastStatusReceivedTime, passActionSent state tracking; introduced 1-second quiet window for fanout pass-action resends after status receipt (via markStatusReceived()); optimized resend timing to capture single time.Now() and defer lastResendTime update to actual send points; added isFanoutPassAction() helper and corresponding test cases.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

Suggested labels

design/refactoring, component/dispatcher

Suggested reviewers

  • wk989898
  • lidezhu
  • flowbehappy

Poem

🐰 A buffer blooms where channels grew,
With dedup magic, old threads anew.
Fanout pass pauses, heartbeats align,
The dispatcher's dance, now truly divine!

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 8.33% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The PR title 'dispatcher,dispatchermanager: deduplicate pending done statuses' accurately and concisely summarizes the main change: adding deduplication logic for DONE statuses across the dispatcher and dispatcher manager components.
Description check ✅ Passed The PR description includes all required sections: problem statement (issue reference, memory amplification), summary of changes (BlockStatusBuffer, delayed materialization, queue-level dedupe), test coverage (unit tests specified), and answers to compatibility questions (no breaking changes).

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
⚔️ Resolve merge conflicts
  • Resolve merge conflict in branch codex/done-dedupe-v1

Warning

There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure.

🔧 golangci-lint (2.11.4)

Command failed


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@ti-chi-bot ti-chi-bot Bot added the size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files. label Apr 13, 2026
Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a BlockStatusBuffer and updates the BlockStatusRequestQueue to coalesce and deduplicate identical DONE block statuses. These changes are designed to reduce local memory amplification and improve the efficiency of reporting dispatcher statuses to the maintainer by replacing direct channel communication with a structured buffering and batching mechanism. I have no feedback to provide as no review comments were submitted.

@ti-chi-bot
Copy link
Copy Markdown

ti-chi-bot Bot commented Apr 20, 2026

[FORMAT CHECKER NOTIFICATION]

Notice: To remove the do-not-merge/needs-linked-issue label, please provide the linked issue number on one line in the PR body, for example: Issue Number: close #123 or Issue Number: ref #456.

📖 For more info, you can check the "Contribute Code" section in the development guide.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
downstreamadapter/dispatchermanager/heartbeat_collector.go (1)

245-253: ⚠️ Potential issue | 🟠 Major

Only mark block-status sends complete after SendCommand succeeds.

OnSendComplete clears the queue’s in-flight DONE tracking. Calling it before checking err treats failed sends as completed and can reopen the same DONE key for duplicate enqueueing.

🐛 Proposed fix
 			err := c.mc.SendCommand(
 				messaging.NewSingleTargetMessage(
 					blockStatusRequestWithTargetID.TargetID,
 					messaging.MaintainerManagerTopic,
 					blockStatusRequestWithTargetID.Request,
 				))
-			c.blockStatusReqQueue.OnSendComplete(blockStatusRequestWithTargetID)
 			if err != nil {
 				log.Error("failed to send block status request message", zap.Error(err))
+				continue
 			}
+			c.blockStatusReqQueue.OnSendComplete(blockStatusRequestWithTargetID)

If failed sends should be retried/released, add a distinct failure path instead of using the success-completion hook.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@downstreamadapter/dispatchermanager/heartbeat_collector.go` around lines 245
- 253, The code calls blockStatusReqQueue.OnSendComplete before checking the
SendCommand error, which marks the request done even on failure; move the call
to blockStatusReqQueue.OnSendComplete to the success path (after
c.mc.SendCommand returns nil) and add an explicit failure path that does not
call OnSendComplete (and optionally logs and/or requeues/releases the
blockStatusRequestWithTargetID) so failed sends remain in-flight for retry;
update the logic around c.mc.SendCommand, blockStatusReqQueue.OnSendComplete,
and error handling for blockStatusRequestWithTargetID.TargetID /
blockStatusRequestWithTargetID.Request accordingly.
🧹 Nitpick comments (3)
maintainer/barrier_event.go (1)

113-114: Nit: lastStatusReceivedTime initialized to time.Now() implies a status was just received.

Semantically, this field represents the last time a dispatcher reported progress for this barrier; seeding it with time.Now() at construction (before any dispatcher has reported) is a small white lie. It doesn't cause incorrect behavior today because passActionSent starts false, so the first pass-action branch entry always sends and the quiet-window check is bypassed. Still, initializing to the zero value (or skipping assignment) would match the intent and avoid accidental suppression if passActionSent ever gets flipped via some future path before a real status arrives.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@maintainer/barrier_event.go` around lines 113 - 114, The struct field
lastStatusReceivedTime should not be initialized to time.Now() in the
constructor; instead set it to the zero time (or omit the assignment) so it
accurately represents "no status received yet"—update the initialization in
barrier_event.go where lastStatusReceivedTime and lastWarningLogTime are set
(referencing lastStatusReceivedTime and passActionSent logic) to leave
lastStatusReceivedTime at its zero value while leaving lastWarningLogTime as
appropriate, ensuring existing passActionSent handling still triggers the first
pass action.
downstreamadapter/dispatcher/block_status_buffer.go (1)

59-150: Back-pressure and dedupe-window semantics worth documenting.

Two related subtleties that are correct as-is but should be understood by future readers:

  1. Offer blocks on a full queue while holding a reserved key. reserveDone at Line 74/92 marks the key in pendingDone before the blocking b.queue <- ... send. If the dispatcher manager stops draining (shutdown, stall), every subsequent OfferDone for the same key will silently be dropped (since reserveDone returns false), while the original offerer is blocked on the channel send. That's acceptable because the dropped entries would be redundant with the one that's already queued, but it does mean liveness of DONE reporting is tied to the manager's TakeBlockStatus loop.

  2. Narrow race between dequeue and pendingDone delete. In materialize (Line 130-138) the key is removed from pendingDone after the entry is pulled off the channel. During that tiny window, a concurrent identical OfferDone will see the key still reserved and be dropped. This is fine (the taken entry will be delivered downstream), but strictly speaking it means "identical DONE after the previous has already been dequeued" can be lost if timed exactly within that window. The downstream consumer (maintainer) is idempotent w.r.t. duplicate DONE acks, so this doesn't cause incorrect behavior — a short comment here would help the next reader.

  3. Len() does not include in-flight or pending. The metric wired up in collectBlockStatusRequest uses b.Len() = len(channel), which excludes the one slot consumed between <-b.queue and materialize. Fine for queue-depth monitoring, just flagging in case anyone correlates this with pendingDone size later.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@downstreamadapter/dispatcher/block_status_buffer.go` around lines 59 - 150,
Add short clarifying comments documenting the back-pressure and dedupe-window
semantics: in Offer/OfferDone (around where reserveDone is called) note that
reserveDone marks pendingDone before the potentially-blocking send so offers for
the same key will be dropped while the original sender blocks; in materialize
(where delete(b.pendingDone, key) occurs) document the narrow race where an
identical OfferDone arriving between channel receive and the pendingDone
deletion will be dropped and why that is acceptable; and next to Len() note that
it returns only the channel length and does not include the slot consumed
in-flight or entries tracked in pendingDone. Reference the methods
BlockStatusBuffer.Offer/OfferDone, reserveDone, materialize, and Len in these
comments.
downstreamadapter/dispatcher/basic_dispatcher_info.go (1)

282-290: Nit: the ok bool return in TakeBlockStatusWithTimeout is redundant with status != nil.

BlockStatusBuffer.Take only returns nil on ctx.Done(), so status == nil and ok == false are equivalent here. Either drop the bool and have callers check nil, or document the intent (e.g., "ok=false on timeout, reserved for future non-nil sentinel values"). Minor; purely for API clarity.

Proposed simplification
-func (s *SharedInfo) TakeBlockStatusWithTimeout(timeout time.Duration) (*heartbeatpb.TableSpanBlockStatus, bool) {
-	ctx, cancel := context.WithTimeout(context.Background(), timeout)
-	defer cancel()
-	status := s.TakeBlockStatus(ctx)
-	if status == nil {
-		return nil, false
-	}
-	return status, true
-}
+func (s *SharedInfo) TakeBlockStatusWithTimeout(timeout time.Duration) *heartbeatpb.TableSpanBlockStatus {
+	ctx, cancel := context.WithTimeout(context.Background(), timeout)
+	defer cancel()
+	return s.TakeBlockStatus(ctx)
+}

Note: this changes the public signature — would need to update test helpers (event_dispatcher_test.go, basic_dispatcher_active_active_test.go) that consume the bool return.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@downstreamadapter/dispatcher/basic_dispatcher_info.go` around lines 282 -
290, The method TakeBlockStatusWithTimeout currently returns
(*heartbeatpb.TableSpanBlockStatus, bool) where the bool is redundant because
BlockStatusBuffer.Take only returns nil on ctx.Done(); change
TakeBlockStatusWithTimeout to return only (*heartbeatpb.TableSpanBlockStatus)
(return nil on timeout) by removing the bool, update its implementation (remove
the ok handling) and then update all callers and test helpers that expect the
two-value signature (notably event_dispatcher_test.go and
basic_dispatcher_active_active_test.go) to check for nil instead of checking the
bool; ensure any exported documentation/comments reflect that nil means
timeout/closed.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@downstreamadapter/dispatchermanager/dispatcher_manager.go`:
- Around line 575-613: The first-taken status returned by
e.sharedInfo.TakeBlockStatus(ctx) can be dropped if ctx is cancelled during the
10ms batching window because the code checks ctx.Err() and returns before
calling enqueueBlockStatus; fix by moving the enqueue logic ahead of the
ctx.Err() check so you always flush blockStatusMessage and
redoBlockStatusMessage (call enqueueBlockStatus for common.DefaultMode and
common.RedoMode when their slices are non-empty) before returning on parent
context cancellation; keep the batching loop and metric update
(e.metricBlockStatusesChanLen.Set(e.sharedInfo.BlockStatusLen())) as-is but
ensure enqueueBlockStatus is invoked prior to checking ctx.Err() after cancel(),
referencing TakeBlockStatus, enqueueBlockStatus, blockStatusMessage,
redoBlockStatusMessage, ctx, and batchCtx.

In `@maintainer/barrier_event.go`:
- Around line 693-710: The quiet-window check uses fanoutPassResendQuietInterval
which equals the hard resend gate (1s), making suppression ineffective; also
passActionSent is set before sendPassAction(mode) completes so it may be flipped
even when no action is actually sent (e.g., InfluenceType_DB empty span). Fix by
making fanoutPassResendQuietInterval strictly larger than the resend gate or
change the suppression check to compare now.Sub(be.lastResendTime) against
fanoutPassResendQuietInterval consistently, and move the be.passActionSent =
true assignment to after sendPassAction(mode) only when sendPassAction reports
an actual send (i.e., only mark passActionSent when sendPassAction indicates
success/that a message was emitted). Ensure references:
fanoutPassResendQuietInterval, isFanoutPassAction(), passActionSent,
lastResendTime, sendPassAction(mode).

---

Outside diff comments:
In `@downstreamadapter/dispatchermanager/heartbeat_collector.go`:
- Around line 245-253: The code calls blockStatusReqQueue.OnSendComplete before
checking the SendCommand error, which marks the request done even on failure;
move the call to blockStatusReqQueue.OnSendComplete to the success path (after
c.mc.SendCommand returns nil) and add an explicit failure path that does not
call OnSendComplete (and optionally logs and/or requeues/releases the
blockStatusRequestWithTargetID) so failed sends remain in-flight for retry;
update the logic around c.mc.SendCommand, blockStatusReqQueue.OnSendComplete,
and error handling for blockStatusRequestWithTargetID.TargetID /
blockStatusRequestWithTargetID.Request accordingly.

---

Nitpick comments:
In `@downstreamadapter/dispatcher/basic_dispatcher_info.go`:
- Around line 282-290: The method TakeBlockStatusWithTimeout currently returns
(*heartbeatpb.TableSpanBlockStatus, bool) where the bool is redundant because
BlockStatusBuffer.Take only returns nil on ctx.Done(); change
TakeBlockStatusWithTimeout to return only (*heartbeatpb.TableSpanBlockStatus)
(return nil on timeout) by removing the bool, update its implementation (remove
the ok handling) and then update all callers and test helpers that expect the
two-value signature (notably event_dispatcher_test.go and
basic_dispatcher_active_active_test.go) to check for nil instead of checking the
bool; ensure any exported documentation/comments reflect that nil means
timeout/closed.

In `@downstreamadapter/dispatcher/block_status_buffer.go`:
- Around line 59-150: Add short clarifying comments documenting the
back-pressure and dedupe-window semantics: in Offer/OfferDone (around where
reserveDone is called) note that reserveDone marks pendingDone before the
potentially-blocking send so offers for the same key will be dropped while the
original sender blocks; in materialize (where delete(b.pendingDone, key) occurs)
document the narrow race where an identical OfferDone arriving between channel
receive and the pendingDone deletion will be dropped and why that is acceptable;
and next to Len() note that it returns only the channel length and does not
include the slot consumed in-flight or entries tracked in pendingDone. Reference
the methods BlockStatusBuffer.Offer/OfferDone, reserveDone, materialize, and Len
in these comments.

In `@maintainer/barrier_event.go`:
- Around line 113-114: The struct field lastStatusReceivedTime should not be
initialized to time.Now() in the constructor; instead set it to the zero time
(or omit the assignment) so it accurately represents "no status received
yet"—update the initialization in barrier_event.go where lastStatusReceivedTime
and lastWarningLogTime are set (referencing lastStatusReceivedTime and
passActionSent logic) to leave lastStatusReceivedTime at its zero value while
leaving lastWarningLogTime as appropriate, ensuring existing passActionSent
handling still triggers the first pass action.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 262cfc2f-5556-4b2b-aaa1-0c4402f5af4d

📥 Commits

Reviewing files that changed from the base of the PR and between 0a418b4 and 1f3f40a.

📒 Files selected for processing (15)
  • downstreamadapter/dispatcher/basic_dispatcher.go
  • downstreamadapter/dispatcher/basic_dispatcher_active_active_test.go
  • downstreamadapter/dispatcher/basic_dispatcher_info.go
  • downstreamadapter/dispatcher/block_status_buffer.go
  • downstreamadapter/dispatcher/block_status_buffer_test.go
  • downstreamadapter/dispatcher/event_dispatcher_test.go
  • downstreamadapter/dispatcher/helper.go
  • downstreamadapter/dispatcher/redo_dispatcher_test.go
  • downstreamadapter/dispatchermanager/dispatcher_manager.go
  • downstreamadapter/dispatchermanager/dispatcher_manager_test.go
  • downstreamadapter/dispatchermanager/heartbeat_collector.go
  • downstreamadapter/dispatchermanager/heartbeat_queue.go
  • downstreamadapter/dispatchermanager/heartbeat_queue_test.go
  • maintainer/barrier_event.go
  • maintainer/barrier_event_test.go

Comment on lines 575 to 613
for {
blockStatusMessage := make([]*heartbeatpb.TableSpanBlockStatus, 0)
redoBlockStatusMessage := make([]*heartbeatpb.TableSpanBlockStatus, 0)
select {
case <-ctx.Done():
blockStatus := e.sharedInfo.TakeBlockStatus(ctx)
if blockStatus == nil {
return
case blockStatus := <-e.sharedInfo.GetBlockStatusesChan():
}
if common.IsDefaultMode(blockStatus.Mode) {
blockStatusMessage = append(blockStatusMessage, blockStatus)
} else {
redoBlockStatusMessage = append(redoBlockStatusMessage, blockStatus)
}

batchCtx, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
for {
blockStatus = e.sharedInfo.TakeBlockStatus(batchCtx)
if blockStatus == nil {
break
}
if common.IsDefaultMode(blockStatus.Mode) {
blockStatusMessage = append(blockStatusMessage, blockStatus)
} else {
redoBlockStatusMessage = append(redoBlockStatusMessage, blockStatus)
}
delay.Reset(10 * time.Millisecond)
loop:
for {
select {
case blockStatus := <-e.sharedInfo.GetBlockStatusesChan():
if common.IsDefaultMode(blockStatus.Mode) {
blockStatusMessage = append(blockStatusMessage, blockStatus)
} else {
redoBlockStatusMessage = append(redoBlockStatusMessage, blockStatus)
}
case <-delay.C:
break loop
}
}
}
cancel()
if ctx.Err() != nil {
return
}

e.metricBlockStatusesChanLen.Set(float64(len(e.sharedInfo.GetBlockStatusesChan())))
if len(blockStatusMessage) != 0 {
enqueueBlockStatus(blockStatusMessage, common.DefaultMode)
}
if len(redoBlockStatusMessage) != 0 {
enqueueBlockStatus(redoBlockStatusMessage, common.RedoMode)
}
e.metricBlockStatusesChanLen.Set(float64(e.sharedInfo.BlockStatusLen()))
if len(blockStatusMessage) != 0 {
enqueueBlockStatus(blockStatusMessage, common.DefaultMode)
}
if len(redoBlockStatusMessage) != 0 {
enqueueBlockStatus(redoBlockStatusMessage, common.RedoMode)
}
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Minor: first-taken status is dropped if the parent context cancels during the 10ms batch window.

If ctx is canceled after the outer TakeBlockStatus(ctx) at Line 578 returns a non-nil status but before or during the batch loop, the if ctx.Err() != nil { return } at Line 601 returns without calling enqueueBlockStatus. The first status is effectively dropped. The old timer-based implementation had an equivalent drop window on shutdown, so this isn't a regression, but it's worth being aware of — if you want to be strict about not losing any status acks during graceful shutdown, you could perform the enqueueBlockStatus flush first and then check ctx.Err() for whether to continue the loop.

The rest of the refactor looks good: Take(batchCtx) returning nil correctly terminates the batch on both timeout and parent cancellation, and materialize only runs on successful dequeue so the dropped select branch just leaves the entry queued for the next iteration.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@downstreamadapter/dispatchermanager/dispatcher_manager.go` around lines 575 -
613, The first-taken status returned by e.sharedInfo.TakeBlockStatus(ctx) can be
dropped if ctx is cancelled during the 10ms batching window because the code
checks ctx.Err() and returns before calling enqueueBlockStatus; fix by moving
the enqueue logic ahead of the ctx.Err() check so you always flush
blockStatusMessage and redoBlockStatusMessage (call enqueueBlockStatus for
common.DefaultMode and common.RedoMode when their slices are non-empty) before
returning on parent context cancellation; keep the batching loop and metric
update (e.metricBlockStatusesChanLen.Set(e.sharedInfo.BlockStatusLen())) as-is
but ensure enqueueBlockStatus is invoked prior to checking ctx.Err() after
cancel(), referencing TakeBlockStatus, enqueueBlockStatus, blockStatusMessage,
redoBlockStatusMessage, ctx, and batchCtx.

Comment on lines +693 to +710
if be.passActionSent && be.isFanoutPassAction() &&
now.Sub(be.lastStatusReceivedTime) < fanoutPassResendQuietInterval {
return nil
}
be.passActionSent = true
be.lastResendTime = now
return be.sendPassAction(mode)
}
return msgs
}

func (be *BarrierEvent) isFanoutPassAction() bool {
if be.blockedDispatchers == nil {
return false
}
return be.blockedDispatchers.InfluenceType == heartbeatpb.InfluenceType_All ||
be.blockedDispatchers.InfluenceType == heartbeatpb.InfluenceType_DB
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🌐 Web query:

Go time.Now() monotonic clock semantics for repeated Sub comparisons

💡 Result:

In Go, time.Now returns a time.Time value containing both a wall clock reading (for telling time) and a monotonic clock reading (for measuring time intervals). The monotonic clock is monotonically non-decreasing within the process lifetime and is used automatically by comparison methods (After, Before, Equal, Compare) and Sub when both time.Time values have monotonic readings. For repeated Sub comparisons (e.g., t2.Sub(t1), t3.Sub(t2) where t1, t2, t3 are from time.Now): - If both operands have monotonic clock readings (true for time.Now values), Sub uses ONLY the monotonic readings, ignoring wall time. This ensures correct, positive durations even if the wall clock jumps backward due to NTP adjustments or manual changes. - Example: start := time.Now; ...; elapsed := time.Now.Sub(start) always yields a non-negative Duration approximating real elapsed time. - Caveats: Monotonic clock may pause during system sleep; absolute monotonic values are process-local and unexported; serialization (e.g., JSON) strips monotonic info; mixing time.Now with parsed/constructed times falls back to wall time. This behavior, introduced in Go 1.9, prevents bugs like negative durations from clock skew. Official docs confirm: "If Times t and u both contain monotonic clock readings, ... t.Sub(u) [uses] the monotonic clock readings alone" (pkg.go.dev/time). Code example: start := time.Now time.Sleep(20 * time.Millisecond) elapsed := time.Now.Sub(start) // Uses monotonic; always ~20ms even if wall clock adjusted fmt.Println(elapsed) // Positive duration

Citations:


🏁 Script executed:

# First, let me find the file and check its size
find . -name "barrier_event.go" -type f

Repository: pingcap/ticdc

Length of output: 86


🏁 Script executed:

# Check file size first
wc -l ./maintainer/barrier_event.go

Repository: pingcap/ticdc

Length of output: 90


🏁 Script executed:

# Examine key lines mentioned in the review
# First, let's look at line 35 and surrounding context
sed -n '30,40p' ./maintainer/barrier_event.go

# Line 113-114 (initialization)
sed -n '110,120p' ./maintainer/barrier_event.go

# Line 223 (passActionSent reset)
sed -n '220,230p' ./maintainer/barrier_event.go

# Line 298-300 (markStatusReceived)
sed -n '295,305p' ./maintainer/barrier_event.go

# Line 597 (outer resend gate)
sed -n '594,604p' ./maintainer/barrier_event.go

# Lines 693-710 (code snippet)
sed -n '690,715p' ./maintainer/barrier_event.go

Repository: pingcap/ticdc

Length of output: 2845


🏁 Script executed:

# Search for where markStatusReceived is called
rg "markStatusReceived" ./maintainer/barrier_event.go -A 2 -B 2

# Search for markDispatcherEventDone to verify the claim
rg "markDispatcherEventDone" ./maintainer/barrier_event.go -A 5 -B 2

# Also check where NewBarrierEvent is defined to confirm initialization
rg "func NewBarrierEvent" ./maintainer/barrier_event.go -A 20

Repository: pingcap/ticdc

Length of output: 575


🏁 Script executed:

# Search for NewBarrierEvent constructor
rg "func NewBarrierEvent" ./maintainer/barrier_event.go -A 30

Repository: pingcap/ticdc

Length of output: 39


🏁 Script executed:

# Try to find the constructor with different patterns
rg "NewBarrierEvent|BarrierEvent{" ./maintainer/barrier_event.go -A 25

# Or look for struct initialization
rg "lastStatusReceivedTime.*time.Now" ./maintainer/barrier_event.go -B 5 -A 2

Repository: pingcap/ticdc

Length of output: 618


🏁 Script executed:

# Look at sendPassAction implementation
rg "func \(be \*BarrierEvent\) sendPassAction" ./maintainer/barrier_event.go -A 30

Repository: pingcap/ticdc

Length of output: 1081


Fanout quiet interval timing: equal to the outer resend gate, limiting practical suppression effect.

fanoutPassResendQuietInterval at line 35 is 1s, identical to the minimum interval enforced at line 597 (now.Sub(be.lastResendTime) < time.Second). When the quiet window check runs at line 695, the constraint now.Sub(be.lastResendTime) >= 1s is guaranteed, so suppression only fires when a status update arrived strictly between the last resend and now. This works correctly but means the quiet window's benefit is limited—any batching delay in the dispatcher manager will make this check a no-op in practice, since both gates are synchronized to the same 1s cadence.

Additionally, at line 697, be.passActionSent = true is assigned before be.sendPassAction(mode) is called. For InfluenceType_DB with an empty span set (which invokes rangeChecker.MarkCovered() and returns nil), the flag is still flipped. This is harmless because the event is considered covered and the resend loop will exit via higher-level lifecycle, but worth tracking in case future code paths need to reset this flag to re-send.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@maintainer/barrier_event.go` around lines 693 - 710, The quiet-window check
uses fanoutPassResendQuietInterval which equals the hard resend gate (1s),
making suppression ineffective; also passActionSent is set before
sendPassAction(mode) completes so it may be flipped even when no action is
actually sent (e.g., InfluenceType_DB empty span). Fix by making
fanoutPassResendQuietInterval strictly larger than the resend gate or change the
suppression check to compare now.Sub(be.lastResendTime) against
fanoutPassResendQuietInterval consistently, and move the be.passActionSent =
true assignment to after sendPassAction(mode) only when sendPassAction reports
an actual send (i.e., only mark passActionSent when sendPassAction indicates
success/that a message was emitted). Ensure references:
fanoutPassResendQuietInterval, isFanoutPassAction(), passActionSent,
lastResendTime, sendPassAction(mode).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

do-not-merge/needs-linked-issue release-note-none Denotes a PR that doesn't merit a release note. size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant