diff --git a/logservice/schemastore/gc_keeper.go b/logservice/schemastore/gc_keeper.go index 9860136d87..2b23832ae6 100644 --- a/logservice/schemastore/gc_keeper.go +++ b/logservice/schemastore/gc_keeper.go @@ -16,7 +16,6 @@ package schemastore import ( "context" "fmt" - "math" "strings" "time" @@ -28,29 +27,29 @@ import ( ) const ( - schemaStoreGCRefreshInterval = 10 * time.Second + schemaStoreGCRefreshInterval = time.Minute schemaStoreGCServiceKeeperTag = "-keeper-" ) type schemaStoreGCKeeper struct { pdCli gc.GCServiceClient keyspaceMeta common.KeyspaceMeta - // gcServiceIDTag separates schema store GC services from user changefeeds. - gcServiceIDTag string - // gcServiceIDParts carries the keyspace/name pair used by the existing GC - // helper API to build a stable internal service ID. - gcServiceIDParts common.ChangeFeedID + gcServiceID string } func newSchemaStoreGCKeeper(pdCli gc.GCServiceClient, keyspaceMeta common.KeyspaceMeta) *schemaStoreGCKeeper { + serviceID := fmt.Sprintf( + "%s%s%s_node_%s_keyspace_%d", + defaultSchemaStoreGcServiceID, + schemaStoreGCServiceKeeperTag, + keyspaceMeta.Name, + sanitizeSchemaStoreNodeID(config.GetGlobalServerConfig().AdvertiseAddr), + keyspaceMeta.ID, + ) return &schemaStoreGCKeeper{ - pdCli: pdCli, - keyspaceMeta: keyspaceMeta, - gcServiceIDTag: defaultSchemaStoreGcServiceID + schemaStoreGCServiceKeeperTag, - gcServiceIDParts: common.NewChangeFeedIDWithName( - fmt.Sprintf("node_%s_keyspace_%d", sanitizeSchemaStoreNodeID(config.GetGlobalServerConfig().AdvertiseAddr), keyspaceMeta.ID), - keyspaceMeta.Name, - ), + pdCli: pdCli, + keyspaceMeta: keyspaceMeta, + gcServiceID: serviceID, } } @@ -63,36 +62,11 @@ func (k *schemaStoreGCKeeper) refresh(ctx context.Context, resolvedTs uint64) er } func (k *schemaStoreGCKeeper) refreshWithTs(ctx context.Context, ts uint64) error { - // EnsureChangefeedStartTsSafety is defined in terms of changefeed startTs: it - // keeps "startTs - 1" readable, not startTs itself. - // - // Schema store needs the snapshot at ts to stay readable, and it pulls - // incremental DDLs starting from ts. So ts itself must not be - // collected yet. To express that requirement with the helper's startTs - // convention, schema store passes ts + 1 here. - startTs := ts - if startTs != math.MaxUint64 { - startTs++ - } - return gc.EnsureChangefeedStartTsSafety( - ctx, - k.pdCli, - k.gcServiceIDTag, - k.keyspaceMeta.ID, - k.gcServiceIDParts, - defaultGcServiceTTL, - startTs, - ) + return gc.EnsureServiceTsSafety(ctx, k.pdCli, k.gcServiceID, k.keyspaceMeta.ID, defaultGcServiceTTL, ts) } func (k *schemaStoreGCKeeper) close(ctx context.Context) error { - return gc.UndoEnsureChangefeedStartTsSafety( - ctx, - k.pdCli, - k.keyspaceMeta.ID, - k.gcServiceIDTag, - k.gcServiceIDParts, - ) + return gc.UnifyDeleteGcSafepoint(ctx, k.pdCli, k.keyspaceMeta.ID, k.gcServiceID) } func (k *schemaStoreGCKeeper) run(ctx context.Context, resolvedTsGetter func() uint64) { @@ -117,7 +91,7 @@ func (k *schemaStoreGCKeeper) run(ctx context.Context, resolvedTsGetter func() u // serviceID returns the exact PD GC service ID used by this schema store keeper. func (k *schemaStoreGCKeeper) serviceID() string { - return k.gcServiceIDTag + k.gcServiceIDParts.Keyspace() + "_" + k.gcServiceIDParts.Name() + return k.gcServiceID } // sanitizeSchemaStoreNodeID normalizes the node identity before embedding it in diff --git a/logservice/schemastore/gc_keeper_test.go b/logservice/schemastore/gc_keeper_test.go index 756944f4a6..acc5469387 100644 --- a/logservice/schemastore/gc_keeper_test.go +++ b/logservice/schemastore/gc_keeper_test.go @@ -44,10 +44,10 @@ func TestSchemaStoreGCKeeperLifecycle(t *testing.T) { ctx := context.Background() require.NoError(t, keeper.initialize(ctx, 100)) - assertSchemaStoreBarrierTS(t, state, serviceID, 101) + assertSchemaStoreBarrierTS(t, state, serviceID, 100) require.NoError(t, keeper.refresh(ctx, 130)) - assertSchemaStoreBarrierTS(t, state, serviceID, 131) + assertSchemaStoreBarrierTS(t, state, serviceID, 130) require.NoError(t, keeper.close(ctx)) if kerneltype.IsClassic() { @@ -71,7 +71,7 @@ func TestCloseSchemaStoreGCKeeperUsesFreshContext(t *testing.T) { ctx := context.Background() require.NoError(t, keeper.initialize(ctx, 100)) - assertSchemaStoreBarrierTS(t, state, serviceID, 101) + assertSchemaStoreBarrierTS(t, state, serviceID, 100) canceledCtx, cancel := context.WithCancel(context.Background()) cancel() @@ -86,6 +86,26 @@ func TestCloseSchemaStoreGCKeeperUsesFreshContext(t *testing.T) { require.False(t, ok) } +func TestSchemaStoreGCKeeperRefreshReturnsErrorWhenTsIsStale(t *testing.T) { + originalConfig := config.GetGlobalServerConfig() + cfg := originalConfig.Clone() + cfg.AdvertiseAddr = "127.0.0.1:8300" + config.StoreGlobalServerConfig(cfg) + defer config.StoreGlobalServerConfig(originalConfig) + + pdCli, state := newMockGCServiceClientForSchemaStoreGC(t) + keeper := newSchemaStoreGCKeeper(pdCli, common.DefaultKeyspace) + + if kerneltype.IsClassic() { + state.serviceSafePoint["other"] = 120 + } else { + state.txnSafePoint = 120 + } + + err := keeper.refresh(context.Background(), 100) + require.Error(t, err) +} + func TestSanitizeSchemaStoreNodeID(t *testing.T) { testCases := []struct { name string diff --git a/pkg/txnutil/gc/gc_service.go b/pkg/txnutil/gc/gc_service.go index d434cab128..cb2aba713f 100644 --- a/pkg/txnutil/gc/gc_service.go +++ b/pkg/txnutil/gc/gc_service.go @@ -53,7 +53,6 @@ func EnsureChangefeedStartTsSafety( } func ensureChangefeedStartTsSafetyClassic(ctx context.Context, pdCli GCServiceClient, gcServiceID string, ttl int64, startTs uint64) error { - // set gc safepoint for the changefeed gc service minServiceGCTs, err := SetServiceGCSafepoint(ctx, pdCli, gcServiceID, ttl, startTs) if err != nil { return errors.Trace(err) @@ -82,6 +81,66 @@ func ensureChangefeedStartTsSafetyNextGen(ctx context.Context, pdCli GCServiceCl return nil } +// EnsureServiceTsSafety ensures the exact ts remains readable by the specified +// GC service ID. It is intended for internal callers like schema store that +// maintain their own dedicated service safepoints / barriers. +func EnsureServiceTsSafety( + ctx context.Context, pdCli GCServiceClient, + serviceID string, + keyspaceID uint32, + ttl int64, + ts uint64, +) error { + if kerneltype.IsClassic() { + return ensureServiceTsSafetyClassic(ctx, pdCli, serviceID, ttl, ts) + } + return ensureServiceTsSafetyNextGen(ctx, pdCli, serviceID, keyspaceID, ttl, ts) +} + +func ensureServiceTsSafetyClassic( + ctx context.Context, + pdCli GCServiceClient, + serviceID string, + ttl int64, + ts uint64, +) error { + minServiceGCTs, err := SetServiceGCSafepoint(ctx, pdCli, serviceID, ttl, ts) + if err != nil { + return errors.Trace(err) + } + if minServiceGCTs != math.MaxUint64 && ts < minServiceGCTs { + return errors.ErrSnapshotLostByGC.GenWithStackByArgs(ts+1, minServiceGCTs) + } + return nil +} + +func ensureServiceTsSafetyNextGen( + ctx context.Context, + pdCli GCServiceClient, + serviceID string, + keyspaceID uint32, + ttl int64, + ts uint64, +) error { + gcCli := pdCli.GetGCStatesClient(keyspaceID) + _, err := SetGCBarrier(ctx, gcCli, serviceID, ts, time.Duration(ttl)*time.Second) + if err == nil { + return nil + } + if !errors.IsGCBarrierTSBehindTxnSafePointError(err) { + return errors.WrapError(errors.ErrUpdateGCBarrierFailed, err) + } + + minBarrierTS, barrierErr := UnifyGetServiceGCSafepoint(ctx, pdCli, keyspaceID, serviceID) + if barrierErr != nil { + return barrierErr + } + if minBarrierTS != math.MaxUint64 && ts < minBarrierTS { + return errors.ErrSnapshotLostByGC.GenWithStackByArgs(ts+1, minBarrierTS) + } + return nil +} + // UndoEnsureChangefeedStartTsSafety cleans the service GC safepoint of a changefeed // if something goes wrong after successfully calling EnsureChangefeedStartTsSafety(). func UndoEnsureChangefeedStartTsSafety(