Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions coordinator/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,13 +134,22 @@ func NewController(
batchSize,
oc,
changefeedDB,
drainController,
),
scheduler.DrainScheduler: coscheduler.NewDrainScheduler(
selfNode.ID.String(),
batchSize,
oc,
changefeedDB,
drainController,
),
scheduler.BalanceScheduler: coscheduler.NewBalanceScheduler(
selfNode.ID.String(),
batchSize,
oc,
changefeedDB,
balanceInterval,
drainController,
),
}),
eventCh: eventCh,
Expand Down
27 changes: 27 additions & 0 deletions coordinator/operator/operator_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,33 @@ func (oc *Controller) OperatorSize() int {
return len(oc.operators)
}

// CountMoveMaintainerOperatorsFromNodes returns the number of in-flight move
// operators whose origin is one of the given nodes.
func (oc *Controller) CountMoveMaintainerOperatorsFromNodes(origins []node.ID) int {
oc.mu.RLock()
defer oc.mu.RUnlock()

if len(origins) == 0 {
return 0
}
originSet := make(map[node.ID]struct{}, len(origins))
for _, origin := range origins {
originSet[origin] = struct{}{}
}

count := 0
for _, op := range oc.operators {
moveOp, ok := op.OP.(*MoveMaintainerOperator)
if !ok {
continue
}
if _, ok := originSet[moveOp.OriginNode()]; ok {
count++
}
}
return count
}

// HasOperatorInvolvingNode returns true if any in-flight operator affects n.
func (oc *Controller) HasOperatorInvolvingNode(n node.ID) bool {
oc.mu.RLock()
Expand Down
22 changes: 22 additions & 0 deletions coordinator/operator/operator_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,28 @@ func TestController_HasOperatorInvolvingNode(t *testing.T) {
require.False(t, oc.HasOperatorInvolvingNode("n3"))
}

func TestController_CountMoveMaintainerOperatorsFromNodes(t *testing.T) {
changefeedDB := changefeed.NewChangefeedDB(1216)
ctrl := gomock.NewController(t)
backend := mock_changefeed.NewMockBackend(ctrl)
oc, self, nodeManager := newOperatorControllerForTest(t, changefeedDB, backend)
dest := node.NewInfo("localhost:8301", "")
nodeManager.GetAliveNodes()[dest.ID] = dest

cfID := common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceName)
cf := changefeed.NewChangefeed(cfID, &config.ChangeFeedInfo{
ChangefeedID: cfID,
Config: config.GetDefaultReplicaConfig(),
SinkURI: "mysql://127.0.0.1:3306",
}, 1, true)
changefeedDB.AddReplicatingMaintainer(cf, self.ID)

require.True(t, oc.AddOperator(NewMoveMaintainerOperator(changefeedDB, cf, self.ID, dest.ID)))

require.Equal(t, 1, oc.CountMoveMaintainerOperatorsFromNodes([]node.ID{self.ID}))
require.Equal(t, 0, oc.CountMoveMaintainerOperatorsFromNodes([]node.ID{"n3"}))
}

func TestController_StopChangefeedDuringAddOperator(t *testing.T) {
// Setup test environment
changefeedDB := changefeed.NewChangefeedDB(1216)
Expand Down
8 changes: 8 additions & 0 deletions coordinator/operator/operator_move.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,14 @@ func (m *MoveMaintainerOperator) AffectedNodes() []node.ID {
return []node.ID{m.origin, m.dest}
}

// OriginNode returns the source node of the move.
func (m *MoveMaintainerOperator) OriginNode() node.ID {
m.lck.Lock()
defer m.lck.Unlock()

return m.origin
}

func (m *MoveMaintainerOperator) ID() common.ChangeFeedID {
return m.changefeed.ID
}
Expand Down
33 changes: 30 additions & 3 deletions coordinator/scheduler/balance.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"time"

"github.com/pingcap/ticdc/coordinator/changefeed"
"github.com/pingcap/ticdc/coordinator/drain"
"github.com/pingcap/ticdc/coordinator/operator"
appcontext "github.com/pingcap/ticdc/pkg/common/context"
"github.com/pingcap/ticdc/pkg/node"
Expand All @@ -33,6 +34,7 @@ type balanceScheduler struct {
operatorController *operator.Controller
changefeedDB *changefeed.ChangefeedDB
nodeManager *watcher.NodeManager
liveness *drain.Controller

random *rand.Rand
lastRebalanceTime time.Time
Expand All @@ -44,13 +46,16 @@ type balanceScheduler struct {
// `Schedule`.
// It speeds up rebalance.
forceBalance bool

drainBalanceBlockedUntil time.Time
}

func NewBalanceScheduler(
id string, batchSize int,
oc *operator.Controller,
changefeedDB *changefeed.ChangefeedDB,
balanceInterval time.Duration,
liveness *drain.Controller,
) *balanceScheduler {
return &balanceScheduler{
id: id,
Expand All @@ -61,28 +66,43 @@ func NewBalanceScheduler(
nodeManager: appcontext.GetService[*watcher.NodeManager](watcher.NodeManagerName),
checkBalanceInterval: balanceInterval,
lastRebalanceTime: time.Now(),
liveness: liveness,
}
}

func (s *balanceScheduler) Execute() time.Time {
now := time.Now()
if hasDrainingOrStoppingNode(s.liveness) {
// Pause regular balance scheduling while any node is observed draining or
// stopping. Each observation extends the block window by one balance
// interval so regular rebalance does not race with evacuation progress.
s.drainBalanceBlockedUntil = now.Add(s.drainCooldown())
return now.Add(s.checkBalanceInterval)
}
if now.Before(s.drainBalanceBlockedUntil) {
// If drain disappears before the previously extended block window expires,
// keep skipping regular rebalance until that window elapses.
return now.Add(s.checkBalanceInterval)
}
if !s.forceBalance && time.Since(s.lastRebalanceTime) < s.checkBalanceInterval {
return s.lastRebalanceTime.Add(s.checkBalanceInterval)
}
now := time.Now()

if s.operatorController.OperatorSize() > 0 || s.changefeedDB.GetAbsentSize() > 0 {
// not in stable schedule state, skip balance
return now.Add(s.checkBalanceInterval)
}

// check the balance status
moveSize := pkgScheduler.CheckBalanceStatus(s.changefeedDB.GetTaskSizePerNode(), s.nodeManager.GetAliveNodes())
activeNodes := s.nodeManager.GetAliveNodes()
activeNodes = filterSchedulableAliveNodes(activeNodes, s.liveness)
moveSize := pkgScheduler.CheckBalanceStatus(s.changefeedDB.GetTaskSizePerNode(), activeNodes)
if moveSize <= 0 {
// fast check the balance status, no need to do the balance,skip
return now.Add(s.checkBalanceInterval)
}
// balance changefeeds among the active nodes
movedSize := pkgScheduler.Balance(s.batchSize, s.random, s.nodeManager.GetAliveNodes(), s.changefeedDB.GetReplicating(),
movedSize := pkgScheduler.Balance(s.batchSize, s.random, activeNodes, s.changefeedDB.GetReplicating(),
func(cf *changefeed.Changefeed, nodeID node.ID) bool {
return s.operatorController.AddOperator(operator.NewMoveMaintainerOperator(s.changefeedDB, cf, cf.GetNodeID(), nodeID))
})
Expand All @@ -92,6 +112,13 @@ func (s *balanceScheduler) Execute() time.Time {
return now.Add(s.checkBalanceInterval)
}

func (s *balanceScheduler) drainCooldown() time.Duration {
if s.checkBalanceInterval > 0 {
return s.checkBalanceInterval
}
return time.Second
}

func (s *balanceScheduler) Name() string {
return "balance-scheduler"
}
Loading
Loading