diff --git a/logservice/logpuller/range_reload_aggregator.go b/logservice/logpuller/range_reload_aggregator.go new file mode 100644 index 0000000000..27761b92c6 --- /dev/null +++ b/logservice/logpuller/range_reload_aggregator.go @@ -0,0 +1,164 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package logpuller + +import ( + "context" + "sort" + "sync" + "time" + + "github.com/pingcap/log" + "github.com/pingcap/ticdc/heartbeatpb" + "github.com/pingcap/ticdc/pkg/common" + "go.uber.org/zap" +) + +type rangeReloadTaskSet struct { + subscribedSpan *subscribedSpan + filterLoop bool + priority TaskType + spans []heartbeatpb.TableSpan + flushScheduled bool +} + +type rangeReloadAggregator struct { + client *subscriptionClient + debounce time.Duration + flushThreshold int + + mu sync.Mutex + tasks map[SubscriptionID]*rangeReloadTaskSet +} + +func newRangeReloadAggregator(client *subscriptionClient, debounce time.Duration, flushThreshold int) *rangeReloadAggregator { + if flushThreshold <= 0 { + flushThreshold = rangeReloadImmediateFlushMax + } + return &rangeReloadAggregator{ + client: client, + debounce: debounce, + flushThreshold: flushThreshold, + tasks: make(map[SubscriptionID]*rangeReloadTaskSet), + } +} + +func (a *rangeReloadAggregator) add( + span heartbeatpb.TableSpan, + subscribedSpan *subscribedSpan, + filterLoop bool, + priority TaskType, +) { + if subscribedSpan == nil || common.IsEmptySpan(span) { + return + } + var immediateFlush bool + a.mu.Lock() + set := a.tasks[subscribedSpan.subID] + if set == nil { + set = &rangeReloadTaskSet{ + subscribedSpan: subscribedSpan, + filterLoop: filterLoop, + priority: priority, + spans: make([]heartbeatpb.TableSpan, 0, 4), + } + a.tasks[subscribedSpan.subID] = set + } + if priority < set.priority { + set.priority = priority + } + set.spans = append(set.spans, span) + if len(set.spans) >= a.flushThreshold { + immediateFlush = true + set.flushScheduled = false + } else if !set.flushScheduled { + // Debounce reloads so a split/merge storm can be collapsed into a + // smaller batch of merged spans before hitting region metadata APIs. + set.flushScheduled = true + a.client.retryScheduler.schedule(a.debounce, func(_ context.Context) { + a.flush(subscribedSpan.subID) + }) + } + a.mu.Unlock() + + if immediateFlush { + a.flush(subscribedSpan.subID) + } +} + +func (a *rangeReloadAggregator) flush(subID SubscriptionID) { + var ( + set *rangeReloadTaskSet + mergedSpans []heartbeatpb.TableSpan + ) + a.mu.Lock() + set = a.tasks[subID] + if set == nil { + a.mu.Unlock() + return + } + delete(a.tasks, subID) + set.flushScheduled = false + mergedSpans = mergeTableSpans(set.spans) + a.mu.Unlock() + + if set.subscribedSpan.stopped.Load() { + return + } + log.Debug("subscription client flush aggregated reload spans", + zap.Uint64("subscriptionID", uint64(subID)), + zap.Int("inputSpanCount", len(set.spans)), + zap.Int("mergedSpanCount", len(mergedSpans))) + for _, span := range mergedSpans { + a.client.scheduleRangeTaskAfter(rangeTask{ + span: span, + subscribedSpan: set.subscribedSpan, + filterLoop: set.filterLoop, + priority: set.priority, + }, 0) + } +} + +func mergeTableSpans(spans []heartbeatpb.TableSpan) []heartbeatpb.TableSpan { + if len(spans) <= 1 { + return append([]heartbeatpb.TableSpan(nil), spans...) + } + copied := append([]heartbeatpb.TableSpan(nil), spans...) + sort.Slice(copied, func(i, j int) bool { + return common.StartCompare(copied[i].StartKey, copied[j].StartKey) < 0 + }) + + merged := make([]heartbeatpb.TableSpan, 0, len(copied)) + for _, span := range copied { + if common.IsEmptySpan(span) { + continue + } + if len(merged) == 0 { + merged = append(merged, span) + continue + } + last := merged[len(merged)-1] + if common.EndCompare(last.EndKey, span.StartKey) >= 0 { + // Adjacent retry spans are merged too, because they can be resolved + // by a single BatchLoadRegionsWithKeyRange call later. + if common.EndCompare(span.EndKey, last.EndKey) > 0 { + last.EndKey = span.EndKey + } + merged[len(merged)-1] = last + continue + } + merged = append(merged, span) + } + return merged +} diff --git a/logservice/logpuller/region_event_handler.go b/logservice/logpuller/region_event_handler.go index 0de2306c47..b71b221ae9 100644 --- a/logservice/logpuller/region_event_handler.go +++ b/logservice/logpuller/region_event_handler.go @@ -277,6 +277,9 @@ func handleEventEntries(span *subscribedSpan, state *regionFeedState, entries *c switch entry.Type { case cdcpb.Event_INITIALIZED: state.setInitialized() + if state.worker != nil { + state.worker.client.resetRetryBackoff(SubscriptionID(state.requestID), regionID) + } log.Debug("region is initialized", zap.Int64("tableID", span.span.TableID), zap.Uint64("regionID", regionID), @@ -364,6 +367,9 @@ func handleResolvedTs(span *subscribedSpan, state *regionFeedState, resolvedTs u } state.updateResolvedTs(resolvedTs) + if state.worker != nil { + state.worker.client.resetRetryBackoff(SubscriptionID(state.requestID), regionID) + } ts := uint64(0) shouldAdvance := false diff --git a/logservice/logpuller/region_request_worker.go b/logservice/logpuller/region_request_worker.go index fafe7fcade..2091a57f44 100644 --- a/logservice/logpuller/region_request_worker.go +++ b/logservice/logpuller/region_request_worker.go @@ -123,6 +123,7 @@ func newRegionRequestWorker( } regionErr = &sendRequestToStoreErr{} } + worker.client.markStoreFailure(worker.store.storeAddr) for subID, m := range worker.clearRegionStates() { for _, state := range m { state.markStopped(regionErr) @@ -186,6 +187,7 @@ func (s *regionRequestWorker) run(ctx context.Context, credential *security.Cred defer func() { _ = conn.Conn.Close() }() + s.client.markStoreSuccess(s.store.storeAddr) g.Go(func() error { return s.receiveAndDispatchChangeEvents(conn) @@ -221,6 +223,7 @@ func (s *regionRequestWorker) receiveAndDispatchChangeEvents(conn *ConnAndClient } return errors.Trace(err) } + s.client.markStoreSuccess(s.store.storeAddr) if len(changeEvent.Events) > 0 { s.dispatchRegionChangeEvents(changeEvent.Events) } @@ -345,6 +348,7 @@ func (s *regionRequestWorker) processRegionSendTask( zap.Error(err)) return errors.Trace(err) } + s.client.markStoreSuccess(s.store.storeAddr) // TODO: add a metric? return nil } @@ -410,6 +414,7 @@ func (s *regionRequestWorker) processRegionSendTask( s.requestCache.markDone() return err } + s.client.resetRetryBackoff(subID, region.verID.GetID()) s.requestCache.markSent(regionReq) } regionReq, err = fetchMoreReq() diff --git a/logservice/logpuller/retry_scheduler.go b/logservice/logpuller/retry_scheduler.go new file mode 100644 index 0000000000..80ba205bde --- /dev/null +++ b/logservice/logpuller/retry_scheduler.go @@ -0,0 +1,387 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package logpuller + +import ( + "container/heap" + "context" + "sync" + "time" + + "github.com/pingcap/log" + "github.com/pingcap/ticdc/heartbeatpb" + "go.uber.org/zap" +) + +const ( + notLeaderRetryBaseDelay = 50 * time.Millisecond + rpcCtxRetryBaseDelay = 100 * time.Millisecond + busyRetryBaseDelay = 100 * time.Millisecond + storeFailureRetryBaseDelay = 200 * time.Millisecond + shortRetryMaxDelay = 1 * time.Second + busyRetryMaxDelay = 3 * time.Second + storeFailureRetryMaxDelay = 5 * time.Second + rangeReloadDebounceInterval = 25 * time.Millisecond + rangeReloadImmediateFlushMax = 128 +) + +type delayedAction func(context.Context) + +type delayedTask struct { + run delayedAction + at time.Time +} + +type delayedTaskHeap []delayedTask + +func (h delayedTaskHeap) Len() int { return len(h) } + +func (h delayedTaskHeap) Less(i, j int) bool { + return h[i].at.Before(h[j].at) +} + +func (h delayedTaskHeap) Swap(i, j int) { + h[i], h[j] = h[j], h[i] +} + +func (h *delayedTaskHeap) Push(x any) { + *h = append(*h, x.(delayedTask)) +} + +func (h *delayedTaskHeap) Pop() any { + old := *h + n := len(old) + item := old[n-1] + *h = old[:n-1] + return item +} + +type retryScheduler struct { + client *subscriptionClient + addCh chan delayedTask +} + +func newRetryScheduler(client *subscriptionClient) *retryScheduler { + return &retryScheduler{ + client: client, + addCh: make(chan delayedTask, 4096), + } +} + +func (s *retryScheduler) schedule(delay time.Duration, action delayedAction) { + if delay <= 0 { + action(s.client.ctx) + return + } + + task := delayedTask{run: action, at: time.Now().Add(delay)} + select { + case <-s.client.ctx.Done(): + case s.addCh <- task: + } +} + +func (s *retryScheduler) run(ctx context.Context) error { + var ( + taskHeap delayedTaskHeap + timer *time.Timer + timerCh <-chan time.Time + ) + heap.Init(&taskHeap) + resetTimer := func(d time.Duration) { + if d < 0 { + d = 0 + } + if timer == nil { + timer = time.NewTimer(d) + } else { + if !timer.Stop() { + select { + case <-timer.C: + default: + } + } + timer.Reset(d) + } + timerCh = timer.C + } + disableTimer := func() { + if timer != nil { + if !timer.Stop() { + select { + case <-timer.C: + default: + } + } + } + timerCh = nil + } + for { + // Keep only the nearest deadline armed so delayed retries do not spin + // when the heap contains many future tasks. + if taskHeap.Len() > 0 { + resetTimer(time.Until(taskHeap[0].at)) + } else { + disableTimer() + } + + select { + case <-ctx.Done(): + disableTimer() + return ctx.Err() + case task := <-s.addCh: + heap.Push(&taskHeap, task) + case <-timerCh: + now := time.Now() + for taskHeap.Len() > 0 { + task := taskHeap[0] + if task.at.After(now) { + break + } + heap.Pop(&taskHeap) + task.run(ctx) + } + } + } +} + +type retryBackoffReason string + +const ( + retryBackoffNotLeader retryBackoffReason = "not-leader" + retryBackoffRPCContext retryBackoffReason = "rpc-context" + retryBackoffBusy retryBackoffReason = "busy" + retryBackoffStoreFailure retryBackoffReason = "store-failure" +) + +type retryBackoffKey struct { + subID SubscriptionID + regionID uint64 + reason retryBackoffReason +} + +type retryBackoffManager struct { + mu sync.Mutex + attempts map[retryBackoffKey]int +} + +func newRetryBackoffManager() *retryBackoffManager { + return &retryBackoffManager{attempts: make(map[retryBackoffKey]int)} +} + +func (m *retryBackoffManager) nextDelay(key retryBackoffKey, base, max time.Duration) time.Duration { + m.mu.Lock() + defer m.mu.Unlock() + attempt := m.attempts[key] + if attempt < 0 { + attempt = 0 + } + m.attempts[key] = attempt + 1 + + delay := base + for i := 0; i < attempt; i++ { + if delay >= max/2 { + return max + } + delay *= 2 + } + if delay > max { + return max + } + return delay +} + +func (m *retryBackoffManager) resetRegion(subID SubscriptionID, regionID uint64) { + m.mu.Lock() + defer m.mu.Unlock() + for key := range m.attempts { + if key.subID == subID && key.regionID == regionID { + delete(m.attempts, key) + } + } +} + +type storeBackoffState struct { + consecutiveFailures int + cooldownUntil time.Time +} + +type storeBackoffManager struct { + mu sync.Mutex + states map[string]storeBackoffState +} + +func newStoreBackoffManager() *storeBackoffManager { + return &storeBackoffManager{states: make(map[string]storeBackoffState)} +} + +func (m *storeBackoffManager) markFailure(storeAddr string) time.Duration { + if storeAddr == "" { + return 0 + } + m.mu.Lock() + defer m.mu.Unlock() + state := m.states[storeAddr] + state.consecutiveFailures++ + delay := storeFailureRetryBaseDelay + for i := 1; i < state.consecutiveFailures; i++ { + if delay >= storeFailureRetryMaxDelay/2 { + delay = storeFailureRetryMaxDelay + break + } + delay *= 2 + } + if delay > storeFailureRetryMaxDelay { + delay = storeFailureRetryMaxDelay + } + state.cooldownUntil = time.Now().Add(delay) + m.states[storeAddr] = state + return delay +} + +func (m *storeBackoffManager) markSuccess(storeAddr string) { + if storeAddr == "" { + return + } + m.mu.Lock() + defer m.mu.Unlock() + delete(m.states, storeAddr) +} + +func (m *storeBackoffManager) cooldownRemaining(storeAddr string) time.Duration { + if storeAddr == "" { + return 0 + } + m.mu.Lock() + defer m.mu.Unlock() + state, ok := m.states[storeAddr] + if !ok { + return 0 + } + remaining := time.Until(state.cooldownUntil) + if remaining <= 0 { + delete(m.states, storeAddr) + return 0 + } + return remaining +} + +func (s *subscriptionClient) schedulePriorityTaskAfter(task PriorityTask, delay time.Duration) { + s.retryScheduler.schedule(delay, func(ctx context.Context) { + select { + case <-ctx.Done(): + return + default: + } + s.regionTaskQueue.Push(task) + }) +} + +func (s *subscriptionClient) scheduleRegionRequestAfter( + ctx context.Context, + region regionInfo, + priority TaskType, + delay time.Duration, +) { + s.retryScheduler.schedule(delay, func(taskCtx context.Context) { + // Re-enter the normal scheduling path after the backoff expires so + // range locking and worker selection still use the same code path. + s.scheduleRegionRequest(taskCtx, region, priority) + }) +} + +func (s *subscriptionClient) scheduleRangeTaskAfter(task rangeTask, delay time.Duration) { + s.retryScheduler.schedule(delay, func(ctx context.Context) { + select { + case <-ctx.Done(): + case s.rangeTaskCh <- task: + } + }) +} + +func (s *subscriptionClient) addReloadRangeTask( + span heartbeatpb.TableSpan, + subscribedSpan *subscribedSpan, + filterLoop bool, + priority TaskType, +) { + if s.rangeReloadAggregator == nil { + s.scheduleRangeTaskAfter(rangeTask{ + span: span, + subscribedSpan: subscribedSpan, + filterLoop: filterLoop, + priority: priority, + }, 0) + return + } + s.rangeReloadAggregator.add(span, subscribedSpan, filterLoop, priority) +} + +func (s *subscriptionClient) getRetryBackoffDelay( + subID SubscriptionID, + regionID uint64, + reason retryBackoffReason, +) time.Duration { + // Backoff is tracked per region and reason so a successful resolved-ts + // update on one path can reset retries without affecting unrelated errors. + key := retryBackoffKey{subID: subID, regionID: regionID, reason: reason} + switch reason { + case retryBackoffNotLeader: + return s.retryBackoff.nextDelay(key, notLeaderRetryBaseDelay, shortRetryMaxDelay) + case retryBackoffRPCContext: + return s.retryBackoff.nextDelay(key, rpcCtxRetryBaseDelay, shortRetryMaxDelay) + case retryBackoffBusy: + return s.retryBackoff.nextDelay(key, busyRetryBaseDelay, busyRetryMaxDelay) + case retryBackoffStoreFailure: + return s.retryBackoff.nextDelay(key, storeFailureRetryBaseDelay, storeFailureRetryMaxDelay) + default: + return busyRetryBaseDelay + } +} + +func (s *subscriptionClient) resetRetryBackoff(subID SubscriptionID, regionID uint64) { + if s == nil || s.retryBackoff == nil { + return + } + s.retryBackoff.resetRegion(subID, regionID) +} + +func (s *subscriptionClient) markStoreFailure(storeAddr string) time.Duration { + if s == nil || s.storeBackoff == nil { + return 0 + } + delay := s.storeBackoff.markFailure(storeAddr) + if delay > 0 { + // The store-level cooldown prevents a reconnect storm from immediately + // pushing the same batch of regions back to a store that just failed. + log.Info("subscription client mark store cooldown", + zap.String("addr", storeAddr), + zap.Duration("delay", delay)) + } + return delay +} + +func (s *subscriptionClient) markStoreSuccess(storeAddr string) { + if s == nil || s.storeBackoff == nil { + return + } + s.storeBackoff.markSuccess(storeAddr) +} + +func (s *subscriptionClient) getStoreCooldown(storeAddr string) time.Duration { + if s == nil || s.storeBackoff == nil { + return 0 + } + return s.storeBackoff.cooldownRemaining(storeAddr) +} diff --git a/logservice/logpuller/retry_scheduler_test.go b/logservice/logpuller/retry_scheduler_test.go new file mode 100644 index 0000000000..eeac980f4b --- /dev/null +++ b/logservice/logpuller/retry_scheduler_test.go @@ -0,0 +1,148 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package logpuller + +import ( + "context" + "testing" + "time" + + "github.com/pingcap/ticdc/heartbeatpb" + "github.com/pingcap/ticdc/pkg/common" + "github.com/pingcap/ticdc/pkg/pdutil" + "github.com/stretchr/testify/require" + "github.com/tikv/client-go/v2/tikv" +) + +func TestScheduleRegionRequestAfter(t *testing.T) { + client := &subscriptionClient{ + rangeTaskCh: make(chan rangeTask, 8), + regionTaskQueue: NewPriorityQueue(), + } + client.ctx, client.cancel = context.WithCancel(context.Background()) + defer client.cancel() + client.pdClock = pdutil.NewClock4Test() + client.retryScheduler = newRetryScheduler(client) + client.retryBackoff = newRetryBackoffManager() + client.storeBackoff = newStoreBackoffManager() + + done := make(chan error, 1) + go func() { + done <- client.retryScheduler.run(client.ctx) + }() + + consumeKVEvents := func(_ []common.RawKVEntry, _ func()) bool { return false } + advanceResolvedTs := func(ts uint64) {} + span := heartbeatpb.TableSpan{TableID: 1, StartKey: []byte("a"), EndKey: []byte("z")} + rt := client.newSubscribedSpan(SubscriptionID(1), span, 100, consumeKVEvents, advanceResolvedTs, 0, false) + region := newRegionInfo(tikv.NewRegionVerID(11, 1, 1), heartbeatpb.TableSpan{TableID: 1, StartKey: []byte("b"), EndKey: []byte("c")}, nil, rt, false) + + client.scheduleRegionRequestAfter(client.ctx, region, TaskHighPrior, 50*time.Millisecond) + + time.Sleep(20 * time.Millisecond) + require.Equal(t, 0, client.regionTaskQueue.Len()) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + task, err := client.regionTaskQueue.Pop(ctx) + require.NoError(t, err) + regionInfo := task.GetRegionInfo() + require.Equal(t, uint64(11), regionInfo.verID.GetID()) + + client.cancel() + select { + case err := <-done: + require.Equal(t, context.Canceled, err) + case <-time.After(time.Second): + t.Fatal("retry scheduler should exit after cancel") + } +} + +func TestRangeReloadAggregatorMergesSpans(t *testing.T) { + client := &subscriptionClient{ + rangeTaskCh: make(chan rangeTask, 8), + } + client.ctx, client.cancel = context.WithCancel(context.Background()) + defer client.cancel() + client.retryScheduler = newRetryScheduler(client) + client.rangeReloadAggregator = newRangeReloadAggregator(client, 10*time.Millisecond, 16) + + done := make(chan error, 1) + go func() { + done <- client.retryScheduler.run(client.ctx) + }() + + consumeKVEvents := func(_ []common.RawKVEntry, _ func()) bool { return false } + advanceResolvedTs := func(ts uint64) {} + rawSpan := heartbeatpb.TableSpan{TableID: 1, StartKey: []byte("a"), EndKey: []byte("z")} + rt := client.newSubscribedSpan(SubscriptionID(1), rawSpan, 100, consumeKVEvents, advanceResolvedTs, 0, false) + + client.addReloadRangeTask(heartbeatpb.TableSpan{TableID: 1, StartKey: []byte("a"), EndKey: []byte("b")}, rt, false, TaskHighPrior) + client.addReloadRangeTask(heartbeatpb.TableSpan{TableID: 1, StartKey: []byte("b"), EndKey: []byte("c")}, rt, false, TaskLowPrior) + client.addReloadRangeTask(heartbeatpb.TableSpan{TableID: 1, StartKey: []byte("d"), EndKey: []byte("e")}, rt, false, TaskHighPrior) + + var tasks []rangeTask + deadline := time.After(time.Second) + for len(tasks) < 2 { + select { + case task := <-client.rangeTaskCh: + tasks = append(tasks, task) + case <-deadline: + t.Fatal("timed out waiting for merged range reload tasks") + } + } + require.Len(t, tasks, 2) + require.Equal(t, []byte("a"), tasks[0].span.StartKey) + require.Equal(t, []byte("c"), tasks[0].span.EndKey) + require.Equal(t, TaskHighPrior, tasks[0].priority) + require.Equal(t, []byte("d"), tasks[1].span.StartKey) + require.Equal(t, []byte("e"), tasks[1].span.EndKey) + + select { + case extra := <-client.rangeTaskCh: + t.Fatalf("unexpected extra range task: %+v", extra) + case <-time.After(30 * time.Millisecond): + } + + client.cancel() + select { + case err := <-done: + require.Equal(t, context.Canceled, err) + case <-time.After(time.Second): + t.Fatal("retry scheduler should exit after cancel") + } +} + +func TestMergeTableSpans(t *testing.T) { + spans := []heartbeatpb.TableSpan{ + {StartKey: []byte("d"), EndKey: []byte("e")}, + {StartKey: []byte("a"), EndKey: []byte("b")}, + {StartKey: []byte("b"), EndKey: []byte("c")}, + {StartKey: []byte("c"), EndKey: []byte("d")}, + } + merged := mergeTableSpans(spans) + require.Len(t, merged, 1) + require.Equal(t, []byte("a"), merged[0].StartKey) + require.Equal(t, []byte("e"), merged[0].EndKey) +} + +func TestStoreBackoffManager(t *testing.T) { + m := newStoreBackoffManager() + d1 := m.markFailure("store-1") + d2 := m.markFailure("store-1") + require.Greater(t, d2, d1) + require.Greater(t, m.cooldownRemaining("store-1"), time.Duration(0)) + m.markSuccess("store-1") + require.Equal(t, time.Duration(0), m.cooldownRemaining("store-1")) +} diff --git a/logservice/logpuller/subscription_client.go b/logservice/logpuller/subscription_client.go index 52552fb72d..393bbb0826 100644 --- a/logservice/logpuller/subscription_client.go +++ b/logservice/logpuller/subscription_client.go @@ -222,6 +222,11 @@ type subscriptionClient struct { // errCh is used to receive region errors. // The errors will be handled in `handleErrors` goroutine. errCache *errCache + + retryScheduler *retryScheduler + rangeReloadAggregator *rangeReloadAggregator + retryBackoff *retryBackoffManager + storeBackoff *storeBackoffManager } // NewSubscriptionClient creates a client. @@ -248,6 +253,10 @@ func NewSubscriptionClient( errCache: newErrCache(), } subClient.ctx, subClient.cancel = context.WithCancel(context.Background()) + subClient.retryScheduler = newRetryScheduler(subClient) + subClient.rangeReloadAggregator = newRangeReloadAggregator(subClient, rangeReloadDebounceInterval, rangeReloadImmediateFlushMax) + subClient.retryBackoff = newRetryBackoffManager() + subClient.storeBackoff = newStoreBackoffManager() subClient.totalSpans.spanMap = make(map[SubscriptionID]*subscribedSpan) option := dynstream.NewOption() @@ -463,6 +472,7 @@ func (s *subscriptionClient) Run(ctx context.Context) error { g.Go(func() error { return s.handleRangeTasks(ctx) }) g.Go(func() error { return s.handleRegions(ctx, g) }) g.Go(func() error { return s.handleErrors(ctx) }) + g.Go(func() error { return s.retryScheduler.run(ctx) }) g.Go(func() error { return s.runResolveLockChecker(ctx) }) g.Go(func() error { return s.handleResolveLockTasks(ctx) }) g.Go(func() error { return s.logSlowRegions(ctx) }) @@ -622,6 +632,14 @@ func (s *subscriptionClient) handleRegions(ctx context.Context, eg *errgroup.Gro continue } + cooldown := s.getStoreCooldown(region.rpcCtx.Addr) + if cooldown > 0 { + // Reschedule the same priority task instead of busy-looping through + // attachRPCContext and worker selection while the store is cooling down. + s.schedulePriorityTaskAfter(regionTask, cooldown) + continue + } + store := getStore(region.rpcCtx.Addr) worker := store.getRequestWorker() force := regionTask.Priority() <= forcedPriorityBase @@ -822,10 +840,7 @@ func (s *subscriptionClient) scheduleRangeRequest( filterLoop bool, priority TaskType, ) { - select { - case <-ctx.Done(): - case s.rangeTaskCh <- rangeTask{span: span, subscribedSpan: subscribedSpan, filterLoop: filterLoop, priority: priority}: - } + s.scheduleRangeTaskAfter(rangeTask{span: span, subscribedSpan: subscribedSpan, filterLoop: filterLoop, priority: priority}, 0) } func (s *subscriptionClient) handleErrors(ctx context.Context) error { @@ -855,27 +870,40 @@ func (s *subscriptionClient) doHandleError(ctx context.Context, errInfo regionEr if notLeader := innerErr.GetNotLeader(); notLeader != nil { metricFeedNotLeaderCounter.Inc() s.regionCache.UpdateLeader(errInfo.verID, notLeader.GetLeader(), errInfo.rpcCtx.AccessIdx) - s.scheduleRegionRequest(ctx, errInfo.regionInfo, TaskHighPrior) + delay := s.getRetryBackoffDelay(errInfo.subscribedSpan.subID, errInfo.verID.GetID(), retryBackoffNotLeader) + s.scheduleRegionRequestAfter(ctx, errInfo.regionInfo, TaskHighPrior, delay) return nil } if innerErr.GetEpochNotMatch() != nil { metricFeedEpochNotMatchCounter.Inc() - s.scheduleRangeRequest(ctx, errInfo.span, errInfo.subscribedSpan, errInfo.filterLoop, TaskHighPrior) + s.addReloadRangeTask(errInfo.span, errInfo.subscribedSpan, errInfo.filterLoop, TaskHighPrior) return nil } if innerErr.GetRegionNotFound() != nil { metricFeedRegionNotFoundCounter.Inc() - s.scheduleRangeRequest(ctx, errInfo.span, errInfo.subscribedSpan, errInfo.filterLoop, TaskHighPrior) + s.addReloadRangeTask(errInfo.span, errInfo.subscribedSpan, errInfo.filterLoop, TaskHighPrior) return nil } if innerErr.GetCongested() != nil { metricKvCongestedCounter.Inc() - s.scheduleRegionRequest(ctx, errInfo.regionInfo, TaskLowPrior) + delay := s.getRetryBackoffDelay(errInfo.subscribedSpan.subID, errInfo.verID.GetID(), retryBackoffBusy) + if errInfo.rpcCtx != nil { + if storeDelay := s.markStoreFailure(errInfo.rpcCtx.Addr); storeDelay > delay { + delay = storeDelay + } + } + s.scheduleRegionRequestAfter(ctx, errInfo.regionInfo, TaskLowPrior, delay) return nil } if innerErr.GetServerIsBusy() != nil { metricKvIsBusyCounter.Inc() - s.scheduleRegionRequest(ctx, errInfo.regionInfo, TaskLowPrior) + delay := s.getRetryBackoffDelay(errInfo.subscribedSpan.subID, errInfo.verID.GetID(), retryBackoffBusy) + if errInfo.rpcCtx != nil { + if storeDelay := s.markStoreFailure(errInfo.rpcCtx.Addr); storeDelay > delay { + delay = storeDelay + } + } + s.scheduleRegionRequestAfter(ctx, errInfo.regionInfo, TaskLowPrior, delay) return nil } if duplicated := innerErr.GetDuplicateRequest(); duplicated != nil { @@ -894,24 +922,40 @@ func (s *subscriptionClient) doHandleError(ctx context.Context, errInfo regionEr zap.Uint64("subscriptionID", uint64(errInfo.subscribedSpan.subID)), zap.Stringer("error", innerErr)) metricFeedUnknownErrorCounter.Inc() - s.scheduleRegionRequest(ctx, errInfo.regionInfo, TaskHighPrior) + delay := s.getRetryBackoffDelay(errInfo.subscribedSpan.subID, errInfo.verID.GetID(), retryBackoffStoreFailure) + s.scheduleRegionRequestAfter(ctx, errInfo.regionInfo, TaskHighPrior, delay) return nil case *rpcCtxUnavailableErr: metricFeedRPCCtxUnavailable.Inc() - s.scheduleRangeRequest(ctx, errInfo.span, errInfo.subscribedSpan, errInfo.filterLoop, TaskHighPrior) + delay := s.getRetryBackoffDelay(errInfo.subscribedSpan.subID, errInfo.verID.GetID(), retryBackoffRPCContext) + s.retryScheduler.schedule(delay, func(context.Context) { + s.addReloadRangeTask(errInfo.span, errInfo.subscribedSpan, errInfo.filterLoop, TaskHighPrior) + }) return nil case *getStoreErr: metricGetStoreErr.Inc() bo := tikv.NewBackoffer(ctx, tikvRequestMaxBackoff) // cannot get the store the region belongs to, so we need to reload the region. s.regionCache.OnSendFail(bo, errInfo.rpcCtx, true, err) - s.scheduleRangeRequest(ctx, errInfo.span, errInfo.subscribedSpan, errInfo.filterLoop, TaskHighPrior) + delay := time.Duration(0) + if errInfo.rpcCtx != nil { + delay = s.markStoreFailure(errInfo.rpcCtx.Addr) + } + s.retryScheduler.schedule(delay, func(context.Context) { + s.addReloadRangeTask(errInfo.span, errInfo.subscribedSpan, errInfo.filterLoop, TaskHighPrior) + }) return nil case *sendRequestToStoreErr: metricStoreSendRequestErr.Inc() bo := tikv.NewBackoffer(ctx, tikvRequestMaxBackoff) s.regionCache.OnSendFail(bo, errInfo.rpcCtx, regionScheduleReload, err) - s.scheduleRegionRequest(ctx, errInfo.regionInfo, TaskHighPrior) + delay := s.getRetryBackoffDelay(errInfo.subscribedSpan.subID, errInfo.verID.GetID(), retryBackoffStoreFailure) + if errInfo.rpcCtx != nil { + if storeDelay := s.markStoreFailure(errInfo.rpcCtx.Addr); storeDelay > delay { + delay = storeDelay + } + } + s.scheduleRegionRequestAfter(ctx, errInfo.regionInfo, TaskHighPrior, delay) return nil case *requestCancelledErr: // the corresponding subscription has been unsubscribed, just ignore.