diff --git a/.github/workflows/check.yaml b/.github/workflows/check.yaml deleted file mode 100644 index e2bf99c026f..00000000000 --- a/.github/workflows/check.yaml +++ /dev/null @@ -1,30 +0,0 @@ -name: Check PD -on: [push, pull_request] -concurrency: - group: ${{ github.ref }}-${{ github.workflow }} - cancel-in-progress: true -jobs: - statics: - runs-on: ubuntu-latest - timeout-minutes: 20 - steps: - - uses: actions/setup-go@v3 - with: - go-version: '1.21' - - name: Checkout code - uses: actions/checkout@v3 - - name: Restore cache - uses: actions/cache@v3 - with: - path: | - ~/go/pkg/mod - ~/.cache/go-build - **/.dashboard_download_cache - key: ${{ runner.os }}-golang-${{ hashFiles('**/go.sum') }} - restore-keys: | - ${{ runner.os }}-golang - - name: Make Check - run: | - SWAGGER=1 make build - make tools - make check diff --git a/.github/workflows/label.yaml b/.github/workflows/label.yaml deleted file mode 100644 index 5ff2b895528..00000000000 --- a/.github/workflows/label.yaml +++ /dev/null @@ -1,37 +0,0 @@ -name: Add Labels -on: - pull_request_target: - types: [opened, reopened] - -jobs: - add_labels: - runs-on: ubuntu-latest - steps: - - uses: actions/github-script@v4 - name: Add labels - with: - script: | - function doAddLabels(labels) { - console.log("Adding labels", labels); - return github.issues.addLabels({ - issue_number: context.issue.number, - owner: context.repo.owner, - repo: context.repo.repo, - labels: labels, - }); - } - - async function run() { - const { title } = context.payload.pull_request; - if (title.startsWith("Update TiDB Dashboard to")) { - await doAddLabels(["component/visualization", "require-LGT1"]); - return; - } - - console.log("Not matching any label rules, skip"); - } - - run().catch(e => { - // Do not fail on errors - console.error("Errors: ", e.stack); - }) diff --git a/.github/workflows/pd-docker-image.yaml b/.github/workflows/pd-docker-image.yaml deleted file mode 100644 index 2a04c030016..00000000000 --- a/.github/workflows/pd-docker-image.yaml +++ /dev/null @@ -1,24 +0,0 @@ -name: PD Docker Image -on: - push: - branches: - - master - pull_request: - branches: - - master -concurrency: - group: ${{ github.ref }}-${{ github.workflow }} - cancel-in-progress: true -jobs: - chunks: - runs-on: ubuntu-latest - strategy: - fail-fast: true - steps: - - uses: actions/setup-go@v3 - with: - go-version: '1.21' - - name: Checkout code - uses: actions/checkout@v3 - - name: Make - run: make docker-image diff --git a/.github/workflows/tso-consistency-test.yaml b/.github/workflows/tso-consistency-test.yaml deleted file mode 100644 index 570cbbc5da8..00000000000 --- a/.github/workflows/tso-consistency-test.yaml +++ /dev/null @@ -1,17 +0,0 @@ -name: TSO Consistency Test -on: - # Only run when the new code is merged into master. - push: - branches: - - master -jobs: - tso-consistency-test: - runs-on: ubuntu-latest - steps: - - uses: actions/setup-go@v3 - with: - go-version: '1.21' - - name: Checkout code - uses: actions/checkout@v3 - - name: Make TSO Consistency Test - run: make test-tso-consistency diff --git a/.github/workflows/tso-function-test.yaml b/.github/workflows/tso-function-test.yaml deleted file mode 100644 index ee7679602f5..00000000000 --- a/.github/workflows/tso-function-test.yaml +++ /dev/null @@ -1,28 +0,0 @@ -name: TSO Function Test -on: - push: - branches: - - master - - release-5.* - - release-6.* - - release-7.* - pull_request: - branches: - - master - - release-5.* - - release-6.* - - release-7.* -concurrency: - group: ${{ github.ref }}-${{ github.workflow }} - cancel-in-progress: true -jobs: - tso-function-test: - runs-on: ubuntu-latest - steps: - - uses: actions/setup-go@v3 - with: - go-version: '1.21' - - name: Checkout code - uses: actions/checkout@v3 - - name: Make TSO Function Test - run: make test-tso-function diff --git a/Makefile b/Makefile index 7c31cb7d684..09bed9ec805 100644 --- a/Makefile +++ b/Makefile @@ -242,7 +242,7 @@ basic-test: install-tools ci-test-job: install-tools dashboard-ui @$(FAILPOINT_ENABLE) - ./scripts/ci-subtask.sh $(JOB_COUNT) $(JOB_INDEX) || { $(FAILPOINT_DISABLE); exit 1; } + ./scripts/ci-subtask.sh 13 13 || { $(FAILPOINT_DISABLE); exit 1; } @$(FAILPOINT_DISABLE) TSO_INTEGRATION_TEST_PKGS := $(PD_PKG)/tests/server/tso diff --git a/server/forward.go b/server/forward.go index 65750fcd4be..0806f8fcc80 100644 --- a/server/forward.go +++ b/server/forward.go @@ -406,7 +406,7 @@ func (s *GrpcServer) getGlobalTSO(ctx context.Context) (pdpb.Timestamp, error) { } request := &tsopb.TsoRequest{ Header: &tsopb.RequestHeader{ - ClusterId: s.clusterID, + ClusterId: s.ClusterID(), KeyspaceId: utils.DefaultKeyspaceID, KeyspaceGroupId: utils.DefaultKeyspaceGroupID, }, diff --git a/server/grpc_service.go b/server/grpc_service.go index ef7020f7fee..a3bea9ad3b0 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -571,9 +571,10 @@ func (s *GrpcServer) Tso(stream pdpb.PD_TsoServer) error { if s.IsClosed() { return status.Errorf(codes.Unknown, "server not started") } - if request.GetHeader().GetClusterId() != s.clusterID { + clusterID := s.ClusterID() + if request.GetHeader().GetClusterId() != clusterID { return status.Errorf(codes.FailedPrecondition, - "mismatch cluster id, need %d but got %d", s.clusterID, request.GetHeader().GetClusterId()) + "mismatch cluster id, need %d but got %d", clusterID, request.GetHeader().GetClusterId()) } count := request.GetCount() ctx, task := trace.NewTask(ctx, "tso") @@ -2276,17 +2277,17 @@ func (s *GrpcServer) validateRoleInRequest(ctx context.Context, header *pdpb.Req } *allowFollower = true } - if header.GetClusterId() != s.clusterID { - return status.Errorf(codes.FailedPrecondition, "mismatch cluster id, need %d but got %d", s.clusterID, header.GetClusterId()) + if id := s.ClusterID(); header.GetClusterId() != id { + return status.Errorf(codes.FailedPrecondition, "mismatch cluster id, need %d but got %d", id, header.GetClusterId()) } return nil } func (s *GrpcServer) header() *pdpb.ResponseHeader { - if s.clusterID == 0 { + if s.ClusterID() == 0 { return s.wrapErrorToHeader(pdpb.ErrorType_NOT_BOOTSTRAPPED, "cluster id is not ready") } - return &pdpb.ResponseHeader{ClusterId: s.clusterID} + return &pdpb.ResponseHeader{ClusterId: s.ClusterID()} } func (s *GrpcServer) wrapErrorToHeader(errorType pdpb.ErrorType, message string) *pdpb.ResponseHeader { @@ -2298,7 +2299,7 @@ func (s *GrpcServer) wrapErrorToHeader(errorType pdpb.ErrorType, message string) func (s *GrpcServer) errorHeader(err *pdpb.Error) *pdpb.ResponseHeader { return &pdpb.ResponseHeader{ - ClusterId: s.clusterID, + ClusterId: s.ClusterID(), Error: err, } } diff --git a/server/server.go b/server/server.go index be886a56712..44e2bb79047 100644 --- a/server/server.go +++ b/server/server.go @@ -157,7 +157,7 @@ type Server struct { electionClient *clientv3.Client // http client httpClient *http.Client - clusterID uint64 // pd cluster id. + clusterID atomic.Uint64 rootPath string // Server services. @@ -425,17 +425,18 @@ func (s *Server) AddStartCallback(callbacks ...func()) { } func (s *Server) startServer(ctx context.Context) error { - var err error - if s.clusterID, err = etcdutil.InitClusterID(s.client, pdClusterIDPath); err != nil { + clusterID, err := etcdutil.InitClusterID(s.client, pdClusterIDPath) + if err != nil { log.Error("failed to init cluster id", errs.ZapError(err)) return err } - log.Info("init cluster id", zap.Uint64("cluster-id", s.clusterID)) + s.clusterID.Store(clusterID) + log.Info("init cluster id", zap.Uint64("cluster-id", clusterID)) // It may lose accuracy if use float64 to store uint64. So we store the cluster id in label. - metadataGauge.WithLabelValues(fmt.Sprintf("cluster%d", s.clusterID)).Set(0) + metadataGauge.WithLabelValues(fmt.Sprintf("cluster%d", clusterID)).Set(0) bs.ServerInfoGauge.WithLabelValues(versioninfo.PDReleaseVersion, versioninfo.PDGitHash).Set(float64(time.Now().Unix())) - s.rootPath = endpoint.PDRootPath(s.clusterID) + s.rootPath = endpoint.PDRootPath(clusterID) s.member.InitMemberInfo(s.cfg.AdvertiseClientUrls, s.cfg.AdvertisePeerUrls, s.Name(), s.rootPath) s.member.SetMemberDeployPath(s.member.ID()) s.member.SetMemberBinaryVersion(s.member.ID(), versioninfo.PDReleaseVersion) @@ -478,7 +479,7 @@ func (s *Server) startServer(ctx context.Context) error { s.gcSafePointManager = gc.NewSafePointManager(s.storage, s.cfg.PDServerCfg) s.basicCluster = core.NewBasicCluster() - s.cluster = cluster.NewRaftCluster(ctx, s.clusterID, s.GetBasicCluster(), s.GetStorage(), syncer.NewRegionSyncer(s), s.client, s.httpClient) + s.cluster = cluster.NewRaftCluster(ctx, clusterID, s.GetBasicCluster(), s.GetStorage(), syncer.NewRegionSyncer(s), s.client, s.httpClient) keyspaceIDAllocator := id.NewAllocator(&id.AllocatorParams{ Client: s.client, RootPath: s.rootPath, @@ -488,11 +489,11 @@ func (s *Server) startServer(ctx context.Context) error { Step: keyspace.AllocStep, }) if s.IsAPIServiceMode() { - s.keyspaceGroupManager = keyspace.NewKeyspaceGroupManager(s.ctx, s.storage, s.client, s.clusterID) + s.keyspaceGroupManager = keyspace.NewKeyspaceGroupManager(s.ctx, s.storage, s.client, clusterID) } s.keyspaceManager = keyspace.NewKeyspaceManager(s.ctx, s.storage, s.cluster, keyspaceIDAllocator, &s.cfg.Keyspace, s.keyspaceGroupManager) s.safePointV2Manager = gc.NewSafePointManagerV2(s.ctx, s.storage, s.storage, s.storage) - s.hbStreams = hbstream.NewHeartbeatStreams(ctx, s.clusterID, "", s.cluster) + s.hbStreams = hbstream.NewHeartbeatStreams(ctx, clusterID, "", s.cluster) // initial hot_region_storage in here. s.hotRegionStorage, err = storage.NewHotRegionsStorage( @@ -685,7 +686,7 @@ func (s *Server) collectEtcdStateMetrics() { } func (s *Server) bootstrapCluster(req *pdpb.BootstrapRequest) (*pdpb.BootstrapResponse, error) { - clusterID := s.clusterID + clusterID := s.ClusterID() log.Info("try to bootstrap raft cluster", zap.Uint64("cluster-id", clusterID), @@ -916,7 +917,7 @@ func (s *Server) Name() string { // ClusterID returns the cluster ID of this server. func (s *Server) ClusterID() uint64 { - return s.clusterID + return s.clusterID.Load() } // StartTimestamp returns the start timestamp of this server @@ -1409,7 +1410,7 @@ func (s *Server) DirectlyGetRaftCluster() *cluster.RaftCluster { // GetCluster gets cluster. func (s *Server) GetCluster() *metapb.Cluster { return &metapb.Cluster{ - Id: s.clusterID, + Id: s.ClusterID(), MaxPeerCount: uint32(s.persistOptions.GetMaxReplicas()), } } @@ -2010,7 +2011,7 @@ func (s *Server) SetServicePrimaryAddr(serviceName, addr string) { func (s *Server) initTSOPrimaryWatcher() { serviceName := mcs.TSOServiceName - tsoRootPath := endpoint.TSOSvcRootPath(s.clusterID) + tsoRootPath := endpoint.TSOSvcRootPath(s.ClusterID()) tsoServicePrimaryKey := endpoint.KeyspaceGroupPrimaryPath(tsoRootPath, mcs.DefaultKeyspaceGroupID) s.tsoPrimaryWatcher = s.initServicePrimaryWatcher(serviceName, tsoServicePrimaryKey) s.tsoPrimaryWatcher.StartWatchLoop() @@ -2018,7 +2019,7 @@ func (s *Server) initTSOPrimaryWatcher() { func (s *Server) initSchedulingPrimaryWatcher() { serviceName := mcs.SchedulingServiceName - primaryKey := endpoint.SchedulingPrimaryPath(s.clusterID) + primaryKey := endpoint.SchedulingPrimaryPath(s.ClusterID()) s.schedulingPrimaryWatcher = s.initServicePrimaryWatcher(serviceName, primaryKey) s.schedulingPrimaryWatcher.StartWatchLoop() }