Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 0 additions & 30 deletions logservice/logpuller/region_event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,25 +86,6 @@ func (h *regionEventHandler) Path(event regionEvent) SubscriptionID {
}

func (h *regionEventHandler) Handle(span *subscribedSpan, events ...regionEvent) bool {
startTime := time.Now()
hasEntries := false
hasResolved := false
hasError := false
defer func() {
eventType := "error"
switch {
case hasEntries && hasResolved:
eventType = "mixed"
case hasEntries:
eventType = "entries"
case hasResolved:
eventType = "resolved"
case hasError:
eventType = "error"
}
metrics.SubscriptionClientRegionEventHandleDuration.WithLabelValues(eventType).Observe(time.Since(startTime).Seconds())
}()

if len(span.kvEventsCache) != 0 {
log.Panic("kvEventsCache is not empty",
zap.Int("kvEventsCacheLen", len(span.kvEventsCache)),
Expand All @@ -114,15 +95,12 @@ func (h *regionEventHandler) Handle(span *subscribedSpan, events ...regionEvent)
newResolvedTs := uint64(0)
for _, event := range events {
if len(event.states) == 1 && event.states[0].isStale() {
hasError = true
h.handleRegionError(event.states[0])
continue
}
if event.entries != nil {
hasEntries = true
handleEventEntries(span, event.mustFirstState(), event.entries)
} else if event.resolvedTs != 0 {
hasResolved = true
for _, state := range event.states {
resolvedTs := handleResolvedTs(span, state, event.resolvedTs)
if resolvedTs > newResolvedTs {
Expand All @@ -141,17 +119,9 @@ func (h *regionEventHandler) Handle(span *subscribedSpan, events ...regionEvent)
if len(span.kvEventsCache) > 0 {
metricsEventCount.Add(float64(len(span.kvEventsCache)))
await := span.consumeKVEvents(span.kvEventsCache, func() {
start := time.Now()
span.clearKVEventsCache()
metrics.SubscriptionClientConsumeKVEventsCallbackDuration.WithLabelValues("clearCache").Observe(time.Since(start).Seconds())

start = time.Now()
tryAdvanceResolvedTs()
metrics.SubscriptionClientConsumeKVEventsCallbackDuration.WithLabelValues("advanceResolvedTs").Observe(time.Since(start).Seconds())

start = time.Now()
h.subClient.wakeSubscription(span.subID)
metrics.SubscriptionClientConsumeKVEventsCallbackDuration.WithLabelValues("wakeSubscription").Observe(time.Since(start).Seconds())
})
// if not await, the wake callback will not be called, we need clear the cache manually.
if !await {
Expand Down
3 changes: 1 addition & 2 deletions logservice/logpuller/region_req_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,7 @@ func (c *requestCache) add(ctx context.Context, region regionInfo, force bool) (
return false, ctx.Err()
case c.pendingQueue <- req:
c.pendingCount.Inc()
cost := time.Since(start)
metrics.SubscriptionClientAddRegionRequestDuration.Observe(cost.Seconds())
metrics.SubscriptionClientAddRegionRequestDuration.Observe(time.Since(start).Seconds())
return true, nil
case <-ticker.C:
addReqRetryLimit--
Expand Down
1 change: 0 additions & 1 deletion logservice/logpuller/region_request_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,6 @@ func (s *regionRequestWorker) dispatchRegionChangeEvents(events []*cdcpb.Event)
func (s *regionRequestWorker) dispatchResolvedTsEvent(resolvedTsEvent *cdcpb.ResolvedTs) {
subscriptionID := SubscriptionID(resolvedTsEvent.RequestId)
metricsResolvedTsCount.Add(float64(len(resolvedTsEvent.Regions)))
s.client.metrics.batchResolvedSize.Observe(float64(len(resolvedTsEvent.Regions)))
// TODO: resolvedTsEvent.Ts be 0 is impossible, we need find the root cause.
if resolvedTsEvent.Ts == 0 {
log.Warn("region request worker receives a resolved ts event with zero value, ignore it",
Expand Down
4 changes: 0 additions & 4 deletions logservice/logpuller/region_request_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/ticdc/logservice/logpuller/regionlock"
"github.com/pingcap/ticdc/utils/dynstream"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/tikv"
"google.golang.org/grpc"
Expand Down Expand Up @@ -150,9 +149,6 @@ func newDispatchResolvedTsTestWorker(regionCount int) (*regionRequestWorker, *mo
ds := &mockRegionEventDynamicStream{}
worker := &regionRequestWorker{
client: &subscriptionClient{
metrics: sharedClientMetrics{
batchResolvedSize: prometheus.ObserverFunc(func(float64) {}),
},
ds: ds,
},
}
Expand Down
13 changes: 0 additions & 13 deletions logservice/logpuller/subscription_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
"github.com/pingcap/ticdc/pkg/spanz"
"github.com/pingcap/ticdc/pkg/util"
"github.com/pingcap/ticdc/utils/dynstream"
"github.com/prometheus/client_golang/prometheus"
kvclientv2 "github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikv"
Expand Down Expand Up @@ -158,10 +157,6 @@ type SubscriptionClientConfig struct {
RegionRequestWorkerPerStore uint
}

type sharedClientMetrics struct {
batchResolvedSize prometheus.Observer
}

// subscriptionClient is used to subscribe events of table ranges from TiKV.
// All exported Methods are thread-safe.
type SubscriptionClient interface {
Expand All @@ -186,7 +181,6 @@ type subscriptionClient struct {
ctx context.Context
cancel context.CancelFunc
config *SubscriptionClientConfig
metrics sharedClientMetrics
clusterID uint64

pd pd.Client
Expand Down Expand Up @@ -265,8 +259,6 @@ func NewSubscriptionClient(
ds.Start()
subClient.ds = ds
subClient.cond = sync.NewCond(&subClient.mu)

subClient.initMetrics()
return subClient
}

Expand All @@ -279,11 +271,6 @@ func (s *subscriptionClient) AllocSubscriptionID() SubscriptionID {
return SubscriptionID(subscriptionIDGen.Add(1))
}

func (s *subscriptionClient) initMetrics() {
// TODO: fix metrics
s.metrics.batchResolvedSize = metrics.BatchResolvedEventSize.WithLabelValues("event-store")
}

func (s *subscriptionClient) updateMetrics(ctx context.Context) error {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
Expand Down
20 changes: 0 additions & 20 deletions pkg/metrics/log_puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,24 +85,6 @@ var (
Name: "resolve_lock_task_drop_count",
Help: "The number of resolve lock tasks dropped before being processed",
})

SubscriptionClientRegionEventHandleDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "ticdc",
Subsystem: "subscription_client",
Name: "region_event_handle_duration",
Help: "duration (s) for subscription client to handle region events and build KV cache",
Buckets: prometheus.ExponentialBuckets(0.00004, 2.0, 28), // 40us to 1.5h
}, []string{"type"}) // types: entries, resolved, mixed, error.

SubscriptionClientConsumeKVEventsCallbackDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "ticdc",
Subsystem: "subscription_client",
Name: "consume_kv_events_callback_duration",
Help: "duration (s) from calling consumeKVEvents to wake callback execution",
Buckets: prometheus.ExponentialBuckets(0.00004, 2.0, 28), // 40us to 1.5h
}, []string{"type"})
)

func initLogPullerMetrics(registry *prometheus.Registry) {
Expand All @@ -114,6 +96,4 @@ func initLogPullerMetrics(registry *prometheus.Registry) {
registry.MustRegister(RegionRequestFinishScanDuration)
registry.MustRegister(SubscriptionClientSubscribedRegionCount)
registry.MustRegister(SubscriptionClientResolveLockTaskDropCounter)
registry.MustRegister(SubscriptionClientRegionEventHandleDuration)
registry.MustRegister(SubscriptionClientConsumeKVEventsCallbackDuration)
}
9 changes: 0 additions & 9 deletions pkg/metrics/puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,6 @@ var (
Name: "channel_size",
Help: "size of each channel in kv client",
}, []string{"channel"})
BatchResolvedEventSize = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "ticdc",
Subsystem: "kvclient",
Name: "batch_resolved_event_size",
Help: "The number of region in one batch resolved ts event",
Buckets: prometheus.ExponentialBuckets(1, 2, 16),
}, []string{"type"})
LockResolveDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "ticdc",
Expand Down Expand Up @@ -108,7 +100,6 @@ func initPullerMetrics(registry *prometheus.Registry) {
registry.MustRegister(RegionWorkerProcessDuration)
registry.MustRegister(RegionWorkerTotalDuration)
registry.MustRegister(EventFeedErrorCounter)
registry.MustRegister(BatchResolvedEventSize)
registry.MustRegister(eventSize)
registry.MustRegister(PullerEventCounter)
registry.MustRegister(clientChannelSize)
Expand Down
Loading