From d40852561004dc9811a4fe09345103ad96fd509a Mon Sep 17 00:00:00 2001 From: lidezhu Date: Sun, 5 Apr 2026 16:06:40 +0800 Subject: [PATCH 1/2] remove unnecessary metrics --- logservice/logpuller/region_event_handler.go | 37 ------------------- logservice/logpuller/region_req_cache.go | 3 +- logservice/logpuller/region_request_worker.go | 2 - .../logpuller/region_request_worker_test.go | 4 -- logservice/logpuller/subscription_client.go | 13 ------- pkg/metrics/log_puller.go | 20 ---------- pkg/metrics/puller.go | 17 --------- 7 files changed, 1 insertion(+), 95 deletions(-) diff --git a/logservice/logpuller/region_event_handler.go b/logservice/logpuller/region_event_handler.go index 0de2306c47..acc67ade7f 100644 --- a/logservice/logpuller/region_event_handler.go +++ b/logservice/logpuller/region_event_handler.go @@ -20,7 +20,6 @@ import ( "github.com/pingcap/kvproto/pkg/cdcpb" "github.com/pingcap/log" "github.com/pingcap/ticdc/pkg/common" - "github.com/pingcap/ticdc/pkg/metrics" "github.com/pingcap/ticdc/pkg/spanz" "github.com/pingcap/ticdc/pkg/util" "github.com/pingcap/ticdc/utils/dynstream" @@ -28,11 +27,6 @@ import ( "go.uber.org/zap" ) -var ( - metricsResolvedTsCount = metrics.PullerEventCounter.WithLabelValues("resolved_ts") - metricsEventCount = metrics.PullerEventCounter.WithLabelValues("event") -) - const ( DataGroupEntriesOrResolvedTs = 1 DataGroupError = 2 @@ -86,25 +80,6 @@ func (h *regionEventHandler) Path(event regionEvent) SubscriptionID { } func (h *regionEventHandler) Handle(span *subscribedSpan, events ...regionEvent) bool { - startTime := time.Now() - hasEntries := false - hasResolved := false - hasError := false - defer func() { - eventType := "error" - switch { - case hasEntries && hasResolved: - eventType = "mixed" - case hasEntries: - eventType = "entries" - case hasResolved: - eventType = "resolved" - case hasError: - eventType = "error" - } - metrics.SubscriptionClientRegionEventHandleDuration.WithLabelValues(eventType).Observe(time.Since(startTime).Seconds()) - }() - if len(span.kvEventsCache) != 0 { log.Panic("kvEventsCache is not empty", zap.Int("kvEventsCacheLen", len(span.kvEventsCache)), @@ -114,15 +89,12 @@ func (h *regionEventHandler) Handle(span *subscribedSpan, events ...regionEvent) newResolvedTs := uint64(0) for _, event := range events { if len(event.states) == 1 && event.states[0].isStale() { - hasError = true h.handleRegionError(event.states[0]) continue } if event.entries != nil { - hasEntries = true handleEventEntries(span, event.mustFirstState(), event.entries) } else if event.resolvedTs != 0 { - hasResolved = true for _, state := range event.states { resolvedTs := handleResolvedTs(span, state, event.resolvedTs) if resolvedTs > newResolvedTs { @@ -139,19 +111,10 @@ func (h *regionEventHandler) Handle(span *subscribedSpan, events ...regionEvent) } } if len(span.kvEventsCache) > 0 { - metricsEventCount.Add(float64(len(span.kvEventsCache))) await := span.consumeKVEvents(span.kvEventsCache, func() { - start := time.Now() span.clearKVEventsCache() - metrics.SubscriptionClientConsumeKVEventsCallbackDuration.WithLabelValues("clearCache").Observe(time.Since(start).Seconds()) - - start = time.Now() tryAdvanceResolvedTs() - metrics.SubscriptionClientConsumeKVEventsCallbackDuration.WithLabelValues("advanceResolvedTs").Observe(time.Since(start).Seconds()) - - start = time.Now() h.subClient.wakeSubscription(span.subID) - metrics.SubscriptionClientConsumeKVEventsCallbackDuration.WithLabelValues("wakeSubscription").Observe(time.Since(start).Seconds()) }) // if not await, the wake callback will not be called, we need clear the cache manually. if !await { diff --git a/logservice/logpuller/region_req_cache.go b/logservice/logpuller/region_req_cache.go index b4478cab07..c3b3fd2b37 100644 --- a/logservice/logpuller/region_req_cache.go +++ b/logservice/logpuller/region_req_cache.go @@ -109,8 +109,7 @@ func (c *requestCache) add(ctx context.Context, region regionInfo, force bool) ( return false, ctx.Err() case c.pendingQueue <- req: c.pendingCount.Inc() - cost := time.Since(start) - metrics.SubscriptionClientAddRegionRequestDuration.Observe(cost.Seconds()) + metrics.SubscriptionClientAddRegionRequestDuration.Observe(time.Since(start).Seconds()) return true, nil case <-ticker.C: addReqRetryLimit-- diff --git a/logservice/logpuller/region_request_worker.go b/logservice/logpuller/region_request_worker.go index d5fa7ed197..09d3b7e6f0 100644 --- a/logservice/logpuller/region_request_worker.go +++ b/logservice/logpuller/region_request_worker.go @@ -288,8 +288,6 @@ func (s *regionRequestWorker) dispatchRegionChangeEvents(events []*cdcpb.Event) func (s *regionRequestWorker) dispatchResolvedTsEvent(resolvedTsEvent *cdcpb.ResolvedTs) { subscriptionID := SubscriptionID(resolvedTsEvent.RequestId) - metricsResolvedTsCount.Add(float64(len(resolvedTsEvent.Regions))) - s.client.metrics.batchResolvedSize.Observe(float64(len(resolvedTsEvent.Regions))) // TODO: resolvedTsEvent.Ts be 0 is impossible, we need find the root cause. if resolvedTsEvent.Ts == 0 { log.Warn("region request worker receives a resolved ts event with zero value, ignore it", diff --git a/logservice/logpuller/region_request_worker_test.go b/logservice/logpuller/region_request_worker_test.go index e711aff283..e59bfdc3aa 100644 --- a/logservice/logpuller/region_request_worker_test.go +++ b/logservice/logpuller/region_request_worker_test.go @@ -22,7 +22,6 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/ticdc/logservice/logpuller/regionlock" "github.com/pingcap/ticdc/utils/dynstream" - "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" "github.com/tikv/client-go/v2/tikv" "google.golang.org/grpc" @@ -150,9 +149,6 @@ func newDispatchResolvedTsTestWorker(regionCount int) (*regionRequestWorker, *mo ds := &mockRegionEventDynamicStream{} worker := ®ionRequestWorker{ client: &subscriptionClient{ - metrics: sharedClientMetrics{ - batchResolvedSize: prometheus.ObserverFunc(func(float64) {}), - }, ds: ds, }, } diff --git a/logservice/logpuller/subscription_client.go b/logservice/logpuller/subscription_client.go index 52552fb72d..708d7467ac 100644 --- a/logservice/logpuller/subscription_client.go +++ b/logservice/logpuller/subscription_client.go @@ -35,7 +35,6 @@ import ( "github.com/pingcap/ticdc/pkg/spanz" "github.com/pingcap/ticdc/pkg/util" "github.com/pingcap/ticdc/utils/dynstream" - "github.com/prometheus/client_golang/prometheus" kvclientv2 "github.com/tikv/client-go/v2/kv" "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/tikv" @@ -158,10 +157,6 @@ type SubscriptionClientConfig struct { RegionRequestWorkerPerStore uint } -type sharedClientMetrics struct { - batchResolvedSize prometheus.Observer -} - // subscriptionClient is used to subscribe events of table ranges from TiKV. // All exported Methods are thread-safe. type SubscriptionClient interface { @@ -186,7 +181,6 @@ type subscriptionClient struct { ctx context.Context cancel context.CancelFunc config *SubscriptionClientConfig - metrics sharedClientMetrics clusterID uint64 pd pd.Client @@ -265,8 +259,6 @@ func NewSubscriptionClient( ds.Start() subClient.ds = ds subClient.cond = sync.NewCond(&subClient.mu) - - subClient.initMetrics() return subClient } @@ -279,11 +271,6 @@ func (s *subscriptionClient) AllocSubscriptionID() SubscriptionID { return SubscriptionID(subscriptionIDGen.Add(1)) } -func (s *subscriptionClient) initMetrics() { - // TODO: fix metrics - s.metrics.batchResolvedSize = metrics.BatchResolvedEventSize.WithLabelValues("event-store") -} - func (s *subscriptionClient) updateMetrics(ctx context.Context) error { ticker := time.NewTicker(10 * time.Second) defer ticker.Stop() diff --git a/pkg/metrics/log_puller.go b/pkg/metrics/log_puller.go index c9c25c2dda..02346d5528 100644 --- a/pkg/metrics/log_puller.go +++ b/pkg/metrics/log_puller.go @@ -85,24 +85,6 @@ var ( Name: "resolve_lock_task_drop_count", Help: "The number of resolve lock tasks dropped before being processed", }) - - SubscriptionClientRegionEventHandleDuration = prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - Namespace: "ticdc", - Subsystem: "subscription_client", - Name: "region_event_handle_duration", - Help: "duration (s) for subscription client to handle region events and build KV cache", - Buckets: prometheus.ExponentialBuckets(0.00004, 2.0, 28), // 40us to 1.5h - }, []string{"type"}) // types: entries, resolved, mixed, error. - - SubscriptionClientConsumeKVEventsCallbackDuration = prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - Namespace: "ticdc", - Subsystem: "subscription_client", - Name: "consume_kv_events_callback_duration", - Help: "duration (s) from calling consumeKVEvents to wake callback execution", - Buckets: prometheus.ExponentialBuckets(0.00004, 2.0, 28), // 40us to 1.5h - }, []string{"type"}) ) func initLogPullerMetrics(registry *prometheus.Registry) { @@ -114,6 +96,4 @@ func initLogPullerMetrics(registry *prometheus.Registry) { registry.MustRegister(RegionRequestFinishScanDuration) registry.MustRegister(SubscriptionClientSubscribedRegionCount) registry.MustRegister(SubscriptionClientResolveLockTaskDropCounter) - registry.MustRegister(SubscriptionClientRegionEventHandleDuration) - registry.MustRegister(SubscriptionClientConsumeKVEventsCallbackDuration) } diff --git a/pkg/metrics/puller.go b/pkg/metrics/puller.go index af416aea8c..d1da522364 100644 --- a/pkg/metrics/puller.go +++ b/pkg/metrics/puller.go @@ -36,13 +36,6 @@ var ( Help: "Size of KV events.", Buckets: prometheus.ExponentialBuckets(16, 2, 25), }, []string{"type"}) - PullerEventCounter = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Namespace: "ticdc", - Subsystem: "kvclient", - Name: "pull_event_count", - Help: "event count received by this puller", - }, []string{"type"}) clientChannelSize = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: "ticdc", @@ -50,14 +43,6 @@ var ( Name: "channel_size", Help: "size of each channel in kv client", }, []string{"channel"}) - BatchResolvedEventSize = prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - Namespace: "ticdc", - Subsystem: "kvclient", - Name: "batch_resolved_event_size", - Help: "The number of region in one batch resolved ts event", - Buckets: prometheus.ExponentialBuckets(1, 2, 16), - }, []string{"type"}) LockResolveDuration = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Namespace: "ticdc", @@ -108,9 +93,7 @@ func initPullerMetrics(registry *prometheus.Registry) { registry.MustRegister(RegionWorkerProcessDuration) registry.MustRegister(RegionWorkerTotalDuration) registry.MustRegister(EventFeedErrorCounter) - registry.MustRegister(BatchResolvedEventSize) registry.MustRegister(eventSize) - registry.MustRegister(PullerEventCounter) registry.MustRegister(clientChannelSize) registry.MustRegister(LockResolveDuration) registry.MustRegister(regionWorkerQueueDuration) From 9999bcb3d2dd22117037d04af6e89415d2029afb Mon Sep 17 00:00:00 2001 From: lidezhu Date: Sun, 5 Apr 2026 16:11:04 +0800 Subject: [PATCH 2/2] f --- logservice/logpuller/region_event_handler.go | 7 +++++++ logservice/logpuller/region_request_worker.go | 1 + pkg/metrics/puller.go | 8 ++++++++ 3 files changed, 16 insertions(+) diff --git a/logservice/logpuller/region_event_handler.go b/logservice/logpuller/region_event_handler.go index acc67ade7f..59c60a5827 100644 --- a/logservice/logpuller/region_event_handler.go +++ b/logservice/logpuller/region_event_handler.go @@ -20,6 +20,7 @@ import ( "github.com/pingcap/kvproto/pkg/cdcpb" "github.com/pingcap/log" "github.com/pingcap/ticdc/pkg/common" + "github.com/pingcap/ticdc/pkg/metrics" "github.com/pingcap/ticdc/pkg/spanz" "github.com/pingcap/ticdc/pkg/util" "github.com/pingcap/ticdc/utils/dynstream" @@ -27,6 +28,11 @@ import ( "go.uber.org/zap" ) +var ( + metricsResolvedTsCount = metrics.PullerEventCounter.WithLabelValues("resolved_ts") + metricsEventCount = metrics.PullerEventCounter.WithLabelValues("event") +) + const ( DataGroupEntriesOrResolvedTs = 1 DataGroupError = 2 @@ -111,6 +117,7 @@ func (h *regionEventHandler) Handle(span *subscribedSpan, events ...regionEvent) } } if len(span.kvEventsCache) > 0 { + metricsEventCount.Add(float64(len(span.kvEventsCache))) await := span.consumeKVEvents(span.kvEventsCache, func() { span.clearKVEventsCache() tryAdvanceResolvedTs() diff --git a/logservice/logpuller/region_request_worker.go b/logservice/logpuller/region_request_worker.go index 09d3b7e6f0..0c201189f2 100644 --- a/logservice/logpuller/region_request_worker.go +++ b/logservice/logpuller/region_request_worker.go @@ -288,6 +288,7 @@ func (s *regionRequestWorker) dispatchRegionChangeEvents(events []*cdcpb.Event) func (s *regionRequestWorker) dispatchResolvedTsEvent(resolvedTsEvent *cdcpb.ResolvedTs) { subscriptionID := SubscriptionID(resolvedTsEvent.RequestId) + metricsResolvedTsCount.Add(float64(len(resolvedTsEvent.Regions))) // TODO: resolvedTsEvent.Ts be 0 is impossible, we need find the root cause. if resolvedTsEvent.Ts == 0 { log.Warn("region request worker receives a resolved ts event with zero value, ignore it", diff --git a/pkg/metrics/puller.go b/pkg/metrics/puller.go index d1da522364..f49bda3630 100644 --- a/pkg/metrics/puller.go +++ b/pkg/metrics/puller.go @@ -36,6 +36,13 @@ var ( Help: "Size of KV events.", Buckets: prometheus.ExponentialBuckets(16, 2, 25), }, []string{"type"}) + PullerEventCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "ticdc", + Subsystem: "kvclient", + Name: "pull_event_count", + Help: "event count received by this puller", + }, []string{"type"}) clientChannelSize = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: "ticdc", @@ -94,6 +101,7 @@ func initPullerMetrics(registry *prometheus.Registry) { registry.MustRegister(RegionWorkerTotalDuration) registry.MustRegister(EventFeedErrorCounter) registry.MustRegister(eventSize) + registry.MustRegister(PullerEventCounter) registry.MustRegister(clientChannelSize) registry.MustRegister(LockResolveDuration) registry.MustRegister(regionWorkerQueueDuration)