diff --git a/logservice/logpuller/region_event_handler.go b/logservice/logpuller/region_event_handler.go index 0de2306c47..81dd99988e 100644 --- a/logservice/logpuller/region_event_handler.go +++ b/logservice/logpuller/region_event_handler.go @@ -252,6 +252,7 @@ func (h *regionEventHandler) handleRegionError(state *regionFeedState) { func handleEventEntries(span *subscribedSpan, state *regionFeedState, entries *cdcpb.Event_Entries_) { regionID, _, _ := state.getRegionMeta() + state.updateRuntimeLastEvent(time.Now()) assembleRowEvent := func(regionID uint64, entry *cdcpb.Event_Row) common.RawKVEntry { var opType common.OpType switch entry.GetOpType() { @@ -277,6 +278,7 @@ func handleEventEntries(span *subscribedSpan, state *regionFeedState, entries *c switch entry.Type { case cdcpb.Event_INITIALIZED: state.setInitialized() + state.markRuntimeReplicating(time.Now()) log.Debug("region is initialized", zap.Int64("tableID", span.span.TableID), zap.Uint64("regionID", regionID), @@ -364,6 +366,7 @@ func handleResolvedTs(span *subscribedSpan, state *regionFeedState, resolvedTs u } state.updateResolvedTs(resolvedTs) + state.updateRuntimeResolvedTs(resolvedTs, time.Now()) ts := uint64(0) shouldAdvance := false diff --git a/logservice/logpuller/region_request_worker.go b/logservice/logpuller/region_request_worker.go index d5fa7ed197..29b84248be 100644 --- a/logservice/logpuller/region_request_worker.go +++ b/logservice/logpuller/region_request_worker.go @@ -61,6 +61,25 @@ type regionRequestWorker struct { } } +func (s *regionRequestWorker) runtimeRegistry() *regionRuntimeRegistry { + if s.client == nil { + return nil + } + return s.client.regionRuntimeRegistry +} + +func (s *regionRequestWorker) markRegionRuntimeEnqueued(region regionInfo, now time.Time) { + if registry := s.runtimeRegistry(); registry != nil && region.runtimeKey.isValid() { + registry.setRequestEnqueueTime(region.runtimeKey, now) + } +} + +func (s *regionRequestWorker) markRegionRuntimeSent(region regionInfo, now time.Time) { + if registry := s.runtimeRegistry(); registry != nil && region.runtimeKey.isValid() { + registry.markWaitInitialized(region.runtimeKey, s.workerID, now) + } +} + func newRegionRequestWorker( ctx context.Context, client *subscriptionClient, @@ -431,6 +450,7 @@ func (s *regionRequestWorker) processRegionSendTask( // sentRequests visible in the same order and avoids leaving stale // requests in cleanup. s.requestCache.markSent(regionReq) + s.markRegionRuntimeSent(region, time.Now()) if err := doSend(s.createRegionRequest(region)); err != nil { state.markStopped(err) return err @@ -511,7 +531,11 @@ func (s *regionRequestWorker) clearRegionStates() map[SubscriptionID]regionFeedS // add adds a region request to the worker's cache // It blocks if the cache is full until there's space or ctx is cancelled func (s *regionRequestWorker) add(ctx context.Context, region regionInfo, force bool) (bool, error) { - return s.requestCache.add(ctx, region, force) + ok, err := s.requestCache.add(ctx, region, force) + if ok && err == nil { + s.markRegionRuntimeEnqueued(region, time.Now()) + } + return ok, err } func (s *regionRequestWorker) clearPendingRegions() []regionInfo { diff --git a/logservice/logpuller/region_runtime.go b/logservice/logpuller/region_runtime.go new file mode 100644 index 0000000000..7233121e91 --- /dev/null +++ b/logservice/logpuller/region_runtime.go @@ -0,0 +1,389 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package logpuller + +import ( + "sort" + "sync" + "time" + + "github.com/pingcap/ticdc/heartbeatpb" + "github.com/tikv/client-go/v2/tikv" +) + +type regionPhase string + +const ( + regionPhaseUnknown regionPhase = "unknown" + regionPhaseDiscovered regionPhase = "discovered" + regionPhaseRangeLockWait regionPhase = "range_lock_wait" + regionPhaseQueued regionPhase = "queued" + regionPhaseRPCReady regionPhase = "rpc_ready" + regionPhaseWaitInitialized regionPhase = "wait_initialized" + regionPhaseReplicating regionPhase = "replicating" + regionPhaseRetryPending regionPhase = "retry_pending" + regionPhaseRemoved regionPhase = "removed" +) + +var regionRuntimePhases = []regionPhase{ + regionPhaseUnknown, + regionPhaseDiscovered, + regionPhaseRangeLockWait, + regionPhaseQueued, + regionPhaseRPCReady, + regionPhaseWaitInitialized, + regionPhaseReplicating, + regionPhaseRetryPending, + regionPhaseRemoved, +} + +type regionRuntimeIdentity struct { + subID SubscriptionID + regionID uint64 +} + +type regionRuntimeKey struct { + subID SubscriptionID + regionID uint64 + generation uint64 +} + +func (k regionRuntimeKey) isValid() bool { + return k.subID != InvalidSubscriptionID && k.regionID != 0 && k.generation != 0 +} + +type regionRuntimeState struct { + key regionRuntimeKey + + tableID int64 + span heartbeatpb.TableSpan + verID tikv.RegionVerID + + leaderStoreID uint64 + leaderPeerID uint64 + storeAddr string + workerID uint64 + + phase regionPhase + phaseEnterTime time.Time + + lastEventTime time.Time + lastResolvedTs uint64 + + lastError string + lastErrorTime time.Time + retryCount int + + // phaseEnterTime records the current lifecycle phase boundary. + // The fields below keep distinct historical milestones that are still useful + // after the phase moves forward. + rangeLockAcquiredTime time.Time + requestEnqueueTime time.Time + requestRPCReadyTime time.Time + requestSendTime time.Time + initializedTime time.Time +} + +func (s regionRuntimeState) clone() regionRuntimeState { + s.span = cloneTableSpan(s.span) + return s +} + +func (s *regionRuntimeState) applyRegionInfo(region regionInfo) { + if region.subscribedSpan != nil { + s.tableID = region.subscribedSpan.span.TableID + } + s.span = cloneTableSpan(region.span) + s.verID = region.verID + if region.rpcCtx == nil { + return + } + + s.storeAddr = region.rpcCtx.Addr + if region.rpcCtx.Peer != nil { + s.leaderPeerID = region.rpcCtx.Peer.Id + s.leaderStoreID = region.rpcCtx.Peer.StoreId + } +} + +func cloneTableSpan(span heartbeatpb.TableSpan) heartbeatpb.TableSpan { + cloned := span + if len(span.StartKey) > 0 { + cloned.StartKey = append([]byte(nil), span.StartKey...) + } + if len(span.EndKey) > 0 { + cloned.EndKey = append([]byte(nil), span.EndKey...) + } + return cloned +} + +type regionRuntimeRegistry struct { + mu sync.RWMutex + + states map[regionRuntimeKey]*regionRuntimeState + generations map[regionRuntimeIdentity]uint64 +} + +func newRegionRuntimeRegistry() *regionRuntimeRegistry { + return ®ionRuntimeRegistry{ + states: make(map[regionRuntimeKey]*regionRuntimeState), + generations: make(map[regionRuntimeIdentity]uint64), + } +} + +func (r *regionRuntimeRegistry) allocKey(subID SubscriptionID, regionID uint64) regionRuntimeKey { + r.mu.Lock() + defer r.mu.Unlock() + + identity := regionRuntimeIdentity{subID: subID, regionID: regionID} + r.generations[identity]++ + return regionRuntimeKey{ + subID: subID, + regionID: regionID, + generation: r.generations[identity], + } +} + +func (r *regionRuntimeRegistry) upsert( + key regionRuntimeKey, + update func(*regionRuntimeState), +) regionRuntimeState { + r.mu.Lock() + defer r.mu.Unlock() + + state, ok := r.states[key] + if !ok { + state = ®ionRuntimeState{ + key: key, + phase: regionPhaseUnknown, + } + r.states[key] = state + } + if update != nil { + update(state) + } + return state.clone() +} + +func (r *regionRuntimeRegistry) transition( + key regionRuntimeKey, + phase regionPhase, + phaseEnterTime time.Time, +) regionRuntimeState { + return r.upsert(key, func(state *regionRuntimeState) { + state.phase = phase + state.phaseEnterTime = phaseEnterTime + }) +} + +func (r *regionRuntimeRegistry) updateRegionInfo( + key regionRuntimeKey, + region regionInfo, +) regionRuntimeState { + return r.upsert(key, func(state *regionRuntimeState) { + state.applyRegionInfo(region) + }) +} + +func (r *regionRuntimeRegistry) registerRegion( + key regionRuntimeKey, + region regionInfo, + discoveredTime time.Time, +) regionRuntimeState { + return r.upsert(key, func(state *regionRuntimeState) { + state.applyRegionInfo(region) + state.phase = regionPhaseDiscovered + state.phaseEnterTime = discoveredTime + }) +} + +func (r *regionRuntimeRegistry) updateLastEvent( + key regionRuntimeKey, + lastEventTime time.Time, +) regionRuntimeState { + return r.upsert(key, func(state *regionRuntimeState) { + state.lastEventTime = lastEventTime + }) +} + +func (r *regionRuntimeRegistry) updateResolvedTs( + key regionRuntimeKey, + resolvedTs uint64, + lastEventTime time.Time, +) regionRuntimeState { + return r.upsert(key, func(state *regionRuntimeState) { + state.lastResolvedTs = resolvedTs + if !lastEventTime.IsZero() { + state.lastEventTime = lastEventTime + } + }) +} + +func (r *regionRuntimeRegistry) recordError( + key regionRuntimeKey, + err error, + errTime time.Time, +) regionRuntimeState { + return r.upsert(key, func(state *regionRuntimeState) { + if err == nil { + state.lastError = "" + } else { + state.lastError = err.Error() + } + state.lastErrorTime = errTime + }) +} + +func (r *regionRuntimeRegistry) markRetryPending( + key regionRuntimeKey, + err error, + retryTime time.Time, +) regionRuntimeState { + return r.upsert(key, func(state *regionRuntimeState) { + if err == nil { + state.lastError = "" + } else { + state.lastError = err.Error() + } + state.lastErrorTime = retryTime + state.retryCount++ + state.phase = regionPhaseRetryPending + state.phaseEnterTime = retryTime + }) +} + +func (r *regionRuntimeRegistry) setRequestEnqueueTime( + key regionRuntimeKey, + enqueueTime time.Time, +) regionRuntimeState { + return r.upsert(key, func(state *regionRuntimeState) { + state.requestEnqueueTime = enqueueTime + }) +} + +func (r *regionRuntimeRegistry) markQueued( + key regionRuntimeKey, + acquiredTime time.Time, + queuedTime time.Time, +) regionRuntimeState { + return r.upsert(key, func(state *regionRuntimeState) { + state.rangeLockAcquiredTime = acquiredTime + state.phase = regionPhaseQueued + state.phaseEnterTime = queuedTime + }) +} + +func (r *regionRuntimeRegistry) markRPCReady( + key regionRuntimeKey, + rpcReadyTime time.Time, +) regionRuntimeState { + return r.upsert(key, func(state *regionRuntimeState) { + state.requestRPCReadyTime = rpcReadyTime + state.phase = regionPhaseRPCReady + state.phaseEnterTime = rpcReadyTime + }) +} + +func (r *regionRuntimeRegistry) markWaitInitialized( + key regionRuntimeKey, + workerID uint64, + sendTime time.Time, +) regionRuntimeState { + return r.upsert(key, func(state *regionRuntimeState) { + state.workerID = workerID + state.requestSendTime = sendTime + state.phase = regionPhaseWaitInitialized + state.phaseEnterTime = sendTime + }) +} + +func (r *regionRuntimeRegistry) markReplicating( + key regionRuntimeKey, + initializedTime time.Time, +) regionRuntimeState { + return r.upsert(key, func(state *regionRuntimeState) { + state.initializedTime = initializedTime + state.phase = regionPhaseReplicating + state.phaseEnterTime = initializedTime + }) +} + +func (r *regionRuntimeRegistry) get(key regionRuntimeKey) (regionRuntimeState, bool) { + r.mu.RLock() + defer r.mu.RUnlock() + + state, ok := r.states[key] + if !ok { + return regionRuntimeState{}, false + } + return state.clone(), true +} + +func (r *regionRuntimeRegistry) snapshot() []regionRuntimeState { + r.mu.RLock() + defer r.mu.RUnlock() + + snapshots := make([]regionRuntimeState, 0, len(r.states)) + for _, state := range r.states { + snapshots = append(snapshots, state.clone()) + } + sort.Slice(snapshots, func(i, j int) bool { + left, right := snapshots[i].key, snapshots[j].key + if left.subID != right.subID { + return left.subID < right.subID + } + if left.regionID != right.regionID { + return left.regionID < right.regionID + } + return left.generation < right.generation + }) + return snapshots +} + +func (r *regionRuntimeRegistry) phaseCounts() map[regionPhase]int { + r.mu.RLock() + defer r.mu.RUnlock() + + counts := make(map[regionPhase]int, len(r.states)) + for _, state := range r.states { + counts[state.phase]++ + } + return counts +} + +func (r *regionRuntimeRegistry) remove(key regionRuntimeKey) bool { + r.mu.Lock() + defer r.mu.Unlock() + + if _, ok := r.states[key]; !ok { + return false + } + delete(r.states, key) + return true +} + +func (r *regionRuntimeRegistry) removeBySubscription(subID SubscriptionID) int { + r.mu.Lock() + defer r.mu.Unlock() + + removed := 0 + for key := range r.states { + if key.subID != subID { + continue + } + delete(r.states, key) + removed++ + } + return removed +} diff --git a/logservice/logpuller/region_runtime_integration_test.go b/logservice/logpuller/region_runtime_integration_test.go new file mode 100644 index 0000000000..854a1b5733 --- /dev/null +++ b/logservice/logpuller/region_runtime_integration_test.go @@ -0,0 +1,238 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package logpuller + +import ( + "context" + "testing" + "time" + + "github.com/pingcap/kvproto/pkg/cdcpb" + "github.com/pingcap/kvproto/pkg/errorpb" + "github.com/pingcap/ticdc/heartbeatpb" + "github.com/pingcap/ticdc/logservice/logpuller/regionlock" + "github.com/pingcap/ticdc/pkg/common" + "github.com/pingcap/ticdc/pkg/pdutil" + "github.com/stretchr/testify/require" + "github.com/tikv/client-go/v2/tikv" +) + +func TestScheduleRegionRequestUpdatesRuntimeRegistry(t *testing.T) { + client := &subscriptionClient{ + regionTaskQueue: NewPriorityQueue(), + regionRuntimeRegistry: newRegionRuntimeRegistry(), + pdClock: pdutil.NewClock4Test(), + } + + rawSpan := heartbeatpb.TableSpan{ + TableID: 1, + StartKey: []byte{'a'}, + EndKey: []byte{'z'}, + } + consumeKVEvents := func(_ []common.RawKVEntry, _ func()) bool { return false } + advanceResolvedTs := func(uint64) {} + subSpan := client.newSubscribedSpan(SubscriptionID(1), rawSpan, 100, consumeKVEvents, advanceResolvedTs, 0, false) + + regionSpan := heartbeatpb.TableSpan{ + TableID: 1, + StartKey: []byte{'b'}, + EndKey: []byte{'c'}, + } + region := newRegionInfo(tikv.NewRegionVerID(10, 1, 1), regionSpan, nil, subSpan, false) + + client.scheduleRegionRequest(context.Background(), region, TaskLowPrior) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + task, err := client.regionTaskQueue.Pop(ctx) + require.NoError(t, err) + queued := task.GetRegionInfo() + require.True(t, queued.runtimeKey.isValid()) + + state, ok := client.regionRuntimeRegistry.get(queued.runtimeKey) + require.True(t, ok) + require.Equal(t, regionPhaseQueued, state.phase) + require.Equal(t, uint64(10), state.verID.GetID()) + require.False(t, state.rangeLockAcquiredTime.IsZero()) +} + +func TestOnRegionFailUpdatesRuntimeRegistry(t *testing.T) { + client := &subscriptionClient{ + regionRuntimeRegistry: newRegionRuntimeRegistry(), + errCache: newErrCache(), + } + client.ctx, client.cancel = context.WithCancel(context.Background()) + defer client.cancel() + + rawSpan := heartbeatpb.TableSpan{ + TableID: 1, + StartKey: []byte{'a'}, + EndKey: []byte{'z'}, + } + consumeKVEvents := func(_ []common.RawKVEntry, _ func()) bool { return false } + advanceResolvedTs := func(uint64) {} + subSpan := client.newSubscribedSpan(SubscriptionID(1), rawSpan, 100, consumeKVEvents, advanceResolvedTs, 0, false) + + lockRes := subSpan.rangeLock.LockRange(context.Background(), []byte{'b'}, []byte{'c'}, 10, 1) + require.Equal(t, regionlock.LockRangeStatusSuccess, lockRes.Status) + + region := newRegionInfo(tikv.NewRegionVerID(10, 1, 1), heartbeatpb.TableSpan{ + TableID: 1, + StartKey: []byte{'b'}, + EndKey: []byte{'c'}, + }, nil, subSpan, false) + region.lockedRangeState = lockRes.LockedRangeState + + client.ensureRegionRuntime(®ion, time.Now()) + require.True(t, region.runtimeKey.isValid()) + + client.onRegionFail(newRegionErrorInfo(region, &sendRequestToStoreErr{})) + + state, ok := client.regionRuntimeRegistry.get(region.runtimeKey) + require.True(t, ok) + require.Equal(t, regionPhaseDiscovered, state.phase) + require.Equal(t, "send request to store error", state.lastError) + require.Equal(t, 0, state.retryCount) +} + +func TestHandleResolvedTsUpdatesRuntimeRegistry(t *testing.T) { + client := &subscriptionClient{ + regionRuntimeRegistry: newRegionRuntimeRegistry(), + } + worker := ®ionRequestWorker{client: client} + + rawSpan := heartbeatpb.TableSpan{ + TableID: 1, + StartKey: []byte{'a'}, + EndKey: []byte{'z'}, + } + subSpan := &subscribedSpan{ + subID: SubscriptionID(1), + startTs: 100, + span: rawSpan, + rangeLock: regionlock.NewRangeLock(1, rawSpan.StartKey, rawSpan.EndKey, 100), + filterLoop: false, + } + subSpan.resolvedTs.Store(100) + subSpan.resolvedTsUpdated.Store(time.Now().Unix()) + subSpan.advanceInterval = 0 + + lockRes := subSpan.rangeLock.LockRange(context.Background(), rawSpan.StartKey, rawSpan.EndKey, 10, 1) + require.Equal(t, regionlock.LockRangeStatusSuccess, lockRes.Status) + lockRes.LockedRangeState.Initialized.Store(true) + + region := newRegionInfo(tikv.NewRegionVerID(10, 1, 1), rawSpan, nil, subSpan, false) + region.lockedRangeState = lockRes.LockedRangeState + region.runtimeKey = client.regionRuntimeRegistry.allocKey(subSpan.subID, region.verID.GetID()) + client.regionRuntimeRegistry.updateRegionInfo(region.runtimeKey, region) + + state := newRegionFeedState(region, uint64(subSpan.subID), worker) + state.start() + + resolvedTs := uint64(200) + handleResolvedTs(subSpan, state, resolvedTs) + + stored, ok := client.regionRuntimeRegistry.get(region.runtimeKey) + require.True(t, ok) + require.Equal(t, resolvedTs, stored.lastResolvedTs) + require.False(t, stored.lastEventTime.IsZero()) +} + +func TestDoHandleErrorMarksRetryPendingForRetryableRegionError(t *testing.T) { + client := &subscriptionClient{ + regionRuntimeRegistry: newRegionRuntimeRegistry(), + regionTaskQueue: NewPriorityQueue(), + pdClock: pdutil.NewClock4Test(), + } + + rawSpan := heartbeatpb.TableSpan{ + TableID: 1, + StartKey: []byte{'a'}, + EndKey: []byte{'z'}, + } + consumeKVEvents := func(_ []common.RawKVEntry, _ func()) bool { return false } + advanceResolvedTs := func(uint64) {} + subSpan := client.newSubscribedSpan(SubscriptionID(1), rawSpan, 100, consumeKVEvents, advanceResolvedTs, 0, false) + + region := newRegionInfo(tikv.NewRegionVerID(10, 1, 1), rawSpan, nil, subSpan, false) + client.ensureRegionRuntime(®ion, time.Now()) + + err := client.doHandleError(context.Background(), newRegionErrorInfo(region, &eventError{ + err: &cdcpb.Error{ServerIsBusy: &errorpb.ServerIsBusy{Reason: "busy"}}, + })) + require.NoError(t, err) + + state, ok := client.regionRuntimeRegistry.get(region.runtimeKey) + require.True(t, ok) + require.Equal(t, regionPhaseQueued, state.phase) + require.Equal(t, 1, state.retryCount) + require.Contains(t, state.lastError, "server_is_busy") +} + +func TestDoHandleErrorRemovesRuntimeForRangeReload(t *testing.T) { + client := &subscriptionClient{ + regionRuntimeRegistry: newRegionRuntimeRegistry(), + rangeTaskCh: make(chan rangeTask, 1), + } + + rawSpan := heartbeatpb.TableSpan{ + TableID: 1, + StartKey: []byte{'a'}, + EndKey: []byte{'z'}, + } + consumeKVEvents := func(_ []common.RawKVEntry, _ func()) bool { return false } + advanceResolvedTs := func(uint64) {} + subSpan := client.newSubscribedSpan(SubscriptionID(1), rawSpan, 100, consumeKVEvents, advanceResolvedTs, 0, false) + + region := newRegionInfo(tikv.NewRegionVerID(10, 1, 1), rawSpan, nil, subSpan, false) + client.ensureRegionRuntime(®ion, time.Now()) + + err := client.doHandleError(context.Background(), newRegionErrorInfo(region, &rpcCtxUnavailableErr{verID: region.verID})) + require.NoError(t, err) + + _, ok := client.regionRuntimeRegistry.get(region.runtimeKey) + require.False(t, ok) + + select { + case task := <-client.rangeTaskCh: + require.Equal(t, rawSpan, task.span) + require.Equal(t, subSpan, task.subscribedSpan) + case <-time.After(time.Second): + t.Fatal("expected range task to be scheduled") + } +} + +func TestDoHandleErrorRemovesRuntimeForCancelledRequest(t *testing.T) { + client := &subscriptionClient{ + regionRuntimeRegistry: newRegionRuntimeRegistry(), + } + + rawSpan := heartbeatpb.TableSpan{ + TableID: 1, + StartKey: []byte{'a'}, + EndKey: []byte{'z'}, + } + consumeKVEvents := func(_ []common.RawKVEntry, _ func()) bool { return false } + advanceResolvedTs := func(uint64) {} + subSpan := client.newSubscribedSpan(SubscriptionID(1), rawSpan, 100, consumeKVEvents, advanceResolvedTs, 0, false) + + region := newRegionInfo(tikv.NewRegionVerID(10, 1, 1), rawSpan, nil, subSpan, false) + client.ensureRegionRuntime(®ion, time.Now()) + + err := client.doHandleError(context.Background(), newRegionErrorInfo(region, &requestCancelledErr{})) + require.NoError(t, err) + + _, ok := client.regionRuntimeRegistry.get(region.runtimeKey) + require.False(t, ok) +} diff --git a/logservice/logpuller/region_runtime_test.go b/logservice/logpuller/region_runtime_test.go new file mode 100644 index 0000000000..919d51e521 --- /dev/null +++ b/logservice/logpuller/region_runtime_test.go @@ -0,0 +1,153 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package logpuller + +import ( + "errors" + "testing" + "time" + + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/ticdc/heartbeatpb" + "github.com/stretchr/testify/require" + "github.com/tikv/client-go/v2/tikv" +) + +func TestRegionRuntimeRegistryAllocKey(t *testing.T) { + registry := newRegionRuntimeRegistry() + + key1 := registry.allocKey(1, 101) + key2 := registry.allocKey(1, 101) + key3 := registry.allocKey(1, 102) + + require.Equal(t, regionRuntimeKey{subID: 1, regionID: 101, generation: 1}, key1) + require.Equal(t, regionRuntimeKey{subID: 1, regionID: 101, generation: 2}, key2) + require.Equal(t, regionRuntimeKey{subID: 1, regionID: 102, generation: 1}, key3) +} + +func TestRegionRuntimeRegistryUpdateAndSnapshot(t *testing.T) { + registry := newRegionRuntimeRegistry() + key := registry.allocKey(1, 101) + now := time.Unix(1700000000, 0) + + subSpan := &subscribedSpan{ + subID: 1, + span: heartbeatpb.TableSpan{ + TableID: 42, + StartKey: []byte("a"), + EndKey: []byte("z"), + }, + } + region := regionInfo{ + verID: tikv.NewRegionVerID(101, 2, 3), + span: heartbeatpb.TableSpan{ + TableID: 42, + StartKey: []byte("b"), + EndKey: []byte("c"), + }, + rpcCtx: &tikv.RPCContext{ + Addr: "tikv-1:20160", + Peer: &metapb.Peer{Id: 11, StoreId: 22}, + }, + subscribedSpan: subSpan, + } + + registry.registerRegion(key, region, now) + registry.setRequestEnqueueTime(key, now.Add(500*time.Millisecond)) + registry.markQueued(key, now.Add(time.Second), now.Add(2*time.Second)) + registry.markWaitInitialized(key, 7, now.Add(3*time.Second)) + registry.updateResolvedTs(key, 12345, now.Add(4*time.Second)) + registry.recordError(key, errors.New("store busy"), now.Add(5*time.Second)) + + state, ok := registry.get(key) + require.True(t, ok) + require.Equal(t, int64(42), state.tableID) + require.Equal(t, regionPhaseWaitInitialized, state.phase) + require.Equal(t, uint64(22), state.leaderStoreID) + require.Equal(t, uint64(11), state.leaderPeerID) + require.Equal(t, "tikv-1:20160", state.storeAddr) + require.Equal(t, uint64(7), state.workerID) + require.Equal(t, uint64(12345), state.lastResolvedTs) + require.Equal(t, "store busy", state.lastError) + require.Equal(t, 0, state.retryCount) + require.Equal(t, now.Add(time.Second), state.rangeLockAcquiredTime) + require.Equal(t, now.Add(3*time.Second), state.phaseEnterTime) + require.Equal(t, now.Add(3*time.Second), state.requestSendTime) + require.Equal(t, now.Add(500*time.Millisecond), state.requestEnqueueTime) + + snapshots := registry.snapshot() + require.Len(t, snapshots, 1) + require.Equal(t, state, snapshots[0]) + + snapshots[0].span.StartKey[0] = 'x' + updated, ok := registry.get(key) + require.True(t, ok) + require.Equal(t, []byte("b"), updated.span.StartKey) +} + +func TestRegionRuntimeRegistryMarkReplicating(t *testing.T) { + registry := newRegionRuntimeRegistry() + key := registry.allocKey(1, 101) + now := time.Unix(1700000100, 0) + + registry.markReplicating(key, now) + + state, ok := registry.get(key) + require.True(t, ok) + require.Equal(t, regionPhaseReplicating, state.phase) + require.Equal(t, now, state.initializedTime) + require.Equal(t, now, state.phaseEnterTime) +} + +func TestRegionRuntimeRegistryRemoveBySubscription(t *testing.T) { + registry := newRegionRuntimeRegistry() + key1 := registry.allocKey(1, 101) + key2 := registry.allocKey(1, 102) + key3 := registry.allocKey(2, 201) + + registry.transition(key1, regionPhaseDiscovered, time.Unix(1, 0)) + registry.transition(key2, regionPhaseRemoved, time.Unix(2, 0)) + registry.transition(key3, regionPhaseReplicating, time.Unix(3, 0)) + + require.Len(t, registry.snapshot(), 3) + require.Equal(t, 2, registry.removeBySubscription(1)) + + _, ok := registry.get(key1) + require.False(t, ok) + _, ok = registry.get(key2) + require.False(t, ok) + + state, ok := registry.get(key3) + require.True(t, ok) + require.Equal(t, regionPhaseReplicating, state.phase) + require.Len(t, registry.snapshot(), 1) +} + +func TestRegionRuntimeRegistryPhaseCounts(t *testing.T) { + registry := newRegionRuntimeRegistry() + now := time.Unix(1700000000, 0) + + key1 := registry.allocKey(1, 101) + key2 := registry.allocKey(1, 102) + key3 := registry.allocKey(2, 201) + + registry.transition(key1, regionPhaseQueued, now) + registry.transition(key2, regionPhaseQueued, now.Add(time.Second)) + registry.transition(key3, regionPhaseWaitInitialized, now.Add(2*time.Second)) + + counts := registry.phaseCounts() + require.Equal(t, 2, counts[regionPhaseQueued]) + require.Equal(t, 1, counts[regionPhaseWaitInitialized]) + require.Equal(t, 0, counts[regionPhaseReplicating]) +} diff --git a/logservice/logpuller/region_state.go b/logservice/logpuller/region_state.go index e9c21a7aad..496e001fb5 100644 --- a/logservice/logpuller/region_state.go +++ b/logservice/logpuller/region_state.go @@ -15,6 +15,7 @@ package logpuller import ( "sync" + "time" "github.com/pingcap/ticdc/heartbeatpb" "github.com/pingcap/ticdc/logservice/logpuller/regionlock" @@ -28,6 +29,10 @@ const ( ) type regionInfo struct { + // runtimeKey links this region info to a regionRuntimeRegistry entry. + // It is assigned by subscriptionClient when scheduling the region. + runtimeKey regionRuntimeKey + verID tikv.RegionVerID // The span of the region. // Note(dongmen): The span doesn't always represent the whole span of a region. @@ -194,3 +199,28 @@ func (s *regionFeedState) getRegionInfo() regionInfo { func (s *regionFeedState) getRegionMeta() (uint64, heartbeatpb.TableSpan, string) { return s.region.verID.GetID(), s.region.span, s.region.rpcCtx.Addr } + +func (s *regionFeedState) runtimeRegistry() *regionRuntimeRegistry { + if !s.region.runtimeKey.isValid() || s.worker == nil || s.worker.client == nil { + return nil + } + return s.worker.client.regionRuntimeRegistry +} + +func (s *regionFeedState) updateRuntimeLastEvent(now time.Time) { + if registry := s.runtimeRegistry(); registry != nil { + registry.updateLastEvent(s.region.runtimeKey, now) + } +} + +func (s *regionFeedState) markRuntimeReplicating(now time.Time) { + if registry := s.runtimeRegistry(); registry != nil { + registry.markReplicating(s.region.runtimeKey, now) + } +} + +func (s *regionFeedState) updateRuntimeResolvedTs(resolvedTs uint64, now time.Time) { + if registry := s.runtimeRegistry(); registry != nil { + registry.updateResolvedTs(s.region.runtimeKey, resolvedTs, now) + } +} diff --git a/logservice/logpuller/subscription_client.go b/logservice/logpuller/subscription_client.go index 52552fb72d..b91944ab85 100644 --- a/logservice/logpuller/subscription_client.go +++ b/logservice/logpuller/subscription_client.go @@ -189,6 +189,8 @@ type subscriptionClient struct { metrics sharedClientMetrics clusterID uint64 + regionRuntimeRegistry *regionRuntimeRegistry + pd pd.Client regionCache *tikv.RegionCache pdClock pdutil.Clock @@ -224,6 +226,92 @@ type subscriptionClient struct { errCache *errCache } +func (s *subscriptionClient) ensureRegionRuntime(region *regionInfo, now time.Time) { + if s.regionRuntimeRegistry == nil { + return + } + if region == nil || region.subscribedSpan == nil { + return + } + if region.verID.GetID() == 0 { + return + } + if !region.runtimeKey.isValid() { + region.runtimeKey = s.regionRuntimeRegistry.allocKey(region.subscribedSpan.subID, region.verID.GetID()) + s.regionRuntimeRegistry.registerRegion(region.runtimeKey, *region, now) + } +} + +func (s *subscriptionClient) updateRegionRuntimeInfo(region regionInfo) { + if s.regionRuntimeRegistry == nil { + return + } + if !region.runtimeKey.isValid() { + return + } + s.regionRuntimeRegistry.updateRegionInfo(region.runtimeKey, region) +} + +func (s *subscriptionClient) transitionRegionRuntime(region regionInfo, phase regionPhase, now time.Time) { + if s.regionRuntimeRegistry == nil { + return + } + if !region.runtimeKey.isValid() { + return + } + s.regionRuntimeRegistry.transition(region.runtimeKey, phase, now) +} + +func (s *subscriptionClient) markRegionRetryPending(region regionInfo, err error, now time.Time) { + if s.regionRuntimeRegistry == nil || !region.runtimeKey.isValid() { + return + } + s.regionRuntimeRegistry.markRetryPending(region.runtimeKey, err, now) +} + +func (s *subscriptionClient) markRegionRuntimeRPCReady(region regionInfo, now time.Time) { + if s.regionRuntimeRegistry == nil || !region.runtimeKey.isValid() { + return + } + s.regionRuntimeRegistry.markRPCReady(region.runtimeKey, now) +} + +func (s *subscriptionClient) markRegionRuntimeQueued(region regionInfo, acquiredTime, queuedTime time.Time) { + if s.regionRuntimeRegistry == nil || !region.runtimeKey.isValid() { + return + } + s.regionRuntimeRegistry.markQueued(region.runtimeKey, acquiredTime, queuedTime) +} + +func (s *subscriptionClient) recordRegionRuntimeError(region regionInfo, err error, now time.Time) { + if s.regionRuntimeRegistry == nil || !region.runtimeKey.isValid() { + return + } + s.regionRuntimeRegistry.recordError(region.runtimeKey, err, now) +} + +func (s *subscriptionClient) regionRuntimePhaseCounts() map[regionPhase]int { + if s.regionRuntimeRegistry == nil { + return nil + } + return s.regionRuntimeRegistry.phaseCounts() +} + +func (s *subscriptionClient) removeSubscriptionRuntime(subID SubscriptionID) { + if s.regionRuntimeRegistry == nil { + return + } + s.regionRuntimeRegistry.removeBySubscription(subID) +} + +func (s *subscriptionClient) removeRegionRuntime(region regionInfo, now time.Time) { + if s.regionRuntimeRegistry == nil || !region.runtimeKey.isValid() { + return + } + s.regionRuntimeRegistry.transition(region.runtimeKey, regionPhaseRemoved, now) + s.regionRuntimeRegistry.remove(region.runtimeKey) +} + // NewSubscriptionClient creates a client. func NewSubscriptionClient( config *SubscriptionClientConfig, @@ -240,6 +328,8 @@ func NewSubscriptionClient( pdClock: appcontext.GetService[pdutil.Clock](appcontext.DefaultPDClock), lockResolver: lockResolver, + regionRuntimeRegistry: newRegionRuntimeRegistry(), + credential: credential, rangeTaskCh: make(chan rangeTask, 1024), @@ -332,6 +422,12 @@ func (s *subscriptionClient) updateMetrics(ctx context.Context) error { metrics.SubscriptionClientRequestedRegionCount.WithLabelValues("pending").Set(float64(pendingRegionReqCount)) + if counts := s.regionRuntimePhaseCounts(); counts != nil { + for _, phase := range regionRuntimePhases { + metrics.SubscriptionClientRegionRuntimePhaseCount.WithLabelValues(string(phase)).Set(float64(counts[phase])) + } + } + count := 0 s.totalSpans.RLock() for _, rt := range s.totalSpans.spanMap { @@ -504,6 +600,8 @@ func (s *subscriptionClient) onTableDrained(rt *subscribedSpan) { log.Info("subscription client stop span is finished", zap.Uint64("subscriptionID", uint64(rt.subID))) + s.removeSubscriptionRuntime(rt.subID) + err := s.ds.RemovePath(rt.subID) if err != nil { log.Warn("subscription client remove path failed", @@ -517,6 +615,7 @@ func (s *subscriptionClient) onTableDrained(rt *subscribedSpan) { // Note: don't block the caller, otherwise there may be deadlock func (s *subscriptionClient) onRegionFail(errInfo regionErrorInfo) { + s.recordRegionRuntimeError(errInfo.regionInfo, errInfo.err, time.Now()) // unlock the range early to prevent blocking the range. if errInfo.subscribedSpan.rangeLock.UnlockRange( errInfo.span.StartKey, errInfo.span.EndKey, @@ -621,6 +720,8 @@ func (s *subscriptionClient) handleRegions(ctx context.Context, eg *errgroup.Gro if !ok { continue } + s.updateRegionRuntimeInfo(region) + s.markRegionRuntimeRPCReady(region, time.Now()) store := getStore(region.rpcCtx.Addr) worker := store.getRequestWorker() @@ -796,21 +897,27 @@ func (s *subscriptionClient) divideSpanAndScheduleRegionRequests( // scheduleRegionRequest locks the region's range and send the region to regionTaskQueue, // which will be handled by handleRegions. func (s *subscriptionClient) scheduleRegionRequest(ctx context.Context, region regionInfo, priority TaskType) { + s.ensureRegionRuntime(®ion, time.Now()) lockRangeResult := region.subscribedSpan.rangeLock.LockRange( ctx, region.span.StartKey, region.span.EndKey, region.verID.GetID(), region.verID.GetVer()) if lockRangeResult.Status == regionlock.LockRangeStatusWait { + s.transitionRegionRuntime(region, regionPhaseRangeLockWait, time.Now()) lockRangeResult = lockRangeResult.WaitFn() } switch lockRangeResult.Status { case regionlock.LockRangeStatusSuccess: region.lockedRangeState = lockRangeResult.LockedRangeState + s.markRegionRuntimeQueued(region, lockRangeResult.LockedRangeState.Created, time.Now()) s.regionTaskQueue.Push(NewRegionPriorityTask(priority, region, s.pdClock.CurrentTS())) case regionlock.LockRangeStatusStale: + s.removeRegionRuntime(region, time.Now()) for _, r := range lockRangeResult.RetryRanges { s.scheduleRangeRequest(ctx, r, region.subscribedSpan, region.filterLoop, priority) } + case regionlock.LockRangeStatusCancel: + s.removeRegionRuntime(region, time.Now()) default: return } @@ -855,26 +962,31 @@ func (s *subscriptionClient) doHandleError(ctx context.Context, errInfo regionEr if notLeader := innerErr.GetNotLeader(); notLeader != nil { metricFeedNotLeaderCounter.Inc() s.regionCache.UpdateLeader(errInfo.verID, notLeader.GetLeader(), errInfo.rpcCtx.AccessIdx) + s.markRegionRetryPending(errInfo.regionInfo, errInfo.err, time.Now()) s.scheduleRegionRequest(ctx, errInfo.regionInfo, TaskHighPrior) return nil } if innerErr.GetEpochNotMatch() != nil { metricFeedEpochNotMatchCounter.Inc() + s.removeRegionRuntime(errInfo.regionInfo, time.Now()) s.scheduleRangeRequest(ctx, errInfo.span, errInfo.subscribedSpan, errInfo.filterLoop, TaskHighPrior) return nil } if innerErr.GetRegionNotFound() != nil { metricFeedRegionNotFoundCounter.Inc() + s.removeRegionRuntime(errInfo.regionInfo, time.Now()) s.scheduleRangeRequest(ctx, errInfo.span, errInfo.subscribedSpan, errInfo.filterLoop, TaskHighPrior) return nil } if innerErr.GetCongested() != nil { metricKvCongestedCounter.Inc() + s.markRegionRetryPending(errInfo.regionInfo, errInfo.err, time.Now()) s.scheduleRegionRequest(ctx, errInfo.regionInfo, TaskLowPrior) return nil } if innerErr.GetServerIsBusy() != nil { metricKvIsBusyCounter.Inc() + s.markRegionRetryPending(errInfo.regionInfo, errInfo.err, time.Now()) s.scheduleRegionRequest(ctx, errInfo.regionInfo, TaskLowPrior) return nil } @@ -894,10 +1006,12 @@ func (s *subscriptionClient) doHandleError(ctx context.Context, errInfo regionEr zap.Uint64("subscriptionID", uint64(errInfo.subscribedSpan.subID)), zap.Stringer("error", innerErr)) metricFeedUnknownErrorCounter.Inc() + s.markRegionRetryPending(errInfo.regionInfo, errInfo.err, time.Now()) s.scheduleRegionRequest(ctx, errInfo.regionInfo, TaskHighPrior) return nil case *rpcCtxUnavailableErr: metricFeedRPCCtxUnavailable.Inc() + s.removeRegionRuntime(errInfo.regionInfo, time.Now()) s.scheduleRangeRequest(ctx, errInfo.span, errInfo.subscribedSpan, errInfo.filterLoop, TaskHighPrior) return nil case *getStoreErr: @@ -905,15 +1019,18 @@ func (s *subscriptionClient) doHandleError(ctx context.Context, errInfo regionEr bo := tikv.NewBackoffer(ctx, tikvRequestMaxBackoff) // cannot get the store the region belongs to, so we need to reload the region. s.regionCache.OnSendFail(bo, errInfo.rpcCtx, true, err) + s.removeRegionRuntime(errInfo.regionInfo, time.Now()) s.scheduleRangeRequest(ctx, errInfo.span, errInfo.subscribedSpan, errInfo.filterLoop, TaskHighPrior) return nil case *sendRequestToStoreErr: metricStoreSendRequestErr.Inc() bo := tikv.NewBackoffer(ctx, tikvRequestMaxBackoff) s.regionCache.OnSendFail(bo, errInfo.rpcCtx, regionScheduleReload, err) + s.markRegionRetryPending(errInfo.regionInfo, errInfo.err, time.Now()) s.scheduleRegionRequest(ctx, errInfo.regionInfo, TaskHighPrior) return nil case *requestCancelledErr: + s.removeRegionRuntime(errInfo.regionInfo, time.Now()) // the corresponding subscription has been unsubscribed, just ignore. return nil default: diff --git a/pkg/metrics/log_puller.go b/pkg/metrics/log_puller.go index c9c25c2dda..b5bdd9390e 100644 --- a/pkg/metrics/log_puller.go +++ b/pkg/metrics/log_puller.go @@ -85,6 +85,13 @@ var ( Name: "resolve_lock_task_drop_count", Help: "The number of resolve lock tasks dropped before being processed", }) + SubscriptionClientRegionRuntimePhaseCount = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "ticdc", + Subsystem: "subscription_client", + Name: "region_runtime_phase_count", + Help: "The number of regions in each runtime phase", + }, []string{"phase"}) SubscriptionClientRegionEventHandleDuration = prometheus.NewHistogramVec( prometheus.HistogramOpts{ @@ -114,6 +121,7 @@ func initLogPullerMetrics(registry *prometheus.Registry) { registry.MustRegister(RegionRequestFinishScanDuration) registry.MustRegister(SubscriptionClientSubscribedRegionCount) registry.MustRegister(SubscriptionClientResolveLockTaskDropCounter) + registry.MustRegister(SubscriptionClientRegionRuntimePhaseCount) registry.MustRegister(SubscriptionClientRegionEventHandleDuration) registry.MustRegister(SubscriptionClientConsumeKVEventsCallbackDuration) }