diff --git a/server/module_election.go b/server/module_election.go index 6b0df0e266..06d0776dc5 100644 --- a/server/module_election.go +++ b/server/module_election.go @@ -134,14 +134,15 @@ func (e *elector) campaignCoordinator(ctx context.Context) error { log.Info("campaign coordinator successfully", zap.String("nodeID", nodeID), zap.Int64("coordinatorVersion", coordinatorVersion)) + maxTaskConcurrency, checkBalanceInterval := coordinatorSchedulerSettings() co := coordinator.New( e.svr.info, e.svr.pdClient, changefeed.NewEtcdBackend(e.svr.EtcdClient), e.svr.EtcdClient.GetGCServiceID(), coordinatorVersion, - 10000, - time.Minute, + maxTaskConcurrency, + checkBalanceInterval, ) e.svr.setCoordinator(co) err = co.Run(ctx) @@ -191,6 +192,11 @@ func (e *elector) campaignCoordinator(ctx context.Context) error { } } +func coordinatorSchedulerSettings() (int, time.Duration) { + schedulerCfg := config.GetGlobalServerConfig().Debug.Scheduler + return schedulerCfg.MaxTaskConcurrency, time.Duration(schedulerCfg.CheckBalanceInterval) +} + func (e *elector) campaignLogCoordinator(ctx context.Context) error { // Limit the frequency of elections to avoid putting too much pressure on the etcd server rl := rate.NewLimiter(rate.Every(time.Second), 1 /* burst */) diff --git a/server/module_election_test.go b/server/module_election_test.go new file mode 100644 index 0000000000..29026943ee --- /dev/null +++ b/server/module_election_test.go @@ -0,0 +1,41 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "testing" + "time" + + "github.com/pingcap/ticdc/pkg/config" + "github.com/stretchr/testify/require" +) + +func TestCoordinatorSchedulerSettingsUsesGlobalConfig(t *testing.T) { + // Scenario: the coordinator should honor the validated server scheduler config. + // Steps: install a temporary global config, read the coordinator settings, and + // verify both the concurrency limit and balance interval match the config. + original := config.GetGlobalServerConfig() + t.Cleanup(func() { + config.StoreGlobalServerConfig(original) + }) + + cfg := config.GetDefaultServerConfig() + cfg.Debug.Scheduler.MaxTaskConcurrency = 3 + cfg.Debug.Scheduler.CheckBalanceInterval = config.TomlDuration(22 * time.Second) + config.StoreGlobalServerConfig(cfg) + + maxTaskConcurrency, checkBalanceInterval := coordinatorSchedulerSettings() + require.Equal(t, 3, maxTaskConcurrency) + require.Equal(t, 22*time.Second, checkBalanceInterval) +}