Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 42 additions & 14 deletions downstreamadapter/dispatcher/basic_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,21 @@ import (
"go.uber.org/zap/zapcore"
)

const dispatcherWarnLogInterval = 10 * time.Second

func shouldLogDispatcherWarning(lastLogTime *atomic.Int64, interval time.Duration) bool {
now := time.Now().UnixNano()
for {
last := lastLogTime.Load()
if last != 0 && now-last < interval.Nanoseconds() {
return false
}
Comment on lines +44 to +46
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The duration check now - last < interval.Nanoseconds() using Unix nanoseconds is not robust against system clock adjustments. If the system clock is set backwards, the result of the subtraction can be negative, which will be less than the interval, causing logging to be suppressed until the wall clock catches up to the previously stored timestamp.

Consider using time.Since() or explicitly handling negative deltas to avoid silencing logs for extended periods during clock jumps.

if lastLogTime.CAS(last, now) {
return true
}
}
}
Comment on lines +40 to +51
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The logic for rate-limiting log messages is duplicated across multiple files in this PR (e.g., basic_dispatcher.go, dispatcher_manager.go, dispatcher_stat.go, subscription_client.go, maintainer.go, remote_target.go, and module_node_manager.go).

Furthermore, the implementation is inconsistent: some files use go.uber.org/atomic (calling .CAS()), while others use sync/atomic (calling .CompareAndSwap()).

Please refactor this into a single utility function in a common package (e.g., pkg/util) to improve maintainability and ensure consistency across the codebase.


// DispatcherService defines the interface for providing dispatcher information and basic event handling.
type DispatcherService interface {
GetId() common.DispatcherID
Expand Down Expand Up @@ -211,6 +226,11 @@ type BasicDispatcher struct {
// This field prevents a race condition where TryClose is called while events are being processed.
// In this corner case, `tableProgress` might be empty, which could lead to the dispatcher being removed prematurely.
duringHandleEvents atomic.Bool
// Limit repeated warnings on hot paths such as stale events, removing state, and errCh backpressure.
lastErrorChannelFullLogTime atomic.Int64
lastRemovingLogTime atomic.Int64
lastStaleEventLogTime atomic.Int64
lastUnexpectedEventLogTime atomic.Int64

seq uint64
mode int64
Expand Down Expand Up @@ -539,10 +559,12 @@ func (d *BasicDispatcher) HandleError(err error) {
select {
case d.sharedInfo.errCh <- err:
default:
log.Error("error channel is full, discard error",
zap.Stringer("changefeedID", d.sharedInfo.changefeedID),
zap.Stringer("dispatcherID", d.id),
zap.Error(err))
if shouldLogDispatcherWarning(&d.lastErrorChannelFullLogTime, dispatcherWarnLogInterval) {
log.Error("error channel is full, discard error",
zap.Stringer("changefeedID", d.sharedInfo.changefeedID),
zap.Stringer("dispatcherID", d.id),
zap.Error(err))
}
}
}

Expand All @@ -563,7 +585,9 @@ func (d *BasicDispatcher) HandleEvents(dispatcherEvents []DispatcherEvent, wakeC
// Return true if should block the dispatcher.
func (d *BasicDispatcher) handleEvents(dispatcherEvents []DispatcherEvent, wakeCallback func()) bool {
if d.GetRemovingStatus() {
log.Warn("dispatcher is removing", zap.Any("id", d.id))
if shouldLogDispatcherWarning(&d.lastRemovingLogTime, dispatcherWarnLogInterval) {
log.Warn("dispatcher is removing", zap.Any("id", d.id))
}
return true
}

Expand Down Expand Up @@ -592,12 +616,14 @@ func (d *BasicDispatcher) handleEvents(dispatcherEvents []DispatcherEvent, wakeC
event := dispatcherEvent.Event
// Pre-check, make sure the event is not stale
if event.GetCommitTs() < d.GetResolvedTs() {
log.Warn("Received a stale event, should ignore it",
zap.Uint64("dispatcherResolvedTs", d.GetResolvedTs()),
zap.Uint64("eventCommitTs", event.GetCommitTs()),
zap.Uint64("seq", event.GetSeq()),
zap.Int("eventType", event.GetType()),
zap.Stringer("dispatcher", d.id))
if shouldLogDispatcherWarning(&d.lastStaleEventLogTime, dispatcherWarnLogInterval) {
log.Warn("Received a stale event, should ignore it",
zap.Uint64("dispatcherResolvedTs", d.GetResolvedTs()),
zap.Uint64("eventCommitTs", event.GetCommitTs()),
zap.Uint64("seq", event.GetSeq()),
zap.Int("eventType", event.GetType()),
zap.Stringer("dispatcher", d.id))
}
continue
}

Expand Down Expand Up @@ -708,9 +734,11 @@ func (d *BasicDispatcher) handleEvents(dispatcherEvents []DispatcherEvent, wakeC
})
d.DealWithBlockEvent(syncPoint)
case commonEvent.TypeHandshakeEvent:
log.Warn("Receive handshake event unexpectedly",
zap.Stringer("dispatcher", d.id),
zap.Any("event", event))
if shouldLogDispatcherWarning(&d.lastUnexpectedEventLogTime, dispatcherWarnLogInterval) {
log.Warn("Receive handshake event unexpectedly",
zap.Stringer("dispatcher", d.id),
zap.Any("event", event))
}
default:
log.Panic("Unexpected event type",
zap.Int("eventType", event.GetType()),
Expand Down
17 changes: 14 additions & 3 deletions downstreamadapter/dispatcher/event_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,9 @@ func (d *EventDispatcher) cache(dispatcherEvents []DispatcherEvent, wakeCallback
d.cacheEvents.Lock()
defer d.cacheEvents.Unlock()
if d.GetRemovingStatus() {
log.Warn("dispatcher has removed", zap.Any("id", d.id))
if shouldLogDispatcherWarning(&d.lastRemovingLogTime, dispatcherWarnLogInterval) {
log.Warn("dispatcher has removed", zap.Any("id", d.id))
}
return
}
// Here we have to create a new event slice, because dispatcherEvents will be cleaned up in dynamic stream
Expand Down Expand Up @@ -174,14 +176,23 @@ func (d *EventDispatcher) EmitBootstrap() bool {
ts := d.GetStartTs()
schemaStore := appcontext.GetService[schemastore.SchemaStore](appcontext.SchemaStore)
currentTables := make([]*common.TableInfo, 0, len(tables))
lastBootstrapWarnLogTime := time.Time{}
logBootstrapWarning := func(msg string, fields ...zap.Field) {
now := time.Now()
if !lastBootstrapWarnLogTime.IsZero() && now.Sub(lastBootstrapWarnLogTime) < dispatcherWarnLogInterval {
return
}
lastBootstrapWarnLogTime = now
log.Warn(msg, fields...)
}
meta := common.KeyspaceMeta{
ID: d.tableSpan.KeyspaceID,
Name: d.sharedInfo.changefeedID.Keyspace(),
}
for _, table := range tables {
err := schemaStore.RegisterTable(meta, table, ts)
if err != nil {
log.Warn("register table to schemaStore failed",
logBootstrapWarning("register table to schemaStore failed",
zap.Int64("tableID", table),
zap.Uint64("startTs", ts),
zap.Error(err),
Expand All @@ -190,7 +201,7 @@ func (d *EventDispatcher) EmitBootstrap() bool {
}
tableInfo, err := schemaStore.GetTableInfo(meta, table, ts)
if err != nil {
log.Warn("get table info failed, just ignore",
logBootstrapWarning("get table info failed, just ignore",
zap.Stringer("changefeed", d.sharedInfo.changefeedID),
zap.Error(err))
continue
Expand Down
40 changes: 32 additions & 8 deletions downstreamadapter/dispatchermanager/dispatcher_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,21 @@ import (
"go.uber.org/zap"
)

const dispatcherManagerWarnLogInterval = 10 * time.Second

func shouldLogDispatcherManagerWarning(lastLogTime *atomic.Int64, interval time.Duration) bool {
now := time.Now().UnixNano()
for {
last := lastLogTime.Load()
if last != 0 && now-last < interval.Nanoseconds() {
return false
}
if lastLogTime.CompareAndSwap(last, now) {
return true
}
}
}

/*
DispatcherManager manages dispatchers for a changefeed instance with responsibilities including:

Expand Down Expand Up @@ -136,6 +151,11 @@ type DispatcherManager struct {
cancel context.CancelFunc
wg sync.WaitGroup

lastErrorChannelFullLogTime atomic.Int64
lastCollectErrLogTime atomic.Int64
lastRedoMetaErrLogTime atomic.Int64
lastRedoMetaInvariantLogTime atomic.Int64

// removeTaskHandles stores the task handles for async dispatcher removal
// map[common.DispatcherID]*threadpool.TaskHandle
removeTaskHandles sync.Map
Expand Down Expand Up @@ -514,10 +534,12 @@ func (e *DispatcherManager) handleError(ctx context.Context, err error) {
return
case e.sharedInfo.GetErrCh() <- err:
default:
log.Error("error channel is full, discard error",
zap.Stringer("changefeedID", e.changefeedID),
zap.Error(err),
)
if shouldLogDispatcherManagerWarning(&e.lastErrorChannelFullLogTime, dispatcherManagerWarnLogInterval) {
log.Error("error channel is full, discard error",
zap.Stringer("changefeedID", e.changefeedID),
zap.Error(err),
)
}
}
}
}
Expand All @@ -530,10 +552,12 @@ func (e *DispatcherManager) collectErrors(ctx context.Context) {
return
case err := <-e.sharedInfo.GetErrCh():
if !errors.Is(errors.Cause(err), context.Canceled) {
log.Error("Event Dispatcher Manager Meets Error",
zap.Stringer("changefeedID", e.changefeedID),
zap.Error(err),
)
if shouldLogDispatcherManagerWarning(&e.lastCollectErrLogTime, dispatcherManagerWarnLogInterval) {
log.Error("Event Dispatcher Manager Meets Error",
zap.Stringer("changefeedID", e.changefeedID),
zap.Error(err),
)
}

// report error to maintainer
var message heartbeatpb.HeartBeatRequest
Expand Down
12 changes: 9 additions & 3 deletions downstreamadapter/dispatchermanager/dispatcher_manager_redo.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,9 @@ func (e *DispatcherManager) UpdateRedoMeta(checkpointTs, resolvedTs uint64) {
// only update meta on the one node
d := e.GetTableTriggerRedoDispatcher()
if d == nil {
log.Warn("should not reach here. only update redo meta on the tableTriggerRedoDispatcher")
if shouldLogDispatcherManagerWarning(&e.lastRedoMetaInvariantLogTime, dispatcherManagerWarnLogInterval) {
log.Warn("should not reach here. only update redo meta on the tableTriggerRedoDispatcher")
}
return
}
d.UpdateMeta(checkpointTs, resolvedTs)
Expand All @@ -309,7 +311,9 @@ func (e *DispatcherManager) collectRedoMeta(ctx context.Context) error {
return ctx.Err()
case <-ticker.C:
if e.GetTableTriggerRedoDispatcher() == nil {
log.Error("should not reach here. only collect redo meta on the tableTriggerRedoDispatcher")
if shouldLogDispatcherManagerWarning(&e.lastRedoMetaInvariantLogTime, dispatcherManagerWarnLogInterval) {
log.Error("should not reach here. only collect redo meta on the tableTriggerRedoDispatcher")
}
continue
}
logMeta := e.GetTableTriggerRedoDispatcher().GetFlushedMeta()
Expand All @@ -326,7 +330,9 @@ func (e *DispatcherManager) collectRedoMeta(ctx context.Context) error {
},
))
if err != nil {
log.Error("failed to send redo request message", zap.Error(err))
if shouldLogDispatcherManagerWarning(&e.lastRedoMetaErrLogTime, dispatcherManagerWarnLogInterval) {
log.Error("failed to send redo request message", zap.Error(err))
}
}
preResolvedTs = logMeta.ResolvedTs
}
Expand Down
20 changes: 15 additions & 5 deletions downstreamadapter/dispatchermanager/heartbeat_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ type HeartBeatCollector struct {
wg sync.WaitGroup
cancel context.CancelFunc
isClosed atomic.Bool

lastHeartbeatReqErrLogTime atomic.Int64
lastBlockStatusErrLogTime atomic.Int64
lastUnknownMessageLogTime atomic.Int64
}

func NewHeartBeatCollector(serverId node.ID) *HeartBeatCollector {
Expand Down Expand Up @@ -225,7 +229,9 @@ func (c *HeartBeatCollector) sendHeartBeatMessages(ctx context.Context) error {
heartBeatRequestWithTargetID.Request,
))
if err != nil {
log.Error("failed to send heartbeat request message", zap.Error(err))
if shouldLogDispatcherManagerWarning(&c.lastHeartbeatReqErrLogTime, dispatcherManagerWarnLogInterval) {
log.Error("failed to send heartbeat request message", zap.Error(err))
}
}
}
}
Expand All @@ -249,7 +255,9 @@ func (c *HeartBeatCollector) sendBlockStatusMessages(ctx context.Context) error
blockStatusRequestWithTargetID.Request,
))
if err != nil {
log.Error("failed to send block status request message", zap.Error(err))
if shouldLogDispatcherManagerWarning(&c.lastBlockStatusErrLogTime, dispatcherManagerWarnLogInterval) {
log.Error("failed to send block status request message", zap.Error(err))
}
}
}
}
Expand Down Expand Up @@ -291,9 +299,11 @@ func (c *HeartBeatCollector) RecvMessages(_ context.Context, msg *messaging.Targ
common.NewChangefeedGIDFromPB(mergeDispatcherRequest.ChangefeedID),
NewMergeDispatcherRequest(mergeDispatcherRequest))
default:
log.Warn("unknown message type, ignore it",
zap.String("type", msg.Type.String()),
zap.Any("message", msg.Message))
if shouldLogDispatcherManagerWarning(&c.lastUnknownMessageLogTime, dispatcherManagerWarnLogInterval) {
log.Warn("unknown message type, ignore it",
zap.String("type", msg.Type.String()),
zap.Any("message", msg.Message))
}
}
return nil
}
Expand Down
Loading
Loading