From fe4bcccc8a78f109ac4c7bcce86af9e00460121e Mon Sep 17 00:00:00 2001 From: lidezhu Date: Tue, 31 Mar 2026 20:13:53 +0800 Subject: [PATCH 1/3] schemastore: update GC safepoints with dedicated keeper service IDs --- logservice/schemastore/gc_keeper.go | 81 ++++++++++++------------ logservice/schemastore/gc_keeper_test.go | 26 +++++++- 2 files changed, 64 insertions(+), 43 deletions(-) diff --git a/logservice/schemastore/gc_keeper.go b/logservice/schemastore/gc_keeper.go index 9860136d87..5e30ee1466 100644 --- a/logservice/schemastore/gc_keeper.go +++ b/logservice/schemastore/gc_keeper.go @@ -23,34 +23,36 @@ import ( "github.com/pingcap/log" "github.com/pingcap/ticdc/pkg/common" "github.com/pingcap/ticdc/pkg/config" + "github.com/pingcap/ticdc/pkg/config/kerneltype" + cerror "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/pkg/txnutil/gc" "go.uber.org/zap" ) const ( - schemaStoreGCRefreshInterval = 10 * time.Second + schemaStoreGCRefreshInterval = time.Minute schemaStoreGCServiceKeeperTag = "-keeper-" ) type schemaStoreGCKeeper struct { pdCli gc.GCServiceClient keyspaceMeta common.KeyspaceMeta - // gcServiceIDTag separates schema store GC services from user changefeeds. - gcServiceIDTag string - // gcServiceIDParts carries the keyspace/name pair used by the existing GC - // helper API to build a stable internal service ID. - gcServiceIDParts common.ChangeFeedID + gcServiceID string } func newSchemaStoreGCKeeper(pdCli gc.GCServiceClient, keyspaceMeta common.KeyspaceMeta) *schemaStoreGCKeeper { + serviceID := fmt.Sprintf( + "%s%s%s_node_%s_keyspace_%d", + defaultSchemaStoreGcServiceID, + schemaStoreGCServiceKeeperTag, + keyspaceMeta.Name, + sanitizeSchemaStoreNodeID(config.GetGlobalServerConfig().AdvertiseAddr), + keyspaceMeta.ID, + ) return &schemaStoreGCKeeper{ - pdCli: pdCli, - keyspaceMeta: keyspaceMeta, - gcServiceIDTag: defaultSchemaStoreGcServiceID + schemaStoreGCServiceKeeperTag, - gcServiceIDParts: common.NewChangeFeedIDWithName( - fmt.Sprintf("node_%s_keyspace_%d", sanitizeSchemaStoreNodeID(config.GetGlobalServerConfig().AdvertiseAddr), keyspaceMeta.ID), - keyspaceMeta.Name, - ), + pdCli: pdCli, + keyspaceMeta: keyspaceMeta, + gcServiceID: serviceID, } } @@ -63,36 +65,35 @@ func (k *schemaStoreGCKeeper) refresh(ctx context.Context, resolvedTs uint64) er } func (k *schemaStoreGCKeeper) refreshWithTs(ctx context.Context, ts uint64) error { - // EnsureChangefeedStartTsSafety is defined in terms of changefeed startTs: it - // keeps "startTs - 1" readable, not startTs itself. - // - // Schema store needs the snapshot at ts to stay readable, and it pulls - // incremental DDLs starting from ts. So ts itself must not be - // collected yet. To express that requirement with the helper's startTs - // convention, schema store passes ts + 1 here. - startTs := ts - if startTs != math.MaxUint64 { - startTs++ + if kerneltype.IsClassic() { + minServiceGCTs, err := gc.SetServiceGCSafepoint(ctx, k.pdCli, k.gcServiceID, defaultGcServiceTTL, ts) + if err != nil { + return err + } + if minServiceGCTs != math.MaxUint64 && ts < minServiceGCTs { + return cerror.ErrSnapshotLostByGC.GenWithStackByArgs(ts+1, minServiceGCTs) + } + return nil } - return gc.EnsureChangefeedStartTsSafety( - ctx, - k.pdCli, - k.gcServiceIDTag, - k.keyspaceMeta.ID, - k.gcServiceIDParts, - defaultGcServiceTTL, - startTs, - ) + + gcCli := k.pdCli.GetGCStatesClient(k.keyspaceMeta.ID) + _, err := gc.SetGCBarrier(ctx, gcCli, k.gcServiceID, ts, time.Duration(defaultGcServiceTTL)*time.Second) + if err == nil { + return nil + } + if !cerror.IsGCBarrierTSBehindTxnSafePointError(err) { + return cerror.WrapError(cerror.ErrUpdateGCBarrierFailed, err) + } + + minBarrierTS, barrierErr := gc.UnifyGetServiceGCSafepoint(ctx, k.pdCli, k.keyspaceMeta.ID, k.gcServiceID) + if barrierErr != nil { + return barrierErr + } + return cerror.ErrSnapshotLostByGC.GenWithStackByArgs(ts+1, minBarrierTS) } func (k *schemaStoreGCKeeper) close(ctx context.Context) error { - return gc.UndoEnsureChangefeedStartTsSafety( - ctx, - k.pdCli, - k.keyspaceMeta.ID, - k.gcServiceIDTag, - k.gcServiceIDParts, - ) + return gc.UnifyDeleteGcSafepoint(ctx, k.pdCli, k.keyspaceMeta.ID, k.gcServiceID) } func (k *schemaStoreGCKeeper) run(ctx context.Context, resolvedTsGetter func() uint64) { @@ -117,7 +118,7 @@ func (k *schemaStoreGCKeeper) run(ctx context.Context, resolvedTsGetter func() u // serviceID returns the exact PD GC service ID used by this schema store keeper. func (k *schemaStoreGCKeeper) serviceID() string { - return k.gcServiceIDTag + k.gcServiceIDParts.Keyspace() + "_" + k.gcServiceIDParts.Name() + return k.gcServiceID } // sanitizeSchemaStoreNodeID normalizes the node identity before embedding it in diff --git a/logservice/schemastore/gc_keeper_test.go b/logservice/schemastore/gc_keeper_test.go index 756944f4a6..acc5469387 100644 --- a/logservice/schemastore/gc_keeper_test.go +++ b/logservice/schemastore/gc_keeper_test.go @@ -44,10 +44,10 @@ func TestSchemaStoreGCKeeperLifecycle(t *testing.T) { ctx := context.Background() require.NoError(t, keeper.initialize(ctx, 100)) - assertSchemaStoreBarrierTS(t, state, serviceID, 101) + assertSchemaStoreBarrierTS(t, state, serviceID, 100) require.NoError(t, keeper.refresh(ctx, 130)) - assertSchemaStoreBarrierTS(t, state, serviceID, 131) + assertSchemaStoreBarrierTS(t, state, serviceID, 130) require.NoError(t, keeper.close(ctx)) if kerneltype.IsClassic() { @@ -71,7 +71,7 @@ func TestCloseSchemaStoreGCKeeperUsesFreshContext(t *testing.T) { ctx := context.Background() require.NoError(t, keeper.initialize(ctx, 100)) - assertSchemaStoreBarrierTS(t, state, serviceID, 101) + assertSchemaStoreBarrierTS(t, state, serviceID, 100) canceledCtx, cancel := context.WithCancel(context.Background()) cancel() @@ -86,6 +86,26 @@ func TestCloseSchemaStoreGCKeeperUsesFreshContext(t *testing.T) { require.False(t, ok) } +func TestSchemaStoreGCKeeperRefreshReturnsErrorWhenTsIsStale(t *testing.T) { + originalConfig := config.GetGlobalServerConfig() + cfg := originalConfig.Clone() + cfg.AdvertiseAddr = "127.0.0.1:8300" + config.StoreGlobalServerConfig(cfg) + defer config.StoreGlobalServerConfig(originalConfig) + + pdCli, state := newMockGCServiceClientForSchemaStoreGC(t) + keeper := newSchemaStoreGCKeeper(pdCli, common.DefaultKeyspace) + + if kerneltype.IsClassic() { + state.serviceSafePoint["other"] = 120 + } else { + state.txnSafePoint = 120 + } + + err := keeper.refresh(context.Background(), 100) + require.Error(t, err) +} + func TestSanitizeSchemaStoreNodeID(t *testing.T) { testCases := []struct { name string From c3e87a4f29c183dd840dd3bcb8d928b19e338de2 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Wed, 1 Apr 2026 07:35:19 +0800 Subject: [PATCH 2/3] f --- logservice/schemastore/gc_keeper.go | 29 +-------- pkg/txnutil/gc/gc_service.go | 93 +++++++++++++++++++++++++++-- 2 files changed, 89 insertions(+), 33 deletions(-) diff --git a/logservice/schemastore/gc_keeper.go b/logservice/schemastore/gc_keeper.go index 5e30ee1466..2b23832ae6 100644 --- a/logservice/schemastore/gc_keeper.go +++ b/logservice/schemastore/gc_keeper.go @@ -16,15 +16,12 @@ package schemastore import ( "context" "fmt" - "math" "strings" "time" "github.com/pingcap/log" "github.com/pingcap/ticdc/pkg/common" "github.com/pingcap/ticdc/pkg/config" - "github.com/pingcap/ticdc/pkg/config/kerneltype" - cerror "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/pkg/txnutil/gc" "go.uber.org/zap" ) @@ -65,31 +62,7 @@ func (k *schemaStoreGCKeeper) refresh(ctx context.Context, resolvedTs uint64) er } func (k *schemaStoreGCKeeper) refreshWithTs(ctx context.Context, ts uint64) error { - if kerneltype.IsClassic() { - minServiceGCTs, err := gc.SetServiceGCSafepoint(ctx, k.pdCli, k.gcServiceID, defaultGcServiceTTL, ts) - if err != nil { - return err - } - if minServiceGCTs != math.MaxUint64 && ts < minServiceGCTs { - return cerror.ErrSnapshotLostByGC.GenWithStackByArgs(ts+1, minServiceGCTs) - } - return nil - } - - gcCli := k.pdCli.GetGCStatesClient(k.keyspaceMeta.ID) - _, err := gc.SetGCBarrier(ctx, gcCli, k.gcServiceID, ts, time.Duration(defaultGcServiceTTL)*time.Second) - if err == nil { - return nil - } - if !cerror.IsGCBarrierTSBehindTxnSafePointError(err) { - return cerror.WrapError(cerror.ErrUpdateGCBarrierFailed, err) - } - - minBarrierTS, barrierErr := gc.UnifyGetServiceGCSafepoint(ctx, k.pdCli, k.keyspaceMeta.ID, k.gcServiceID) - if barrierErr != nil { - return barrierErr - } - return cerror.ErrSnapshotLostByGC.GenWithStackByArgs(ts+1, minBarrierTS) + return gc.EnsureServiceTsSafety(ctx, k.pdCli, k.gcServiceID, k.keyspaceMeta.ID, defaultGcServiceTTL, ts) } func (k *schemaStoreGCKeeper) close(ctx context.Context) error { diff --git a/pkg/txnutil/gc/gc_service.go b/pkg/txnutil/gc/gc_service.go index d434cab128..99a2b98ac0 100644 --- a/pkg/txnutil/gc/gc_service.go +++ b/pkg/txnutil/gc/gc_service.go @@ -36,6 +36,13 @@ const ( EnsureGCServiceInitializing = "-initializing-" ) +type serviceTsSafetyMode int + +const ( + serviceTsSafetyStartTs serviceTsSafetyMode = iota + serviceTsSafetyExactTs +) + // EnsureChangefeedStartTsSafety checks if the startTs less than the minimum of // service GC safepoint and this function will update the service GC to startTs func EnsureChangefeedStartTsSafety( @@ -53,8 +60,7 @@ func EnsureChangefeedStartTsSafety( } func ensureChangefeedStartTsSafetyClassic(ctx context.Context, pdCli GCServiceClient, gcServiceID string, ttl int64, startTs uint64) error { - // set gc safepoint for the changefeed gc service - minServiceGCTs, err := SetServiceGCSafepoint(ctx, pdCli, gcServiceID, ttl, startTs) + minServiceGCTs, err := ensureServiceGCSafetyClassic(ctx, pdCli, gcServiceID, ttl, startTs, serviceTsSafetyStartTs) if err != nil { return errors.Trace(err) } @@ -74,10 +80,87 @@ func ensureChangefeedStartTsSafetyClassic(ctx context.Context, pdCli GCServiceCl } func ensureChangefeedStartTsSafetyNextGen(ctx context.Context, pdCli GCServiceClient, gcServiceID string, keyspaceID uint32, ttl int64, startTs uint64) error { - gcCli := pdCli.GetGCStatesClient(keyspaceID) - _, err := SetGCBarrier(ctx, gcCli, gcServiceID, startTs, time.Duration(ttl)*time.Second) + return ensureServiceGCSafetyNextGen(ctx, pdCli, gcServiceID, keyspaceID, ttl, startTs, serviceTsSafetyStartTs) +} + +// EnsureServiceTsSafety ensures the exact ts remains readable by the specified +// GC service ID. It is intended for internal callers like schema store that +// maintain their own dedicated service safepoints / barriers. +func EnsureServiceTsSafety( + ctx context.Context, pdCli GCServiceClient, + serviceID string, + keyspaceID uint32, + ttl int64, + ts uint64, +) error { + if kerneltype.IsClassic() { + _, err := ensureServiceGCSafetyClassic(ctx, pdCli, serviceID, ttl, ts, serviceTsSafetyExactTs) + return err + } + return ensureServiceGCSafetyNextGen(ctx, pdCli, serviceID, keyspaceID, ttl, ts, serviceTsSafetyExactTs) +} + +func ensureServiceGCSafetyClassic( + ctx context.Context, + pdCli GCServiceClient, + serviceID string, + ttl int64, + ts uint64, + mode serviceTsSafetyMode, +) (uint64, error) { + minServiceGCTs, err := SetServiceGCSafepoint(ctx, pdCli, serviceID, ttl, ts) if err != nil { - return errors.ErrStartTsBeforeGC.GenWithStackByArgs(startTs) + return 0, errors.Trace(err) + } + if err := checkServiceTsSafety(mode, ts, minServiceGCTs); err != nil { + return minServiceGCTs, err + } + return minServiceGCTs, nil +} + +func ensureServiceGCSafetyNextGen( + ctx context.Context, + pdCli GCServiceClient, + serviceID string, + keyspaceID uint32, + ttl int64, + ts uint64, + mode serviceTsSafetyMode, +) error { + gcCli := pdCli.GetGCStatesClient(keyspaceID) + _, err := SetGCBarrier(ctx, gcCli, serviceID, ts, time.Duration(ttl)*time.Second) + if err == nil { + return nil + } + if mode == serviceTsSafetyStartTs { + return errors.ErrStartTsBeforeGC.GenWithStackByArgs(ts) + } + if !errors.IsGCBarrierTSBehindTxnSafePointError(err) { + return errors.WrapError(errors.ErrUpdateGCBarrierFailed, err) + } + + minBarrierTS, barrierErr := UnifyGetServiceGCSafepoint(ctx, pdCli, keyspaceID, serviceID) + if barrierErr != nil { + return barrierErr + } + return checkServiceTsSafety(mode, ts, minBarrierTS) +} + +func checkServiceTsSafety(mode serviceTsSafetyMode, ts uint64, lowerBound uint64) error { + switch mode { + case serviceTsSafetyStartTs: + // startTs should be greater than or equal to minServiceGCTs + 1, otherwise gcManager + // would return a ErrSnapshotLostByGC even though the changefeed would appear to be successfully + // created/resumed. See issue #6350 for more detail. + if ts > 0 && ts < lowerBound+1 { + return errors.ErrStartTsBeforeGC.GenWithStackByArgs(ts, lowerBound) + } + case serviceTsSafetyExactTs: + if lowerBound != math.MaxUint64 && ts < lowerBound { + return errors.ErrSnapshotLostByGC.GenWithStackByArgs(ts+1, lowerBound) + } + default: + log.Panic("unknown service ts safety mode", zap.Int("mode", int(mode))) } return nil } From cebf5b0fd46b12d80c1789e9eb6b8700b65c3afe Mon Sep 17 00:00:00 2001 From: lidezhu Date: Wed, 1 Apr 2026 15:22:32 +0800 Subject: [PATCH 3/3] fix --- pkg/txnutil/gc/gc_service.go | 60 +++++++++++------------------------- 1 file changed, 18 insertions(+), 42 deletions(-) diff --git a/pkg/txnutil/gc/gc_service.go b/pkg/txnutil/gc/gc_service.go index 99a2b98ac0..cb2aba713f 100644 --- a/pkg/txnutil/gc/gc_service.go +++ b/pkg/txnutil/gc/gc_service.go @@ -36,13 +36,6 @@ const ( EnsureGCServiceInitializing = "-initializing-" ) -type serviceTsSafetyMode int - -const ( - serviceTsSafetyStartTs serviceTsSafetyMode = iota - serviceTsSafetyExactTs -) - // EnsureChangefeedStartTsSafety checks if the startTs less than the minimum of // service GC safepoint and this function will update the service GC to startTs func EnsureChangefeedStartTsSafety( @@ -60,7 +53,7 @@ func EnsureChangefeedStartTsSafety( } func ensureChangefeedStartTsSafetyClassic(ctx context.Context, pdCli GCServiceClient, gcServiceID string, ttl int64, startTs uint64) error { - minServiceGCTs, err := ensureServiceGCSafetyClassic(ctx, pdCli, gcServiceID, ttl, startTs, serviceTsSafetyStartTs) + minServiceGCTs, err := SetServiceGCSafepoint(ctx, pdCli, gcServiceID, ttl, startTs) if err != nil { return errors.Trace(err) } @@ -80,7 +73,12 @@ func ensureChangefeedStartTsSafetyClassic(ctx context.Context, pdCli GCServiceCl } func ensureChangefeedStartTsSafetyNextGen(ctx context.Context, pdCli GCServiceClient, gcServiceID string, keyspaceID uint32, ttl int64, startTs uint64) error { - return ensureServiceGCSafetyNextGen(ctx, pdCli, gcServiceID, keyspaceID, ttl, startTs, serviceTsSafetyStartTs) + gcCli := pdCli.GetGCStatesClient(keyspaceID) + _, err := SetGCBarrier(ctx, gcCli, gcServiceID, startTs, time.Duration(ttl)*time.Second) + if err != nil { + return errors.ErrStartTsBeforeGC.GenWithStackByArgs(startTs) + } + return nil } // EnsureServiceTsSafety ensures the exact ts remains readable by the specified @@ -94,47 +92,41 @@ func EnsureServiceTsSafety( ts uint64, ) error { if kerneltype.IsClassic() { - _, err := ensureServiceGCSafetyClassic(ctx, pdCli, serviceID, ttl, ts, serviceTsSafetyExactTs) - return err + return ensureServiceTsSafetyClassic(ctx, pdCli, serviceID, ttl, ts) } - return ensureServiceGCSafetyNextGen(ctx, pdCli, serviceID, keyspaceID, ttl, ts, serviceTsSafetyExactTs) + return ensureServiceTsSafetyNextGen(ctx, pdCli, serviceID, keyspaceID, ttl, ts) } -func ensureServiceGCSafetyClassic( +func ensureServiceTsSafetyClassic( ctx context.Context, pdCli GCServiceClient, serviceID string, ttl int64, ts uint64, - mode serviceTsSafetyMode, -) (uint64, error) { +) error { minServiceGCTs, err := SetServiceGCSafepoint(ctx, pdCli, serviceID, ttl, ts) if err != nil { - return 0, errors.Trace(err) + return errors.Trace(err) } - if err := checkServiceTsSafety(mode, ts, minServiceGCTs); err != nil { - return minServiceGCTs, err + if minServiceGCTs != math.MaxUint64 && ts < minServiceGCTs { + return errors.ErrSnapshotLostByGC.GenWithStackByArgs(ts+1, minServiceGCTs) } - return minServiceGCTs, nil + return nil } -func ensureServiceGCSafetyNextGen( +func ensureServiceTsSafetyNextGen( ctx context.Context, pdCli GCServiceClient, serviceID string, keyspaceID uint32, ttl int64, ts uint64, - mode serviceTsSafetyMode, ) error { gcCli := pdCli.GetGCStatesClient(keyspaceID) _, err := SetGCBarrier(ctx, gcCli, serviceID, ts, time.Duration(ttl)*time.Second) if err == nil { return nil } - if mode == serviceTsSafetyStartTs { - return errors.ErrStartTsBeforeGC.GenWithStackByArgs(ts) - } if !errors.IsGCBarrierTSBehindTxnSafePointError(err) { return errors.WrapError(errors.ErrUpdateGCBarrierFailed, err) } @@ -143,24 +135,8 @@ func ensureServiceGCSafetyNextGen( if barrierErr != nil { return barrierErr } - return checkServiceTsSafety(mode, ts, minBarrierTS) -} - -func checkServiceTsSafety(mode serviceTsSafetyMode, ts uint64, lowerBound uint64) error { - switch mode { - case serviceTsSafetyStartTs: - // startTs should be greater than or equal to minServiceGCTs + 1, otherwise gcManager - // would return a ErrSnapshotLostByGC even though the changefeed would appear to be successfully - // created/resumed. See issue #6350 for more detail. - if ts > 0 && ts < lowerBound+1 { - return errors.ErrStartTsBeforeGC.GenWithStackByArgs(ts, lowerBound) - } - case serviceTsSafetyExactTs: - if lowerBound != math.MaxUint64 && ts < lowerBound { - return errors.ErrSnapshotLostByGC.GenWithStackByArgs(ts+1, lowerBound) - } - default: - log.Panic("unknown service ts safety mode", zap.Int("mode", int(mode))) + if minBarrierTS != math.MaxUint64 && ts < minBarrierTS { + return errors.ErrSnapshotLostByGC.GenWithStackByArgs(ts+1, minBarrierTS) } return nil }