Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 16 additions & 42 deletions logservice/schemastore/gc_keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package schemastore
import (
"context"
"fmt"
"math"
"strings"
"time"

Expand All @@ -28,29 +27,29 @@ import (
)

const (
schemaStoreGCRefreshInterval = 10 * time.Second
schemaStoreGCRefreshInterval = time.Minute
schemaStoreGCServiceKeeperTag = "-keeper-"
)

type schemaStoreGCKeeper struct {
pdCli gc.GCServiceClient
keyspaceMeta common.KeyspaceMeta
// gcServiceIDTag separates schema store GC services from user changefeeds.
gcServiceIDTag string
// gcServiceIDParts carries the keyspace/name pair used by the existing GC
// helper API to build a stable internal service ID.
gcServiceIDParts common.ChangeFeedID
gcServiceID string
}

func newSchemaStoreGCKeeper(pdCli gc.GCServiceClient, keyspaceMeta common.KeyspaceMeta) *schemaStoreGCKeeper {
serviceID := fmt.Sprintf(
"%s%s%s_node_%s_keyspace_%d",
defaultSchemaStoreGcServiceID,
schemaStoreGCServiceKeeperTag,
keyspaceMeta.Name,
sanitizeSchemaStoreNodeID(config.GetGlobalServerConfig().AdvertiseAddr),
keyspaceMeta.ID,
)
return &schemaStoreGCKeeper{
pdCli: pdCli,
keyspaceMeta: keyspaceMeta,
gcServiceIDTag: defaultSchemaStoreGcServiceID + schemaStoreGCServiceKeeperTag,
gcServiceIDParts: common.NewChangeFeedIDWithName(
fmt.Sprintf("node_%s_keyspace_%d", sanitizeSchemaStoreNodeID(config.GetGlobalServerConfig().AdvertiseAddr), keyspaceMeta.ID),
keyspaceMeta.Name,
),
pdCli: pdCli,
keyspaceMeta: keyspaceMeta,
gcServiceID: serviceID,
}
}

Expand All @@ -63,36 +62,11 @@ func (k *schemaStoreGCKeeper) refresh(ctx context.Context, resolvedTs uint64) er
}

func (k *schemaStoreGCKeeper) refreshWithTs(ctx context.Context, ts uint64) error {
// EnsureChangefeedStartTsSafety is defined in terms of changefeed startTs: it
// keeps "startTs - 1" readable, not startTs itself.
//
// Schema store needs the snapshot at ts to stay readable, and it pulls
// incremental DDLs starting from ts. So ts itself must not be
// collected yet. To express that requirement with the helper's startTs
// convention, schema store passes ts + 1 here.
startTs := ts
if startTs != math.MaxUint64 {
startTs++
}
return gc.EnsureChangefeedStartTsSafety(
ctx,
k.pdCli,
k.gcServiceIDTag,
k.keyspaceMeta.ID,
k.gcServiceIDParts,
defaultGcServiceTTL,
startTs,
)
return gc.EnsureServiceTsSafety(ctx, k.pdCli, k.gcServiceID, k.keyspaceMeta.ID, defaultGcServiceTTL, ts)
}

func (k *schemaStoreGCKeeper) close(ctx context.Context) error {
return gc.UndoEnsureChangefeedStartTsSafety(
ctx,
k.pdCli,
k.keyspaceMeta.ID,
k.gcServiceIDTag,
k.gcServiceIDParts,
)
return gc.UnifyDeleteGcSafepoint(ctx, k.pdCli, k.keyspaceMeta.ID, k.gcServiceID)
}

func (k *schemaStoreGCKeeper) run(ctx context.Context, resolvedTsGetter func() uint64) {
Expand All @@ -117,7 +91,7 @@ func (k *schemaStoreGCKeeper) run(ctx context.Context, resolvedTsGetter func() u

// serviceID returns the exact PD GC service ID used by this schema store keeper.
func (k *schemaStoreGCKeeper) serviceID() string {
return k.gcServiceIDTag + k.gcServiceIDParts.Keyspace() + "_" + k.gcServiceIDParts.Name()
return k.gcServiceID
}

// sanitizeSchemaStoreNodeID normalizes the node identity before embedding it in
Expand Down
26 changes: 23 additions & 3 deletions logservice/schemastore/gc_keeper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@ func TestSchemaStoreGCKeeperLifecycle(t *testing.T) {

ctx := context.Background()
require.NoError(t, keeper.initialize(ctx, 100))
assertSchemaStoreBarrierTS(t, state, serviceID, 101)
assertSchemaStoreBarrierTS(t, state, serviceID, 100)

require.NoError(t, keeper.refresh(ctx, 130))
assertSchemaStoreBarrierTS(t, state, serviceID, 131)
assertSchemaStoreBarrierTS(t, state, serviceID, 130)

require.NoError(t, keeper.close(ctx))
if kerneltype.IsClassic() {
Expand All @@ -71,7 +71,7 @@ func TestCloseSchemaStoreGCKeeperUsesFreshContext(t *testing.T) {

ctx := context.Background()
require.NoError(t, keeper.initialize(ctx, 100))
assertSchemaStoreBarrierTS(t, state, serviceID, 101)
assertSchemaStoreBarrierTS(t, state, serviceID, 100)

canceledCtx, cancel := context.WithCancel(context.Background())
cancel()
Expand All @@ -86,6 +86,26 @@ func TestCloseSchemaStoreGCKeeperUsesFreshContext(t *testing.T) {
require.False(t, ok)
}

func TestSchemaStoreGCKeeperRefreshReturnsErrorWhenTsIsStale(t *testing.T) {
originalConfig := config.GetGlobalServerConfig()
cfg := originalConfig.Clone()
cfg.AdvertiseAddr = "127.0.0.1:8300"
config.StoreGlobalServerConfig(cfg)
defer config.StoreGlobalServerConfig(originalConfig)

pdCli, state := newMockGCServiceClientForSchemaStoreGC(t)
keeper := newSchemaStoreGCKeeper(pdCli, common.DefaultKeyspace)

if kerneltype.IsClassic() {
state.serviceSafePoint["other"] = 120
} else {
state.txnSafePoint = 120
}

err := keeper.refresh(context.Background(), 100)
require.Error(t, err)
}

func TestSanitizeSchemaStoreNodeID(t *testing.T) {
testCases := []struct {
name string
Expand Down
61 changes: 60 additions & 1 deletion pkg/txnutil/gc/gc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ func EnsureChangefeedStartTsSafety(
}

func ensureChangefeedStartTsSafetyClassic(ctx context.Context, pdCli GCServiceClient, gcServiceID string, ttl int64, startTs uint64) error {
// set gc safepoint for the changefeed gc service
minServiceGCTs, err := SetServiceGCSafepoint(ctx, pdCli, gcServiceID, ttl, startTs)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -82,6 +81,66 @@ func ensureChangefeedStartTsSafetyNextGen(ctx context.Context, pdCli GCServiceCl
return nil
}

// EnsureServiceTsSafety ensures the exact ts remains readable by the specified
// GC service ID. It is intended for internal callers like schema store that
// maintain their own dedicated service safepoints / barriers.
func EnsureServiceTsSafety(
ctx context.Context, pdCli GCServiceClient,
serviceID string,
keyspaceID uint32,
ttl int64,
ts uint64,
) error {
if kerneltype.IsClassic() {
return ensureServiceTsSafetyClassic(ctx, pdCli, serviceID, ttl, ts)
}
return ensureServiceTsSafetyNextGen(ctx, pdCli, serviceID, keyspaceID, ttl, ts)
}

func ensureServiceTsSafetyClassic(
ctx context.Context,
pdCli GCServiceClient,
serviceID string,
ttl int64,
ts uint64,
) error {
minServiceGCTs, err := SetServiceGCSafepoint(ctx, pdCli, serviceID, ttl, ts)
if err != nil {
return errors.Trace(err)
}
if minServiceGCTs != math.MaxUint64 && ts < minServiceGCTs {
return errors.ErrSnapshotLostByGC.GenWithStackByArgs(ts+1, minServiceGCTs)
}
return nil
}

func ensureServiceTsSafetyNextGen(
ctx context.Context,
pdCli GCServiceClient,
serviceID string,
keyspaceID uint32,
ttl int64,
ts uint64,
) error {
gcCli := pdCli.GetGCStatesClient(keyspaceID)
_, err := SetGCBarrier(ctx, gcCli, serviceID, ts, time.Duration(ttl)*time.Second)
if err == nil {
return nil
}
if !errors.IsGCBarrierTSBehindTxnSafePointError(err) {
return errors.WrapError(errors.ErrUpdateGCBarrierFailed, err)
}

minBarrierTS, barrierErr := UnifyGetServiceGCSafepoint(ctx, pdCli, keyspaceID, serviceID)
if barrierErr != nil {
return barrierErr
}
if minBarrierTS != math.MaxUint64 && ts < minBarrierTS {
return errors.ErrSnapshotLostByGC.GenWithStackByArgs(ts+1, minBarrierTS)
}
return nil
}

// UndoEnsureChangefeedStartTsSafety cleans the service GC safepoint of a changefeed
// if something goes wrong after successfully calling EnsureChangefeedStartTsSafety().
func UndoEnsureChangefeedStartTsSafety(
Expand Down
Loading