From 7241f6a0465eb9d4f85381ec9e71c9816f21ebb6 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Sun, 5 Apr 2026 17:37:05 +0800 Subject: [PATCH 1/6] logpuller: add region runtime registry scaffold --- logservice/logpuller/region_runtime.go | 334 ++++++++++++++++++++ logservice/logpuller/region_runtime_test.go | 119 +++++++ logservice/logpuller/subscription_client.go | 4 + 3 files changed, 457 insertions(+) create mode 100644 logservice/logpuller/region_runtime.go create mode 100644 logservice/logpuller/region_runtime_test.go diff --git a/logservice/logpuller/region_runtime.go b/logservice/logpuller/region_runtime.go new file mode 100644 index 0000000000..a40beb2146 --- /dev/null +++ b/logservice/logpuller/region_runtime.go @@ -0,0 +1,334 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package logpuller + +import ( + "sort" + "sync" + "time" + + "github.com/pingcap/ticdc/heartbeatpb" + "github.com/tikv/client-go/v2/tikv" +) + +type regionPhase string + +const ( + regionPhaseUnknown regionPhase = "unknown" + regionPhaseDiscovered regionPhase = "discovered" + regionPhaseRangeLockWait regionPhase = "range_lock_wait" + regionPhaseRangeLocked regionPhase = "range_locked" + regionPhaseQueued regionPhase = "queued" + regionPhaseRPCReady regionPhase = "rpc_ready" + regionPhaseSent regionPhase = "sent" + regionPhaseWaitInitialized regionPhase = "wait_initialized" + regionPhaseReplicating regionPhase = "replicating" + regionPhaseStuck regionPhase = "stuck" + regionPhaseRetryPending regionPhase = "retry_pending" + regionPhaseRemoved regionPhase = "removed" + regionPhaseDeregistering regionPhase = "deregistering" +) + +type regionRuntimeIdentity struct { + subID SubscriptionID + regionID uint64 +} + +type regionRuntimeKey struct { + subID SubscriptionID + regionID uint64 + generation uint64 +} + +type regionRuntimeState struct { + key regionRuntimeKey + + tableID int64 + span heartbeatpb.TableSpan + verID tikv.RegionVerID + + leaderStoreID uint64 + leaderPeerID uint64 + storeAddr string + workerID uint64 + + phase regionPhase + phaseEnterTime time.Time + + lastEventTime time.Time + lastResolvedTs uint64 + + lastError string + lastErrorTime time.Time + retryCount int + + requestEnqueueTime time.Time + requestRPCReadyTime time.Time + requestSendTime time.Time + initializedTime time.Time + replicatingTime time.Time +} + +func (s regionRuntimeState) clone() regionRuntimeState { + s.span = cloneTableSpan(s.span) + return s +} + +func (s *regionRuntimeState) applyRegionInfo(region regionInfo) { + if region.subscribedSpan != nil { + s.tableID = region.subscribedSpan.span.TableID + } + s.span = cloneTableSpan(region.span) + s.verID = region.verID + if region.rpcCtx == nil { + return + } + + s.storeAddr = region.rpcCtx.Addr + if region.rpcCtx.Peer != nil { + s.leaderPeerID = region.rpcCtx.Peer.Id + s.leaderStoreID = region.rpcCtx.Peer.StoreId + } +} + +func cloneTableSpan(span heartbeatpb.TableSpan) heartbeatpb.TableSpan { + cloned := span + if len(span.StartKey) > 0 { + cloned.StartKey = append([]byte(nil), span.StartKey...) + } + if len(span.EndKey) > 0 { + cloned.EndKey = append([]byte(nil), span.EndKey...) + } + return cloned +} + +type regionRuntimeRegistry struct { + mu sync.RWMutex + + states map[regionRuntimeKey]*regionRuntimeState + generations map[regionRuntimeIdentity]uint64 +} + +func newRegionRuntimeRegistry() *regionRuntimeRegistry { + return ®ionRuntimeRegistry{ + states: make(map[regionRuntimeKey]*regionRuntimeState), + generations: make(map[regionRuntimeIdentity]uint64), + } +} + +func (r *regionRuntimeRegistry) allocKey(subID SubscriptionID, regionID uint64) regionRuntimeKey { + r.mu.Lock() + defer r.mu.Unlock() + + identity := regionRuntimeIdentity{subID: subID, regionID: regionID} + r.generations[identity]++ + return regionRuntimeKey{ + subID: subID, + regionID: regionID, + generation: r.generations[identity], + } +} + +func (r *regionRuntimeRegistry) upsert( + key regionRuntimeKey, + update func(*regionRuntimeState), +) regionRuntimeState { + r.mu.Lock() + defer r.mu.Unlock() + + state, ok := r.states[key] + if !ok { + state = ®ionRuntimeState{ + key: key, + phase: regionPhaseUnknown, + } + r.states[key] = state + } + if update != nil { + update(state) + } + return state.clone() +} + +func (r *regionRuntimeRegistry) transition( + key regionRuntimeKey, + phase regionPhase, + phaseEnterTime time.Time, +) regionRuntimeState { + return r.upsert(key, func(state *regionRuntimeState) { + state.phase = phase + state.phaseEnterTime = phaseEnterTime + }) +} + +func (r *regionRuntimeRegistry) updateRegionInfo( + key regionRuntimeKey, + region regionInfo, +) regionRuntimeState { + return r.upsert(key, func(state *regionRuntimeState) { + state.applyRegionInfo(region) + }) +} + +func (r *regionRuntimeRegistry) updateWorker(key regionRuntimeKey, workerID uint64) regionRuntimeState { + return r.upsert(key, func(state *regionRuntimeState) { + state.workerID = workerID + }) +} + +func (r *regionRuntimeRegistry) updateLastEvent( + key regionRuntimeKey, + lastEventTime time.Time, +) regionRuntimeState { + return r.upsert(key, func(state *regionRuntimeState) { + state.lastEventTime = lastEventTime + }) +} + +func (r *regionRuntimeRegistry) updateResolvedTs( + key regionRuntimeKey, + resolvedTs uint64, + lastEventTime time.Time, +) regionRuntimeState { + return r.upsert(key, func(state *regionRuntimeState) { + state.lastResolvedTs = resolvedTs + if !lastEventTime.IsZero() { + state.lastEventTime = lastEventTime + } + }) +} + +func (r *regionRuntimeRegistry) recordError( + key regionRuntimeKey, + err error, + errTime time.Time, +) regionRuntimeState { + return r.upsert(key, func(state *regionRuntimeState) { + if err == nil { + state.lastError = "" + } else { + state.lastError = err.Error() + } + state.lastErrorTime = errTime + }) +} + +func (r *regionRuntimeRegistry) incRetry(key regionRuntimeKey) regionRuntimeState { + return r.upsert(key, func(state *regionRuntimeState) { + state.retryCount++ + }) +} + +func (r *regionRuntimeRegistry) setRequestEnqueueTime( + key regionRuntimeKey, + enqueueTime time.Time, +) regionRuntimeState { + return r.upsert(key, func(state *regionRuntimeState) { + state.requestEnqueueTime = enqueueTime + }) +} + +func (r *regionRuntimeRegistry) setRPCReadyTime( + key regionRuntimeKey, + rpcReadyTime time.Time, +) regionRuntimeState { + return r.upsert(key, func(state *regionRuntimeState) { + state.requestRPCReadyTime = rpcReadyTime + }) +} + +func (r *regionRuntimeRegistry) setRequestSendTime( + key regionRuntimeKey, + sendTime time.Time, +) regionRuntimeState { + return r.upsert(key, func(state *regionRuntimeState) { + state.requestSendTime = sendTime + }) +} + +func (r *regionRuntimeRegistry) setInitializedTime( + key regionRuntimeKey, + initializedTime time.Time, +) regionRuntimeState { + return r.upsert(key, func(state *regionRuntimeState) { + state.initializedTime = initializedTime + }) +} + +func (r *regionRuntimeRegistry) setReplicatingTime( + key regionRuntimeKey, + replicatingTime time.Time, +) regionRuntimeState { + return r.upsert(key, func(state *regionRuntimeState) { + state.replicatingTime = replicatingTime + }) +} + +func (r *regionRuntimeRegistry) get(key regionRuntimeKey) (regionRuntimeState, bool) { + r.mu.RLock() + defer r.mu.RUnlock() + + state, ok := r.states[key] + if !ok { + return regionRuntimeState{}, false + } + return state.clone(), true +} + +func (r *regionRuntimeRegistry) snapshot() []regionRuntimeState { + r.mu.RLock() + defer r.mu.RUnlock() + + snapshots := make([]regionRuntimeState, 0, len(r.states)) + for _, state := range r.states { + snapshots = append(snapshots, state.clone()) + } + sort.Slice(snapshots, func(i, j int) bool { + left, right := snapshots[i].key, snapshots[j].key + if left.subID != right.subID { + return left.subID < right.subID + } + if left.regionID != right.regionID { + return left.regionID < right.regionID + } + return left.generation < right.generation + }) + return snapshots +} + +func (r *regionRuntimeRegistry) remove(key regionRuntimeKey) bool { + r.mu.Lock() + defer r.mu.Unlock() + + if _, ok := r.states[key]; !ok { + return false + } + delete(r.states, key) + return true +} + +func (r *regionRuntimeRegistry) removeBySubscription(subID SubscriptionID) int { + r.mu.Lock() + defer r.mu.Unlock() + + removed := 0 + for key := range r.states { + if key.subID != subID { + continue + } + delete(r.states, key) + removed++ + } + return removed +} diff --git a/logservice/logpuller/region_runtime_test.go b/logservice/logpuller/region_runtime_test.go new file mode 100644 index 0000000000..074b71d83b --- /dev/null +++ b/logservice/logpuller/region_runtime_test.go @@ -0,0 +1,119 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package logpuller + +import ( + "errors" + "testing" + "time" + + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/ticdc/heartbeatpb" + "github.com/stretchr/testify/require" + "github.com/tikv/client-go/v2/tikv" +) + +func TestRegionRuntimeRegistryAllocKey(t *testing.T) { + registry := newRegionRuntimeRegistry() + + key1 := registry.allocKey(1, 101) + key2 := registry.allocKey(1, 101) + key3 := registry.allocKey(1, 102) + + require.Equal(t, regionRuntimeKey{subID: 1, regionID: 101, generation: 1}, key1) + require.Equal(t, regionRuntimeKey{subID: 1, regionID: 101, generation: 2}, key2) + require.Equal(t, regionRuntimeKey{subID: 1, regionID: 102, generation: 1}, key3) +} + +func TestRegionRuntimeRegistryUpdateAndSnapshot(t *testing.T) { + registry := newRegionRuntimeRegistry() + key := registry.allocKey(1, 101) + now := time.Unix(1700000000, 0) + + subSpan := &subscribedSpan{ + subID: 1, + span: heartbeatpb.TableSpan{ + TableID: 42, + StartKey: []byte("a"), + EndKey: []byte("z"), + }, + } + region := regionInfo{ + verID: tikv.NewRegionVerID(101, 2, 3), + span: heartbeatpb.TableSpan{ + TableID: 42, + StartKey: []byte("b"), + EndKey: []byte("c"), + }, + rpcCtx: &tikv.RPCContext{ + Addr: "tikv-1:20160", + Peer: &metapb.Peer{Id: 11, StoreId: 22}, + }, + subscribedSpan: subSpan, + } + + registry.updateRegionInfo(key, region) + registry.transition(key, regionPhaseQueued, now) + registry.updateWorker(key, 7) + registry.updateResolvedTs(key, 12345, now.Add(time.Second)) + registry.recordError(key, errors.New("store busy"), now.Add(2*time.Second)) + registry.incRetry(key) + registry.setRequestEnqueueTime(key, now.Add(3*time.Second)) + + state, ok := registry.get(key) + require.True(t, ok) + require.Equal(t, int64(42), state.tableID) + require.Equal(t, regionPhaseQueued, state.phase) + require.Equal(t, uint64(22), state.leaderStoreID) + require.Equal(t, uint64(11), state.leaderPeerID) + require.Equal(t, "tikv-1:20160", state.storeAddr) + require.Equal(t, uint64(7), state.workerID) + require.Equal(t, uint64(12345), state.lastResolvedTs) + require.Equal(t, "store busy", state.lastError) + require.Equal(t, 1, state.retryCount) + require.Equal(t, now.Add(3*time.Second), state.requestEnqueueTime) + + snapshots := registry.snapshot() + require.Len(t, snapshots, 1) + require.Equal(t, state, snapshots[0]) + + snapshots[0].span.StartKey[0] = 'x' + updated, ok := registry.get(key) + require.True(t, ok) + require.Equal(t, []byte("b"), updated.span.StartKey) +} + +func TestRegionRuntimeRegistryRemoveBySubscription(t *testing.T) { + registry := newRegionRuntimeRegistry() + key1 := registry.allocKey(1, 101) + key2 := registry.allocKey(1, 102) + key3 := registry.allocKey(2, 201) + + registry.transition(key1, regionPhaseDiscovered, time.Unix(1, 0)) + registry.transition(key2, regionPhaseRemoved, time.Unix(2, 0)) + registry.transition(key3, regionPhaseReplicating, time.Unix(3, 0)) + + require.Len(t, registry.snapshot(), 3) + require.Equal(t, 2, registry.removeBySubscription(1)) + + _, ok := registry.get(key1) + require.False(t, ok) + _, ok = registry.get(key2) + require.False(t, ok) + + state, ok := registry.get(key3) + require.True(t, ok) + require.Equal(t, regionPhaseReplicating, state.phase) + require.Len(t, registry.snapshot(), 1) +} diff --git a/logservice/logpuller/subscription_client.go b/logservice/logpuller/subscription_client.go index 52552fb72d..498d93f885 100644 --- a/logservice/logpuller/subscription_client.go +++ b/logservice/logpuller/subscription_client.go @@ -189,6 +189,8 @@ type subscriptionClient struct { metrics sharedClientMetrics clusterID uint64 + regionRuntimeRegistry *regionRuntimeRegistry + pd pd.Client regionCache *tikv.RegionCache pdClock pdutil.Clock @@ -240,6 +242,8 @@ func NewSubscriptionClient( pdClock: appcontext.GetService[pdutil.Clock](appcontext.DefaultPDClock), lockResolver: lockResolver, + regionRuntimeRegistry: newRegionRuntimeRegistry(), + credential: credential, rangeTaskCh: make(chan rangeTask, 1024), From 595bb36051c7dc87b15df4a16478f5eb44f538ee Mon Sep 17 00:00:00 2001 From: lidezhu Date: Sun, 5 Apr 2026 18:20:28 +0800 Subject: [PATCH 2/6] f --- logservice/logpuller/region_runtime.go | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/logservice/logpuller/region_runtime.go b/logservice/logpuller/region_runtime.go index a40beb2146..b9977b26f8 100644 --- a/logservice/logpuller/region_runtime.go +++ b/logservice/logpuller/region_runtime.go @@ -28,16 +28,12 @@ const ( regionPhaseUnknown regionPhase = "unknown" regionPhaseDiscovered regionPhase = "discovered" regionPhaseRangeLockWait regionPhase = "range_lock_wait" - regionPhaseRangeLocked regionPhase = "range_locked" regionPhaseQueued regionPhase = "queued" regionPhaseRPCReady regionPhase = "rpc_ready" - regionPhaseSent regionPhase = "sent" regionPhaseWaitInitialized regionPhase = "wait_initialized" regionPhaseReplicating regionPhase = "replicating" - regionPhaseStuck regionPhase = "stuck" regionPhaseRetryPending regionPhase = "retry_pending" regionPhaseRemoved regionPhase = "removed" - regionPhaseDeregistering regionPhase = "deregistering" ) type regionRuntimeIdentity struct { @@ -73,11 +69,12 @@ type regionRuntimeState struct { lastErrorTime time.Time retryCount int - requestEnqueueTime time.Time - requestRPCReadyTime time.Time - requestSendTime time.Time - initializedTime time.Time - replicatingTime time.Time + rangeLockAcquiredTime time.Time + requestEnqueueTime time.Time + requestRPCReadyTime time.Time + requestSendTime time.Time + initializedTime time.Time + replicatingTime time.Time } func (s regionRuntimeState) clone() regionRuntimeState { @@ -239,6 +236,15 @@ func (r *regionRuntimeRegistry) setRequestEnqueueTime( }) } +func (r *regionRuntimeRegistry) setRangeLockAcquiredTime( + key regionRuntimeKey, + acquiredTime time.Time, +) regionRuntimeState { + return r.upsert(key, func(state *regionRuntimeState) { + state.rangeLockAcquiredTime = acquiredTime + }) +} + func (r *regionRuntimeRegistry) setRPCReadyTime( key regionRuntimeKey, rpcReadyTime time.Time, From 30f6c75ff1567a9fc814b0b4a428064a3728c746 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Tue, 7 Apr 2026 00:49:11 +0800 Subject: [PATCH 3/6] r --- logservice/logpuller/region_event_handler.go | 13 ++ logservice/logpuller/region_request_worker.go | 11 +- logservice/logpuller/region_runtime.go | 4 + .../region_runtime_integration_test.go | 148 ++++++++++++++++++ logservice/logpuller/region_state.go | 4 + logservice/logpuller/subscription_client.go | 66 ++++++++ 6 files changed, 245 insertions(+), 1 deletion(-) create mode 100644 logservice/logpuller/region_runtime_integration_test.go diff --git a/logservice/logpuller/region_event_handler.go b/logservice/logpuller/region_event_handler.go index 0de2306c47..c00be189e0 100644 --- a/logservice/logpuller/region_event_handler.go +++ b/logservice/logpuller/region_event_handler.go @@ -252,6 +252,9 @@ func (h *regionEventHandler) handleRegionError(state *regionFeedState) { func handleEventEntries(span *subscribedSpan, state *regionFeedState, entries *cdcpb.Event_Entries_) { regionID, _, _ := state.getRegionMeta() + if state.region.runtimeKey.isValid() && state.worker != nil && state.worker.client != nil && state.worker.client.regionRuntimeRegistry != nil { + state.worker.client.regionRuntimeRegistry.updateLastEvent(state.region.runtimeKey, time.Now()) + } assembleRowEvent := func(regionID uint64, entry *cdcpb.Event_Row) common.RawKVEntry { var opType common.OpType switch entry.GetOpType() { @@ -277,6 +280,13 @@ func handleEventEntries(span *subscribedSpan, state *regionFeedState, entries *c switch entry.Type { case cdcpb.Event_INITIALIZED: state.setInitialized() + if state.region.runtimeKey.isValid() && state.worker != nil && state.worker.client != nil && state.worker.client.regionRuntimeRegistry != nil { + now := time.Now() + registry := state.worker.client.regionRuntimeRegistry + registry.setInitializedTime(state.region.runtimeKey, now) + registry.setReplicatingTime(state.region.runtimeKey, now) + registry.transition(state.region.runtimeKey, regionPhaseReplicating, now) + } log.Debug("region is initialized", zap.Int64("tableID", span.span.TableID), zap.Uint64("regionID", regionID), @@ -364,6 +374,9 @@ func handleResolvedTs(span *subscribedSpan, state *regionFeedState, resolvedTs u } state.updateResolvedTs(resolvedTs) + if state.region.runtimeKey.isValid() && state.worker != nil && state.worker.client != nil && state.worker.client.regionRuntimeRegistry != nil { + state.worker.client.regionRuntimeRegistry.updateResolvedTs(state.region.runtimeKey, resolvedTs, time.Now()) + } ts := uint64(0) shouldAdvance := false diff --git a/logservice/logpuller/region_request_worker.go b/logservice/logpuller/region_request_worker.go index d5fa7ed197..daa2aceaa5 100644 --- a/logservice/logpuller/region_request_worker.go +++ b/logservice/logpuller/region_request_worker.go @@ -431,6 +431,11 @@ func (s *regionRequestWorker) processRegionSendTask( // sentRequests visible in the same order and avoids leaving stale // requests in cleanup. s.requestCache.markSent(regionReq) + if s.client != nil && s.client.regionRuntimeRegistry != nil && region.runtimeKey.isValid() { + s.client.regionRuntimeRegistry.updateWorker(region.runtimeKey, s.workerID) + s.client.regionRuntimeRegistry.setRequestSendTime(region.runtimeKey, time.Now()) + s.client.regionRuntimeRegistry.transition(region.runtimeKey, regionPhaseWaitInitialized, time.Now()) + } if err := doSend(s.createRegionRequest(region)); err != nil { state.markStopped(err) return err @@ -511,7 +516,11 @@ func (s *regionRequestWorker) clearRegionStates() map[SubscriptionID]regionFeedS // add adds a region request to the worker's cache // It blocks if the cache is full until there's space or ctx is cancelled func (s *regionRequestWorker) add(ctx context.Context, region regionInfo, force bool) (bool, error) { - return s.requestCache.add(ctx, region, force) + ok, err := s.requestCache.add(ctx, region, force) + if ok && err == nil && s.client != nil && s.client.regionRuntimeRegistry != nil && region.runtimeKey.isValid() { + s.client.regionRuntimeRegistry.setRequestEnqueueTime(region.runtimeKey, time.Now()) + } + return ok, err } func (s *regionRequestWorker) clearPendingRegions() []regionInfo { diff --git a/logservice/logpuller/region_runtime.go b/logservice/logpuller/region_runtime.go index b9977b26f8..25991f2d50 100644 --- a/logservice/logpuller/region_runtime.go +++ b/logservice/logpuller/region_runtime.go @@ -47,6 +47,10 @@ type regionRuntimeKey struct { generation uint64 } +func (k regionRuntimeKey) isValid() bool { + return k.subID != InvalidSubscriptionID && k.regionID != 0 && k.generation != 0 +} + type regionRuntimeState struct { key regionRuntimeKey diff --git a/logservice/logpuller/region_runtime_integration_test.go b/logservice/logpuller/region_runtime_integration_test.go new file mode 100644 index 0000000000..7a9ce64cf0 --- /dev/null +++ b/logservice/logpuller/region_runtime_integration_test.go @@ -0,0 +1,148 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package logpuller + +import ( + "context" + "testing" + "time" + + "github.com/pingcap/ticdc/heartbeatpb" + "github.com/pingcap/ticdc/logservice/logpuller/regionlock" + "github.com/pingcap/ticdc/pkg/common" + "github.com/pingcap/ticdc/pkg/pdutil" + "github.com/stretchr/testify/require" + "github.com/tikv/client-go/v2/tikv" +) + +func TestScheduleRegionRequestUpdatesRuntimeRegistry(t *testing.T) { + client := &subscriptionClient{ + regionTaskQueue: NewPriorityQueue(), + regionRuntimeRegistry: newRegionRuntimeRegistry(), + pdClock: pdutil.NewClock4Test(), + } + + rawSpan := heartbeatpb.TableSpan{ + TableID: 1, + StartKey: []byte{'a'}, + EndKey: []byte{'z'}, + } + consumeKVEvents := func(_ []common.RawKVEntry, _ func()) bool { return false } + advanceResolvedTs := func(uint64) {} + subSpan := client.newSubscribedSpan(SubscriptionID(1), rawSpan, 100, consumeKVEvents, advanceResolvedTs, 0, false) + + regionSpan := heartbeatpb.TableSpan{ + TableID: 1, + StartKey: []byte{'b'}, + EndKey: []byte{'c'}, + } + region := newRegionInfo(tikv.NewRegionVerID(10, 1, 1), regionSpan, nil, subSpan, false) + + client.scheduleRegionRequest(context.Background(), region, TaskLowPrior) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + task, err := client.regionTaskQueue.Pop(ctx) + require.NoError(t, err) + queued := task.GetRegionInfo() + require.True(t, queued.runtimeKey.isValid()) + + state, ok := client.regionRuntimeRegistry.get(queued.runtimeKey) + require.True(t, ok) + require.Equal(t, regionPhaseQueued, state.phase) + require.Equal(t, uint64(10), state.verID.GetID()) + require.False(t, state.rangeLockAcquiredTime.IsZero()) +} + +func TestOnRegionFailUpdatesRuntimeRegistry(t *testing.T) { + client := &subscriptionClient{ + regionRuntimeRegistry: newRegionRuntimeRegistry(), + errCache: newErrCache(), + } + client.ctx, client.cancel = context.WithCancel(context.Background()) + defer client.cancel() + + rawSpan := heartbeatpb.TableSpan{ + TableID: 1, + StartKey: []byte{'a'}, + EndKey: []byte{'z'}, + } + consumeKVEvents := func(_ []common.RawKVEntry, _ func()) bool { return false } + advanceResolvedTs := func(uint64) {} + subSpan := client.newSubscribedSpan(SubscriptionID(1), rawSpan, 100, consumeKVEvents, advanceResolvedTs, 0, false) + + lockRes := subSpan.rangeLock.LockRange(context.Background(), []byte{'b'}, []byte{'c'}, 10, 1) + require.Equal(t, regionlock.LockRangeStatusSuccess, lockRes.Status) + + region := newRegionInfo(tikv.NewRegionVerID(10, 1, 1), heartbeatpb.TableSpan{ + TableID: 1, + StartKey: []byte{'b'}, + EndKey: []byte{'c'}, + }, nil, subSpan, false) + region.lockedRangeState = lockRes.LockedRangeState + + client.ensureRegionRuntime(®ion, time.Now()) + require.True(t, region.runtimeKey.isValid()) + + client.onRegionFail(newRegionErrorInfo(region, &sendRequestToStoreErr{})) + + state, ok := client.regionRuntimeRegistry.get(region.runtimeKey) + require.True(t, ok) + require.Equal(t, regionPhaseRetryPending, state.phase) + require.Equal(t, "send request to store error", state.lastError) + require.Equal(t, 1, state.retryCount) +} + +func TestHandleResolvedTsUpdatesRuntimeRegistry(t *testing.T) { + client := &subscriptionClient{ + regionRuntimeRegistry: newRegionRuntimeRegistry(), + } + worker := ®ionRequestWorker{client: client} + + rawSpan := heartbeatpb.TableSpan{ + TableID: 1, + StartKey: []byte{'a'}, + EndKey: []byte{'z'}, + } + subSpan := &subscribedSpan{ + subID: SubscriptionID(1), + startTs: 100, + span: rawSpan, + rangeLock: regionlock.NewRangeLock(1, rawSpan.StartKey, rawSpan.EndKey, 100), + filterLoop: false, + } + subSpan.resolvedTs.Store(100) + subSpan.resolvedTsUpdated.Store(time.Now().Unix()) + subSpan.advanceInterval = 0 + + lockRes := subSpan.rangeLock.LockRange(context.Background(), rawSpan.StartKey, rawSpan.EndKey, 10, 1) + require.Equal(t, regionlock.LockRangeStatusSuccess, lockRes.Status) + lockRes.LockedRangeState.Initialized.Store(true) + + region := newRegionInfo(tikv.NewRegionVerID(10, 1, 1), rawSpan, nil, subSpan, false) + region.lockedRangeState = lockRes.LockedRangeState + region.runtimeKey = client.regionRuntimeRegistry.allocKey(subSpan.subID, region.verID.GetID()) + client.regionRuntimeRegistry.updateRegionInfo(region.runtimeKey, region) + + state := newRegionFeedState(region, uint64(subSpan.subID), worker) + state.start() + + resolvedTs := uint64(200) + handleResolvedTs(subSpan, state, resolvedTs) + + stored, ok := client.regionRuntimeRegistry.get(region.runtimeKey) + require.True(t, ok) + require.Equal(t, resolvedTs, stored.lastResolvedTs) + require.False(t, stored.lastEventTime.IsZero()) +} diff --git a/logservice/logpuller/region_state.go b/logservice/logpuller/region_state.go index e9c21a7aad..f57642d967 100644 --- a/logservice/logpuller/region_state.go +++ b/logservice/logpuller/region_state.go @@ -28,6 +28,10 @@ const ( ) type regionInfo struct { + // runtimeKey links this region info to a regionRuntimeRegistry entry. + // It is assigned by subscriptionClient when scheduling the region. + runtimeKey regionRuntimeKey + verID tikv.RegionVerID // The span of the region. // Note(dongmen): The span doesn't always represent the whole span of a region. diff --git a/logservice/logpuller/subscription_client.go b/logservice/logpuller/subscription_client.go index 498d93f885..ecd3fb7157 100644 --- a/logservice/logpuller/subscription_client.go +++ b/logservice/logpuller/subscription_client.go @@ -226,6 +226,43 @@ type subscriptionClient struct { errCache *errCache } +func (s *subscriptionClient) ensureRegionRuntime(region *regionInfo, now time.Time) { + if s.regionRuntimeRegistry == nil { + return + } + if region == nil || region.subscribedSpan == nil { + return + } + if region.verID.GetID() == 0 { + return + } + if !region.runtimeKey.isValid() { + region.runtimeKey = s.regionRuntimeRegistry.allocKey(region.subscribedSpan.subID, region.verID.GetID()) + s.regionRuntimeRegistry.updateRegionInfo(region.runtimeKey, *region) + s.regionRuntimeRegistry.transition(region.runtimeKey, regionPhaseDiscovered, now) + } +} + +func (s *subscriptionClient) updateRegionRuntimeInfo(region regionInfo) { + if s.regionRuntimeRegistry == nil { + return + } + if !region.runtimeKey.isValid() { + return + } + s.regionRuntimeRegistry.updateRegionInfo(region.runtimeKey, region) +} + +func (s *subscriptionClient) transitionRegionRuntime(region regionInfo, phase regionPhase, now time.Time) { + if s.regionRuntimeRegistry == nil { + return + } + if !region.runtimeKey.isValid() { + return + } + s.regionRuntimeRegistry.transition(region.runtimeKey, phase, now) +} + // NewSubscriptionClient creates a client. func NewSubscriptionClient( config *SubscriptionClientConfig, @@ -508,6 +545,10 @@ func (s *subscriptionClient) onTableDrained(rt *subscribedSpan) { log.Info("subscription client stop span is finished", zap.Uint64("subscriptionID", uint64(rt.subID))) + if s.regionRuntimeRegistry != nil { + s.regionRuntimeRegistry.removeBySubscription(rt.subID) + } + err := s.ds.RemovePath(rt.subID) if err != nil { log.Warn("subscription client remove path failed", @@ -521,6 +562,11 @@ func (s *subscriptionClient) onTableDrained(rt *subscribedSpan) { // Note: don't block the caller, otherwise there may be deadlock func (s *subscriptionClient) onRegionFail(errInfo regionErrorInfo) { + if s.regionRuntimeRegistry != nil && errInfo.runtimeKey.isValid() { + s.regionRuntimeRegistry.recordError(errInfo.runtimeKey, errInfo.err, time.Now()) + s.regionRuntimeRegistry.incRetry(errInfo.runtimeKey) + s.regionRuntimeRegistry.transition(errInfo.runtimeKey, regionPhaseRetryPending, time.Now()) + } // unlock the range early to prevent blocking the range. if errInfo.subscribedSpan.rangeLock.UnlockRange( errInfo.span.StartKey, errInfo.span.EndKey, @@ -625,6 +671,11 @@ func (s *subscriptionClient) handleRegions(ctx context.Context, eg *errgroup.Gro if !ok { continue } + s.updateRegionRuntimeInfo(region) + if s.regionRuntimeRegistry != nil && region.runtimeKey.isValid() { + s.regionRuntimeRegistry.setRPCReadyTime(region.runtimeKey, time.Now()) + } + s.transitionRegionRuntime(region, regionPhaseRPCReady, time.Now()) store := getStore(region.rpcCtx.Addr) worker := store.getRequestWorker() @@ -800,21 +851,36 @@ func (s *subscriptionClient) divideSpanAndScheduleRegionRequests( // scheduleRegionRequest locks the region's range and send the region to regionTaskQueue, // which will be handled by handleRegions. func (s *subscriptionClient) scheduleRegionRequest(ctx context.Context, region regionInfo, priority TaskType) { + s.ensureRegionRuntime(®ion, time.Now()) lockRangeResult := region.subscribedSpan.rangeLock.LockRange( ctx, region.span.StartKey, region.span.EndKey, region.verID.GetID(), region.verID.GetVer()) if lockRangeResult.Status == regionlock.LockRangeStatusWait { + s.transitionRegionRuntime(region, regionPhaseRangeLockWait, time.Now()) lockRangeResult = lockRangeResult.WaitFn() } switch lockRangeResult.Status { case regionlock.LockRangeStatusSuccess: region.lockedRangeState = lockRangeResult.LockedRangeState + if s.regionRuntimeRegistry != nil && region.runtimeKey.isValid() { + s.regionRuntimeRegistry.setRangeLockAcquiredTime(region.runtimeKey, lockRangeResult.LockedRangeState.Created) + } + s.transitionRegionRuntime(region, regionPhaseQueued, time.Now()) s.regionTaskQueue.Push(NewRegionPriorityTask(priority, region, s.pdClock.CurrentTS())) case regionlock.LockRangeStatusStale: + s.transitionRegionRuntime(region, regionPhaseRemoved, time.Now()) + if s.regionRuntimeRegistry != nil && region.runtimeKey.isValid() { + s.regionRuntimeRegistry.remove(region.runtimeKey) + } for _, r := range lockRangeResult.RetryRanges { s.scheduleRangeRequest(ctx, r, region.subscribedSpan, region.filterLoop, priority) } + case regionlock.LockRangeStatusCancel: + s.transitionRegionRuntime(region, regionPhaseRemoved, time.Now()) + if s.regionRuntimeRegistry != nil && region.runtimeKey.isValid() { + s.regionRuntimeRegistry.remove(region.runtimeKey) + } default: return } From 962e988f7a847f238091cfabcfb32e73aa2eadba Mon Sep 17 00:00:00 2001 From: lidezhu Date: Fri, 10 Apr 2026 18:07:45 +0800 Subject: [PATCH 4/6] fix --- logservice/logpuller/region_runtime.go | 23 +++++ .../region_runtime_integration_test.go | 94 ++++++++++++++++++- logservice/logpuller/region_runtime_test.go | 18 ++++ logservice/logpuller/subscription_client.go | 36 ++++++- pkg/metrics/log_puller.go | 8 ++ 5 files changed, 175 insertions(+), 4 deletions(-) diff --git a/logservice/logpuller/region_runtime.go b/logservice/logpuller/region_runtime.go index 25991f2d50..297ba83704 100644 --- a/logservice/logpuller/region_runtime.go +++ b/logservice/logpuller/region_runtime.go @@ -36,6 +36,18 @@ const ( regionPhaseRemoved regionPhase = "removed" ) +var regionRuntimePhases = []regionPhase{ + regionPhaseUnknown, + regionPhaseDiscovered, + regionPhaseRangeLockWait, + regionPhaseQueued, + regionPhaseRPCReady, + regionPhaseWaitInitialized, + regionPhaseReplicating, + regionPhaseRetryPending, + regionPhaseRemoved, +} + type regionRuntimeIdentity struct { subID SubscriptionID regionID uint64 @@ -317,6 +329,17 @@ func (r *regionRuntimeRegistry) snapshot() []regionRuntimeState { return snapshots } +func (r *regionRuntimeRegistry) phaseCounts() map[regionPhase]int { + r.mu.RLock() + defer r.mu.RUnlock() + + counts := make(map[regionPhase]int, len(r.states)) + for _, state := range r.states { + counts[state.phase]++ + } + return counts +} + func (r *regionRuntimeRegistry) remove(key regionRuntimeKey) bool { r.mu.Lock() defer r.mu.Unlock() diff --git a/logservice/logpuller/region_runtime_integration_test.go b/logservice/logpuller/region_runtime_integration_test.go index 7a9ce64cf0..854a1b5733 100644 --- a/logservice/logpuller/region_runtime_integration_test.go +++ b/logservice/logpuller/region_runtime_integration_test.go @@ -18,6 +18,8 @@ import ( "testing" "time" + "github.com/pingcap/kvproto/pkg/cdcpb" + "github.com/pingcap/kvproto/pkg/errorpb" "github.com/pingcap/ticdc/heartbeatpb" "github.com/pingcap/ticdc/logservice/logpuller/regionlock" "github.com/pingcap/ticdc/pkg/common" @@ -99,9 +101,9 @@ func TestOnRegionFailUpdatesRuntimeRegistry(t *testing.T) { state, ok := client.regionRuntimeRegistry.get(region.runtimeKey) require.True(t, ok) - require.Equal(t, regionPhaseRetryPending, state.phase) + require.Equal(t, regionPhaseDiscovered, state.phase) require.Equal(t, "send request to store error", state.lastError) - require.Equal(t, 1, state.retryCount) + require.Equal(t, 0, state.retryCount) } func TestHandleResolvedTsUpdatesRuntimeRegistry(t *testing.T) { @@ -146,3 +148,91 @@ func TestHandleResolvedTsUpdatesRuntimeRegistry(t *testing.T) { require.Equal(t, resolvedTs, stored.lastResolvedTs) require.False(t, stored.lastEventTime.IsZero()) } + +func TestDoHandleErrorMarksRetryPendingForRetryableRegionError(t *testing.T) { + client := &subscriptionClient{ + regionRuntimeRegistry: newRegionRuntimeRegistry(), + regionTaskQueue: NewPriorityQueue(), + pdClock: pdutil.NewClock4Test(), + } + + rawSpan := heartbeatpb.TableSpan{ + TableID: 1, + StartKey: []byte{'a'}, + EndKey: []byte{'z'}, + } + consumeKVEvents := func(_ []common.RawKVEntry, _ func()) bool { return false } + advanceResolvedTs := func(uint64) {} + subSpan := client.newSubscribedSpan(SubscriptionID(1), rawSpan, 100, consumeKVEvents, advanceResolvedTs, 0, false) + + region := newRegionInfo(tikv.NewRegionVerID(10, 1, 1), rawSpan, nil, subSpan, false) + client.ensureRegionRuntime(®ion, time.Now()) + + err := client.doHandleError(context.Background(), newRegionErrorInfo(region, &eventError{ + err: &cdcpb.Error{ServerIsBusy: &errorpb.ServerIsBusy{Reason: "busy"}}, + })) + require.NoError(t, err) + + state, ok := client.regionRuntimeRegistry.get(region.runtimeKey) + require.True(t, ok) + require.Equal(t, regionPhaseQueued, state.phase) + require.Equal(t, 1, state.retryCount) + require.Contains(t, state.lastError, "server_is_busy") +} + +func TestDoHandleErrorRemovesRuntimeForRangeReload(t *testing.T) { + client := &subscriptionClient{ + regionRuntimeRegistry: newRegionRuntimeRegistry(), + rangeTaskCh: make(chan rangeTask, 1), + } + + rawSpan := heartbeatpb.TableSpan{ + TableID: 1, + StartKey: []byte{'a'}, + EndKey: []byte{'z'}, + } + consumeKVEvents := func(_ []common.RawKVEntry, _ func()) bool { return false } + advanceResolvedTs := func(uint64) {} + subSpan := client.newSubscribedSpan(SubscriptionID(1), rawSpan, 100, consumeKVEvents, advanceResolvedTs, 0, false) + + region := newRegionInfo(tikv.NewRegionVerID(10, 1, 1), rawSpan, nil, subSpan, false) + client.ensureRegionRuntime(®ion, time.Now()) + + err := client.doHandleError(context.Background(), newRegionErrorInfo(region, &rpcCtxUnavailableErr{verID: region.verID})) + require.NoError(t, err) + + _, ok := client.regionRuntimeRegistry.get(region.runtimeKey) + require.False(t, ok) + + select { + case task := <-client.rangeTaskCh: + require.Equal(t, rawSpan, task.span) + require.Equal(t, subSpan, task.subscribedSpan) + case <-time.After(time.Second): + t.Fatal("expected range task to be scheduled") + } +} + +func TestDoHandleErrorRemovesRuntimeForCancelledRequest(t *testing.T) { + client := &subscriptionClient{ + regionRuntimeRegistry: newRegionRuntimeRegistry(), + } + + rawSpan := heartbeatpb.TableSpan{ + TableID: 1, + StartKey: []byte{'a'}, + EndKey: []byte{'z'}, + } + consumeKVEvents := func(_ []common.RawKVEntry, _ func()) bool { return false } + advanceResolvedTs := func(uint64) {} + subSpan := client.newSubscribedSpan(SubscriptionID(1), rawSpan, 100, consumeKVEvents, advanceResolvedTs, 0, false) + + region := newRegionInfo(tikv.NewRegionVerID(10, 1, 1), rawSpan, nil, subSpan, false) + client.ensureRegionRuntime(®ion, time.Now()) + + err := client.doHandleError(context.Background(), newRegionErrorInfo(region, &requestCancelledErr{})) + require.NoError(t, err) + + _, ok := client.regionRuntimeRegistry.get(region.runtimeKey) + require.False(t, ok) +} diff --git a/logservice/logpuller/region_runtime_test.go b/logservice/logpuller/region_runtime_test.go index 074b71d83b..d1bf0a900b 100644 --- a/logservice/logpuller/region_runtime_test.go +++ b/logservice/logpuller/region_runtime_test.go @@ -117,3 +117,21 @@ func TestRegionRuntimeRegistryRemoveBySubscription(t *testing.T) { require.Equal(t, regionPhaseReplicating, state.phase) require.Len(t, registry.snapshot(), 1) } + +func TestRegionRuntimeRegistryPhaseCounts(t *testing.T) { + registry := newRegionRuntimeRegistry() + now := time.Unix(1700000000, 0) + + key1 := registry.allocKey(1, 101) + key2 := registry.allocKey(1, 102) + key3 := registry.allocKey(2, 201) + + registry.transition(key1, regionPhaseQueued, now) + registry.transition(key2, regionPhaseQueued, now.Add(time.Second)) + registry.transition(key3, regionPhaseWaitInitialized, now.Add(2*time.Second)) + + counts := registry.phaseCounts() + require.Equal(t, 2, counts[regionPhaseQueued]) + require.Equal(t, 1, counts[regionPhaseWaitInitialized]) + require.Equal(t, 0, counts[regionPhaseReplicating]) +} diff --git a/logservice/logpuller/subscription_client.go b/logservice/logpuller/subscription_client.go index ecd3fb7157..97d17ea361 100644 --- a/logservice/logpuller/subscription_client.go +++ b/logservice/logpuller/subscription_client.go @@ -263,6 +263,23 @@ func (s *subscriptionClient) transitionRegionRuntime(region regionInfo, phase re s.regionRuntimeRegistry.transition(region.runtimeKey, phase, now) } +func (s *subscriptionClient) markRegionRetryPending(region regionInfo, err error, now time.Time) { + if s.regionRuntimeRegistry == nil || !region.runtimeKey.isValid() { + return + } + s.regionRuntimeRegistry.recordError(region.runtimeKey, err, now) + s.regionRuntimeRegistry.incRetry(region.runtimeKey) + s.regionRuntimeRegistry.transition(region.runtimeKey, regionPhaseRetryPending, now) +} + +func (s *subscriptionClient) removeRegionRuntime(region regionInfo, now time.Time) { + if s.regionRuntimeRegistry == nil || !region.runtimeKey.isValid() { + return + } + s.regionRuntimeRegistry.transition(region.runtimeKey, regionPhaseRemoved, now) + s.regionRuntimeRegistry.remove(region.runtimeKey) +} + // NewSubscriptionClient creates a client. func NewSubscriptionClient( config *SubscriptionClientConfig, @@ -373,6 +390,13 @@ func (s *subscriptionClient) updateMetrics(ctx context.Context) error { metrics.SubscriptionClientRequestedRegionCount.WithLabelValues("pending").Set(float64(pendingRegionReqCount)) + if s.regionRuntimeRegistry != nil { + counts := s.regionRuntimeRegistry.phaseCounts() + for _, phase := range regionRuntimePhases { + metrics.SubscriptionClientRegionRuntimePhaseCount.WithLabelValues(string(phase)).Set(float64(counts[phase])) + } + } + count := 0 s.totalSpans.RLock() for _, rt := range s.totalSpans.spanMap { @@ -564,8 +588,6 @@ func (s *subscriptionClient) onTableDrained(rt *subscribedSpan) { func (s *subscriptionClient) onRegionFail(errInfo regionErrorInfo) { if s.regionRuntimeRegistry != nil && errInfo.runtimeKey.isValid() { s.regionRuntimeRegistry.recordError(errInfo.runtimeKey, errInfo.err, time.Now()) - s.regionRuntimeRegistry.incRetry(errInfo.runtimeKey) - s.regionRuntimeRegistry.transition(errInfo.runtimeKey, regionPhaseRetryPending, time.Now()) } // unlock the range early to prevent blocking the range. if errInfo.subscribedSpan.rangeLock.UnlockRange( @@ -925,26 +947,31 @@ func (s *subscriptionClient) doHandleError(ctx context.Context, errInfo regionEr if notLeader := innerErr.GetNotLeader(); notLeader != nil { metricFeedNotLeaderCounter.Inc() s.regionCache.UpdateLeader(errInfo.verID, notLeader.GetLeader(), errInfo.rpcCtx.AccessIdx) + s.markRegionRetryPending(errInfo.regionInfo, errInfo.err, time.Now()) s.scheduleRegionRequest(ctx, errInfo.regionInfo, TaskHighPrior) return nil } if innerErr.GetEpochNotMatch() != nil { metricFeedEpochNotMatchCounter.Inc() + s.removeRegionRuntime(errInfo.regionInfo, time.Now()) s.scheduleRangeRequest(ctx, errInfo.span, errInfo.subscribedSpan, errInfo.filterLoop, TaskHighPrior) return nil } if innerErr.GetRegionNotFound() != nil { metricFeedRegionNotFoundCounter.Inc() + s.removeRegionRuntime(errInfo.regionInfo, time.Now()) s.scheduleRangeRequest(ctx, errInfo.span, errInfo.subscribedSpan, errInfo.filterLoop, TaskHighPrior) return nil } if innerErr.GetCongested() != nil { metricKvCongestedCounter.Inc() + s.markRegionRetryPending(errInfo.regionInfo, errInfo.err, time.Now()) s.scheduleRegionRequest(ctx, errInfo.regionInfo, TaskLowPrior) return nil } if innerErr.GetServerIsBusy() != nil { metricKvIsBusyCounter.Inc() + s.markRegionRetryPending(errInfo.regionInfo, errInfo.err, time.Now()) s.scheduleRegionRequest(ctx, errInfo.regionInfo, TaskLowPrior) return nil } @@ -964,10 +991,12 @@ func (s *subscriptionClient) doHandleError(ctx context.Context, errInfo regionEr zap.Uint64("subscriptionID", uint64(errInfo.subscribedSpan.subID)), zap.Stringer("error", innerErr)) metricFeedUnknownErrorCounter.Inc() + s.markRegionRetryPending(errInfo.regionInfo, errInfo.err, time.Now()) s.scheduleRegionRequest(ctx, errInfo.regionInfo, TaskHighPrior) return nil case *rpcCtxUnavailableErr: metricFeedRPCCtxUnavailable.Inc() + s.removeRegionRuntime(errInfo.regionInfo, time.Now()) s.scheduleRangeRequest(ctx, errInfo.span, errInfo.subscribedSpan, errInfo.filterLoop, TaskHighPrior) return nil case *getStoreErr: @@ -975,15 +1004,18 @@ func (s *subscriptionClient) doHandleError(ctx context.Context, errInfo regionEr bo := tikv.NewBackoffer(ctx, tikvRequestMaxBackoff) // cannot get the store the region belongs to, so we need to reload the region. s.regionCache.OnSendFail(bo, errInfo.rpcCtx, true, err) + s.removeRegionRuntime(errInfo.regionInfo, time.Now()) s.scheduleRangeRequest(ctx, errInfo.span, errInfo.subscribedSpan, errInfo.filterLoop, TaskHighPrior) return nil case *sendRequestToStoreErr: metricStoreSendRequestErr.Inc() bo := tikv.NewBackoffer(ctx, tikvRequestMaxBackoff) s.regionCache.OnSendFail(bo, errInfo.rpcCtx, regionScheduleReload, err) + s.markRegionRetryPending(errInfo.regionInfo, errInfo.err, time.Now()) s.scheduleRegionRequest(ctx, errInfo.regionInfo, TaskHighPrior) return nil case *requestCancelledErr: + s.removeRegionRuntime(errInfo.regionInfo, time.Now()) // the corresponding subscription has been unsubscribed, just ignore. return nil default: diff --git a/pkg/metrics/log_puller.go b/pkg/metrics/log_puller.go index c9c25c2dda..b5bdd9390e 100644 --- a/pkg/metrics/log_puller.go +++ b/pkg/metrics/log_puller.go @@ -85,6 +85,13 @@ var ( Name: "resolve_lock_task_drop_count", Help: "The number of resolve lock tasks dropped before being processed", }) + SubscriptionClientRegionRuntimePhaseCount = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "ticdc", + Subsystem: "subscription_client", + Name: "region_runtime_phase_count", + Help: "The number of regions in each runtime phase", + }, []string{"phase"}) SubscriptionClientRegionEventHandleDuration = prometheus.NewHistogramVec( prometheus.HistogramOpts{ @@ -114,6 +121,7 @@ func initLogPullerMetrics(registry *prometheus.Registry) { registry.MustRegister(RegionRequestFinishScanDuration) registry.MustRegister(SubscriptionClientSubscribedRegionCount) registry.MustRegister(SubscriptionClientResolveLockTaskDropCounter) + registry.MustRegister(SubscriptionClientRegionRuntimePhaseCount) registry.MustRegister(SubscriptionClientRegionEventHandleDuration) registry.MustRegister(SubscriptionClientConsumeKVEventsCallbackDuration) } From 339268361bd87f58646a049ec6b1e863e530ceac Mon Sep 17 00:00:00 2001 From: lidezhu Date: Fri, 10 Apr 2026 20:20:18 +0800 Subject: [PATCH 5/6] fix --- logservice/logpuller/region_event_handler.go | 16 ++-------- logservice/logpuller/region_request_worker.go | 31 ++++++++++++++----- logservice/logpuller/region_state.go | 28 +++++++++++++++++ 3 files changed, 55 insertions(+), 20 deletions(-) diff --git a/logservice/logpuller/region_event_handler.go b/logservice/logpuller/region_event_handler.go index c00be189e0..81dd99988e 100644 --- a/logservice/logpuller/region_event_handler.go +++ b/logservice/logpuller/region_event_handler.go @@ -252,9 +252,7 @@ func (h *regionEventHandler) handleRegionError(state *regionFeedState) { func handleEventEntries(span *subscribedSpan, state *regionFeedState, entries *cdcpb.Event_Entries_) { regionID, _, _ := state.getRegionMeta() - if state.region.runtimeKey.isValid() && state.worker != nil && state.worker.client != nil && state.worker.client.regionRuntimeRegistry != nil { - state.worker.client.regionRuntimeRegistry.updateLastEvent(state.region.runtimeKey, time.Now()) - } + state.updateRuntimeLastEvent(time.Now()) assembleRowEvent := func(regionID uint64, entry *cdcpb.Event_Row) common.RawKVEntry { var opType common.OpType switch entry.GetOpType() { @@ -280,13 +278,7 @@ func handleEventEntries(span *subscribedSpan, state *regionFeedState, entries *c switch entry.Type { case cdcpb.Event_INITIALIZED: state.setInitialized() - if state.region.runtimeKey.isValid() && state.worker != nil && state.worker.client != nil && state.worker.client.regionRuntimeRegistry != nil { - now := time.Now() - registry := state.worker.client.regionRuntimeRegistry - registry.setInitializedTime(state.region.runtimeKey, now) - registry.setReplicatingTime(state.region.runtimeKey, now) - registry.transition(state.region.runtimeKey, regionPhaseReplicating, now) - } + state.markRuntimeReplicating(time.Now()) log.Debug("region is initialized", zap.Int64("tableID", span.span.TableID), zap.Uint64("regionID", regionID), @@ -374,9 +366,7 @@ func handleResolvedTs(span *subscribedSpan, state *regionFeedState, resolvedTs u } state.updateResolvedTs(resolvedTs) - if state.region.runtimeKey.isValid() && state.worker != nil && state.worker.client != nil && state.worker.client.regionRuntimeRegistry != nil { - state.worker.client.regionRuntimeRegistry.updateResolvedTs(state.region.runtimeKey, resolvedTs, time.Now()) - } + state.updateRuntimeResolvedTs(resolvedTs, time.Now()) ts := uint64(0) shouldAdvance := false diff --git a/logservice/logpuller/region_request_worker.go b/logservice/logpuller/region_request_worker.go index daa2aceaa5..74aa4fa2be 100644 --- a/logservice/logpuller/region_request_worker.go +++ b/logservice/logpuller/region_request_worker.go @@ -61,6 +61,27 @@ type regionRequestWorker struct { } } +func (s *regionRequestWorker) runtimeRegistry() *regionRuntimeRegistry { + if s.client == nil { + return nil + } + return s.client.regionRuntimeRegistry +} + +func (s *regionRequestWorker) markRegionRuntimeEnqueued(region regionInfo, now time.Time) { + if registry := s.runtimeRegistry(); registry != nil && region.runtimeKey.isValid() { + registry.setRequestEnqueueTime(region.runtimeKey, now) + } +} + +func (s *regionRequestWorker) markRegionRuntimeSent(region regionInfo, now time.Time) { + if registry := s.runtimeRegistry(); registry != nil && region.runtimeKey.isValid() { + registry.updateWorker(region.runtimeKey, s.workerID) + registry.setRequestSendTime(region.runtimeKey, now) + registry.transition(region.runtimeKey, regionPhaseWaitInitialized, now) + } +} + func newRegionRequestWorker( ctx context.Context, client *subscriptionClient, @@ -431,11 +452,7 @@ func (s *regionRequestWorker) processRegionSendTask( // sentRequests visible in the same order and avoids leaving stale // requests in cleanup. s.requestCache.markSent(regionReq) - if s.client != nil && s.client.regionRuntimeRegistry != nil && region.runtimeKey.isValid() { - s.client.regionRuntimeRegistry.updateWorker(region.runtimeKey, s.workerID) - s.client.regionRuntimeRegistry.setRequestSendTime(region.runtimeKey, time.Now()) - s.client.regionRuntimeRegistry.transition(region.runtimeKey, regionPhaseWaitInitialized, time.Now()) - } + s.markRegionRuntimeSent(region, time.Now()) if err := doSend(s.createRegionRequest(region)); err != nil { state.markStopped(err) return err @@ -517,8 +534,8 @@ func (s *regionRequestWorker) clearRegionStates() map[SubscriptionID]regionFeedS // It blocks if the cache is full until there's space or ctx is cancelled func (s *regionRequestWorker) add(ctx context.Context, region regionInfo, force bool) (bool, error) { ok, err := s.requestCache.add(ctx, region, force) - if ok && err == nil && s.client != nil && s.client.regionRuntimeRegistry != nil && region.runtimeKey.isValid() { - s.client.regionRuntimeRegistry.setRequestEnqueueTime(region.runtimeKey, time.Now()) + if ok && err == nil { + s.markRegionRuntimeEnqueued(region, time.Now()) } return ok, err } diff --git a/logservice/logpuller/region_state.go b/logservice/logpuller/region_state.go index f57642d967..cb96d8777e 100644 --- a/logservice/logpuller/region_state.go +++ b/logservice/logpuller/region_state.go @@ -15,6 +15,7 @@ package logpuller import ( "sync" + "time" "github.com/pingcap/ticdc/heartbeatpb" "github.com/pingcap/ticdc/logservice/logpuller/regionlock" @@ -198,3 +199,30 @@ func (s *regionFeedState) getRegionInfo() regionInfo { func (s *regionFeedState) getRegionMeta() (uint64, heartbeatpb.TableSpan, string) { return s.region.verID.GetID(), s.region.span, s.region.rpcCtx.Addr } + +func (s *regionFeedState) runtimeRegistry() *regionRuntimeRegistry { + if !s.region.runtimeKey.isValid() || s.worker == nil || s.worker.client == nil { + return nil + } + return s.worker.client.regionRuntimeRegistry +} + +func (s *regionFeedState) updateRuntimeLastEvent(now time.Time) { + if registry := s.runtimeRegistry(); registry != nil { + registry.updateLastEvent(s.region.runtimeKey, now) + } +} + +func (s *regionFeedState) markRuntimeReplicating(now time.Time) { + if registry := s.runtimeRegistry(); registry != nil { + registry.setInitializedTime(s.region.runtimeKey, now) + registry.setReplicatingTime(s.region.runtimeKey, now) + registry.transition(s.region.runtimeKey, regionPhaseReplicating, now) + } +} + +func (s *regionFeedState) updateRuntimeResolvedTs(resolvedTs uint64, now time.Time) { + if registry := s.runtimeRegistry(); registry != nil { + registry.updateResolvedTs(s.region.runtimeKey, resolvedTs, now) + } +} From 1e89e15b3c650e5b53484aebd8d078f5529ba485 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Fri, 10 Apr 2026 20:48:26 +0800 Subject: [PATCH 6/6] fix --- logservice/logpuller/region_request_worker.go | 4 +- logservice/logpuller/region_runtime.go | 56 ++++++++++----- logservice/logpuller/region_runtime_test.go | 36 +++++++--- logservice/logpuller/region_state.go | 4 +- logservice/logpuller/subscription_client.go | 71 +++++++++++-------- 5 files changed, 110 insertions(+), 61 deletions(-) diff --git a/logservice/logpuller/region_request_worker.go b/logservice/logpuller/region_request_worker.go index 74aa4fa2be..29b84248be 100644 --- a/logservice/logpuller/region_request_worker.go +++ b/logservice/logpuller/region_request_worker.go @@ -76,9 +76,7 @@ func (s *regionRequestWorker) markRegionRuntimeEnqueued(region regionInfo, now t func (s *regionRequestWorker) markRegionRuntimeSent(region regionInfo, now time.Time) { if registry := s.runtimeRegistry(); registry != nil && region.runtimeKey.isValid() { - registry.updateWorker(region.runtimeKey, s.workerID) - registry.setRequestSendTime(region.runtimeKey, now) - registry.transition(region.runtimeKey, regionPhaseWaitInitialized, now) + registry.markWaitInitialized(region.runtimeKey, s.workerID, now) } } diff --git a/logservice/logpuller/region_runtime.go b/logservice/logpuller/region_runtime.go index 297ba83704..7233121e91 100644 --- a/logservice/logpuller/region_runtime.go +++ b/logservice/logpuller/region_runtime.go @@ -85,12 +85,14 @@ type regionRuntimeState struct { lastErrorTime time.Time retryCount int + // phaseEnterTime records the current lifecycle phase boundary. + // The fields below keep distinct historical milestones that are still useful + // after the phase moves forward. rangeLockAcquiredTime time.Time requestEnqueueTime time.Time requestRPCReadyTime time.Time requestSendTime time.Time initializedTime time.Time - replicatingTime time.Time } func (s regionRuntimeState) clone() regionRuntimeState { @@ -194,9 +196,15 @@ func (r *regionRuntimeRegistry) updateRegionInfo( }) } -func (r *regionRuntimeRegistry) updateWorker(key regionRuntimeKey, workerID uint64) regionRuntimeState { +func (r *regionRuntimeRegistry) registerRegion( + key regionRuntimeKey, + region regionInfo, + discoveredTime time.Time, +) regionRuntimeState { return r.upsert(key, func(state *regionRuntimeState) { - state.workerID = workerID + state.applyRegionInfo(region) + state.phase = regionPhaseDiscovered + state.phaseEnterTime = discoveredTime }) } @@ -237,9 +245,21 @@ func (r *regionRuntimeRegistry) recordError( }) } -func (r *regionRuntimeRegistry) incRetry(key regionRuntimeKey) regionRuntimeState { +func (r *regionRuntimeRegistry) markRetryPending( + key regionRuntimeKey, + err error, + retryTime time.Time, +) regionRuntimeState { return r.upsert(key, func(state *regionRuntimeState) { + if err == nil { + state.lastError = "" + } else { + state.lastError = err.Error() + } + state.lastErrorTime = retryTime state.retryCount++ + state.phase = regionPhaseRetryPending + state.phaseEnterTime = retryTime }) } @@ -252,48 +272,50 @@ func (r *regionRuntimeRegistry) setRequestEnqueueTime( }) } -func (r *regionRuntimeRegistry) setRangeLockAcquiredTime( +func (r *regionRuntimeRegistry) markQueued( key regionRuntimeKey, acquiredTime time.Time, + queuedTime time.Time, ) regionRuntimeState { return r.upsert(key, func(state *regionRuntimeState) { state.rangeLockAcquiredTime = acquiredTime + state.phase = regionPhaseQueued + state.phaseEnterTime = queuedTime }) } -func (r *regionRuntimeRegistry) setRPCReadyTime( +func (r *regionRuntimeRegistry) markRPCReady( key regionRuntimeKey, rpcReadyTime time.Time, ) regionRuntimeState { return r.upsert(key, func(state *regionRuntimeState) { state.requestRPCReadyTime = rpcReadyTime + state.phase = regionPhaseRPCReady + state.phaseEnterTime = rpcReadyTime }) } -func (r *regionRuntimeRegistry) setRequestSendTime( +func (r *regionRuntimeRegistry) markWaitInitialized( key regionRuntimeKey, + workerID uint64, sendTime time.Time, ) regionRuntimeState { return r.upsert(key, func(state *regionRuntimeState) { + state.workerID = workerID state.requestSendTime = sendTime + state.phase = regionPhaseWaitInitialized + state.phaseEnterTime = sendTime }) } -func (r *regionRuntimeRegistry) setInitializedTime( +func (r *regionRuntimeRegistry) markReplicating( key regionRuntimeKey, initializedTime time.Time, ) regionRuntimeState { return r.upsert(key, func(state *regionRuntimeState) { state.initializedTime = initializedTime - }) -} - -func (r *regionRuntimeRegistry) setReplicatingTime( - key regionRuntimeKey, - replicatingTime time.Time, -) regionRuntimeState { - return r.upsert(key, func(state *regionRuntimeState) { - state.replicatingTime = replicatingTime + state.phase = regionPhaseReplicating + state.phaseEnterTime = initializedTime }) } diff --git a/logservice/logpuller/region_runtime_test.go b/logservice/logpuller/region_runtime_test.go index d1bf0a900b..919d51e521 100644 --- a/logservice/logpuller/region_runtime_test.go +++ b/logservice/logpuller/region_runtime_test.go @@ -63,26 +63,28 @@ func TestRegionRuntimeRegistryUpdateAndSnapshot(t *testing.T) { subscribedSpan: subSpan, } - registry.updateRegionInfo(key, region) - registry.transition(key, regionPhaseQueued, now) - registry.updateWorker(key, 7) - registry.updateResolvedTs(key, 12345, now.Add(time.Second)) - registry.recordError(key, errors.New("store busy"), now.Add(2*time.Second)) - registry.incRetry(key) - registry.setRequestEnqueueTime(key, now.Add(3*time.Second)) + registry.registerRegion(key, region, now) + registry.setRequestEnqueueTime(key, now.Add(500*time.Millisecond)) + registry.markQueued(key, now.Add(time.Second), now.Add(2*time.Second)) + registry.markWaitInitialized(key, 7, now.Add(3*time.Second)) + registry.updateResolvedTs(key, 12345, now.Add(4*time.Second)) + registry.recordError(key, errors.New("store busy"), now.Add(5*time.Second)) state, ok := registry.get(key) require.True(t, ok) require.Equal(t, int64(42), state.tableID) - require.Equal(t, regionPhaseQueued, state.phase) + require.Equal(t, regionPhaseWaitInitialized, state.phase) require.Equal(t, uint64(22), state.leaderStoreID) require.Equal(t, uint64(11), state.leaderPeerID) require.Equal(t, "tikv-1:20160", state.storeAddr) require.Equal(t, uint64(7), state.workerID) require.Equal(t, uint64(12345), state.lastResolvedTs) require.Equal(t, "store busy", state.lastError) - require.Equal(t, 1, state.retryCount) - require.Equal(t, now.Add(3*time.Second), state.requestEnqueueTime) + require.Equal(t, 0, state.retryCount) + require.Equal(t, now.Add(time.Second), state.rangeLockAcquiredTime) + require.Equal(t, now.Add(3*time.Second), state.phaseEnterTime) + require.Equal(t, now.Add(3*time.Second), state.requestSendTime) + require.Equal(t, now.Add(500*time.Millisecond), state.requestEnqueueTime) snapshots := registry.snapshot() require.Len(t, snapshots, 1) @@ -94,6 +96,20 @@ func TestRegionRuntimeRegistryUpdateAndSnapshot(t *testing.T) { require.Equal(t, []byte("b"), updated.span.StartKey) } +func TestRegionRuntimeRegistryMarkReplicating(t *testing.T) { + registry := newRegionRuntimeRegistry() + key := registry.allocKey(1, 101) + now := time.Unix(1700000100, 0) + + registry.markReplicating(key, now) + + state, ok := registry.get(key) + require.True(t, ok) + require.Equal(t, regionPhaseReplicating, state.phase) + require.Equal(t, now, state.initializedTime) + require.Equal(t, now, state.phaseEnterTime) +} + func TestRegionRuntimeRegistryRemoveBySubscription(t *testing.T) { registry := newRegionRuntimeRegistry() key1 := registry.allocKey(1, 101) diff --git a/logservice/logpuller/region_state.go b/logservice/logpuller/region_state.go index cb96d8777e..496e001fb5 100644 --- a/logservice/logpuller/region_state.go +++ b/logservice/logpuller/region_state.go @@ -215,9 +215,7 @@ func (s *regionFeedState) updateRuntimeLastEvent(now time.Time) { func (s *regionFeedState) markRuntimeReplicating(now time.Time) { if registry := s.runtimeRegistry(); registry != nil { - registry.setInitializedTime(s.region.runtimeKey, now) - registry.setReplicatingTime(s.region.runtimeKey, now) - registry.transition(s.region.runtimeKey, regionPhaseReplicating, now) + registry.markReplicating(s.region.runtimeKey, now) } } diff --git a/logservice/logpuller/subscription_client.go b/logservice/logpuller/subscription_client.go index 97d17ea361..b91944ab85 100644 --- a/logservice/logpuller/subscription_client.go +++ b/logservice/logpuller/subscription_client.go @@ -238,8 +238,7 @@ func (s *subscriptionClient) ensureRegionRuntime(region *regionInfo, now time.Ti } if !region.runtimeKey.isValid() { region.runtimeKey = s.regionRuntimeRegistry.allocKey(region.subscribedSpan.subID, region.verID.GetID()) - s.regionRuntimeRegistry.updateRegionInfo(region.runtimeKey, *region) - s.regionRuntimeRegistry.transition(region.runtimeKey, regionPhaseDiscovered, now) + s.regionRuntimeRegistry.registerRegion(region.runtimeKey, *region, now) } } @@ -264,12 +263,45 @@ func (s *subscriptionClient) transitionRegionRuntime(region regionInfo, phase re } func (s *subscriptionClient) markRegionRetryPending(region regionInfo, err error, now time.Time) { + if s.regionRuntimeRegistry == nil || !region.runtimeKey.isValid() { + return + } + s.regionRuntimeRegistry.markRetryPending(region.runtimeKey, err, now) +} + +func (s *subscriptionClient) markRegionRuntimeRPCReady(region regionInfo, now time.Time) { + if s.regionRuntimeRegistry == nil || !region.runtimeKey.isValid() { + return + } + s.regionRuntimeRegistry.markRPCReady(region.runtimeKey, now) +} + +func (s *subscriptionClient) markRegionRuntimeQueued(region regionInfo, acquiredTime, queuedTime time.Time) { + if s.regionRuntimeRegistry == nil || !region.runtimeKey.isValid() { + return + } + s.regionRuntimeRegistry.markQueued(region.runtimeKey, acquiredTime, queuedTime) +} + +func (s *subscriptionClient) recordRegionRuntimeError(region regionInfo, err error, now time.Time) { if s.regionRuntimeRegistry == nil || !region.runtimeKey.isValid() { return } s.regionRuntimeRegistry.recordError(region.runtimeKey, err, now) - s.regionRuntimeRegistry.incRetry(region.runtimeKey) - s.regionRuntimeRegistry.transition(region.runtimeKey, regionPhaseRetryPending, now) +} + +func (s *subscriptionClient) regionRuntimePhaseCounts() map[regionPhase]int { + if s.regionRuntimeRegistry == nil { + return nil + } + return s.regionRuntimeRegistry.phaseCounts() +} + +func (s *subscriptionClient) removeSubscriptionRuntime(subID SubscriptionID) { + if s.regionRuntimeRegistry == nil { + return + } + s.regionRuntimeRegistry.removeBySubscription(subID) } func (s *subscriptionClient) removeRegionRuntime(region regionInfo, now time.Time) { @@ -390,8 +422,7 @@ func (s *subscriptionClient) updateMetrics(ctx context.Context) error { metrics.SubscriptionClientRequestedRegionCount.WithLabelValues("pending").Set(float64(pendingRegionReqCount)) - if s.regionRuntimeRegistry != nil { - counts := s.regionRuntimeRegistry.phaseCounts() + if counts := s.regionRuntimePhaseCounts(); counts != nil { for _, phase := range regionRuntimePhases { metrics.SubscriptionClientRegionRuntimePhaseCount.WithLabelValues(string(phase)).Set(float64(counts[phase])) } @@ -569,9 +600,7 @@ func (s *subscriptionClient) onTableDrained(rt *subscribedSpan) { log.Info("subscription client stop span is finished", zap.Uint64("subscriptionID", uint64(rt.subID))) - if s.regionRuntimeRegistry != nil { - s.regionRuntimeRegistry.removeBySubscription(rt.subID) - } + s.removeSubscriptionRuntime(rt.subID) err := s.ds.RemovePath(rt.subID) if err != nil { @@ -586,9 +615,7 @@ func (s *subscriptionClient) onTableDrained(rt *subscribedSpan) { // Note: don't block the caller, otherwise there may be deadlock func (s *subscriptionClient) onRegionFail(errInfo regionErrorInfo) { - if s.regionRuntimeRegistry != nil && errInfo.runtimeKey.isValid() { - s.regionRuntimeRegistry.recordError(errInfo.runtimeKey, errInfo.err, time.Now()) - } + s.recordRegionRuntimeError(errInfo.regionInfo, errInfo.err, time.Now()) // unlock the range early to prevent blocking the range. if errInfo.subscribedSpan.rangeLock.UnlockRange( errInfo.span.StartKey, errInfo.span.EndKey, @@ -694,10 +721,7 @@ func (s *subscriptionClient) handleRegions(ctx context.Context, eg *errgroup.Gro continue } s.updateRegionRuntimeInfo(region) - if s.regionRuntimeRegistry != nil && region.runtimeKey.isValid() { - s.regionRuntimeRegistry.setRPCReadyTime(region.runtimeKey, time.Now()) - } - s.transitionRegionRuntime(region, regionPhaseRPCReady, time.Now()) + s.markRegionRuntimeRPCReady(region, time.Now()) store := getStore(region.rpcCtx.Addr) worker := store.getRequestWorker() @@ -885,24 +909,15 @@ func (s *subscriptionClient) scheduleRegionRequest(ctx context.Context, region r switch lockRangeResult.Status { case regionlock.LockRangeStatusSuccess: region.lockedRangeState = lockRangeResult.LockedRangeState - if s.regionRuntimeRegistry != nil && region.runtimeKey.isValid() { - s.regionRuntimeRegistry.setRangeLockAcquiredTime(region.runtimeKey, lockRangeResult.LockedRangeState.Created) - } - s.transitionRegionRuntime(region, regionPhaseQueued, time.Now()) + s.markRegionRuntimeQueued(region, lockRangeResult.LockedRangeState.Created, time.Now()) s.regionTaskQueue.Push(NewRegionPriorityTask(priority, region, s.pdClock.CurrentTS())) case regionlock.LockRangeStatusStale: - s.transitionRegionRuntime(region, regionPhaseRemoved, time.Now()) - if s.regionRuntimeRegistry != nil && region.runtimeKey.isValid() { - s.regionRuntimeRegistry.remove(region.runtimeKey) - } + s.removeRegionRuntime(region, time.Now()) for _, r := range lockRangeResult.RetryRanges { s.scheduleRangeRequest(ctx, r, region.subscribedSpan, region.filterLoop, priority) } case regionlock.LockRangeStatusCancel: - s.transitionRegionRuntime(region, regionPhaseRemoved, time.Now()) - if s.regionRuntimeRegistry != nil && region.runtimeKey.isValid() { - s.regionRuntimeRegistry.remove(region.runtimeKey) - } + s.removeRegionRuntime(region, time.Now()) default: return }