diff --git a/downstreamadapter/dispatcher/basic_dispatcher.go b/downstreamadapter/dispatcher/basic_dispatcher.go index 0c31275a4c..dfed2d7405 100644 --- a/downstreamadapter/dispatcher/basic_dispatcher.go +++ b/downstreamadapter/dispatcher/basic_dispatcher.go @@ -35,6 +35,21 @@ import ( "go.uber.org/zap/zapcore" ) +const dispatcherWarnLogInterval = 10 * time.Second + +func shouldLogDispatcherWarning(lastLogTime *atomic.Int64, interval time.Duration) bool { + now := time.Now().UnixNano() + for { + last := lastLogTime.Load() + if last != 0 && now-last < interval.Nanoseconds() { + return false + } + if lastLogTime.CAS(last, now) { + return true + } + } +} + // DispatcherService defines the interface for providing dispatcher information and basic event handling. type DispatcherService interface { GetId() common.DispatcherID @@ -211,6 +226,11 @@ type BasicDispatcher struct { // This field prevents a race condition where TryClose is called while events are being processed. // In this corner case, `tableProgress` might be empty, which could lead to the dispatcher being removed prematurely. duringHandleEvents atomic.Bool + // Limit repeated warnings on hot paths such as stale events, removing state, and errCh backpressure. + lastErrorChannelFullLogTime atomic.Int64 + lastRemovingLogTime atomic.Int64 + lastStaleEventLogTime atomic.Int64 + lastUnexpectedEventLogTime atomic.Int64 seq uint64 mode int64 @@ -539,10 +559,12 @@ func (d *BasicDispatcher) HandleError(err error) { select { case d.sharedInfo.errCh <- err: default: - log.Error("error channel is full, discard error", - zap.Stringer("changefeedID", d.sharedInfo.changefeedID), - zap.Stringer("dispatcherID", d.id), - zap.Error(err)) + if shouldLogDispatcherWarning(&d.lastErrorChannelFullLogTime, dispatcherWarnLogInterval) { + log.Error("error channel is full, discard error", + zap.Stringer("changefeedID", d.sharedInfo.changefeedID), + zap.Stringer("dispatcherID", d.id), + zap.Error(err)) + } } } @@ -563,7 +585,9 @@ func (d *BasicDispatcher) HandleEvents(dispatcherEvents []DispatcherEvent, wakeC // Return true if should block the dispatcher. func (d *BasicDispatcher) handleEvents(dispatcherEvents []DispatcherEvent, wakeCallback func()) bool { if d.GetRemovingStatus() { - log.Warn("dispatcher is removing", zap.Any("id", d.id)) + if shouldLogDispatcherWarning(&d.lastRemovingLogTime, dispatcherWarnLogInterval) { + log.Warn("dispatcher is removing", zap.Any("id", d.id)) + } return true } @@ -592,12 +616,14 @@ func (d *BasicDispatcher) handleEvents(dispatcherEvents []DispatcherEvent, wakeC event := dispatcherEvent.Event // Pre-check, make sure the event is not stale if event.GetCommitTs() < d.GetResolvedTs() { - log.Warn("Received a stale event, should ignore it", - zap.Uint64("dispatcherResolvedTs", d.GetResolvedTs()), - zap.Uint64("eventCommitTs", event.GetCommitTs()), - zap.Uint64("seq", event.GetSeq()), - zap.Int("eventType", event.GetType()), - zap.Stringer("dispatcher", d.id)) + if shouldLogDispatcherWarning(&d.lastStaleEventLogTime, dispatcherWarnLogInterval) { + log.Warn("Received a stale event, should ignore it", + zap.Uint64("dispatcherResolvedTs", d.GetResolvedTs()), + zap.Uint64("eventCommitTs", event.GetCommitTs()), + zap.Uint64("seq", event.GetSeq()), + zap.Int("eventType", event.GetType()), + zap.Stringer("dispatcher", d.id)) + } continue } @@ -708,9 +734,11 @@ func (d *BasicDispatcher) handleEvents(dispatcherEvents []DispatcherEvent, wakeC }) d.DealWithBlockEvent(syncPoint) case commonEvent.TypeHandshakeEvent: - log.Warn("Receive handshake event unexpectedly", - zap.Stringer("dispatcher", d.id), - zap.Any("event", event)) + if shouldLogDispatcherWarning(&d.lastUnexpectedEventLogTime, dispatcherWarnLogInterval) { + log.Warn("Receive handshake event unexpectedly", + zap.Stringer("dispatcher", d.id), + zap.Any("event", event)) + } default: log.Panic("Unexpected event type", zap.Int("eventType", event.GetType()), diff --git a/downstreamadapter/dispatcher/event_dispatcher.go b/downstreamadapter/dispatcher/event_dispatcher.go index 65464b919b..1c77ed3879 100644 --- a/downstreamadapter/dispatcher/event_dispatcher.go +++ b/downstreamadapter/dispatcher/event_dispatcher.go @@ -109,7 +109,9 @@ func (d *EventDispatcher) cache(dispatcherEvents []DispatcherEvent, wakeCallback d.cacheEvents.Lock() defer d.cacheEvents.Unlock() if d.GetRemovingStatus() { - log.Warn("dispatcher has removed", zap.Any("id", d.id)) + if shouldLogDispatcherWarning(&d.lastRemovingLogTime, dispatcherWarnLogInterval) { + log.Warn("dispatcher has removed", zap.Any("id", d.id)) + } return } // Here we have to create a new event slice, because dispatcherEvents will be cleaned up in dynamic stream @@ -174,6 +176,15 @@ func (d *EventDispatcher) EmitBootstrap() bool { ts := d.GetStartTs() schemaStore := appcontext.GetService[schemastore.SchemaStore](appcontext.SchemaStore) currentTables := make([]*common.TableInfo, 0, len(tables)) + lastBootstrapWarnLogTime := time.Time{} + logBootstrapWarning := func(msg string, fields ...zap.Field) { + now := time.Now() + if !lastBootstrapWarnLogTime.IsZero() && now.Sub(lastBootstrapWarnLogTime) < dispatcherWarnLogInterval { + return + } + lastBootstrapWarnLogTime = now + log.Warn(msg, fields...) + } meta := common.KeyspaceMeta{ ID: d.tableSpan.KeyspaceID, Name: d.sharedInfo.changefeedID.Keyspace(), @@ -181,7 +192,7 @@ func (d *EventDispatcher) EmitBootstrap() bool { for _, table := range tables { err := schemaStore.RegisterTable(meta, table, ts) if err != nil { - log.Warn("register table to schemaStore failed", + logBootstrapWarning("register table to schemaStore failed", zap.Int64("tableID", table), zap.Uint64("startTs", ts), zap.Error(err), @@ -190,7 +201,7 @@ func (d *EventDispatcher) EmitBootstrap() bool { } tableInfo, err := schemaStore.GetTableInfo(meta, table, ts) if err != nil { - log.Warn("get table info failed, just ignore", + logBootstrapWarning("get table info failed, just ignore", zap.Stringer("changefeed", d.sharedInfo.changefeedID), zap.Error(err)) continue diff --git a/downstreamadapter/dispatchermanager/dispatcher_manager.go b/downstreamadapter/dispatchermanager/dispatcher_manager.go index 128f8cbe1e..fda965e7ca 100644 --- a/downstreamadapter/dispatchermanager/dispatcher_manager.go +++ b/downstreamadapter/dispatchermanager/dispatcher_manager.go @@ -45,6 +45,21 @@ import ( "go.uber.org/zap" ) +const dispatcherManagerWarnLogInterval = 10 * time.Second + +func shouldLogDispatcherManagerWarning(lastLogTime *atomic.Int64, interval time.Duration) bool { + now := time.Now().UnixNano() + for { + last := lastLogTime.Load() + if last != 0 && now-last < interval.Nanoseconds() { + return false + } + if lastLogTime.CompareAndSwap(last, now) { + return true + } + } +} + /* DispatcherManager manages dispatchers for a changefeed instance with responsibilities including: @@ -136,6 +151,11 @@ type DispatcherManager struct { cancel context.CancelFunc wg sync.WaitGroup + lastErrorChannelFullLogTime atomic.Int64 + lastCollectErrLogTime atomic.Int64 + lastRedoMetaErrLogTime atomic.Int64 + lastRedoMetaInvariantLogTime atomic.Int64 + // removeTaskHandles stores the task handles for async dispatcher removal // map[common.DispatcherID]*threadpool.TaskHandle removeTaskHandles sync.Map @@ -514,10 +534,12 @@ func (e *DispatcherManager) handleError(ctx context.Context, err error) { return case e.sharedInfo.GetErrCh() <- err: default: - log.Error("error channel is full, discard error", - zap.Stringer("changefeedID", e.changefeedID), - zap.Error(err), - ) + if shouldLogDispatcherManagerWarning(&e.lastErrorChannelFullLogTime, dispatcherManagerWarnLogInterval) { + log.Error("error channel is full, discard error", + zap.Stringer("changefeedID", e.changefeedID), + zap.Error(err), + ) + } } } } @@ -530,10 +552,12 @@ func (e *DispatcherManager) collectErrors(ctx context.Context) { return case err := <-e.sharedInfo.GetErrCh(): if !errors.Is(errors.Cause(err), context.Canceled) { - log.Error("Event Dispatcher Manager Meets Error", - zap.Stringer("changefeedID", e.changefeedID), - zap.Error(err), - ) + if shouldLogDispatcherManagerWarning(&e.lastCollectErrLogTime, dispatcherManagerWarnLogInterval) { + log.Error("Event Dispatcher Manager Meets Error", + zap.Stringer("changefeedID", e.changefeedID), + zap.Error(err), + ) + } // report error to maintainer var message heartbeatpb.HeartBeatRequest diff --git a/downstreamadapter/dispatchermanager/dispatcher_manager_redo.go b/downstreamadapter/dispatchermanager/dispatcher_manager_redo.go index 8412716ac7..f8eaca747b 100644 --- a/downstreamadapter/dispatchermanager/dispatcher_manager_redo.go +++ b/downstreamadapter/dispatchermanager/dispatcher_manager_redo.go @@ -288,7 +288,9 @@ func (e *DispatcherManager) UpdateRedoMeta(checkpointTs, resolvedTs uint64) { // only update meta on the one node d := e.GetTableTriggerRedoDispatcher() if d == nil { - log.Warn("should not reach here. only update redo meta on the tableTriggerRedoDispatcher") + if shouldLogDispatcherManagerWarning(&e.lastRedoMetaInvariantLogTime, dispatcherManagerWarnLogInterval) { + log.Warn("should not reach here. only update redo meta on the tableTriggerRedoDispatcher") + } return } d.UpdateMeta(checkpointTs, resolvedTs) @@ -309,7 +311,9 @@ func (e *DispatcherManager) collectRedoMeta(ctx context.Context) error { return ctx.Err() case <-ticker.C: if e.GetTableTriggerRedoDispatcher() == nil { - log.Error("should not reach here. only collect redo meta on the tableTriggerRedoDispatcher") + if shouldLogDispatcherManagerWarning(&e.lastRedoMetaInvariantLogTime, dispatcherManagerWarnLogInterval) { + log.Error("should not reach here. only collect redo meta on the tableTriggerRedoDispatcher") + } continue } logMeta := e.GetTableTriggerRedoDispatcher().GetFlushedMeta() @@ -326,7 +330,9 @@ func (e *DispatcherManager) collectRedoMeta(ctx context.Context) error { }, )) if err != nil { - log.Error("failed to send redo request message", zap.Error(err)) + if shouldLogDispatcherManagerWarning(&e.lastRedoMetaErrLogTime, dispatcherManagerWarnLogInterval) { + log.Error("failed to send redo request message", zap.Error(err)) + } } preResolvedTs = logMeta.ResolvedTs } diff --git a/downstreamadapter/dispatchermanager/heartbeat_collector.go b/downstreamadapter/dispatchermanager/heartbeat_collector.go index 99ef678b14..138c22f418 100644 --- a/downstreamadapter/dispatchermanager/heartbeat_collector.go +++ b/downstreamadapter/dispatchermanager/heartbeat_collector.go @@ -63,6 +63,10 @@ type HeartBeatCollector struct { wg sync.WaitGroup cancel context.CancelFunc isClosed atomic.Bool + + lastHeartbeatReqErrLogTime atomic.Int64 + lastBlockStatusErrLogTime atomic.Int64 + lastUnknownMessageLogTime atomic.Int64 } func NewHeartBeatCollector(serverId node.ID) *HeartBeatCollector { @@ -225,7 +229,9 @@ func (c *HeartBeatCollector) sendHeartBeatMessages(ctx context.Context) error { heartBeatRequestWithTargetID.Request, )) if err != nil { - log.Error("failed to send heartbeat request message", zap.Error(err)) + if shouldLogDispatcherManagerWarning(&c.lastHeartbeatReqErrLogTime, dispatcherManagerWarnLogInterval) { + log.Error("failed to send heartbeat request message", zap.Error(err)) + } } } } @@ -249,7 +255,9 @@ func (c *HeartBeatCollector) sendBlockStatusMessages(ctx context.Context) error blockStatusRequestWithTargetID.Request, )) if err != nil { - log.Error("failed to send block status request message", zap.Error(err)) + if shouldLogDispatcherManagerWarning(&c.lastBlockStatusErrLogTime, dispatcherManagerWarnLogInterval) { + log.Error("failed to send block status request message", zap.Error(err)) + } } } } @@ -291,9 +299,11 @@ func (c *HeartBeatCollector) RecvMessages(_ context.Context, msg *messaging.Targ common.NewChangefeedGIDFromPB(mergeDispatcherRequest.ChangefeedID), NewMergeDispatcherRequest(mergeDispatcherRequest)) default: - log.Warn("unknown message type, ignore it", - zap.String("type", msg.Type.String()), - zap.Any("message", msg.Message)) + if shouldLogDispatcherManagerWarning(&c.lastUnknownMessageLogTime, dispatcherManagerWarnLogInterval) { + log.Warn("unknown message type, ignore it", + zap.String("type", msg.Type.String()), + zap.Any("message", msg.Message)) + } } return nil } diff --git a/downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.go b/downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.go index f91e5222f2..08932622e5 100644 --- a/downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.go +++ b/downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.go @@ -35,6 +35,21 @@ import ( "go.uber.org/zap" ) +const dispatcherOrchestratorWarnLogInterval = 10 * time.Second + +func shouldLogDispatcherOrchestratorWarning(lastLogTime *atomic.Int64, interval time.Duration) bool { + now := time.Now().UnixNano() + for { + last := lastLogTime.Load() + if last != 0 && now-last < interval.Nanoseconds() { + return false + } + if lastLogTime.CompareAndSwap(last, now) { + return true + } + } +} + // DispatcherOrchestrator coordinates the creation, deletion, and management of event dispatcher managers // for different change feeds based on maintainer bootstrap messages. type DispatcherOrchestrator struct { @@ -50,6 +65,11 @@ type DispatcherOrchestrator struct { closed atomic.Bool // msgGuardWaitGroup waits for in-flight RecvMaintainerRequest handlers before shutdown. msgGuardWaitGroup util.GuardedWaitGroup + + lastUnknownMessageLogTime atomic.Int64 + lastHandleMessageErrLogTime atomic.Int64 + lastSendResponseErrLogTime atomic.Int64 + lastCreateManagerErrLogTime atomic.Int64 } func New() *DispatcherOrchestrator { @@ -87,9 +107,11 @@ func (m *DispatcherOrchestrator) RecvMaintainerRequest( key, ok := getPendingMessageKey(msg) if !ok { - log.Warn("unknown message type, drop message", - zap.String("type", msg.Type.String()), - zap.Any("message", msg.Message)) + if shouldLogDispatcherOrchestratorWarning(&m.lastUnknownMessageLogTime, dispatcherOrchestratorWarnLogInterval) { + log.Warn("unknown message type, drop message", + zap.String("type", msg.Type.String()), + zap.Any("message", msg.Message)) + } return nil } @@ -130,21 +152,29 @@ func (m *DispatcherOrchestrator) handleMessages() { switch req := msg.Message[0].(type) { case *heartbeatpb.MaintainerBootstrapRequest: if err := m.handleBootstrapRequest(msg.From, req); err != nil { - log.Error("failed to handle bootstrap request", zap.Error(err)) + if shouldLogDispatcherOrchestratorWarning(&m.lastHandleMessageErrLogTime, dispatcherOrchestratorWarnLogInterval) { + log.Error("failed to handle bootstrap request", zap.Error(err)) + } } case *heartbeatpb.MaintainerPostBootstrapRequest: // Only the event dispatcher manager with table trigger event dispatcher will receive the post bootstrap request if err := m.handlePostBootstrapRequest(msg.From, req); err != nil { - log.Error("failed to handle post bootstrap request", zap.Error(err)) + if shouldLogDispatcherOrchestratorWarning(&m.lastHandleMessageErrLogTime, dispatcherOrchestratorWarnLogInterval) { + log.Error("failed to handle post bootstrap request", zap.Error(err)) + } } case *heartbeatpb.MaintainerCloseRequest: if err := m.handleCloseRequest(msg.From, req); err != nil { - log.Error("failed to handle close request", zap.Error(err)) + if shouldLogDispatcherOrchestratorWarning(&m.lastHandleMessageErrLogTime, dispatcherOrchestratorWarnLogInterval) { + log.Error("failed to handle close request", zap.Error(err)) + } } default: - log.Warn("unknown message type, ignore it", - zap.String("type", msg.Type.String()), - zap.Any("message", msg.Message)) + if shouldLogDispatcherOrchestratorWarning(&m.lastUnknownMessageLogTime, dispatcherOrchestratorWarnLogInterval) { + log.Warn("unknown message type, ignore it", + zap.String("type", msg.Type.String()), + zap.Any("message", msg.Message)) + } } m.msgQueue.Done(key) @@ -182,8 +212,10 @@ func (m *DispatcherOrchestrator) handleBootstrapRequest( ) // Fast return the error to maintainer. if err != nil { - log.Error("failed to create new dispatcher manager", - zap.Any("changefeedID", cfId.Name()), zap.Duration("duration", time.Since(start)), zap.Error(err)) + if shouldLogDispatcherOrchestratorWarning(&m.lastCreateManagerErrLogTime, dispatcherOrchestratorWarnLogInterval) { + log.Error("failed to create new dispatcher manager", + zap.Any("changefeedID", cfId.Name()), zap.Duration("duration", time.Since(start)), zap.Error(err)) + } appcontext.GetService[*dispatchermanager.HeartBeatCollector](appcontext.HeartbeatCollector).RemoveDispatcherManager(cfId) @@ -196,8 +228,10 @@ func (m *DispatcherOrchestrator) handleBootstrapRequest( Message: err.Error(), }, } - log.Error("create new dispatcher manager failed", - zap.Any("changefeedID", cfId.Name()), zap.Duration("duration", time.Since(start)), zap.Error(err)) + if shouldLogDispatcherOrchestratorWarning(&m.lastCreateManagerErrLogTime, dispatcherOrchestratorWarnLogInterval) { + log.Error("create new dispatcher manager failed", + zap.Any("changefeedID", cfId.Name()), zap.Duration("duration", time.Since(start)), zap.Error(err)) + } return m.sendResponse(from, messaging.MaintainerManagerTopic, response) } @@ -396,7 +430,9 @@ func createBootstrapResponse( func (m *DispatcherOrchestrator) sendResponse(to node.ID, topic string, msg messaging.IOTypeT) error { message := messaging.NewSingleTargetMessage(to, topic, msg) if err := m.mc.SendCommand(message); err != nil { - log.Error("failed to send response", zap.Error(err)) + if shouldLogDispatcherOrchestratorWarning(&m.lastSendResponseErrLogTime, dispatcherOrchestratorWarnLogInterval) { + log.Error("failed to send response", zap.Error(err)) + } return err } return nil diff --git a/downstreamadapter/eventcollector/dispatcher_stat.go b/downstreamadapter/eventcollector/dispatcher_stat.go index 0b1862279e..bde9590632 100644 --- a/downstreamadapter/eventcollector/dispatcher_stat.go +++ b/downstreamadapter/eventcollector/dispatcher_stat.go @@ -16,6 +16,7 @@ package eventcollector import ( "sync" "sync/atomic" + "time" "github.com/pingcap/log" "github.com/pingcap/ticdc/downstreamadapter/dispatcher" @@ -30,6 +31,21 @@ import ( "go.uber.org/zap" ) +const dispatcherIssueLogInterval = 10 * time.Second + +func shouldLogDispatcherIssue(lastLogTime *atomic.Int64, interval time.Duration) bool { + now := time.Now().UnixNano() + for { + last := lastLogTime.Load() + if last != 0 && now-last < interval.Nanoseconds() { + return false + } + if lastLogTime.CompareAndSwap(last, now) { + return true + } + } +} + type dispatcherConnState struct { sync.RWMutex // 1) if eventServiceID is set to a remote event service, @@ -153,6 +169,9 @@ type dispatcherStat struct { // tableInfoVersion is the latest table info version of the dispatcher's corresponding table. // It is updated by ddl event tableInfoVersion atomic.Uint64 + + lastOrderingIssueLogTime atomic.Int64 + lastStaleEventLogTime atomic.Int64 } func newDispatcherStat( @@ -276,10 +295,12 @@ func (d *dispatcherStat) getDispatcherID() common.DispatcherID { func (d *dispatcherStat) verifyEventSequence(event dispatcher.DispatcherEvent, state *dispatcherEpochState) bool { // check the invariant that handshake event is the first event of every epoch if event.GetType() != commonEvent.TypeHandshakeEvent && state.lastEventSeq.Load() == 0 { - log.Warn("receive non-handshake event before handshake event, reset the dispatcher", - zap.Stringer("changefeedID", d.target.GetChangefeedID()), - zap.Stringer("dispatcher", d.getDispatcherID()), - zap.Any("event", event.Event)) + if shouldLogDispatcherIssue(&d.lastOrderingIssueLogTime, dispatcherIssueLogInterval) { + log.Warn("receive non-handshake event before handshake event, reset the dispatcher", + zap.Stringer("changefeedID", d.target.GetChangefeedID()), + zap.Stringer("dispatcher", d.getDispatcherID()), + zap.Any("event", event.Event)) + } return false } @@ -309,15 +330,17 @@ func (d *dispatcherStat) verifyEventSequence(event dispatcher.DispatcherEvent, s } if event.GetSeq() != expectedSeq { - log.Warn("receive an out-of-order event, reset the dispatcher", - zap.Stringer("changefeedID", d.target.GetChangefeedID()), - zap.Stringer("dispatcher", d.getDispatcherID()), - zap.String("eventType", commonEvent.TypeToString(event.GetType())), - zap.Uint64("lastEventSeq", lastEventSeq), - zap.Uint64("lastEventCommitTs", d.lastEventCommitTs.Load()), - zap.Uint64("receivedSeq", event.GetSeq()), - zap.Uint64("expectedSeq", expectedSeq), - zap.Uint64("commitTs", event.GetCommitTs())) + if shouldLogDispatcherIssue(&d.lastOrderingIssueLogTime, dispatcherIssueLogInterval) { + log.Warn("receive an out-of-order event, reset the dispatcher", + zap.Stringer("changefeedID", d.target.GetChangefeedID()), + zap.Stringer("dispatcher", d.getDispatcherID()), + zap.String("eventType", commonEvent.TypeToString(event.GetType())), + zap.Uint64("lastEventSeq", lastEventSeq), + zap.Uint64("lastEventCommitTs", d.lastEventCommitTs.Load()), + zap.Uint64("receivedSeq", event.GetSeq()), + zap.Uint64("expectedSeq", expectedSeq), + zap.Uint64("commitTs", event.GetCommitTs())) + } return false } case commonEvent.TypeBatchDMLEvent: @@ -331,15 +354,17 @@ func (d *dispatcherStat) verifyEventSequence(event dispatcher.DispatcherEvent, s expectedSeq := state.lastEventSeq.Add(1) if e.Seq != expectedSeq { - log.Warn("receive an out-of-order batch DML event, reset the dispatcher", - zap.Stringer("changefeedID", d.target.GetChangefeedID()), - zap.Stringer("dispatcher", d.getDispatcherID()), - zap.String("eventType", commonEvent.TypeToString(event.GetType())), - zap.Uint64("lastEventSeq", state.lastEventSeq.Load()), - zap.Uint64("lastEventCommitTs", d.lastEventCommitTs.Load()), - zap.Uint64("receivedSeq", e.Seq), - zap.Uint64("expectedSeq", expectedSeq), - zap.Uint64("commitTs", e.CommitTs)) + if shouldLogDispatcherIssue(&d.lastOrderingIssueLogTime, dispatcherIssueLogInterval) { + log.Warn("receive an out-of-order batch DML event, reset the dispatcher", + zap.Stringer("changefeedID", d.target.GetChangefeedID()), + zap.Stringer("dispatcher", d.getDispatcherID()), + zap.String("eventType", commonEvent.TypeToString(event.GetType())), + zap.Uint64("lastEventSeq", state.lastEventSeq.Load()), + zap.Uint64("lastEventCommitTs", d.lastEventCommitTs.Load()), + zap.Uint64("receivedSeq", e.Seq), + zap.Uint64("expectedSeq", expectedSeq), + zap.Uint64("commitTs", e.CommitTs)) + } return false } } @@ -366,13 +391,15 @@ func (d *dispatcherStat) shouldForwardEventByCommitTs(event dispatcher.Dispatche } } if shouldIgnore { - log.Warn("receive a event older than sendCommitTs, ignore it", - zap.Stringer("changefeedID", d.target.GetChangefeedID()), - zap.Int64("tableID", d.target.GetTableSpan().TableID), - zap.Stringer("dispatcher", d.getDispatcherID()), - zap.Any("event", event.Event), - zap.Uint64("eventCommitTs", event.GetCommitTs()), - zap.Uint64("sentCommitTs", d.lastEventCommitTs.Load())) + if shouldLogDispatcherIssue(&d.lastStaleEventLogTime, dispatcherIssueLogInterval) { + log.Warn("receive a event older than sendCommitTs, ignore it", + zap.Stringer("changefeedID", d.target.GetChangefeedID()), + zap.Int64("tableID", d.target.GetTableSpan().TableID), + zap.Stringer("dispatcher", d.getDispatcherID()), + zap.Any("event", event.Event), + zap.Uint64("eventCommitTs", event.GetCommitTs()), + zap.Uint64("sentCommitTs", d.lastEventCommitTs.Load())) + } return false } return true @@ -685,10 +712,12 @@ func (d *dispatcherStat) handleHandshakeEvent(event dispatcher.DispatcherEvent) return } if event.GetSeq() != 1 { - log.Warn("should not happen: handshake event sequence number is not 1", - zap.Stringer("changefeedID", d.target.GetChangefeedID()), - zap.Stringer("dispatcher", d.getDispatcherID()), - zap.Uint64("sequence", event.GetSeq())) + if shouldLogDispatcherIssue(&d.lastOrderingIssueLogTime, dispatcherIssueLogInterval) { + log.Warn("should not happen: handshake event sequence number is not 1", + zap.Stringer("changefeedID", d.target.GetChangefeedID()), + zap.Stringer("dispatcher", d.getDispatcherID()), + zap.Uint64("sequence", event.GetSeq())) + } return } tableInfo := handshakeEvent.TableInfo diff --git a/downstreamadapter/eventcollector/event_collector.go b/downstreamadapter/eventcollector/event_collector.go index f1388f0e42..0e518ba25c 100644 --- a/downstreamadapter/eventcollector/event_collector.go +++ b/downstreamadapter/eventcollector/event_collector.go @@ -37,9 +37,10 @@ import ( ) const ( - receiveChanSize = 1024 * 8 - commonMsgRetryQuota = 3 // The number of retries for most droppable dispatcher requests. - eventServiceHeartbeatInterval = time.Second + receiveChanSize = 1024 * 8 + commonMsgRetryQuota = 3 // The number of retries for most droppable dispatcher requests. + eventServiceHeartbeatInterval = time.Second + dispatcherRequestRetryLogInterval = 10 * time.Second ) // DispatcherMessage is the message send to EventService. @@ -146,6 +147,10 @@ type EventCollector struct { metricDSPendingQueue prometheus.Gauge metricDSEventChanSizeRedo prometheus.Gauge metricDSPendingQueueRedo prometheus.Gauge + + lastDispatcherRequestDropLogTime atomic.Int64 + lastUnknownMessageLogTime atomic.Int64 + lastCongestionControlErrLogTime atomic.Int64 } func New(serverId node.ID) *EventCollector { @@ -460,6 +465,7 @@ func (c *EventCollector) processDSFeedback(ctx context.Context) error { } func (c *EventCollector) sendDispatcherRequests(ctx context.Context) error { + lastRetryLogTime := time.Time{} for { select { case <-ctx.Done(): @@ -472,13 +478,19 @@ func (c *EventCollector) sendDispatcherRequests(ctx context.Context) error { if appErr, ok := err.(errors.AppError); ok && appErr.Type == errors.ErrorTypeMessageCongested { sleepInterval = 1 * time.Second } - log.Info("failed to send dispatcher request message, try again later", - zap.String("message", req.Message.String()), - zap.Duration("sleepInterval", sleepInterval), - zap.Error(err)) + now := time.Now() + if lastRetryLogTime.IsZero() || now.Sub(lastRetryLogTime) >= dispatcherRequestRetryLogInterval { + log.Info("failed to send dispatcher request message, try again later", + zap.String("message", req.Message.String()), + zap.Duration("sleepInterval", sleepInterval), + zap.Error(err)) + lastRetryLogTime = now + } if !req.decrAndCheckRetry() { - log.Warn("dispatcher request retry limit exceeded, dropping request", - zap.String("message", req.Message.String())) + if shouldLogDispatcherIssue(&c.lastDispatcherRequestDropLogTime, dispatcherIssueLogInterval) { + log.Warn("dispatcher request retry limit exceeded, dropping request", + zap.String("message", req.Message.String())) + } continue } // Put the request back to the channel for later retry. @@ -543,9 +555,11 @@ func (c *EventCollector) MessageCenterHandler(ctx context.Context, targetMessage case *event.DispatcherHeartbeatResponse: c.handleDispatcherHeartbeatResponse(targetMessage) default: - log.Warn("unknown message type, ignore it", - zap.String("type", targetMessage.Type.String()), - zap.Any("msg", msg)) + if shouldLogDispatcherIssue(&c.lastUnknownMessageLogTime, dispatcherIssueLogInterval) { + log.Warn("unknown message type, ignore it", + zap.String("type", targetMessage.Type.String()), + zap.Any("msg", msg)) + } } } return nil @@ -563,9 +577,11 @@ func (c *EventCollector) RedoMessageCenterHandler(ctx context.Context, targetMes } return nil } - log.Warn("unknown message type, ignore it", - zap.String("type", targetMessage.Type.String()), - zap.Any("msg", targetMessage)) + if shouldLogDispatcherIssue(&c.lastUnknownMessageLogTime, dispatcherIssueLogInterval) { + log.Warn("unknown message type, ignore it", + zap.String("type", targetMessage.Type.String()), + zap.Any("msg", targetMessage)) + } return nil } @@ -599,9 +615,11 @@ func (c *EventCollector) runDispatchMessage(ctx context.Context, inCh <-chan *me ds.Push(e.GetDispatcherID(), dispatcherEvent) } default: - log.Warn("unknown message type, ignore it", - zap.String("type", targetMessage.Type.String()), - zap.Any("msg", msg)) + if shouldLogDispatcherIssue(&c.lastUnknownMessageLogTime, dispatcherIssueLogInterval) { + log.Warn("unknown message type, ignore it", + zap.String("type", targetMessage.Type.String()), + zap.Any("msg", msg)) + } } } } @@ -622,7 +640,9 @@ func (c *EventCollector) controlCongestion(ctx context.Context) error { if len(m.GetAvailables()) != 0 { msg := messaging.NewSingleTargetMessage(serverID, messaging.EventServiceTopic, m) if err := c.mc.SendCommand(msg); err != nil { - log.Warn("send congestion control message failed", zap.Error(err)) + if shouldLogDispatcherIssue(&c.lastCongestionControlErrLogTime, dispatcherIssueLogInterval) { + log.Warn("send congestion control message failed", zap.Error(err)) + } } } } diff --git a/downstreamadapter/eventcollector/log_coordinator_client.go b/downstreamadapter/eventcollector/log_coordinator_client.go index 383db9410f..cbb7e174b1 100644 --- a/downstreamadapter/eventcollector/log_coordinator_client.go +++ b/downstreamadapter/eventcollector/log_coordinator_client.go @@ -41,6 +41,10 @@ type LogCoordinatorClient struct { coordinatorInfo atomic.Value logCoordinatorRequestChan *chann.DrainableChann[*logservicepb.ReusableEventServiceRequest] enableRemoteEventService bool + + lastCoordinatorUnavailableLogTime atomic.Int64 + lastSendCoordinatorErrLogTime atomic.Int64 + lastUnknownMessageLogTime atomic.Int64 } func newLogCoordinatorClient(eventCollector *EventCollector) *LogCoordinatorClient { @@ -66,9 +70,11 @@ func (l *LogCoordinatorClient) MessageCenterHandler(_ context.Context, targetMes dispatcher.setRemoteCandidates(msg.Nodes) } default: - log.Warn("unknown message type, ignore it", - zap.String("type", targetMessage.Type.String()), - zap.Any("msg", msg)) + if shouldLogDispatcherIssue(&l.lastUnknownMessageLogTime, dispatcherIssueLogInterval) { + log.Warn("unknown message type, ignore it", + zap.String("type", targetMessage.Type.String()), + zap.Any("msg", msg)) + } } } return nil @@ -81,7 +87,9 @@ func (l *LogCoordinatorClient) run(ctx context.Context) error { return context.Cause(ctx) case req := <-l.logCoordinatorRequestChan.Out(): if l.getCoordinatorInfo() == "" { - log.Info("coordinator info is empty, try send request later") + if shouldLogDispatcherIssue(&l.lastCoordinatorUnavailableLogTime, dispatcherIssueLogInterval) { + log.Info("coordinator info is empty, try send request later") + } l.logCoordinatorRequestChan.In() <- req // Since the log coordinator isn't ready and won't be available soon, processing later requests would be pointless. // Thus, we apply a longer sleep interval here. @@ -95,7 +103,9 @@ func (l *LogCoordinatorClient) run(ctx context.Context) error { msg := messaging.NewSingleTargetMessage(coordinatorID, logCoordinatorTopic, req) err := l.mc.SendCommand(msg) if err != nil { - log.Info("fail to send dispatcher request message to log coordinator, try again later", zap.Error(err)) + if shouldLogDispatcherIssue(&l.lastSendCoordinatorErrLogTime, dispatcherIssueLogInterval) { + log.Info("fail to send dispatcher request message to log coordinator, try again later", zap.Error(err)) + } time.Sleep(sleepInterval) } else { break diff --git a/downstreamadapter/sink/kafka/sink.go b/downstreamadapter/sink/kafka/sink.go index 7ecc21c9b8..b65a100901 100644 --- a/downstreamadapter/sink/kafka/sink.go +++ b/downstreamadapter/sink/kafka/sink.go @@ -36,9 +36,23 @@ import ( const ( // batchSize is the maximum size of the number of messages in a batch. - batchSize = 2048 + batchSize = 2048 + kafkaSinkWarnLogInterval = 10 * time.Second ) +func shouldLogKafkaSinkWarning(lastLogTime *atomic.Int64, interval time.Duration) bool { + now := time.Now().UnixNano() + for { + last := lastLogTime.Load() + if last != 0 && now-last < interval.Nanoseconds() { + return false + } + if lastLogTime.CAS(last, now) { + return true + } + } +} + type sink struct { changefeedID commonType.ChangeFeedID @@ -61,6 +75,8 @@ type sink struct { // isNormal indicate whether the sink is in the normal state. isNormal *atomic.Bool ctx context.Context + + lastSendErrLogTime atomic.Int64 } func (s *sink) SinkType() commonType.SinkType { @@ -406,10 +422,12 @@ func (s *sink) sendMessages(ctx context.Context) error { future.Key.Topic, future.Key.Partition, message); err != nil { - log.Error("kafka sink send message failed", - zap.String("keyspace", s.changefeedID.Keyspace()), - zap.String("changefeed", s.changefeedID.Name()), - zap.Error(err)) + if shouldLogKafkaSinkWarning(&s.lastSendErrLogTime, kafkaSinkWarnLogInterval) { + log.Error("kafka sink send message failed", + zap.String("keyspace", s.changefeedID.Keyspace()), + zap.String("changefeed", s.changefeedID.Name()), + zap.Error(err)) + } return 0, 0, err } return message.GetRowsCount(), int64(message.Length()), nil diff --git a/downstreamadapter/sink/mysql/sink.go b/downstreamadapter/sink/mysql/sink.go index 13ed5e95bc..c661a3560f 100644 --- a/downstreamadapter/sink/mysql/sink.go +++ b/downstreamadapter/sink/mysql/sink.go @@ -37,8 +37,22 @@ import ( const ( // defaultConflictDetectorSlots indicates the default slot count of conflict detector. TODO:check this defaultConflictDetectorSlots uint64 = 16 * 1024 + mysqlSinkWarnLogInterval = 10 * time.Second ) +func shouldLogMySQLSinkWarning(lastLogTime *atomic.Int64, interval time.Duration) bool { + now := time.Now().UnixNano() + for { + last := lastLogTime.Load() + if last != 0 && now-last < interval.Nanoseconds() { + return false + } + if lastLogTime.CAS(last, now) { + return true + } + } +} + // Sink is responsible for writing data to mysql downstream. // Including DDL and DML. type Sink struct { @@ -68,6 +82,8 @@ type Sink struct { // variable @@tidb_cdc_active_active_sync_stats and is shared by all DML writers. // It is nil when disabled or unsupported by downstream. activeActiveSyncStatsCollector *mysql.ActiveActiveSyncStatsCollector + + lastProgressUpdateErrLogTime atomic.Int64 } // Verify is used to verify the sink uri and config is valid @@ -323,9 +339,11 @@ func (s *Sink) AddCheckpointTs(ts uint64) { } if err := s.progressTableWriter.Flush(ts); err != nil { - log.Warn("failed to update active active progress table", - zap.String("changefeed", s.changefeedID.DisplayName.String()), - zap.Error(err)) + if shouldLogMySQLSinkWarning(&s.lastProgressUpdateErrLogTime, mysqlSinkWarnLogInterval) { + log.Warn("failed to update active active progress table", + zap.String("changefeed", s.changefeedID.DisplayName.String()), + zap.Error(err)) + } return } } diff --git a/downstreamadapter/sink/pulsar/dml_producer.go b/downstreamadapter/sink/pulsar/dml_producer.go index 8bf226184c..90d3e685bd 100644 --- a/downstreamadapter/sink/pulsar/dml_producer.go +++ b/downstreamadapter/sink/pulsar/dml_producer.go @@ -16,6 +16,7 @@ package pulsar import ( "context" "sync" + "sync/atomic" "time" pulsarClient "github.com/apache/pulsar-client-go/pulsar" @@ -30,6 +31,21 @@ import ( "go.uber.org/zap" ) +const pulsarProducerWarnLogInterval = 10 * time.Second + +func shouldLogPulsarProducerWarning(lastLogTime *atomic.Int64, interval time.Duration) bool { + now := time.Now().UnixNano() + for { + last := lastLogTime.Load() + if last != 0 && now-last < interval.Nanoseconds() { + return false + } + if lastLogTime.CompareAndSwap(last, now) { + return true + } + } +} + // dmlProducer is the interface for the pulsar DML message producer. type dmlProducer interface { // AsyncSendMessage sends a message asynchronously. @@ -65,6 +81,9 @@ type dmlProducers struct { failpointCh chan error // closeCh is send error errChan chan error + + lastAsyncSendErrLogTime atomic.Int64 + lastErrChanFullLogTime atomic.Int64 } // newDMLProducers creates a new pulsar producer. @@ -165,12 +184,14 @@ func (p *dmlProducers) asyncSendMessage( // fail if err != nil { e := errors.WrapError(errors.ErrPulsarAsyncSendMessage, err) - log.Error("Pulsar DML producer async send error", - zap.String("keyspace", p.changefeedID.Keyspace()), - zap.String("changefeed", p.changefeedID.ID().String()), - zap.Int("messageSize", len(m.Payload)), - zap.String("topic", topic), - zap.Error(err)) + if shouldLogPulsarProducerWarning(&p.lastAsyncSendErrLogTime, pulsarProducerWarnLogInterval) { + log.Error("Pulsar DML producer async send error", + zap.String("keyspace", p.changefeedID.Keyspace()), + zap.String("changefeed", p.changefeedID.ID().String()), + zap.Int("messageSize", len(m.Payload)), + zap.String("topic", topic), + zap.Error(err)) + } pulsar.IncPublishedDMLFail(topic, p.changefeedID.String()) // use this select to avoid send error to a closed channel // the ctx will always be called before the errChan is closed @@ -179,8 +200,10 @@ func (p *dmlProducers) asyncSendMessage( return case p.errChan <- e: default: - log.Warn("Error channel is full in pulsar DML producer", - zap.Stringer("changefeed", p.changefeedID), zap.Error(e)) + if shouldLogPulsarProducerWarning(&p.lastErrChanFullLogTime, pulsarProducerWarnLogInterval) { + log.Warn("Error channel is full in pulsar DML producer", + zap.Stringer("changefeed", p.changefeedID), zap.Error(e)) + } } } else if message.Callback != nil { // success diff --git a/logservice/logpuller/region_event_handler.go b/logservice/logpuller/region_event_handler.go index 0de2306c47..61c437124b 100644 --- a/logservice/logpuller/region_event_handler.go +++ b/logservice/logpuller/region_event_handler.go @@ -228,7 +228,9 @@ func (h *regionEventHandler) OnDrop(event regionEvent) interface{} { zap.Bool("stateIsStale", state.isStale()), zap.Uint64("workerID", state.worker.workerID), } - log.Warn("drop region event", fields...) + if shouldLogLogPullerWarning(&h.subClient.lastDroppedRegionEventLogTime, logPullerWarnLogInterval) { + log.Warn("drop region event", fields...) + } return nil } @@ -401,13 +403,15 @@ func handleResolvedTs(span *subscribedSpan, state *regionFeedState, resolvedTs u decreaseLag := float64(nextResolvedPhyTs-resolvedPhyTs) / 1e3 const largeResolvedTsAdvanceStepInSecs = 30 if decreaseLag > largeResolvedTsAdvanceStepInSecs { - log.Warn("resolved ts advance step is too large", - zap.Uint64("subID", uint64(span.subID)), - zap.Int64("tableID", span.span.TableID), - zap.Uint64("regionID", regionID), - zap.Uint64("resolvedTs", ts), - zap.Uint64("lastResolvedTs", lastResolvedTs), - zap.Float64("decreaseLag(s)", decreaseLag)) + if shouldLogLogPullerWarning(&state.worker.client.lastResolvedTsJumpLogTime, logPullerWarnLogInterval) { + log.Warn("resolved ts advance step is too large", + zap.Uint64("subID", uint64(span.subID)), + zap.Int64("tableID", span.span.TableID), + zap.Uint64("regionID", regionID), + zap.Uint64("resolvedTs", ts), + zap.Uint64("lastResolvedTs", lastResolvedTs), + zap.Float64("decreaseLag(s)", decreaseLag)) + } } span.resolvedTs.Store(ts) span.resolvedTsUpdated.Store(time.Now().Unix()) diff --git a/logservice/logpuller/region_req_cache.go b/logservice/logpuller/region_req_cache.go index 07dfd49d11..270938689b 100644 --- a/logservice/logpuller/region_req_cache.go +++ b/logservice/logpuller/region_req_cache.go @@ -30,6 +30,7 @@ const ( addReqRetryInterval = time.Millisecond * 1 addReqRetryLimit = 3 abnormalRequestDurationInSec = 60 * 60 * 2 // 2 hours + requestCacheWarnLogInterval = 10 * time.Second ) // regionReq represents a wrapped region request with state @@ -73,6 +74,8 @@ type requestCache struct { spaceAvailable chan struct{} lastCheckStaleRequestTime atomic.Time + lastOverwriteLogTime time.Time + lastStaleRequestLogTime time.Time } func newRequestCache(maxPendingCount int) *requestCache { @@ -163,13 +166,17 @@ func (c *requestCache) markSent(req regionReq) { } if oldReq, exists := m[req.regionInfo.verID.GetID()]; exists { - log.Warn("region request overwritten", - zap.Uint64("subID", uint64(req.regionInfo.subscribedSpan.subID)), - zap.Uint64("regionID", req.regionInfo.verID.GetID()), - zap.Float64("oldAgeSec", time.Since(oldReq.createTime).Seconds()), - zap.Float64("newAgeSec", time.Since(req.createTime).Seconds()), - zap.Int("pendingCount", int(c.pendingCount.Load())), - zap.Int("pendingQueueLen", len(c.pendingQueue))) + now := time.Now() + if c.lastOverwriteLogTime.IsZero() || now.Sub(c.lastOverwriteLogTime) >= requestCacheWarnLogInterval { + log.Warn("region request overwritten", + zap.Uint64("subID", uint64(req.regionInfo.subscribedSpan.subID)), + zap.Uint64("regionID", req.regionInfo.verID.GetID()), + zap.Float64("oldAgeSec", time.Since(oldReq.createTime).Seconds()), + zap.Float64("newAgeSec", time.Since(req.createTime).Seconds()), + zap.Int("pendingCount", int(c.pendingCount.Load())), + zap.Int("pendingQueueLen", len(c.pendingQueue))) + c.lastOverwriteLogTime = now + } c.markDone() } m[req.regionInfo.verID.GetID()] = req @@ -241,15 +248,19 @@ func (c *requestCache) clearStaleRequest() { regionReq.regionInfo.lockedRangeState.Initialized.Load() || regionReq.isStale() { c.markDone() - log.Warn("region worker delete stale region request", - zap.Uint64("subID", uint64(subID)), - zap.Uint64("regionID", regionID), - zap.Int("pendingCount", int(c.pendingCount.Load())), - zap.Int("pendingQueueLen", len(c.pendingQueue)), - zap.Bool("isRegionStopped", regionReq.regionInfo.isStopped()), - zap.Bool("isSubscribedSpanStopped", regionReq.regionInfo.subscribedSpan.stopped.Load()), - zap.Bool("isStale", regionReq.isStale()), - zap.Time("createTime", regionReq.createTime)) + now := time.Now() + if c.lastStaleRequestLogTime.IsZero() || now.Sub(c.lastStaleRequestLogTime) >= requestCacheWarnLogInterval { + log.Warn("region worker delete stale region request", + zap.Uint64("subID", uint64(subID)), + zap.Uint64("regionID", regionID), + zap.Int("pendingCount", int(c.pendingCount.Load())), + zap.Int("pendingQueueLen", len(c.pendingQueue)), + zap.Bool("isRegionStopped", regionReq.regionInfo.isStopped()), + zap.Bool("isSubscribedSpanStopped", regionReq.regionInfo.subscribedSpan.stopped.Load()), + zap.Bool("isStale", regionReq.isStale()), + zap.Time("createTime", regionReq.createTime)) + c.lastStaleRequestLogTime = now + } delete(regionReqs, regionID) } else { reqCount++ diff --git a/logservice/logpuller/region_request_worker.go b/logservice/logpuller/region_request_worker.go index fafe7fcade..6cae7bbc7d 100644 --- a/logservice/logpuller/region_request_worker.go +++ b/logservice/logpuller/region_request_worker.go @@ -59,6 +59,9 @@ type regionRequestWorker struct { subscriptions map[SubscriptionID]regionFeedStates } + + lastConnectionIssueLogTime atomic.Int64 + lastUnexpectedEventLogTime atomic.Int64 } func newRegionRequestWorker( @@ -108,10 +111,12 @@ func newRegionRequestWorker( if errors.Cause(err) == context.Canceled { return nil } - log.Error("event feed check store version fails", - zap.Uint64("workerID", worker.workerID), - zap.String("addr", worker.store.storeAddr), - zap.Error(err)) + if shouldLogLogPullerWarning(&worker.lastConnectionIssueLogTime, logPullerWarnLogInterval) { + log.Error("event feed check store version fails", + zap.Uint64("workerID", worker.workerID), + zap.String("addr", worker.store.storeAddr), + zap.Error(err)) + } if cerror.Is(err, cerror.ErrGetAllStoresFailed) { regionErr = &getStoreErr{} } else { @@ -173,10 +178,12 @@ func (s *regionRequestWorker) run(ctx context.Context, credential *security.Cred g, gctx := errgroup.WithContext(ctx) conn, err := Connect(gctx, credential, s.store.storeAddr) if err != nil { - log.Warn("region request worker create grpc stream failed", - zap.Uint64("workerID", s.workerID), - zap.String("addr", s.store.storeAddr), - zap.Error(err)) + if shouldLogLogPullerWarning(&s.lastConnectionIssueLogTime, logPullerWarnLogInterval) { + log.Warn("region request worker create grpc stream failed", + zap.Uint64("workerID", s.workerID), + zap.String("addr", s.store.storeAddr), + zap.Error(err)) + } // Close the connection if it was partially created to prevent goroutine leaks if conn != nil && conn.Conn != nil { _ = conn.Conn.Close() @@ -268,15 +275,18 @@ func (s *regionRequestWorker) dispatchRegionChangeEvents(events []*cdcpb.Event) log.Panic("unknown event type", zap.Any("event", event)) } s.client.pushRegionEventToDS(SubscriptionID(event.RequestId), regionEvent) - } else { - switch event.Event.(type) { - case *cdcpb.Event_Error: - // it is normal to receive region error after deregister a subscription - log.Debug("region request worker receives an error for a stale region, ignore it", - zap.Uint64("workerID", s.workerID), - zap.Uint64("subscriptionID", uint64(subscriptionID)), - zap.Uint64("regionID", event.RegionId)) - default: + continue + } + + switch event.Event.(type) { + case *cdcpb.Event_Error: + // it is normal to receive region error after deregister a subscription + log.Debug("region request worker receives an error for a stale region, ignore it", + zap.Uint64("workerID", s.workerID), + zap.Uint64("subscriptionID", uint64(subscriptionID)), + zap.Uint64("regionID", event.RegionId)) + default: + if shouldLogLogPullerWarning(&s.lastUnexpectedEventLogTime, logPullerWarnLogInterval) { log.Warn("region request worker receives a region event for an untracked region", zap.Uint64("workerID", s.workerID), zap.Uint64("subscriptionID", uint64(subscriptionID)), @@ -292,10 +302,12 @@ func (s *regionRequestWorker) dispatchResolvedTsEvent(resolvedTsEvent *cdcpb.Res s.client.metrics.batchResolvedSize.Observe(float64(len(resolvedTsEvent.Regions))) // TODO: resolvedTsEvent.Ts be 0 is impossible, we need find the root cause. if resolvedTsEvent.Ts == 0 { - log.Warn("region request worker receives a resolved ts event with zero value, ignore it", - zap.Uint64("workerID", s.workerID), - zap.Uint64("subscriptionID", resolvedTsEvent.RequestId), - zap.Any("regionIDs", resolvedTsEvent.Regions)) + if shouldLogLogPullerWarning(&s.lastUnexpectedEventLogTime, logPullerWarnLogInterval) { + log.Warn("region request worker receives a resolved ts event with zero value, ignore it", + zap.Uint64("workerID", s.workerID), + zap.Uint64("subscriptionID", resolvedTsEvent.RequestId), + zap.Any("regionIDs", resolvedTsEvent.Regions)) + } return } // Avoid allocating a huge states slice when resolvedTsEvent.Regions is large. @@ -321,11 +333,13 @@ func (s *regionRequestWorker) dispatchResolvedTsEvent(resolvedTsEvent *cdcpb.Res } continue } - log.Warn("region request worker receives a resolved ts event for an untracked region", - zap.Uint64("workerID", s.workerID), - zap.Uint64("subscriptionID", uint64(subscriptionID)), - zap.Uint64("regionID", regionID), - zap.Uint64("resolvedTs", resolvedTsEvent.Ts)) + if shouldLogLogPullerWarning(&s.lastUnexpectedEventLogTime, logPullerWarnLogInterval) { + log.Warn("region request worker receives a resolved ts event for an untracked region", + zap.Uint64("workerID", s.workerID), + zap.Uint64("subscriptionID", uint64(subscriptionID)), + zap.Uint64("regionID", regionID), + zap.Uint64("resolvedTs", resolvedTsEvent.Ts)) + } } flush() } @@ -337,12 +351,14 @@ func (s *regionRequestWorker) processRegionSendTask( ) error { doSend := func(req *cdcpb.ChangeDataRequest) error { if err := conn.Client.Send(req); err != nil { - log.Warn("region request worker send request to grpc stream failed", - zap.Uint64("workerID", s.workerID), - zap.Uint64("subscriptionID", req.RequestId), - zap.Uint64("regionID", req.RegionId), - zap.String("addr", s.store.storeAddr), - zap.Error(err)) + if shouldLogLogPullerWarning(&s.lastConnectionIssueLogTime, logPullerWarnLogInterval) { + log.Warn("region request worker send request to grpc stream failed", + zap.Uint64("workerID", s.workerID), + zap.Uint64("subscriptionID", req.RequestId), + zap.Uint64("regionID", req.RegionId), + zap.String("addr", s.store.storeAddr), + zap.Error(err)) + } return errors.Trace(err) } // TODO: add a metric? diff --git a/logservice/logpuller/subscription_client.go b/logservice/logpuller/subscription_client.go index 52552fb72d..de94f63aea 100644 --- a/logservice/logpuller/subscription_client.go +++ b/logservice/logpuller/subscription_client.go @@ -53,10 +53,11 @@ const ( // don't need to force reload region anymore. regionScheduleReload = false - loadRegionRetryInterval time.Duration = 100 * time.Millisecond - resolveLockMinInterval time.Duration = 10 * time.Second - resolveLockTickInterval time.Duration = 2 * time.Second - resolveLockFence time.Duration = 4 * time.Second + loadRegionRetryInterval time.Duration = 100 * time.Millisecond + resolveLockMinInterval time.Duration = 10 * time.Second + resolveLockTickInterval time.Duration = 2 * time.Second + resolveLockFence time.Duration = 4 * time.Second + logPullerWarnLogInterval time.Duration = 10 * time.Second // resolveLastRunGCThreshold is the size threshold to GC resolveLastRun and drop stale entries. resolveLastRunGCThreshold = 1024 @@ -106,6 +107,19 @@ type rangeTask struct { const kvEventsCacheMaxSize = 32 +func shouldLogLogPullerWarning(lastLogTime *atomic.Int64, interval time.Duration) bool { + now := time.Now().UnixNano() + for { + last := lastLogTime.Load() + if last != 0 && now-last < interval.Nanoseconds() { + return false + } + if lastLogTime.CompareAndSwap(last, now) { + return true + } + } +} + // subscribedSpan represents a span to subscribe. // It contains a sub span of a table(or the total span of a table), // the startTs of the table, and the output event channel. @@ -140,6 +154,10 @@ type subscribedSpan struct { lastAdvanceTime atomic.Int64 + lastLoadRegionErrLogTime atomic.Int64 + lastLoadRegionHolesLogTime atomic.Int64 + lastUnknownCdcErrLogTime atomic.Int64 + initialized atomic.Bool resolvedTsUpdated atomic.Int64 resolvedTs atomic.Uint64 @@ -198,9 +216,12 @@ type subscriptionClient struct { ds dynstream.DynamicStream[int, SubscriptionID, regionEvent, *subscribedSpan, *regionEventHandler] // the following three fields are used to manage feedback from ds and notify other goroutines - mu sync.Mutex - cond *sync.Cond - paused atomic.Bool + mu sync.Mutex + cond *sync.Cond + paused atomic.Bool + lastResolveLockErrLogTime atomic.Int64 + lastDroppedRegionEventLogTime atomic.Int64 + lastResolvedTsJumpLogTime atomic.Int64 // the credential to connect tikv credential *security.Credential @@ -737,10 +758,12 @@ func (s *subscriptionClient) divideSpanAndScheduleRegionRequests( backoff := tikv.NewBackoffer(ctx, tikvRequestMaxBackoff) regions, err := s.regionCache.BatchLoadRegionsWithKeyRange(backoff, nextSpan.StartKey, nextSpan.EndKey, limit) if err != nil { - log.Warn("subscription client load regions failed", - zap.Uint64("subscriptionID", uint64(subscribedSpan.subID)), - zap.Any("span", common.FormatTableSpan(&nextSpan)), - zap.Error(err)) + if shouldLogLogPullerWarning(&subscribedSpan.lastLoadRegionErrLogTime, logPullerWarnLogInterval) { + log.Warn("subscription client load regions failed", + zap.Uint64("subscriptionID", uint64(subscribedSpan.subID)), + zap.Any("span", common.FormatTableSpan(&nextSpan)), + zap.Error(err)) + } backoffBeforeLoad = true continue } @@ -752,9 +775,11 @@ func (s *subscriptionClient) divideSpanAndScheduleRegionRequests( } regionMetas = regionlock.CutRegionsLeftCoverSpan(regionMetas, nextSpan) if len(regionMetas) == 0 { - log.Warn("subscription client load regions with holes", - zap.Uint64("subscriptionID", uint64(subscribedSpan.subID)), - zap.Any("span", common.FormatTableSpan(&nextSpan))) + if shouldLogLogPullerWarning(&subscribedSpan.lastLoadRegionHolesLogTime, logPullerWarnLogInterval) { + log.Warn("subscription client load regions with holes", + zap.Uint64("subscriptionID", uint64(subscribedSpan.subID)), + zap.Any("span", common.FormatTableSpan(&nextSpan))) + } backoffBeforeLoad = true continue } @@ -890,9 +915,11 @@ func (s *subscriptionClient) doHandleError(ctx context.Context, errInfo regionEr return cerror.ErrClusterIDMismatch.GenWithStackByArgs(mismatch.Current, mismatch.Request) } - log.Warn("empty or unknown cdc error", - zap.Uint64("subscriptionID", uint64(errInfo.subscribedSpan.subID)), - zap.Stringer("error", innerErr)) + if shouldLogLogPullerWarning(&errInfo.subscribedSpan.lastUnknownCdcErrLogTime, logPullerWarnLogInterval) { + log.Warn("empty or unknown cdc error", + zap.Uint64("subscriptionID", uint64(errInfo.subscribedSpan.subID)), + zap.Stringer("error", innerErr)) + } metricFeedUnknownErrorCounter.Inc() s.scheduleRegionRequest(ctx, errInfo.regionInfo, TaskHighPrior) return nil @@ -1009,13 +1036,15 @@ func (s *subscriptionClient) handleResolveLockTasks(ctx context.Context) error { } if err := s.lockResolver.Resolve(ctx, keyspaceID, regionID, targetTs); err != nil { - log.Warn("subscription client resolve lock fail", - zap.Uint32("keyspaceID", keyspaceID), - zap.Uint64("regionID", regionID), - zap.Uint64("targetTs", targetTs), - zap.Time("lastRun", lastRun), - zap.Any("state", state), - zap.Error(err)) + if shouldLogLogPullerWarning(&s.lastResolveLockErrLogTime, logPullerWarnLogInterval) { + log.Warn("subscription client resolve lock fail", + zap.Uint32("keyspaceID", keyspaceID), + zap.Uint64("regionID", regionID), + zap.Uint64("targetTs", targetTs), + zap.Time("lastRun", lastRun), + zap.Any("state", state), + zap.Error(err)) + } } resolveLastRun[regionID] = time.Now() } diff --git a/logservice/schemastore/disk_format.go b/logservice/schemastore/disk_format.go index 8b27b11d65..37863ea360 100644 --- a/logservice/schemastore/disk_format.go +++ b/logservice/schemastore/disk_format.go @@ -46,10 +46,11 @@ import ( // and we will pull ddl job from `resolved_ts` at restart if the current gc ts is smaller than resolved_ts. const ( - snapshotSchemaKeyPrefix = "ss_" - snapshotTableKeyPrefix = "st_" - snapshotPartitionKeyPrefix = "sp_" - ddlKeyPrefix = "ds_" + snapshotSchemaKeyPrefix = "ss_" + snapshotTableKeyPrefix = "st_" + snapshotPartitionKeyPrefix = "sp_" + ddlKeyPrefix = "ds_" + schemaSnapshotRetryLogInterval = 10 * time.Second ) func gcTsKey() []byte { @@ -590,13 +591,25 @@ func persistSchemaSnapshot( snapTs uint64, collectMetaInfo bool, ) (map[int64]*BasicDatabaseInfo, map[int64]*BasicTableInfo, map[int64]BasicPartitionInfo, error) { + var ( + lastListDatabasesLogTime time.Time + lastGetTablesLogTime time.Time + ) + logRetry := func(lastLogTime *time.Time, msg string, fields ...zap.Field) { + now := time.Now() + if !lastLogTime.IsZero() && now.Sub(*lastLogTime) < schemaSnapshotRetryLogInterval { + return + } + log.Warn(msg, fields...) + *lastLogTime = now + } for { meta := getSnapshotMeta(tiStore, snapTs) start := time.Now() dbInfos, err := meta.ListDatabases() if err != nil { time.Sleep(100 * time.Millisecond) - log.Warn("list databases failed, retrying", zap.Error(err)) + logRetry(&lastListDatabasesLogTime, "list databases failed, retrying", zap.Error(err)) continue } @@ -667,7 +680,7 @@ func persistSchemaSnapshot( } time.Sleep(100 * time.Millisecond) - log.Warn("get tables failed", zap.Error(err)) + logRetry(&lastGetTablesLogTime, "get tables failed", zap.Error(err)) } } diff --git a/logservice/schemastore/persist_storage.go b/logservice/schemastore/persist_storage.go index 394bc62f1f..f6c273c015 100644 --- a/logservice/schemastore/persist_storage.go +++ b/logservice/schemastore/persist_storage.go @@ -678,7 +678,7 @@ func (p *persistentStorage) persistUpperBoundPeriodically(ctx context.Context) { case <-ticker.C: p.mu.Lock() if !p.upperBoundChanged { - log.Warn("schema store upper bound not changed") + log.Debug("schema store upper bound not changed") p.mu.Unlock() continue } diff --git a/logservice/schemastore/schema_store.go b/logservice/schemastore/schema_store.go index eacacf74f8..982f6f62d0 100644 --- a/logservice/schemastore/schema_store.go +++ b/logservice/schemastore/schema_store.go @@ -593,6 +593,8 @@ func (s *schemaStore) acquireInitialGCSafePoint( keyspaceMeta common.KeyspaceMeta, gcKeeper *schemaStoreGCKeeper, ) (uint64, error) { + const retryLogInterval = 10 * time.Second + lastLogTime := time.Time{} for { // Read the current lower bound first, then install a dedicated GC barrier // for this schema store instance before any snapshot or incremental pull starts. @@ -608,8 +610,12 @@ func (s *schemaStore) acquireInitialGCSafePoint( } } - log.Warn("prepare schema store gc safepoint failed, will retry in 1s", - zap.Any("keyspace", keyspaceMeta), zap.Error(err)) + now := time.Now() + if lastLogTime.IsZero() || now.Sub(lastLogTime) >= retryLogInterval { + log.Warn("prepare schema store gc safepoint failed, will retry in 1s", + zap.Any("keyspace", keyspaceMeta), zap.Error(err)) + lastLogTime = now + } select { case <-ctx.Done(): return 0, errors.Trace(err) diff --git a/logservice/schemastore/unsorted_cache.go b/logservice/schemastore/unsorted_cache.go index a2c1c24cfa..ed098307b6 100644 --- a/logservice/schemastore/unsorted_cache.go +++ b/logservice/schemastore/unsorted_cache.go @@ -35,7 +35,7 @@ func ddlEventLess(a, b DDLJobWithCommitTs) bool { func newDDLCache() *ddlCache { return &ddlCache{ - ddlEvents: btree.NewG[DDLJobWithCommitTs](16, ddlEventLess), + ddlEvents: btree.NewG(16, ddlEventLess), } } diff --git a/maintainer/barrier_event.go b/maintainer/barrier_event.go index cf81e05a5e..7f61279d30 100644 --- a/maintainer/barrier_event.go +++ b/maintainer/barrier_event.go @@ -32,6 +32,8 @@ import ( "go.uber.org/zap" ) +const barrierEventWarnLogInterval = 10 * time.Second + // BarrierEvent is a barrier event that reported by dispatchers, note is a block multiple dispatchers // all of these dispatchers should report the same event type BarrierEvent struct { @@ -73,7 +75,9 @@ type BarrierEvent struct { rangeChecker range_checker.RangeChecker lastResendTime time.Time - lastWarningLogTime time.Time + lastWarningLogTime time.Time + lastDispatcherMissingLogTime time.Time + lastWriterMissingLogTime time.Time } func NewBlockEvent(cfID common.ChangeFeedID, @@ -306,10 +310,14 @@ func (be *BarrierEvent) addDispatchersToRangeChecker() { func (be *BarrierEvent) markDispatcherEventDone(dispatcherID common.DispatcherID) { replicaSpan := be.spanController.GetTaskByID(dispatcherID) if replicaSpan == nil { - log.Warn("dispatcher not found, ignore", - zap.String("changefeed", be.cfID.Name()), - zap.String("dispatcher", dispatcherID.String()), - zap.Int64("mode", be.mode)) + now := time.Now() + if be.lastDispatcherMissingLogTime.IsZero() || now.Sub(be.lastDispatcherMissingLogTime) >= barrierEventWarnLogInterval { + log.Warn("dispatcher not found, ignore", + zap.String("changefeed", be.cfID.Name()), + zap.String("dispatcher", dispatcherID.String()), + zap.Int64("mode", be.mode)) + be.lastDispatcherMissingLogTime = now + } return } @@ -585,7 +593,7 @@ func (be *BarrierEvent) resend(mode int64) []*messaging.TargetMessage { } var msgs []*messaging.TargetMessage defer func() { - if time.Since(be.lastWarningLogTime) > time.Second*10 { + if time.Since(be.lastWarningLogTime) > barrierEventWarnLogInterval { if be.rangeChecker != nil { log.Warn("barrier event is not resolved", zap.String("changefeed", be.cfID.Name()), @@ -616,7 +624,7 @@ func (be *BarrierEvent) resend(mode int64) []*messaging.TargetMessage { // still waiting for all dispatcher to reach the block commit ts if !be.selected.Load() { - if time.Since(be.lastWarningLogTime) > time.Second*10 { + if time.Since(be.lastWarningLogTime) > barrierEventWarnLogInterval { log.Info("barrier event is not being selected", zap.String("changefeed", be.cfID.Name()), zap.Uint64("commitTs", be.commitTs), @@ -635,12 +643,16 @@ func (be *BarrierEvent) resend(mode int64) []*messaging.TargetMessage { // resend write action stm := be.spanController.GetTaskByID(be.writerDispatcher) if stm == nil || stm.GetNodeID() == "" { - log.Warn("writer dispatcher not found", - zap.String("changefeed", be.cfID.Name()), - zap.String("dispatcher", be.writerDispatcher.String()), - zap.Uint64("commitTs", be.commitTs), - zap.Bool("isSyncPoint", be.isSyncPoint), - zap.Int64("mode", be.mode)) + now := time.Now() + if be.lastWriterMissingLogTime.IsZero() || now.Sub(be.lastWriterMissingLogTime) >= barrierEventWarnLogInterval { + log.Warn("writer dispatcher not found", + zap.String("changefeed", be.cfID.Name()), + zap.String("dispatcher", be.writerDispatcher.String()), + zap.Uint64("commitTs", be.commitTs), + zap.Bool("isSyncPoint", be.isSyncPoint), + zap.Int64("mode", be.mode)) + be.lastWriterMissingLogTime = now + } // choose a new one as the writer // it only can happen then the split and merge happens to a table, and the writeDispatcher is not the table trigger event dispatcher diff --git a/maintainer/maintainer.go b/maintainer/maintainer.go index 15a16cd984..4d4e648288 100644 --- a/maintainer/maintainer.go +++ b/maintainer/maintainer.go @@ -47,8 +47,9 @@ import ( ) const ( - periodEventInterval = time.Millisecond * 100 - periodRedoInterval = time.Second * 1 + periodEventInterval = time.Millisecond * 100 + periodRedoInterval = time.Second * 1 + progressAdvanceBlockedLogInterval = 10 * time.Second ) // Maintainer is response for handle changefeed replication tasks. Maintainer should: @@ -163,6 +164,30 @@ type Maintainer struct { redoScheduledTaskGauge prometheus.Gauge redoSpanCountGauge prometheus.Gauge redoTableCountGauge prometheus.Gauge + + lastCheckpointAdvanceBlockedLogTime atomic.Int64 + lastRedoAdvanceBlockedLogTime atomic.Int64 +} + +func shouldLogAdvanceBlockedWarning(lastLogTime *atomic.Int64, interval time.Duration) bool { + now := time.Now().UnixNano() + for { + last := lastLogTime.Load() + if last != 0 && now-last < interval.Nanoseconds() { + return false + } + if lastLogTime.CompareAndSwap(last, now) { + return true + } + } +} + +func nodeIDsToStrings(ids []node.ID) []string { + res := make([]string, 0, len(ids)) + for _, id := range ids { + res = append(res, id.String()) + } + return res } // NewMaintainer create the maintainer for the changefeed @@ -565,12 +590,15 @@ func (m *Maintainer) handleRedoMetaTsMessage(ctx context.Context) { return case <-ticker.C: if !m.initialized.Load() { - log.Warn("can not advance redoTs since not bootstrapped", - zap.Stringer("changefeedID", m.changefeedID)) + if shouldLogAdvanceBlockedWarning(&m.lastRedoAdvanceBlockedLogTime, progressAdvanceBlockedLogInterval) { + log.Warn("can not advance redoTs since not bootstrapped", + zap.Stringer("changefeedID", m.changefeedID)) + } break } needUpdate := false updateCheckpointTs := true + missingNodes := make([]node.ID, 0) newWatermark := heartbeatpb.NewMaxWatermark() // Calculate operator and barrier constraints first to ensure atomicity. @@ -590,14 +618,21 @@ func (m *Maintainer) handleRedoMetaTsMessage(ctx context.Context) { watermark, ok := m.redoTsByCapture.Get(id) if !ok { updateCheckpointTs = false - log.Warn("redo checkpointTs can not be advanced, since missing capture heartbeat", - zap.Stringer("changefeedID", m.changefeedID), - zap.Any("node", id)) + missingNodes = append(missingNodes, id) continue } newWatermark.UpdateMin(watermark) } + if !updateCheckpointTs && + len(missingNodes) > 0 && + shouldLogAdvanceBlockedWarning(&m.lastRedoAdvanceBlockedLogTime, progressAdvanceBlockedLogInterval) { + log.Warn("redo checkpointTs can not be advanced, since missing capture heartbeat", + zap.Stringer("changefeedID", m.changefeedID), + zap.Int("missingNodeCount", len(missingNodes)), + zap.Strings("missingNodes", nodeIDsToStrings(missingNodes))) + } + newWatermark.UpdateMin(heartbeatpb.Watermark{CheckpointTs: minRedoCheckpointTsForScheduler, ResolvedTs: minRedoCheckpointTsForScheduler}) newWatermark.UpdateMin(heartbeatpb.Watermark{CheckpointTs: minRedoCheckpointTsForBarrier, ResolvedTs: minRedoCheckpointTsForBarrier}) @@ -640,10 +675,12 @@ func (m *Maintainer) calCheckpointTs(ctx context.Context) { return case <-ticker.C: if !m.initialized.Load() { - log.Warn("can not advance checkpointTs since not bootstrapped", - zap.Stringer("changefeedID", m.changefeedID), - zap.Uint64("checkpointTs", m.getWatermark().CheckpointTs), - zap.Uint64("resolvedTs", m.getWatermark().ResolvedTs)) + if shouldLogAdvanceBlockedWarning(&m.lastCheckpointAdvanceBlockedLogTime, progressAdvanceBlockedLogInterval) { + log.Warn("can not advance checkpointTs since not bootstrapped", + zap.Stringer("changefeedID", m.changefeedID), + zap.Uint64("checkpointTs", m.getWatermark().CheckpointTs), + zap.Uint64("resolvedTs", m.getWatermark().ResolvedTs)) + } break } @@ -699,6 +736,7 @@ func (m *Maintainer) calculateNewCheckpointTs() (*heartbeatpb.Watermark, bool) { // Step 2: Apply heartbeat constraints from all nodes updateCheckpointTs := true + missingNodes := make([]node.ID, 0) for _, id := range m.bootstrapper.GetAllNodeIDs() { // maintainer node has the table trigger event dispatcher if id != m.selfNode.ID && m.controller.spanController.GetTaskSizeByNodeID(id) <= 0 { @@ -708,10 +746,7 @@ func (m *Maintainer) calculateNewCheckpointTs() (*heartbeatpb.Watermark, bool) { watermark, ok := m.checkpointTsByCapture.Get(id) if !ok { updateCheckpointTs = false - log.Warn("checkpointTs can not be advanced, since missing capture heartbeat", - zap.Stringer("changefeedID", m.changefeedID), zap.Any("node", id), - zap.Uint64("checkpointTs", m.getWatermark().CheckpointTs), - zap.Uint64("resolvedTs", m.getWatermark().ResolvedTs)) + missingNodes = append(missingNodes, id) continue } // Apply heartbeat constraint - can only make checkpointTs smaller (safer) @@ -719,6 +754,15 @@ func (m *Maintainer) calculateNewCheckpointTs() (*heartbeatpb.Watermark, bool) { } if !updateCheckpointTs { + if len(missingNodes) > 0 && + shouldLogAdvanceBlockedWarning(&m.lastCheckpointAdvanceBlockedLogTime, progressAdvanceBlockedLogInterval) { + log.Warn("checkpointTs can not be advanced, since missing capture heartbeat", + zap.Stringer("changefeedID", m.changefeedID), + zap.Uint64("checkpointTs", m.getWatermark().CheckpointTs), + zap.Uint64("resolvedTs", m.getWatermark().ResolvedTs), + zap.Int("missingNodeCount", len(missingNodes)), + zap.Strings("missingNodes", nodeIDsToStrings(missingNodes))) + } return nil, false } diff --git a/maintainer/maintainer_manager.go b/maintainer/maintainer_manager.go index 469dc31fc1..209ad0f490 100644 --- a/maintainer/maintainer_manager.go +++ b/maintainer/maintainer_manager.go @@ -17,6 +17,7 @@ import ( "context" "encoding/json" "sync" + "sync/atomic" "time" "github.com/pingcap/log" @@ -30,6 +31,21 @@ import ( "go.uber.org/zap" ) +const maintainerManagerWarnLogInterval = 10 * time.Second + +func shouldLogMaintainerManagerWarning(lastLogTime *atomic.Int64, interval time.Duration) bool { + now := time.Now().UnixNano() + for { + last := lastLogTime.Load() + if last != 0 && now-last < interval.Nanoseconds() { + return false + } + if lastLogTime.CompareAndSwap(last, now) { + return true + } + } +} + // Manager is the manager of all changefeed maintainer in a ticdc server, each ticdc server will // start a Manager when the ticdc server is startup. It responsible for: // 1. Handle bootstrap command from coordinator and report all changefeed maintainer status. @@ -51,6 +67,12 @@ type Manager struct { msgCh chan *messaging.TargetMessage taskScheduler threadpool.ThreadPool + + lastUnknownMessageLogTime atomic.Int64 + lastSendCommandErrLogTime atomic.Int64 + lastInvalidCoordinatorLogTime atomic.Int64 + lastSendBootstrapRespErrLogTime atomic.Int64 + lastMaintainerNotFoundLogTime atomic.Int64 } // NewMaintainerManager create a changefeed maintainer manager instance @@ -112,9 +134,11 @@ func (m *Manager) recvMessages(ctx context.Context, msg *messaging.TargetMessage req := msg.Message[0].(*heartbeatpb.RedoResolvedTsProgressMessage) return m.dispatcherMaintainerMessage(ctx, common.NewChangefeedIDFromPB(req.ChangefeedID), msg) default: - log.Warn("unknown message type, ignore it", - zap.String("type", msg.Type.String()), - zap.Any("message", msg.Message)) + if shouldLogMaintainerManagerWarning(&m.lastUnknownMessageLogTime, maintainerManagerWarnLogInterval) { + log.Warn("unknown message type, ignore it", + zap.String("type", msg.Type.String()), + zap.Any("message", msg.Message)) + } } return nil } @@ -164,10 +188,12 @@ func (m *Manager) sendMessages(msg *heartbeatpb.MaintainerHeartbeat) { target := m.newCoordinatorTopicMessage(msg) err := m.mc.SendCommand(target) if err != nil { - log.Warn("send command failed", - zap.Stringer("from", m.nodeInfo.ID), - zap.Stringer("target", target.To), - zap.Error(err)) + if shouldLogMaintainerManagerWarning(&m.lastSendCommandErrLogTime, maintainerManagerWarnLogInterval) { + log.Warn("send command failed", + zap.Stringer("from", m.nodeInfo.ID), + zap.Stringer("target", target.To), + zap.Error(err)) + } } } @@ -184,9 +210,11 @@ func (m *Manager) Close(_ context.Context) error { func (m *Manager) onCoordinatorBootstrapRequest(msg *messaging.TargetMessage) { req := msg.Message[0].(*heartbeatpb.CoordinatorBootstrapRequest) if m.coordinatorVersion > req.Version { - log.Warn("ignore invalid coordinator version", - zap.Int64("coordinatorVersion", m.coordinatorVersion), - zap.Int64("version", req.Version)) + if shouldLogMaintainerManagerWarning(&m.lastInvalidCoordinatorLogTime, maintainerManagerWarnLogInterval) { + log.Warn("ignore invalid coordinator version", + zap.Int64("coordinatorVersion", m.coordinatorVersion), + zap.Int64("version", req.Version)) + } return } m.coordinatorID = msg.From @@ -205,10 +233,12 @@ func (m *Manager) onCoordinatorBootstrapRequest(msg *messaging.TargetMessage) { msg = m.newCoordinatorTopicMessage(response) err := m.mc.SendCommand(msg) if err != nil { - log.Warn("send bootstrap response failed", - zap.Stringer("coordinatorID", m.coordinatorID), - zap.Int64("coordinatorVersion", m.coordinatorVersion), - zap.Error(err)) + if shouldLogMaintainerManagerWarning(&m.lastSendBootstrapRespErrLogTime, maintainerManagerWarnLogInterval) { + log.Warn("send bootstrap response failed", + zap.Stringer("coordinatorID", m.coordinatorID), + zap.Int64("coordinatorVersion", m.coordinatorVersion), + zap.Error(err)) + } } log.Info("new coordinator online, bootstrap response already sent", @@ -247,10 +277,12 @@ func (m *Manager) onRemoveMaintainerRequest(msg *messaging.TargetMessage) *heart maintainer, ok := m.maintainers.Load(changefeedID) if !ok { if !req.Cascade { - log.Warn("ignore remove maintainer request, "+ - "since the maintainer not found", - zap.Stringer("changefeedID", changefeedID), - zap.Any("request", req)) + if shouldLogMaintainerManagerWarning(&m.lastMaintainerNotFoundLogTime, maintainerManagerWarnLogInterval) { + log.Warn("ignore remove maintainer request, "+ + "since the maintainer not found", + zap.Stringer("changefeedID", changefeedID), + zap.Any("request", req)) + } return &heartbeatpb.MaintainerStatus{ ChangefeedID: req.GetId(), State: heartbeatpb.ComponentState_Stopped, @@ -276,10 +308,12 @@ func (m *Manager) onDispatchMaintainerRequest( msg *messaging.TargetMessage, ) *heartbeatpb.MaintainerStatus { if m.coordinatorID != msg.From { - log.Warn("ignore invalid coordinator id", - zap.Any("request", msg), - zap.Any("coordinatorID", m.coordinatorID), - zap.Stringer("from", msg.From)) + if shouldLogMaintainerManagerWarning(&m.lastInvalidCoordinatorLogTime, maintainerManagerWarnLogInterval) { + log.Warn("ignore invalid coordinator id", + zap.Any("request", msg), + zap.Any("coordinatorID", m.coordinatorID), + zap.Stringer("from", msg.From)) + } return nil } switch msg.Type { @@ -289,7 +323,9 @@ func (m *Manager) onDispatchMaintainerRequest( case messaging.TypeRemoveMaintainerRequest: return m.onRemoveMaintainerRequest(msg) default: - log.Warn("unknown message type", zap.Any("message", msg.Message)) + if shouldLogMaintainerManagerWarning(&m.lastUnknownMessageLogTime, maintainerManagerWarnLogInterval) { + log.Warn("unknown message type", zap.Any("message", msg.Message)) + } } return nil } @@ -339,9 +375,11 @@ func (m *Manager) dispatcherMaintainerMessage( ) error { c, ok := m.maintainers.Load(changefeed) if !ok { - log.Warn("maintainer is not found", - zap.Stringer("changefeedID", changefeed), - zap.String("message", msg.String())) + if shouldLogMaintainerManagerWarning(&m.lastMaintainerNotFoundLogTime, maintainerManagerWarnLogInterval) { + log.Warn("maintainer is not found", + zap.Stringer("changefeedID", changefeed), + zap.String("message", msg.String())) + } return nil } select { diff --git a/pkg/common/kv_entry.go b/pkg/common/kv_entry.go index 3296801ec5..5afd9c073d 100644 --- a/pkg/common/kv_entry.go +++ b/pkg/common/kv_entry.go @@ -16,6 +16,8 @@ package common import ( "encoding/binary" "fmt" + + "github.com/pingcap/ticdc/pkg/util" ) // OpType for the kv, delete or put @@ -122,7 +124,7 @@ func (v *RawKVEntry) String() string { // TODO: redact values. return fmt.Sprintf( "OpType: %v, Key: %s, Value: %s, OldValue: %s, StartTs: %d, CRTs: %d, RegionID: %d", - v.OpType, string(v.Key), string(v.Value), string(v.OldValue), v.StartTs, v.CRTs, v.RegionID) + v.OpType, string(v.Key), util.RedactBytes(v.Value), util.RedactBytes(v.OldValue), v.StartTs, v.CRTs, v.RegionID) } // GetSize return the size of the RawKVEntry in bytes diff --git a/pkg/messaging/message_center.go b/pkg/messaging/message_center.go index faa94f555d..683714ad34 100644 --- a/pkg/messaging/message_center.go +++ b/pkg/messaging/message_center.go @@ -172,12 +172,14 @@ func (mc *messageCenter) checkRemoteTarget(ctx context.Context) { mc.remoteTargets.RLock() for _, target := range mc.remoteTargets.m { if err := target.getErr(); err != nil { - log.Warn("remote target error, reset the connection", - zap.Stringer("localID", mc.id), - zap.String("localAddr", mc.addr), - zap.Stringer("remoteID", target.targetId), - zap.String("remoteAddr", target.targetAddr), - zap.Error(err)) + if target.shouldLogConnectionIssue(time.Now(), connectionIssueLogInterval) { + log.Warn("remote target error, reset the connection", + zap.Stringer("localID", mc.id), + zap.String("localAddr", mc.addr), + zap.Stringer("remoteID", target.targetId), + zap.String("remoteAddr", target.targetAddr), + zap.Error(err)) + } target.resetConnect() } } diff --git a/pkg/messaging/remote_target.go b/pkg/messaging/remote_target.go index 51dd973df6..22e385b664 100644 --- a/pkg/messaging/remote_target.go +++ b/pkg/messaging/remote_target.go @@ -17,6 +17,7 @@ import ( "context" "fmt" "sync" + "sync/atomic" "time" "github.com/pingcap/errors" @@ -36,9 +37,10 @@ import ( ) const ( - reconnectInterval = 2 * time.Second - streamTypeEvent = "event" - streamTypeCommand = "command" + reconnectInterval = 2 * time.Second + connectionIssueLogInterval = 10 * time.Second + streamTypeEvent = "event" + streamTypeCommand = "command" eventRecvCh = "eventRecvCh" commandRecvCh = "commandRecvCh" @@ -79,6 +81,8 @@ type remoteMessageTarget struct { errCh chan error + lastConnectionIssueLogTime atomic.Int64 + ctx context.Context cancel context.CancelFunc @@ -113,6 +117,18 @@ func (s *remoteMessageTarget) isReadyToSend() bool { return ready } +func (s *remoteMessageTarget) shouldLogConnectionIssue(now time.Time, interval time.Duration) bool { + for { + last := s.lastConnectionIssueLogTime.Load() + if last != 0 && now.UnixNano()-last < interval.Nanoseconds() { + return false + } + if s.lastConnectionIssueLogTime.CompareAndSwap(last, now.UnixNano()) { + return true + } + } +} + // Send an event message to the remote target func (s *remoteMessageTarget) sendEvent(msg ...*TargetMessage) error { if !s.isReadyToSend() { @@ -219,7 +235,9 @@ func newRemoteMessageTarget( err := rt.connect() if err != nil { - log.Error("Failed to connect to remote target", zap.Error(err)) + if rt.shouldLogConnectionIssue(time.Now(), connectionIssueLogInterval) { + log.Error("Failed to connect to remote target", zap.Error(err)) + } rt.collectErr(err) } @@ -367,7 +385,6 @@ func (s *remoteMessageTarget) connect() error { }) if outerErr != nil { - log.Error("Failed to connect to remote target", zap.Error(outerErr)) return outerErr } @@ -417,7 +434,9 @@ LOOP: // Reconnect err := s.connect() if err != nil { - log.Error("Failed to connect to remote target", zap.Error(err)) + if s.shouldLogConnectionIssue(time.Now(), connectionIssueLogInterval) { + log.Error("Failed to connect to remote target", zap.Error(err)) + } s.collectErr(err) } @@ -505,11 +524,13 @@ func (s *remoteMessageTarget) runSendMessages(ctx context.Context, streamType st case <-ctx.Done(): return ctx.Err() case <-time.After(500 * time.Millisecond): - log.Warn("remote target stream is not ready, wait and check again", - zap.Stringer("localID", s.messageCenterID), - zap.String("localAddr", s.localAddr), - zap.Stringer("remoteID", s.targetId), - zap.String("remoteAddr", s.targetAddr)) + if s.shouldLogConnectionIssue(time.Now(), connectionIssueLogInterval) { + log.Warn("remote target stream is not ready, wait and check again", + zap.Stringer("localID", s.messageCenterID), + zap.String("localAddr", s.localAddr), + zap.Stringer("remoteID", s.targetId), + zap.String("remoteAddr", s.targetAddr)) + } continue } } @@ -581,11 +602,13 @@ func (s *remoteMessageTarget) runReceiveMessages(ctx context.Context, streamType case <-ctx.Done(): return ctx.Err() case <-time.After(500 * time.Millisecond): - log.Warn("remote target stream is not ready, wait and check again", - zap.Stringer("localID", s.messageCenterID), - zap.String("localAddr", s.localAddr), - zap.Stringer("remoteID", s.targetId), - zap.String("remoteAddr", s.targetAddr)) + if s.shouldLogConnectionIssue(time.Now(), connectionIssueLogInterval) { + log.Warn("remote target stream is not ready, wait and check again", + zap.Stringer("localID", s.messageCenterID), + zap.String("localAddr", s.localAddr), + zap.Stringer("remoteID", s.targetId), + zap.String("remoteAddr", s.targetAddr)) + } continue } } diff --git a/server/watcher/module_node_manager.go b/server/watcher/module_node_manager.go index b54bc327a1..d140a94378 100644 --- a/server/watcher/module_node_manager.go +++ b/server/watcher/module_node_manager.go @@ -30,6 +30,8 @@ import ( const NodeManagerName = "node-manager" +const getCoordinatorErrLogInterval = 10 * time.Second + type ( NodeChangeHandler func(map[node.ID]*node.Info) OwnerChangeHandler func(newOwnerKeys string) @@ -43,6 +45,8 @@ type NodeManager struct { coordinatorID atomic.Value nodes atomic.Pointer[map[node.ID]*node.Info] + lastGetCoordinatorErrLogTime atomic.Int64 + nodeChangeHandlers struct { sync.RWMutex m map[node.ID]NodeChangeHandler @@ -79,6 +83,14 @@ func (c *NodeManager) Name() string { return NodeManagerName } +func (c *NodeManager) shouldLogGetCoordinatorErr(now time.Time) bool { + last := c.lastGetCoordinatorErrLogTime.Load() + if last != 0 && now.UnixNano()-last < getCoordinatorErrLogInterval.Nanoseconds() { + return false + } + return c.lastGetCoordinatorErrLogTime.CompareAndSwap(last, now.UnixNano()) +} + // Tick is triggered by the server update events func (c *NodeManager) Tick( _ context.Context, @@ -94,7 +106,9 @@ func (c *NodeManager) Tick( oldCoordinatorID := c.coordinatorID.Load().(string) newCoordinatorID, err := c.etcdClient.GetOwnerID(context.Background()) if err != nil { - log.Warn("get coordinator id failed, will retry in next tick", zap.Error(err)) + if c.shouldLogGetCoordinatorErr(time.Now()) { + log.Warn("get coordinator id failed, will retry in next tick", zap.Error(err)) + } return state, nil }