-
Notifications
You must be signed in to change notification settings - Fork 49
*: reduce log print frequency #4701
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -35,6 +35,21 @@ import ( | |
| "go.uber.org/zap/zapcore" | ||
| ) | ||
|
|
||
| const dispatcherWarnLogInterval = 10 * time.Second | ||
|
|
||
| func shouldLogDispatcherWarning(lastLogTime *atomic.Int64, interval time.Duration) bool { | ||
| now := time.Now().UnixNano() | ||
| for { | ||
| last := lastLogTime.Load() | ||
| if last != 0 && now-last < interval.Nanoseconds() { | ||
| return false | ||
| } | ||
| if lastLogTime.CAS(last, now) { | ||
| return true | ||
| } | ||
| } | ||
| } | ||
|
Comment on lines
+40
to
+51
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The logic for rate-limiting log messages is duplicated across multiple files in this PR (e.g., Furthermore, the implementation is inconsistent: some files use Please refactor this into a single utility function in a common package (e.g., |
||
|
|
||
| // DispatcherService defines the interface for providing dispatcher information and basic event handling. | ||
| type DispatcherService interface { | ||
| GetId() common.DispatcherID | ||
|
|
@@ -211,6 +226,11 @@ type BasicDispatcher struct { | |
| // This field prevents a race condition where TryClose is called while events are being processed. | ||
| // In this corner case, `tableProgress` might be empty, which could lead to the dispatcher being removed prematurely. | ||
| duringHandleEvents atomic.Bool | ||
| // Limit repeated warnings on hot paths such as stale events, removing state, and errCh backpressure. | ||
| lastErrorChannelFullLogTime atomic.Int64 | ||
| lastRemovingLogTime atomic.Int64 | ||
| lastStaleEventLogTime atomic.Int64 | ||
| lastUnexpectedEventLogTime atomic.Int64 | ||
|
|
||
| seq uint64 | ||
| mode int64 | ||
|
|
@@ -539,10 +559,12 @@ func (d *BasicDispatcher) HandleError(err error) { | |
| select { | ||
| case d.sharedInfo.errCh <- err: | ||
| default: | ||
| log.Error("error channel is full, discard error", | ||
| zap.Stringer("changefeedID", d.sharedInfo.changefeedID), | ||
| zap.Stringer("dispatcherID", d.id), | ||
| zap.Error(err)) | ||
| if shouldLogDispatcherWarning(&d.lastErrorChannelFullLogTime, dispatcherWarnLogInterval) { | ||
| log.Error("error channel is full, discard error", | ||
| zap.Stringer("changefeedID", d.sharedInfo.changefeedID), | ||
| zap.Stringer("dispatcherID", d.id), | ||
| zap.Error(err)) | ||
| } | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -563,7 +585,9 @@ func (d *BasicDispatcher) HandleEvents(dispatcherEvents []DispatcherEvent, wakeC | |
| // Return true if should block the dispatcher. | ||
| func (d *BasicDispatcher) handleEvents(dispatcherEvents []DispatcherEvent, wakeCallback func()) bool { | ||
| if d.GetRemovingStatus() { | ||
| log.Warn("dispatcher is removing", zap.Any("id", d.id)) | ||
| if shouldLogDispatcherWarning(&d.lastRemovingLogTime, dispatcherWarnLogInterval) { | ||
| log.Warn("dispatcher is removing", zap.Any("id", d.id)) | ||
| } | ||
| return true | ||
| } | ||
|
|
||
|
|
@@ -592,12 +616,14 @@ func (d *BasicDispatcher) handleEvents(dispatcherEvents []DispatcherEvent, wakeC | |
| event := dispatcherEvent.Event | ||
| // Pre-check, make sure the event is not stale | ||
| if event.GetCommitTs() < d.GetResolvedTs() { | ||
| log.Warn("Received a stale event, should ignore it", | ||
| zap.Uint64("dispatcherResolvedTs", d.GetResolvedTs()), | ||
| zap.Uint64("eventCommitTs", event.GetCommitTs()), | ||
| zap.Uint64("seq", event.GetSeq()), | ||
| zap.Int("eventType", event.GetType()), | ||
| zap.Stringer("dispatcher", d.id)) | ||
| if shouldLogDispatcherWarning(&d.lastStaleEventLogTime, dispatcherWarnLogInterval) { | ||
| log.Warn("Received a stale event, should ignore it", | ||
| zap.Uint64("dispatcherResolvedTs", d.GetResolvedTs()), | ||
| zap.Uint64("eventCommitTs", event.GetCommitTs()), | ||
| zap.Uint64("seq", event.GetSeq()), | ||
| zap.Int("eventType", event.GetType()), | ||
| zap.Stringer("dispatcher", d.id)) | ||
| } | ||
| continue | ||
| } | ||
|
|
||
|
|
@@ -708,9 +734,11 @@ func (d *BasicDispatcher) handleEvents(dispatcherEvents []DispatcherEvent, wakeC | |
| }) | ||
| d.DealWithBlockEvent(syncPoint) | ||
| case commonEvent.TypeHandshakeEvent: | ||
| log.Warn("Receive handshake event unexpectedly", | ||
| zap.Stringer("dispatcher", d.id), | ||
| zap.Any("event", event)) | ||
| if shouldLogDispatcherWarning(&d.lastUnexpectedEventLogTime, dispatcherWarnLogInterval) { | ||
| log.Warn("Receive handshake event unexpectedly", | ||
| zap.Stringer("dispatcher", d.id), | ||
| zap.Any("event", event)) | ||
| } | ||
| default: | ||
| log.Panic("Unexpected event type", | ||
| zap.Int("eventType", event.GetType()), | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The duration check
now - last < interval.Nanoseconds()using Unix nanoseconds is not robust against system clock adjustments. If the system clock is set backwards, the result of the subtraction can be negative, which will be less than the interval, causing logging to be suppressed until the wall clock catches up to the previously stored timestamp.Consider using
time.Since()or explicitly handling negative deltas to avoid silencing logs for extended periods during clock jumps.