diff --git a/logservice/logpuller/region_event_handler.go b/logservice/logpuller/region_event_handler.go index 0de2306c47..59c60a5827 100644 --- a/logservice/logpuller/region_event_handler.go +++ b/logservice/logpuller/region_event_handler.go @@ -86,25 +86,6 @@ func (h *regionEventHandler) Path(event regionEvent) SubscriptionID { } func (h *regionEventHandler) Handle(span *subscribedSpan, events ...regionEvent) bool { - startTime := time.Now() - hasEntries := false - hasResolved := false - hasError := false - defer func() { - eventType := "error" - switch { - case hasEntries && hasResolved: - eventType = "mixed" - case hasEntries: - eventType = "entries" - case hasResolved: - eventType = "resolved" - case hasError: - eventType = "error" - } - metrics.SubscriptionClientRegionEventHandleDuration.WithLabelValues(eventType).Observe(time.Since(startTime).Seconds()) - }() - if len(span.kvEventsCache) != 0 { log.Panic("kvEventsCache is not empty", zap.Int("kvEventsCacheLen", len(span.kvEventsCache)), @@ -114,15 +95,12 @@ func (h *regionEventHandler) Handle(span *subscribedSpan, events ...regionEvent) newResolvedTs := uint64(0) for _, event := range events { if len(event.states) == 1 && event.states[0].isStale() { - hasError = true h.handleRegionError(event.states[0]) continue } if event.entries != nil { - hasEntries = true handleEventEntries(span, event.mustFirstState(), event.entries) } else if event.resolvedTs != 0 { - hasResolved = true for _, state := range event.states { resolvedTs := handleResolvedTs(span, state, event.resolvedTs) if resolvedTs > newResolvedTs { @@ -141,17 +119,9 @@ func (h *regionEventHandler) Handle(span *subscribedSpan, events ...regionEvent) if len(span.kvEventsCache) > 0 { metricsEventCount.Add(float64(len(span.kvEventsCache))) await := span.consumeKVEvents(span.kvEventsCache, func() { - start := time.Now() span.clearKVEventsCache() - metrics.SubscriptionClientConsumeKVEventsCallbackDuration.WithLabelValues("clearCache").Observe(time.Since(start).Seconds()) - - start = time.Now() tryAdvanceResolvedTs() - metrics.SubscriptionClientConsumeKVEventsCallbackDuration.WithLabelValues("advanceResolvedTs").Observe(time.Since(start).Seconds()) - - start = time.Now() h.subClient.wakeSubscription(span.subID) - metrics.SubscriptionClientConsumeKVEventsCallbackDuration.WithLabelValues("wakeSubscription").Observe(time.Since(start).Seconds()) }) // if not await, the wake callback will not be called, we need clear the cache manually. if !await { diff --git a/logservice/logpuller/region_req_cache.go b/logservice/logpuller/region_req_cache.go index b4478cab07..c3b3fd2b37 100644 --- a/logservice/logpuller/region_req_cache.go +++ b/logservice/logpuller/region_req_cache.go @@ -109,8 +109,7 @@ func (c *requestCache) add(ctx context.Context, region regionInfo, force bool) ( return false, ctx.Err() case c.pendingQueue <- req: c.pendingCount.Inc() - cost := time.Since(start) - metrics.SubscriptionClientAddRegionRequestDuration.Observe(cost.Seconds()) + metrics.SubscriptionClientAddRegionRequestDuration.Observe(time.Since(start).Seconds()) return true, nil case <-ticker.C: addReqRetryLimit-- diff --git a/logservice/logpuller/region_request_worker.go b/logservice/logpuller/region_request_worker.go index d5fa7ed197..0c201189f2 100644 --- a/logservice/logpuller/region_request_worker.go +++ b/logservice/logpuller/region_request_worker.go @@ -289,7 +289,6 @@ func (s *regionRequestWorker) dispatchRegionChangeEvents(events []*cdcpb.Event) func (s *regionRequestWorker) dispatchResolvedTsEvent(resolvedTsEvent *cdcpb.ResolvedTs) { subscriptionID := SubscriptionID(resolvedTsEvent.RequestId) metricsResolvedTsCount.Add(float64(len(resolvedTsEvent.Regions))) - s.client.metrics.batchResolvedSize.Observe(float64(len(resolvedTsEvent.Regions))) // TODO: resolvedTsEvent.Ts be 0 is impossible, we need find the root cause. if resolvedTsEvent.Ts == 0 { log.Warn("region request worker receives a resolved ts event with zero value, ignore it", diff --git a/logservice/logpuller/region_request_worker_test.go b/logservice/logpuller/region_request_worker_test.go index e711aff283..e59bfdc3aa 100644 --- a/logservice/logpuller/region_request_worker_test.go +++ b/logservice/logpuller/region_request_worker_test.go @@ -22,7 +22,6 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/ticdc/logservice/logpuller/regionlock" "github.com/pingcap/ticdc/utils/dynstream" - "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" "github.com/tikv/client-go/v2/tikv" "google.golang.org/grpc" @@ -150,9 +149,6 @@ func newDispatchResolvedTsTestWorker(regionCount int) (*regionRequestWorker, *mo ds := &mockRegionEventDynamicStream{} worker := ®ionRequestWorker{ client: &subscriptionClient{ - metrics: sharedClientMetrics{ - batchResolvedSize: prometheus.ObserverFunc(func(float64) {}), - }, ds: ds, }, } diff --git a/logservice/logpuller/subscription_client.go b/logservice/logpuller/subscription_client.go index 52552fb72d..708d7467ac 100644 --- a/logservice/logpuller/subscription_client.go +++ b/logservice/logpuller/subscription_client.go @@ -35,7 +35,6 @@ import ( "github.com/pingcap/ticdc/pkg/spanz" "github.com/pingcap/ticdc/pkg/util" "github.com/pingcap/ticdc/utils/dynstream" - "github.com/prometheus/client_golang/prometheus" kvclientv2 "github.com/tikv/client-go/v2/kv" "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/tikv" @@ -158,10 +157,6 @@ type SubscriptionClientConfig struct { RegionRequestWorkerPerStore uint } -type sharedClientMetrics struct { - batchResolvedSize prometheus.Observer -} - // subscriptionClient is used to subscribe events of table ranges from TiKV. // All exported Methods are thread-safe. type SubscriptionClient interface { @@ -186,7 +181,6 @@ type subscriptionClient struct { ctx context.Context cancel context.CancelFunc config *SubscriptionClientConfig - metrics sharedClientMetrics clusterID uint64 pd pd.Client @@ -265,8 +259,6 @@ func NewSubscriptionClient( ds.Start() subClient.ds = ds subClient.cond = sync.NewCond(&subClient.mu) - - subClient.initMetrics() return subClient } @@ -279,11 +271,6 @@ func (s *subscriptionClient) AllocSubscriptionID() SubscriptionID { return SubscriptionID(subscriptionIDGen.Add(1)) } -func (s *subscriptionClient) initMetrics() { - // TODO: fix metrics - s.metrics.batchResolvedSize = metrics.BatchResolvedEventSize.WithLabelValues("event-store") -} - func (s *subscriptionClient) updateMetrics(ctx context.Context) error { ticker := time.NewTicker(10 * time.Second) defer ticker.Stop() diff --git a/pkg/metrics/log_puller.go b/pkg/metrics/log_puller.go index c9c25c2dda..02346d5528 100644 --- a/pkg/metrics/log_puller.go +++ b/pkg/metrics/log_puller.go @@ -85,24 +85,6 @@ var ( Name: "resolve_lock_task_drop_count", Help: "The number of resolve lock tasks dropped before being processed", }) - - SubscriptionClientRegionEventHandleDuration = prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - Namespace: "ticdc", - Subsystem: "subscription_client", - Name: "region_event_handle_duration", - Help: "duration (s) for subscription client to handle region events and build KV cache", - Buckets: prometheus.ExponentialBuckets(0.00004, 2.0, 28), // 40us to 1.5h - }, []string{"type"}) // types: entries, resolved, mixed, error. - - SubscriptionClientConsumeKVEventsCallbackDuration = prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - Namespace: "ticdc", - Subsystem: "subscription_client", - Name: "consume_kv_events_callback_duration", - Help: "duration (s) from calling consumeKVEvents to wake callback execution", - Buckets: prometheus.ExponentialBuckets(0.00004, 2.0, 28), // 40us to 1.5h - }, []string{"type"}) ) func initLogPullerMetrics(registry *prometheus.Registry) { @@ -114,6 +96,4 @@ func initLogPullerMetrics(registry *prometheus.Registry) { registry.MustRegister(RegionRequestFinishScanDuration) registry.MustRegister(SubscriptionClientSubscribedRegionCount) registry.MustRegister(SubscriptionClientResolveLockTaskDropCounter) - registry.MustRegister(SubscriptionClientRegionEventHandleDuration) - registry.MustRegister(SubscriptionClientConsumeKVEventsCallbackDuration) } diff --git a/pkg/metrics/puller.go b/pkg/metrics/puller.go index af416aea8c..f49bda3630 100644 --- a/pkg/metrics/puller.go +++ b/pkg/metrics/puller.go @@ -50,14 +50,6 @@ var ( Name: "channel_size", Help: "size of each channel in kv client", }, []string{"channel"}) - BatchResolvedEventSize = prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - Namespace: "ticdc", - Subsystem: "kvclient", - Name: "batch_resolved_event_size", - Help: "The number of region in one batch resolved ts event", - Buckets: prometheus.ExponentialBuckets(1, 2, 16), - }, []string{"type"}) LockResolveDuration = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Namespace: "ticdc", @@ -108,7 +100,6 @@ func initPullerMetrics(registry *prometheus.Registry) { registry.MustRegister(RegionWorkerProcessDuration) registry.MustRegister(RegionWorkerTotalDuration) registry.MustRegister(EventFeedErrorCounter) - registry.MustRegister(BatchResolvedEventSize) registry.MustRegister(eventSize) registry.MustRegister(PullerEventCounter) registry.MustRegister(clientChannelSize)