newReplicas = planForBucket.getNewReplicas();
+ ReplicaReassignment reassignment =
+ ReplicaReassignment.build(
+ coordinatorContext.getAssignment(tableBucket), newReplicas);
+
+ if (planForBucket.isLeaderAction() && !reassignment.isBeingReassigned()) {
+ // buckets only need to change leader like leader replica rebalance.
+ LOG.info("trigger leader election for tableBucket {}.", tableBucket);
+ tableBucketStateMachine.handleStateChange(
+ Collections.singleton(tableBucket),
+ OnlineBucket,
+ new ReassignmentLeaderElection(newReplicas));
+ rebalanceManager.finishRebalanceTask(tableBucket, RebalanceStatus.COMPLETED);
+ } else {
+ try {
+ LOG.info(
+ "Try to processing bucket reassignment for tableBucket {} with assignment: {}.",
+ tableBucket,
+ reassignment);
+ onBucketReassignment(tableBucket, reassignment);
+ } catch (Exception e) {
+ LOG.error("Error when processing bucket reassignment.", e);
+ rebalanceManager.finishRebalanceTask(tableBucket, RebalanceStatus.FAILED);
+ }
+ }
+ }
+
+ /** try to finish rebalance tasks after receive notify leader and isr response. */
+ private void tryToCompleteRebalanceTask(TableBucket tableBucket) {
+ RebalancePlanForBucket planForBucket =
+ rebalanceManager.getRebalancePlanForBucket(tableBucket);
+ if (planForBucket != null) {
+ ReplicaReassignment reassignment =
+ ReplicaReassignment.build(
+ coordinatorContext.getAssignment(tableBucket),
+ planForBucket.getNewReplicas());
+ try {
+ if (planForBucket.isLeaderAction() && !reassignment.isBeingReassigned()) {
+ LeaderAndIsr leaderAndIsr = zooKeeperClient.getLeaderAndIsr(tableBucket).get();
+ int currentLeader = leaderAndIsr.leader();
+ if (currentLeader == planForBucket.getNewLeader()) {
+ // leader action finish.
+ rebalanceManager.finishRebalanceTask(
+ tableBucket, RebalanceStatus.COMPLETED);
+ }
+ } else {
+ boolean isReassignmentComplete =
+ isReassignmentComplete(tableBucket, reassignment);
+ if (isReassignmentComplete) {
+ LOG.info(
+ "Target replicas {} have all caught up with the leader for reassigning bucket {}",
+ reassignment.getTargetReplicas(),
+ tableBucket);
+ onBucketReassignment(tableBucket, reassignment);
+ }
+ }
+ } catch (Exception e) {
+ LOG.error(
+ "Failed to complete the reassignment for table bucket {}", tableBucket, e);
+ rebalanceManager.finishRebalanceTask(tableBucket, RebalanceStatus.FAILED);
+ }
+ }
+ }
+
+ /**
+ * Reassigning replicas for a tableBucket goes through a few steps listed in the code.
+ *
+ *
+ * - RS = current assigned replica set
+ *
- ORS = original assigned replica set
+ *
- TRS = target replica set
+ *
- AR = the replicas we are adding as part of this reassignment
+ *
- RR = the replicas we are removing as part of this reassignment
+ *
+ *
+ * A reassignment may have up to two phases, each with its own steps:
+ *
+ *
To complete the reassignment, we need to bring the new replicas into sync, so depending on
+ * the state of the ISR, we will execute one of the following steps.
+ *
+ *
Phase A (when TRS != ISR): The reassignment is not yet complete
+ *
+ *
+ * - A1. Bump the bucket epoch for the bucket and send LeaderAndIsr updates to ORS + TRS.
+ *
- A2. Start new replicas AR by moving replicas in AR to NewReplica state.
+ *
- A3. Send the start replica request to the tabletSevers in the reassigned replicas list
+ * that are not in the assigned
+ *
+ *
+ * Phase B (when TRS = ISR): The reassignment is complete
+ *
+ *
+ * - B1. Move all replicas in AR to OnlineReplica state.
+ *
- B2. Send a LeaderAndIsr request with RS = TRS. This will prevent the leader from adding
+ * any replica in TRS - ORS back in the isr. If the current leader is not in TRS or isn't
+ * alive, we move the leader to a new replica in TRS. We may send the LeaderAndIsr to more
+ * than the TRS replicas due to the way the partition state machine works (it reads
+ * replicas from ZK)
+ *
- B3. Move all replicas in RR to OfflineReplica state. As part of OfflineReplica state
+ * change, we shrink the isr to remove RR in ZooKeeper and send a LeaderAndIsr ONLY to the
+ * Leader to notify it of the shrunk isr. After that, we send a StopReplica (delete =
+ * false) to the replicas in RR.
+ *
- B4. Move all replicas in RR to NonExistentReplica state. This will send a StopReplica
+ * (delete = true) to he replicas in RR to physically delete the replicas on disk.
+ *
- B5. Set RS = TRS, AR = [], RR = [] in memory.
+ *
- B6. Update ZK with RS=TRS, AR=[], RR=[].
+ *
- B7. After electing leader, the replicas and isr information changes. So resend the
+ * update metadata request to every tabletServer.
+ *
- B8. Mark the ongoing rebalance task to finish.
+ *
+ *
+ * In general, there are two goals we want to aim for:
+ *
+ *
+ * - 1. Every replica present in the replica set of a LeaderAndIsrRequest gets the request
+ * sent to it
+ *
- 2. Replicas that are removed from a bucket's assignment get StopReplica sent to them
+ *
+ *
+ * For example, if ORS = {1,2,3} and TRS = {4,5,6}, the values in the table and leader/isr
+ * paths in ZK may go through the following transitions.
+ *
+ *
+ * | RS | AR | RR | leader | isr | step |
+ * | {1,2,3} | {} | {} | 1 | {1,2,3} | (initial state) |
+ * | {4,5,6,1,2,3} | {4,5,6} | {1,2,3} | 1 | {1,2,3} | (step A2) |
+ * | {4,5,6,1,2,3} | {4,5,6} | {1,2,3} | 1 | {1,2,3,4,5,6} | (phase B) |
+ * | {4,5,6,1,2,3} | {4,5,6} | {1,2,3} | 4 | {1,2,3,4,5,6} | (step B3) |
+ * | {4,5,6,1,2,3} | {4,5,6} | {1,2,3} | 4 | {4,5,6} | (step B4) |
+ * | {4,5,6} | {} | {} | 4 | {4,5,6} | (step B6) |
+ *
+ *
+ * Note that we have to update RS in ZK with TRS last since it's the only place where we
+ * store ORS persistently. This way, if the coordinatorServer crashes before that step, we can
+ * still recover.
+ */
+ private void onBucketReassignment(TableBucket tableBucket, ReplicaReassignment reassignment)
+ throws Exception {
+ List addingReplicas = reassignment.addingReplicas;
+ List removingReplicas = reassignment.removingReplicas;
+
+ if (!isReassignmentComplete(tableBucket, reassignment)) {
+ // A1. Send LeaderAndIsr request to every replica in ORS + TRS (with the new RS, AR and
+ // RR).
+ updateBucketEpochAndSendRequest(tableBucket, reassignment);
+
+ // A2. Set RS = TRS, AR = [], RR = [] in memory.
+ coordinatorContext.updateBucketReplicaAssignment(tableBucket, reassignment.replicas);
+ updateReplicaAssignmentForBucket(tableBucket, reassignment.replicas);
+
+ // A3. replicas in AR -> NewReplica
+ // send the start replica request to the tabletSevers in the reassigned replicas list
+ // that are not in the assigned
+ addingReplicas.forEach(
+ replica ->
+ replicaStateMachine.handleStateChanges(
+ Collections.singleton(
+ new TableBucketReplica(tableBucket, replica)),
+ NewReplica));
+ } else {
+ // B1. replicas in AR -> OnlineReplica
+ addingReplicas.forEach(
+ replica ->
+ replicaStateMachine.handleStateChanges(
+ Collections.singleton(
+ new TableBucketReplica(tableBucket, replica)),
+ OnlineReplica));
+ List targetReplicas = reassignment.getTargetReplicas();
+ // B2. Send LeaderAndIsr request with a potential new leader (if current leader not in
+ // TRS) and a new RS (using TRS) and same isr to every tabletServer in ORS + TRS or TRS
+ maybeReassignedBucketLeaderIfRequired(tableBucket, targetReplicas);
+ // B3. replicas in RR -> Offline (force those replicas out of isr)
+ // B4. replicas in RR -> NonExistentReplica (force those replicas to be deleted)
+ stopRemovedReplicasOfReassignedBucket(tableBucket, removingReplicas);
+ // B5. Set RS = TRS, AR = [], RR = [] in memory.
+ coordinatorContext.updateBucketReplicaAssignment(
+ tableBucket, reassignment.getTargetReplicas());
+ // B6. Update ZK with RS = TRS, AR = [], RR = [].
+ updateReplicaAssignmentForBucket(tableBucket, targetReplicas);
+ // B7. After electing a leader in B3, the replicas and isr information changes, so
+ // resend the update metadata request to every tabletServer.
+ updateTabletServerMetadataCache(
+ new HashSet<>(coordinatorContext.getLiveTabletServers().values()),
+ null,
+ null,
+ Collections.singleton(tableBucket));
+ // B8. Mark the ongoing rebalance task to finish.
+ rebalanceManager.finishRebalanceTask(tableBucket, RebalanceStatus.COMPLETED);
+ }
+ }
+
+ private boolean isReassignmentComplete(
+ TableBucket tableBucket, ReplicaReassignment reassignment) throws Exception {
+ LeaderAndIsr leaderAndIsr = zooKeeperClient.getLeaderAndIsr(tableBucket).get();
+ List isr = leaderAndIsr.isr();
+ List targetReplicas = reassignment.getTargetReplicas();
+ return targetReplicas.isEmpty() || new HashSet<>(isr).containsAll(targetReplicas);
+ }
+
+ private void maybeReassignedBucketLeaderIfRequired(
+ TableBucket tableBucket, List targetReplicas) {
+ LeaderAndIsr leaderAndIsr = coordinatorContext.getBucketLeaderAndIsr(tableBucket).get();
+ int currentLeader = leaderAndIsr.leader();
+ if (currentLeader != targetReplicas.get(0)) {
+ LOG.info(
+ "Leader {} for tableBucket {} being reassigned. Re-electing leader to {}",
+ currentLeader,
+ tableBucket,
+ targetReplicas.get(0));
+ tableBucketStateMachine.handleStateChange(
+ Collections.singleton(tableBucket),
+ OnlineBucket,
+ new ReassignmentLeaderElection(targetReplicas));
+ }
+ }
+
+ private void stopRemovedReplicasOfReassignedBucket(
+ TableBucket tableBucket, List removingReplicas) {
+ Set replicasToBeDeleted = new HashSet<>();
+ removingReplicas.forEach(
+ replica -> replicasToBeDeleted.add(new TableBucketReplica(tableBucket, replica)));
+ replicaStateMachine.handleStateChanges(replicasToBeDeleted, OfflineReplica);
+ // send stop replica command to the old replicas.
+ replicaStateMachine.handleStateChanges(replicasToBeDeleted, ReplicaDeletionStarted);
+ }
+
+ private void updateReplicaAssignmentForBucket(
+ TableBucket tableBucket, List targetReplicas) throws Exception {
+ long tableId = tableBucket.getTableId();
+ @Nullable Long partitionId = tableBucket.getPartitionId();
+ if (partitionId == null) {
+ Map> tableAssignment =
+ coordinatorContext.getTableAssignment(tableId);
+ tableAssignment.put(tableBucket.getBucket(), targetReplicas);
+ Map newTableAssignment = new HashMap<>();
+ tableAssignment.forEach(
+ (bucket, replicas) ->
+ newTableAssignment.put(bucket, new BucketAssignment(replicas)));
+ zooKeeperClient.updateTableAssignment(tableId, new TableAssignment(newTableAssignment));
+ } else {
+ Map> partitionAssignment =
+ coordinatorContext.getPartitionAssignment(
+ new TablePartition(tableId, partitionId));
+ partitionAssignment.put(tableBucket.getBucket(), targetReplicas);
+ Map newPartitionAssignment = new HashMap<>();
+ partitionAssignment.forEach(
+ (bucket, replicas) ->
+ newPartitionAssignment.put(bucket, new BucketAssignment(replicas)));
+ zooKeeperClient.updatePartitionAssignment(
+ partitionId, new PartitionAssignment(tableId, newPartitionAssignment));
+ }
+ }
+
private List tryProcessAdjustIsr(
Map leaderAndIsrList) {
// TODO verify leader epoch.
@@ -1164,6 +1532,9 @@ private List tryProcessAdjustIsr(
// update coordinator leader and isr cache.
newLeaderAndIsrList.forEach(coordinatorContext::putBucketLeaderAndIsr);
+ // First, try to judge whether the bucket is in rebalance task when isr change.
+ newLeaderAndIsrList.keySet().forEach(this::tryToCompleteRebalanceTask);
+
// TODO update metadata for all alive tablet servers.
return result;
@@ -1217,6 +1588,13 @@ private void validateLeaderAndIsr(TableBucket tableBucket, LeaderAndIsr newLeade
throw new IneligibleReplicaException(errorMsg);
}
}
+
+ List isr = newLeaderAndIsr.isr();
+ Set assignment = new HashSet<>(coordinatorContext.getAssignment(tableBucket));
+ if (!assignment.containsAll(isr)) {
+ throw new FencedLeaderEpochException(
+ "The request isr in adjust isr request is not in assignment.");
+ }
}
}
@@ -1526,7 +1904,7 @@ private ControlledShutdownResponse tryProcessControlledShutdown(
}
tableBucketStateMachine.handleStateChange(
- bucketsLedByServer, OnlineBucket, CONTROLLED_SHUTDOWN_ELECTION);
+ bucketsLedByServer, OnlineBucket, new ControlledShutdownLeaderElection());
// TODO need send stop request to the leader?
@@ -1637,8 +2015,114 @@ private void updateTabletServerMetadataCache(
coordinatorRequestBatch.sendUpdateMetadataRequest();
}
+ private void updateBucketEpochAndSendRequest(
+ TableBucket tableBucket, ReplicaReassignment reassignment) throws Exception {
+ Optional leaderAndIsrOpt = zooKeeperClient.getLeaderAndIsr(tableBucket);
+ if (!leaderAndIsrOpt.isPresent()) {
+ return;
+ }
+ LeaderAndIsr leaderAndIsr = leaderAndIsrOpt.get();
+
+ String partitionName = null;
+ if (tableBucket.getPartitionId() != null) {
+ partitionName = coordinatorContext.getPartitionName(tableBucket.getPartitionId());
+ if (partitionName == null) {
+ LOG.error("Can't find partition name for partition: {}.", tableBucket.getBucket());
+ return;
+ }
+ }
+
+ List newReplicas = reassignment.replicas;
+ // pass the original isr not include the new replicas.
+ LeaderAndIsr newLeaderAndIsr = leaderAndIsr.newLeaderAndIsr(leaderAndIsr.isr());
+
+ coordinatorContext.putBucketLeaderAndIsr(tableBucket, newLeaderAndIsr);
+ zooKeeperClient.updateLeaderAndIsr(tableBucket, newLeaderAndIsr);
+
+ coordinatorRequestBatch.newBatch();
+ coordinatorRequestBatch.addNotifyLeaderRequestForTabletServers(
+ new HashSet<>(newReplicas),
+ PhysicalTablePath.of(
+ coordinatorContext.getTablePathById(tableBucket.getTableId()),
+ partitionName),
+ tableBucket,
+ newReplicas,
+ newLeaderAndIsr);
+ coordinatorRequestBatch.sendRequestToTabletServers(
+ coordinatorContext.getCoordinatorEpoch());
+ }
+
@VisibleForTesting
CompletedSnapshotStoreManager completedSnapshotStoreManager() {
return completedSnapshotStoreManager;
}
+
+ private static final class ReplicaReassignment {
+ private final List replicas;
+ private final List addingReplicas;
+ private final List removingReplicas;
+
+ private ReplicaReassignment(
+ List replicas,
+ List addingReplicas,
+ List removingReplicas) {
+ this.replicas = Collections.unmodifiableList(replicas);
+ this.addingReplicas = Collections.unmodifiableList(addingReplicas);
+ this.removingReplicas = Collections.unmodifiableList(removingReplicas);
+ }
+
+ private static ReplicaReassignment build(
+ List originReplicas, List targetReplicas) {
+ // targetReplicas behind originReplicas in full set.
+ List fullReplicaSet = new ArrayList<>(targetReplicas);
+ fullReplicaSet.addAll(originReplicas);
+ fullReplicaSet = fullReplicaSet.stream().distinct().collect(Collectors.toList());
+
+ List newAddingReplicas = new ArrayList<>(fullReplicaSet);
+ newAddingReplicas.removeAll(originReplicas);
+
+ List newRemovingReplicas = new ArrayList<>(originReplicas);
+ newRemovingReplicas.removeAll(targetReplicas);
+
+ return new ReplicaReassignment(fullReplicaSet, newAddingReplicas, newRemovingReplicas);
+ }
+
+ private List getTargetReplicas() {
+ List computed = new ArrayList<>(replicas);
+ computed.removeAll(removingReplicas);
+ return Collections.unmodifiableList(computed);
+ }
+
+ private boolean isBeingReassigned() {
+ return !addingReplicas.isEmpty() || !removingReplicas.isEmpty();
+ }
+
+ @Override
+ public String toString() {
+ return String.format(
+ "ReplicaAssignment(replicas=%s, addingReplicas=%s, removingReplicas=%s)",
+ replicas, addingReplicas, removingReplicas);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ ReplicaReassignment that = (ReplicaReassignment) o;
+ return Objects.equals(replicas, that.replicas)
+ && Objects.equals(addingReplicas, that.addingReplicas)
+ && Objects.equals(removingReplicas, that.removingReplicas);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(replicas, addingReplicas, removingReplicas);
+ }
+ }
}
diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java
index cd546f4479..76a3fbd68c 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java
@@ -33,6 +33,7 @@
import org.apache.fluss.server.ServerBase;
import org.apache.fluss.server.authorizer.Authorizer;
import org.apache.fluss.server.authorizer.AuthorizerLoader;
+import org.apache.fluss.server.coordinator.rebalance.RebalanceManager;
import org.apache.fluss.server.metadata.CoordinatorMetadataCache;
import org.apache.fluss.server.metadata.ServerMetadataCache;
import org.apache.fluss.server.metrics.ServerMetricUtils;
@@ -508,6 +509,11 @@ public DynamicConfigManager getDynamicConfigManager() {
return dynamicConfigManager;
}
+ @VisibleForTesting
+ public RebalanceManager getRebalanceManager() {
+ return coordinatorEventProcessor.getRebalanceManager();
+ }
+
private static void validateConfigs(Configuration conf) {
if (conf.get(ConfigOptions.DEFAULT_REPLICATION_FACTOR) < 1) {
throw new IllegalConfigurationException(
diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
index 7fc53b96f8..2ef7d04af6 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
@@ -20,6 +20,7 @@
import org.apache.fluss.annotation.VisibleForTesting;
import org.apache.fluss.cluster.ServerType;
import org.apache.fluss.cluster.TabletServerInfo;
+import org.apache.fluss.cluster.rebalance.GoalType;
import org.apache.fluss.cluster.rebalance.ServerTag;
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
@@ -113,12 +114,16 @@
import org.apache.fluss.server.coordinator.event.AccessContextEvent;
import org.apache.fluss.server.coordinator.event.AddServerTagEvent;
import org.apache.fluss.server.coordinator.event.AdjustIsrReceivedEvent;
+import org.apache.fluss.server.coordinator.event.CancelRebalanceEvent;
import org.apache.fluss.server.coordinator.event.CommitKvSnapshotEvent;
import org.apache.fluss.server.coordinator.event.CommitLakeTableSnapshotEvent;
import org.apache.fluss.server.coordinator.event.CommitRemoteLogManifestEvent;
import org.apache.fluss.server.coordinator.event.ControlledShutdownEvent;
import org.apache.fluss.server.coordinator.event.EventManager;
+import org.apache.fluss.server.coordinator.event.ListRebalanceProgressEvent;
+import org.apache.fluss.server.coordinator.event.RebalanceEvent;
import org.apache.fluss.server.coordinator.event.RemoveServerTagEvent;
+import org.apache.fluss.server.coordinator.rebalance.goal.Goal;
import org.apache.fluss.server.entity.CommitKvSnapshotData;
import org.apache.fluss.server.entity.LakeTieringTableInfo;
import org.apache.fluss.server.entity.TablePropertyChanges;
@@ -140,7 +145,9 @@
import javax.annotation.Nullable;
import java.io.UncheckedIOException;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -153,6 +160,7 @@
import static org.apache.fluss.config.FlussConfigUtils.isTableStorageConfig;
import static org.apache.fluss.rpc.util.CommonRpcMessageUtils.toAclBindingFilters;
import static org.apache.fluss.rpc.util.CommonRpcMessageUtils.toAclBindings;
+import static org.apache.fluss.server.coordinator.rebalance.goal.GoalUtils.getGoalByType;
import static org.apache.fluss.server.utils.ServerRpcMessageUtils.fromTablePath;
import static org.apache.fluss.server.utils.ServerRpcMessageUtils.getAdjustIsrData;
import static org.apache.fluss.server.utils.ServerRpcMessageUtils.getCommitLakeTableSnapshotData;
@@ -170,7 +178,6 @@
/** An RPC Gateway service for coordinator server. */
public final class CoordinatorService extends RpcServiceBase implements CoordinatorGateway {
-
private final int defaultBucketNumber;
private final int defaultReplicationFactor;
private final boolean logTableAllowCreation;
@@ -309,12 +316,29 @@ public CompletableFuture createTable(CreateTableRequest req
// first, generate the assignment
TableAssignment tableAssignment = null;
+ Map properties = tableDescriptor.getProperties();
+ boolean generateUnbalanceAssignment;
+ if (properties.containsKey(ConfigOptions.TABLE_GENERATE_UNBALANCE_TABLE_ASSIGNMENT.key())) {
+ generateUnbalanceAssignment =
+ Boolean.parseBoolean(
+ properties.get(
+ ConfigOptions.TABLE_GENERATE_UNBALANCE_TABLE_ASSIGNMENT.key()));
+ } else {
+ generateUnbalanceAssignment = false;
+ }
// only when it's no partitioned table do we generate the assignment for it
if (!tableDescriptor.isPartitioned()) {
// the replication factor must be set now
int replicaFactor = tableDescriptor.getReplicationFactor();
TabletServerInfo[] servers = metadataCache.getLiveServers();
- tableAssignment = generateAssignment(bucketCount, replicaFactor, servers);
+ if (generateUnbalanceAssignment) {
+ // this branch is only used for testing.
+ tableAssignment =
+ new TableAssignment(
+ generateUnBalanceAssignment(bucketCount, replicaFactor));
+ } else {
+ tableAssignment = generateAssignment(bucketCount, replicaFactor, servers);
+ }
}
// before create table in fluss, we may create in lake
@@ -528,9 +552,18 @@ public CompletableFuture createPartition(
// second, generate the PartitionAssignment.
int replicaFactor = table.getTableConfig().getReplicationFactor();
TabletServerInfo[] servers = metadataCache.getLiveServers();
- Map bucketAssignments =
- generateAssignment(table.bucketCount, replicaFactor, servers)
- .getBucketAssignments();
+ Map bucketAssignments;
+
+ boolean generateUnbalanceAssignment = table.getTableConfig().generateUnbalanceAssignment();
+ if (generateUnbalanceAssignment) {
+ // This branch is only used for testing.
+ bucketAssignments = generateUnBalanceAssignment(table.bucketCount, replicaFactor);
+ } else {
+ bucketAssignments =
+ generateAssignment(table.bucketCount, replicaFactor, servers)
+ .getBucketAssignments();
+ }
+
PartitionAssignment partitionAssignment =
new PartitionAssignment(table.tableId, bucketAssignments);
@@ -577,16 +610,15 @@ public CompletableFuture metadata(MetadataRequest request) {
AccessContextEvent metadataResponseAccessContextEvent =
new AccessContextEvent<>(
- ctx -> {
- return processMetadataRequest(
- request,
- listenerName,
- session,
- authorizer,
- metadataCache,
- new CoordinatorMetadataProvider(
- zkClient, metadataManager, ctx));
- });
+ ctx ->
+ processMetadataRequest(
+ request,
+ listenerName,
+ session,
+ authorizer,
+ metadataCache,
+ new CoordinatorMetadataProvider(
+ zkClient, metadataManager, ctx)));
eventManagerSupplier.get().put(metadataResponseAccessContextEvent);
return metadataResponseAccessContextEvent.getResultFuture();
}
@@ -851,19 +883,40 @@ public CompletableFuture removeServerTag(
@Override
public CompletableFuture rebalance(RebalanceRequest request) {
- throw new UnsupportedOperationException("Support soon!");
+ List goalsByPriority = new ArrayList<>();
+ Arrays.stream(request.getGoals())
+ .forEach(goal -> goalsByPriority.add(getGoalByType(GoalType.valueOf(goal))));
+ boolean isDryRun = request.isDryRun();
+
+ CompletableFuture response = new CompletableFuture<>();
+ eventManagerSupplier.get().put(new RebalanceEvent(goalsByPriority, isDryRun, response));
+ return response;
}
@Override
public CompletableFuture listRebalanceProgress(
ListRebalanceProgressRequest request) {
- throw new UnsupportedOperationException("Support soon!");
+ CompletableFuture response = new CompletableFuture<>();
+ eventManagerSupplier
+ .get()
+ .put(
+ new ListRebalanceProgressEvent(
+ request.hasRebalanceId() ? request.getRebalanceId() : null,
+ response));
+ return response;
}
@Override
public CompletableFuture cancelRebalance(
CancelRebalanceRequest request) {
- throw new UnsupportedOperationException("Support soon!");
+ CompletableFuture response = new CompletableFuture<>();
+ eventManagerSupplier
+ .get()
+ .put(
+ new CancelRebalanceEvent(
+ request.hasRebalanceId() ? request.getRebalanceId() : null,
+ response));
+ return response;
}
@VisibleForTesting
@@ -909,6 +962,24 @@ private void validateTableCreationPermission(
}
}
+ private Map generateUnBalanceAssignment(
+ int nBuckets, int replicationFactor) {
+ Map assignments = new HashMap<>();
+ for (int i = 0; i < nBuckets; i++) {
+ if (replicationFactor == 1) {
+ assignments.put(i, new BucketAssignment(Collections.singletonList(0)));
+ } else if (replicationFactor == 2) {
+ assignments.put(i, new BucketAssignment(Arrays.asList(0, 1)));
+ } else if (replicationFactor == 3) {
+ assignments.put(i, new BucketAssignment(Arrays.asList(0, 1, 2)));
+ } else {
+ throw new IllegalArgumentException(
+ "replicationFactor must be 1, 2 or 3 for unbalance assignment.");
+ }
+ }
+ return assignments;
+ }
+
static class DefaultLakeCatalogContext implements LakeCatalog.Context {
private final boolean isCreatingFlussTable;
diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/CancelRebalanceEvent.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/CancelRebalanceEvent.java
new file mode 100644
index 0000000000..8261a25df7
--- /dev/null
+++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/CancelRebalanceEvent.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator.event;
+
+import org.apache.fluss.rpc.messages.CancelRebalanceResponse;
+
+import javax.annotation.Nullable;
+
+import java.util.concurrent.CompletableFuture;
+
+/** The event for canceling rebalance. */
+public class CancelRebalanceEvent implements CoordinatorEvent {
+
+ private final @Nullable String rebalanceId;
+ private final CompletableFuture respCallback;
+
+ public CancelRebalanceEvent(
+ @Nullable String rebalanceId, CompletableFuture respCallback) {
+ this.respCallback = respCallback;
+ this.rebalanceId = rebalanceId;
+ }
+
+ public @Nullable String getRabalanceId() {
+ return rebalanceId;
+ }
+
+ public CompletableFuture getRespCallback() {
+ return respCallback;
+ }
+}
diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/ListRebalanceProgressEvent.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/ListRebalanceProgressEvent.java
new file mode 100644
index 0000000000..c0d944bcd2
--- /dev/null
+++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/ListRebalanceProgressEvent.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator.event;
+
+import org.apache.fluss.rpc.messages.ListRebalanceProgressResponse;
+
+import javax.annotation.Nullable;
+
+import java.util.concurrent.CompletableFuture;
+
+/** The event for listing rebalance progress. */
+public class ListRebalanceProgressEvent implements CoordinatorEvent {
+
+ private final @Nullable String rebalanceId;
+ private final CompletableFuture respCallback;
+
+ public ListRebalanceProgressEvent(
+ @Nullable String rebalanceId,
+ CompletableFuture respCallback) {
+ this.rebalanceId = rebalanceId;
+ this.respCallback = respCallback;
+ }
+
+ public @Nullable String getRabalanceId() {
+ return rebalanceId;
+ }
+
+ public CompletableFuture getRespCallback() {
+ return respCallback;
+ }
+}
diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/RebalanceEvent.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/RebalanceEvent.java
new file mode 100644
index 0000000000..63cb2fe9e4
--- /dev/null
+++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/RebalanceEvent.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator.event;
+
+import org.apache.fluss.rpc.messages.RebalanceResponse;
+import org.apache.fluss.server.coordinator.rebalance.goal.Goal;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+/** The event for rebalance. */
+public class RebalanceEvent implements CoordinatorEvent {
+
+ private final List goalsByPriority;
+ private final boolean isDryRun;
+ private final CompletableFuture respCallback;
+
+ public RebalanceEvent(
+ List goalsByPriority,
+ boolean isDryRun,
+ CompletableFuture respCallback) {
+ this.goalsByPriority = goalsByPriority;
+ this.isDryRun = isDryRun;
+ this.respCallback = respCallback;
+ }
+
+ public List getGoalsByPriority() {
+ return goalsByPriority;
+ }
+
+ public boolean isDryRun() {
+ return isDryRun;
+ }
+
+ public CompletableFuture getRespCallback() {
+ return respCallback;
+ }
+}
diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/ActionAcceptance.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/ActionAcceptance.java
new file mode 100644
index 0000000000..5e815e9c34
--- /dev/null
+++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/ActionAcceptance.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator.rebalance;
+
+/** Flags to indicate if an action is acceptable by the goal(s). */
+public enum ActionAcceptance {
+ /** Action is acceptable -- i.e. it does not violate goal constraints. */
+ ACCEPT,
+ /**
+ * Action is rejected in replica-level. But, the destination tabletServer may potentially accept
+ * actions of the same {@link ActionType} from the source tabletServer specified in the given
+ * action.
+ */
+ REPLICA_REJECT,
+
+ /**
+ * Action is rejected in server-level. hence, the destination tabletServer does not accept
+ * actions of the same {@link ActionType} from the source tabletServer specified in the given
+ * action.
+ */
+ SERVER_REJECT
+}
diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/ActionType.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/ActionType.java
new file mode 100644
index 0000000000..a24bf8acb6
--- /dev/null
+++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/ActionType.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator.rebalance;
+
+/** Flags to indicate the type of action. */
+public enum ActionType {
+ /** Move a replica from a source tabletServer to a destination tabletServer. */
+ REPLICA_MOVEMENT,
+
+ /**
+ * Move leadership of a leader from a source tabletServer to a follower of the same replica
+ * residing in a destination tabletServer.
+ */
+ LEADERSHIP_MOVEMENT;
+}
diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/ReBalancingAction.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/ReBalancingAction.java
new file mode 100644
index 0000000000..bd62f82c2a
--- /dev/null
+++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/ReBalancingAction.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator.rebalance;
+
+import org.apache.fluss.metadata.TableBucket;
+
+/** Represents the load rebalancing operation over a replica for Fluss Load GoalOptimizer. */
+public class ReBalancingAction {
+ private final TableBucket tableBucket;
+ private final Integer sourceServerId;
+ private final Integer destinationServerId;
+ private final ActionType actionType;
+
+ public ReBalancingAction(
+ TableBucket tableBucket,
+ Integer sourceServerId,
+ Integer destinationServerId,
+ ActionType actionType) {
+ this.tableBucket = tableBucket;
+ this.sourceServerId = sourceServerId;
+ this.destinationServerId = destinationServerId;
+ this.actionType = actionType;
+ }
+
+ public TableBucket getTableBucket() {
+ return tableBucket;
+ }
+
+ public Integer getSourceServerId() {
+ return sourceServerId;
+ }
+
+ public Integer getDestinationServerId() {
+ return destinationServerId;
+ }
+
+ public ActionType getActionType() {
+ return actionType;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ ReBalancingAction that = (ReBalancingAction) o;
+
+ if (!tableBucket.equals(that.tableBucket)) {
+ return false;
+ }
+ if (!sourceServerId.equals(that.sourceServerId)) {
+ return false;
+ }
+ if (!destinationServerId.equals(that.destinationServerId)) {
+ return false;
+ }
+ return actionType == that.actionType;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = tableBucket.hashCode();
+ result = 31 * result + sourceServerId.hashCode();
+ result = 31 * result + destinationServerId.hashCode();
+ result = 31 * result + actionType.hashCode();
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "ReBalancingAction{"
+ + "tableBucket="
+ + tableBucket
+ + ", sourceServerId="
+ + sourceServerId
+ + ", destinationServerId="
+ + destinationServerId
+ + ", actionType="
+ + actionType
+ + '}';
+ }
+}
diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManager.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManager.java
new file mode 100644
index 0000000000..c251906355
--- /dev/null
+++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManager.java
@@ -0,0 +1,437 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator.rebalance;
+
+import org.apache.fluss.annotation.VisibleForTesting;
+import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket;
+import org.apache.fluss.cluster.rebalance.RebalanceProgress;
+import org.apache.fluss.cluster.rebalance.RebalanceResultForBucket;
+import org.apache.fluss.cluster.rebalance.RebalanceStatus;
+import org.apache.fluss.cluster.rebalance.ServerTag;
+import org.apache.fluss.exception.RebalanceFailureException;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.server.coordinator.CoordinatorContext;
+import org.apache.fluss.server.coordinator.CoordinatorEventProcessor;
+import org.apache.fluss.server.coordinator.rebalance.goal.Goal;
+import org.apache.fluss.server.coordinator.rebalance.goal.GoalOptimizer;
+import org.apache.fluss.server.coordinator.rebalance.model.ClusterModel;
+import org.apache.fluss.server.coordinator.rebalance.model.RackModel;
+import org.apache.fluss.server.coordinator.rebalance.model.ServerModel;
+import org.apache.fluss.server.metadata.ServerInfo;
+import org.apache.fluss.server.zk.ZooKeeperClient;
+import org.apache.fluss.server.zk.data.LeaderAndIsr;
+import org.apache.fluss.server.zk.data.RebalancePlan;
+import org.apache.fluss.utils.MapUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.ArrayDeque;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.apache.fluss.cluster.rebalance.RebalanceStatus.CANCELED;
+import static org.apache.fluss.cluster.rebalance.RebalanceStatus.COMPLETED;
+import static org.apache.fluss.cluster.rebalance.RebalanceStatus.NOT_STARTED;
+import static org.apache.fluss.cluster.rebalance.RebalanceStatus.NO_TASK;
+import static org.apache.fluss.cluster.rebalance.RebalanceStatus.REBALANCING;
+import static org.apache.fluss.cluster.rebalance.RebalanceUtils.FINAL_STATUSES;
+import static org.apache.fluss.utils.Preconditions.checkArgument;
+import static org.apache.fluss.utils.Preconditions.checkNotNull;
+import static org.apache.fluss.utils.concurrent.LockUtils.inLock;
+
+/**
+ * A rebalance manager to generate rebalance plan, and execution rebalance plan.
+ *
+ * This manager can only be used in {@link CoordinatorEventProcessor} as a single threaded model.
+ */
+@ThreadSafe
+public class RebalanceManager {
+
+ private static final Logger LOG = LoggerFactory.getLogger(RebalanceManager.class);
+
+ private final AtomicBoolean isClosed = new AtomicBoolean(false);
+ private final Lock lock = new ReentrantLock();
+ private final ZooKeeperClient zkClient;
+ private final CoordinatorEventProcessor eventProcessor;
+
+ @GuardedBy("lock")
+ private final Queue ongoingRebalanceTasksQueue = new ArrayDeque<>();
+
+ /** A mapping from table bucket to rebalance status of pending and running tasks. */
+ @GuardedBy("lock")
+ private final Map ongoingRebalanceTasks =
+ MapUtils.newConcurrentHashMap();
+
+ /** A mapping from table bucket to rebalance status of failed or completed tasks. */
+ @GuardedBy("lock")
+ private final Map finishedRebalanceTasks =
+ MapUtils.newConcurrentHashMap();
+
+ @GuardedBy("lock")
+ private final GoalOptimizer goalOptimizer;
+
+ @GuardedBy("lock")
+ private long registerTime;
+
+ @GuardedBy("lock")
+ private volatile RebalanceStatus rebalanceStatus = NO_TASK;
+
+ @GuardedBy("lock")
+ private volatile @Nullable String currentRebalanceId;
+
+ public RebalanceManager(CoordinatorEventProcessor eventProcessor, ZooKeeperClient zkClient) {
+ this.eventProcessor = eventProcessor;
+ this.zkClient = zkClient;
+ this.goalOptimizer = new GoalOptimizer();
+ }
+
+ public void startup() {
+ LOG.info("Start up rebalance manager.");
+ initialize();
+ }
+
+ private void initialize() {
+ try {
+ zkClient.getRebalancePlan()
+ .ifPresent(
+ rebalancePlan ->
+ registerRebalance(
+ rebalancePlan.getRebalanceId(),
+ rebalancePlan.getExecutePlan()));
+ } catch (Exception e) {
+ LOG.error(
+ "Failed to get rebalance plan from zookeeper, it will be treated as no"
+ + "rebalance tasks.",
+ e);
+ }
+ }
+
+ public void registerRebalance(
+ String rebalanceId, Map rebalancePlan) {
+ checkNotClosed();
+ registerTime = System.currentTimeMillis();
+ // Register to zookeeper first.
+ try {
+ // first clear all exists tasks.
+ ongoingRebalanceTasks.clear();
+ ongoingRebalanceTasksQueue.clear();
+ finishedRebalanceTasks.clear();
+
+ Optional existPlanOpt = zkClient.getRebalancePlan();
+ RebalanceStatus newStatus = rebalancePlan.isEmpty() ? COMPLETED : NOT_STARTED;
+ if (!existPlanOpt.isPresent()) {
+ zkClient.registerRebalancePlan(
+ new RebalancePlan(rebalanceId, newStatus, rebalancePlan));
+ } else {
+ RebalancePlan existPlan = existPlanOpt.get();
+ if (FINAL_STATUSES.contains(existPlan.getRebalanceStatus())) {
+ zkClient.updateRebalancePlan(
+ new RebalancePlan(rebalanceId, newStatus, rebalancePlan));
+ } else {
+ throw new RebalanceFailureException(
+ "Rebalance task already exists. Please wait for it to finish or cancel it first.");
+ }
+ }
+
+ currentRebalanceId = rebalanceId;
+ rebalanceStatus = newStatus;
+ } catch (Exception e) {
+ LOG.error("Error when register rebalance plan to zookeeper.", e);
+ throw new RebalanceFailureException(
+ "Error when register rebalance plan to zookeeper.", e);
+ }
+
+ inLock(
+ lock,
+ () -> {
+ // Then, register to ongoingRebalanceTasks.
+ rebalancePlan.forEach(
+ ((tableBucket, rebalancePlanForBucket) -> {
+ ongoingRebalanceTasksQueue.add(tableBucket);
+ ongoingRebalanceTasks.put(
+ tableBucket,
+ RebalanceResultForBucket.of(
+ rebalancePlanForBucket, NOT_STARTED));
+ }));
+
+ // Trigger one rebalance task to execute.
+ rebalanceStatus = rebalancePlan.isEmpty() ? COMPLETED : REBALANCING;
+ processNewRebalanceTask();
+ });
+ }
+
+ public void finishRebalanceTask(TableBucket tableBucket, RebalanceStatus statusForBucket) {
+ checkNotClosed();
+ inLock(
+ lock,
+ () -> {
+ if (ongoingRebalanceTasksQueue.contains(tableBucket)) {
+ ongoingRebalanceTasksQueue.remove(tableBucket);
+ RebalanceResultForBucket resultForBucket =
+ ongoingRebalanceTasks.remove(tableBucket);
+ checkNotNull(resultForBucket, "RebalanceResultForBucket is null.");
+ finishedRebalanceTasks.put(
+ tableBucket,
+ RebalanceResultForBucket.of(
+ resultForBucket.plan(), statusForBucket));
+ LOG.info(
+ "Rebalance task {} in progress: {} tasks pending, {} completed.",
+ currentRebalanceId,
+ ongoingRebalanceTasksQueue.size(),
+ finishedRebalanceTasks.size());
+
+ if (ongoingRebalanceTasksQueue.isEmpty()) {
+ // All rebalance tasks are completed.
+ rebalanceStatus = COMPLETED;
+ completeRebalance();
+ } else {
+ // Trigger one rebalance task to execute.
+ processNewRebalanceTask();
+ }
+ }
+ });
+ }
+
+ public RebalanceProgress listRebalanceProgress(@Nullable String rebalanceId) {
+ checkNotClosed();
+ return inLock(
+ lock,
+ () -> {
+ if (rebalanceId != null
+ && currentRebalanceId != null
+ && !rebalanceId.equals(currentRebalanceId)) {
+ LOG.warn(
+ "Ignore the list rebalance task because it is not the current"
+ + " rebalance task.");
+ return new RebalanceProgress(
+ currentRebalanceId, NO_TASK, 0.0, Collections.emptyMap());
+ }
+
+ Map progressForBucketMap =
+ new HashMap<>();
+ progressForBucketMap.putAll(ongoingRebalanceTasks);
+ progressForBucketMap.putAll(finishedRebalanceTasks);
+ // the progress will be set at client.
+ return new RebalanceProgress(
+ currentRebalanceId, rebalanceStatus, 0.0, progressForBucketMap);
+ });
+ }
+
+ public void cancelRebalance(@Nullable String rebalanceId) {
+ checkNotClosed();
+ inLock(
+ lock,
+ () -> {
+ try {
+ if (rebalanceId != null
+ && currentRebalanceId != null
+ && !rebalanceId.equals(currentRebalanceId)) {
+ // do nothing.
+ LOG.warn(
+ "Ignore the cancel rebalance task because it is not the current"
+ + " rebalance task.");
+ return;
+ }
+
+ Optional rebalancePlanOpt = zkClient.getRebalancePlan();
+ if (rebalancePlanOpt.isPresent()) {
+ RebalancePlan rebalancePlan = rebalancePlanOpt.get();
+ zkClient.updateRebalancePlan(
+ new RebalancePlan(
+ rebalancePlan.getRebalanceId(),
+ CANCELED,
+ rebalancePlan.getExecutePlan()));
+ }
+ } catch (Exception e) {
+ LOG.error("Error when delete rebalance plan from zookeeper.", e);
+ }
+
+ rebalanceStatus = CANCELED;
+ ongoingRebalanceTasksQueue.clear();
+ ongoingRebalanceTasks.clear();
+
+ // Here, it will not clear finishedRebalanceTasks, because it will be used by
+ // listRebalanceProgress. It will be cleared when next register.
+
+ LOG.info("Cancel rebalance task success.");
+ });
+ }
+
+ public boolean hasOngoingRebalance() {
+ checkNotClosed();
+ return inLock(
+ lock,
+ () -> !ongoingRebalanceTasks.isEmpty() || !ongoingRebalanceTasksQueue.isEmpty());
+ }
+
+ public RebalancePlan generateRebalancePlan(List goalsByPriority) {
+ checkNotClosed();
+ List rebalancePlanForBuckets;
+ try {
+ // Generate the latest cluster model.
+ ClusterModel clusterModel = buildClusterModel(eventProcessor.getCoordinatorContext());
+
+ // do optimize.
+ rebalancePlanForBuckets = goalOptimizer.doOptimizeOnce(clusterModel, goalsByPriority);
+ } catch (Exception e) {
+ LOG.error("Failed to generate rebalance plan.", e);
+ throw e;
+ }
+
+ // group by tableId and partitionId to generate rebalance plan.
+ return buildRebalancePlan(rebalancePlanForBuckets);
+ }
+
+ public @Nullable RebalancePlanForBucket getRebalancePlanForBucket(TableBucket tableBucket) {
+ checkNotClosed();
+ return inLock(
+ lock,
+ () -> {
+ RebalanceResultForBucket resultForBucket =
+ ongoingRebalanceTasks.get(tableBucket);
+ if (resultForBucket != null) {
+ return resultForBucket.plan();
+ }
+ return null;
+ });
+ }
+
+ private void processNewRebalanceTask() {
+ TableBucket tableBucket = ongoingRebalanceTasksQueue.peek();
+ if (tableBucket != null && ongoingRebalanceTasks.containsKey(tableBucket)) {
+ RebalanceResultForBucket resultForBucket = ongoingRebalanceTasks.get(tableBucket);
+ RebalanceResultForBucket rebalanceResultForBucket =
+ RebalanceResultForBucket.of(resultForBucket.plan(), REBALANCING);
+ eventProcessor.tryToExecuteRebalanceTask(rebalanceResultForBucket.plan());
+ }
+ }
+
+ private void completeRebalance() {
+ checkNotClosed();
+ inLock(
+ lock,
+ () -> {
+ try {
+ Optional rebalancePlanOpt = zkClient.getRebalancePlan();
+ if (rebalancePlanOpt.isPresent()) {
+ RebalancePlan rebalancePlan = rebalancePlanOpt.get();
+ zkClient.updateRebalancePlan(
+ new RebalancePlan(
+ rebalancePlan.getRebalanceId(),
+ COMPLETED,
+ rebalancePlan.getExecutePlan()));
+ }
+ } catch (Exception e) {
+ LOG.error("Error when update rebalance plan from zookeeper.", e);
+ }
+
+ ongoingRebalanceTasks.clear();
+ ongoingRebalanceTasksQueue.clear();
+
+ // Here, it will not clear finishedRebalanceTasks, because it will be used by
+ // listRebalanceProgress. It will be cleared when next register.
+
+ LOG.info(
+ "Rebalance complete with {} ms.",
+ System.currentTimeMillis() - registerTime);
+ });
+ }
+
+ private ClusterModel buildClusterModel(CoordinatorContext coordinatorContext) {
+ Map liveTabletServers = coordinatorContext.getLiveTabletServers();
+ Map serverTags = coordinatorContext.getServerTags();
+
+ Map serverModelMap = new HashMap<>();
+ for (ServerInfo serverInfo : liveTabletServers.values()) {
+ Integer id = serverInfo.id();
+ String rack = serverInfo.rack() == null ? RackModel.DEFAULT_RACK : serverInfo.rack();
+ if (serverTags.containsKey(id)) {
+ serverModelMap.put(
+ id, new ServerModel(id, rack, !isServerOffline(serverTags.get(id))));
+ } else {
+ serverModelMap.put(id, new ServerModel(id, rack, true));
+ }
+ }
+
+ ClusterModel clusterModel = initialClusterModel(serverModelMap);
+
+ // Try to update the cluster model with the latest bucket states.
+ Set allBuckets = coordinatorContext.getAllBuckets();
+ for (TableBucket tableBucket : allBuckets) {
+ List assignment = coordinatorContext.getAssignment(tableBucket);
+ Optional bucketLeaderAndIsrOpt =
+ coordinatorContext.getBucketLeaderAndIsr(tableBucket);
+ checkArgument(bucketLeaderAndIsrOpt.isPresent(), "Bucket leader and isr is empty.");
+ LeaderAndIsr isr = bucketLeaderAndIsrOpt.get();
+ int leader = isr.leader();
+ for (int i = 0; i < assignment.size(); i++) {
+ int replica = assignment.get(i);
+ clusterModel.createReplica(replica, tableBucket, i, leader == replica);
+ }
+ }
+ return clusterModel;
+ }
+
+ private RebalancePlan buildRebalancePlan(List rebalancePlanForBuckets) {
+ Map bucketPlan = new HashMap<>();
+ for (RebalancePlanForBucket rebalancePlanForBucket : rebalancePlanForBuckets) {
+ bucketPlan.put(rebalancePlanForBucket.getTableBucket(), rebalancePlanForBucket);
+ }
+ return new RebalancePlan(UUID.randomUUID().toString(), NOT_STARTED, bucketPlan);
+ }
+
+ private boolean isServerOffline(ServerTag serverTag) {
+ return serverTag == ServerTag.PERMANENT_OFFLINE || serverTag == ServerTag.TEMPORARY_OFFLINE;
+ }
+
+ private ClusterModel initialClusterModel(Map serverModelMap) {
+ SortedSet servers = new TreeSet<>(serverModelMap.values());
+ return new ClusterModel(servers);
+ }
+
+ private void checkNotClosed() {
+ if (isClosed.get()) {
+ throw new IllegalStateException("RebalanceManager is already closed.");
+ }
+ }
+
+ public void close() {
+ isClosed.compareAndSet(false, true);
+ }
+
+ @VisibleForTesting
+ public ClusterModel buildClusterModel() {
+ return buildClusterModel(eventProcessor.getCoordinatorContext());
+ }
+}
diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/ReplicaReassignment.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/ReplicaReassignment.java
new file mode 100644
index 0000000000..1e4ff17ea2
--- /dev/null
+++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/ReplicaReassignment.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator.rebalance;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/** Replica reassignment. */
+public class ReplicaReassignment {
+ private final List replicas;
+ private final List addingReplicas;
+ private final List removingReplicas;
+
+ private ReplicaReassignment(
+ List replicas, List addingReplicas, List removingReplicas) {
+ this.replicas = Collections.unmodifiableList(replicas);
+ this.addingReplicas = Collections.unmodifiableList(addingReplicas);
+ this.removingReplicas = Collections.unmodifiableList(removingReplicas);
+ }
+
+ private static ReplicaReassignment build(
+ List originReplicas, List targetReplicas) {
+ // targetReplicas behind originReplicas in full set.
+ List fullReplicaSet = new ArrayList<>(targetReplicas);
+ fullReplicaSet.addAll(originReplicas);
+ fullReplicaSet = fullReplicaSet.stream().distinct().collect(Collectors.toList());
+
+ List newAddingReplicas = new ArrayList<>(fullReplicaSet);
+ newAddingReplicas.removeAll(originReplicas);
+
+ List newRemovingReplicas = new ArrayList<>(originReplicas);
+ newRemovingReplicas.removeAll(targetReplicas);
+
+ return new ReplicaReassignment(fullReplicaSet, newAddingReplicas, newRemovingReplicas);
+ }
+
+ private List getTargetReplicas() {
+ List computed = new ArrayList<>(replicas);
+ computed.removeAll(removingReplicas);
+ return Collections.unmodifiableList(computed);
+ }
+
+ private List getOriginReplicas() {
+ List computed = new ArrayList<>(replicas);
+ computed.removeAll(addingReplicas);
+ return Collections.unmodifiableList(computed);
+ }
+
+ private boolean isBeingReassigned() {
+ return !addingReplicas.isEmpty() || !removingReplicas.isEmpty();
+ }
+
+ @Override
+ public String toString() {
+ return String.format(
+ "ReplicaAssignment(replicas=%s, addingReplicas=%s, removingReplicas=%s)",
+ replicas, addingReplicas, removingReplicas);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ ReplicaReassignment that = (ReplicaReassignment) o;
+ return Objects.equals(replicas, that.replicas)
+ && Objects.equals(addingReplicas, that.addingReplicas)
+ && Objects.equals(removingReplicas, that.removingReplicas);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(replicas, addingReplicas, removingReplicas);
+ }
+}
diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/AbstractGoal.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/AbstractGoal.java
new file mode 100644
index 0000000000..513d7209db
--- /dev/null
+++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/AbstractGoal.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator.rebalance.goal;
+
+import org.apache.fluss.exception.RebalanceFailureException;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.server.coordinator.rebalance.ActionAcceptance;
+import org.apache.fluss.server.coordinator.rebalance.ActionType;
+import org.apache.fluss.server.coordinator.rebalance.ReBalancingAction;
+import org.apache.fluss.server.coordinator.rebalance.model.ClusterModel;
+import org.apache.fluss.server.coordinator.rebalance.model.ClusterModelStats;
+import org.apache.fluss.server.coordinator.rebalance.model.ReplicaModel;
+import org.apache.fluss.server.coordinator.rebalance.model.ServerModel;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.SortedSet;
+
+import static org.apache.fluss.server.coordinator.rebalance.ActionAcceptance.ACCEPT;
+import static org.apache.fluss.server.coordinator.rebalance.ActionType.LEADERSHIP_MOVEMENT;
+import static org.apache.fluss.server.coordinator.rebalance.ActionType.REPLICA_MOVEMENT;
+import static org.apache.fluss.server.coordinator.rebalance.goal.GoalOptimizerUtils.isProposalAcceptableForOptimizedGoals;
+import static org.apache.fluss.server.coordinator.rebalance.goal.GoalUtils.legitMove;
+
+/** An abstract class for goals. */
+public abstract class AbstractGoal implements Goal {
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractGoal.class);
+ protected boolean finished;
+ protected boolean succeeded;
+
+ public AbstractGoal() {
+ finished = false;
+ succeeded = true;
+ }
+
+ @Override
+ public void optimize(ClusterModel clusterModel, Set optimizedGoals) {
+ LOG.debug("Starting Optimizing for goal {}", name());
+ // Initialize pre-optimized stats.
+ ClusterModelStats statsBeforeOptimization = clusterModel.getClusterStats();
+ LOG.trace("[PRE - {}] {}", name(), statsBeforeOptimization);
+ finished = false;
+ long goalStartTime = System.currentTimeMillis();
+ initGoalState(clusterModel);
+ SortedSet offlineServers = clusterModel.offlineServers();
+
+ while (!finished) {
+ for (ServerModel server : serversToBalance(clusterModel)) {
+ rebalanceForServer(server, clusterModel, optimizedGoals);
+ }
+ updateGoalState(clusterModel);
+ }
+
+ ClusterModelStats statsAfterOptimization = clusterModel.getClusterStats();
+ LOG.trace("[POST - {}] {}", name(), statsAfterOptimization);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Finished optimization for {} in {}ms.",
+ name(),
+ System.currentTimeMillis() - goalStartTime);
+ }
+ LOG.trace("Cluster after optimization is {}", clusterModel);
+ // The optimization cannot make stats worse unless the cluster has (1) offline servers for
+ // replica move with replicas.
+ if (offlineServers.isEmpty()) {
+ ClusterModelStatsComparator comparator = clusterModelStatsComparator();
+ // Throw exception when the stats before optimization is preferred.
+ if (comparator.compare(statsAfterOptimization, statsBeforeOptimization) < 0) {
+ // If a goal provides worse stats after optimization, that indicates an
+ // implementation error with the goal.
+ throw new IllegalStateException(
+ String.format(
+ "Optimization for goal %s failed because the optimized result is worse than before."
+ + " Reason: %s.",
+ name(), comparator.explainLastComparison()));
+ }
+ }
+ }
+
+ @Override
+ public void finish() {
+ finished = true;
+ }
+
+ @Override
+ public String name() {
+ return this.getClass().getSimpleName();
+ }
+
+ /**
+ * Get sorted tabletServers that the rebalance process will go over to apply balancing actions
+ * to replicas they contain.
+ */
+ protected SortedSet serversToBalance(ClusterModel clusterModel) {
+ return clusterModel.servers();
+ }
+
+ /**
+ * Initialize states that this goal requires. E.g. run sanity checks regarding hard goals
+ * requirements.
+ */
+ protected abstract void initGoalState(ClusterModel clusterModel)
+ throws RebalanceFailureException;
+
+ /**
+ * Rebalance the given tabletServers without violating the constraints of the current goal and
+ * optimized goals.
+ */
+ protected abstract void rebalanceForServer(
+ ServerModel server, ClusterModel clusterModel, Set optimizedGoals)
+ throws RebalanceFailureException;
+
+ /** Update goal state after one round of rebalance. */
+ protected abstract void updateGoalState(ClusterModel clusterModel)
+ throws RebalanceFailureException;
+
+ /**
+ * Check if requirements of this goal are not violated if this action is applied to the given
+ * cluster state, {@code false} otherwise.
+ */
+ protected abstract boolean selfSatisfied(ClusterModel clusterModel, ReBalancingAction action);
+
+ /**
+ * Attempt to apply the given balancing action to the given replica in the given cluster. The
+ * application considers the candidate tabletServers as the potential destination tabletServers
+ * for replica movement or the location of followers for leadership transfer. If the movement
+ * attempt succeeds, the function returns the server id of the destination, otherwise the
+ * function returns null.
+ */
+ protected ServerModel maybeApplyBalancingAction(
+ ClusterModel clusterModel,
+ ReplicaModel replica,
+ Collection candidateServers,
+ ActionType action,
+ Set optimizedGoals) {
+ List eligibleServers = new ArrayList<>(candidateServers);
+ TableBucket tableBucket = replica.tableBucket();
+ for (ServerModel server : eligibleServers) {
+ ReBalancingAction proposal =
+ new ReBalancingAction(tableBucket, replica.server().id(), server.id(), action);
+ // A replica should be moved if:
+ // 0. The move is legit.
+ // 1. The goal requirements are not violated if this action is applied to the given
+ // cluster state.
+ // 2. The movement is acceptable by the previously optimized goals.
+
+ if (!legitMove(replica, server, clusterModel, action)) {
+ LOG.trace("Replica move to server is not legit for {}.", proposal);
+ continue;
+ }
+
+ if (!selfSatisfied(clusterModel, proposal)) {
+ LOG.trace("Unable to self-satisfy proposal {}.", proposal);
+ continue;
+ }
+
+ ActionAcceptance acceptance =
+ isProposalAcceptableForOptimizedGoals(optimizedGoals, proposal, clusterModel);
+ LOG.trace(
+ "Trying to apply legit and self-satisfied action {}, actionAcceptance = {}",
+ proposal,
+ acceptance);
+ if (acceptance == ACCEPT) {
+ if (action == LEADERSHIP_MOVEMENT) {
+ clusterModel.relocateLeadership(
+ tableBucket, replica.server().id(), server.id());
+ } else if (action == REPLICA_MOVEMENT) {
+ clusterModel.relocateReplica(tableBucket, replica.server().id(), server.id());
+ }
+ return server;
+ }
+ }
+ return null;
+ }
+}
diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/Goal.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/Goal.java
new file mode 100644
index 0000000000..e4ab551c53
--- /dev/null
+++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/Goal.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator.rebalance.goal;
+
+import org.apache.fluss.exception.RebalanceFailureException;
+import org.apache.fluss.server.coordinator.rebalance.ActionAcceptance;
+import org.apache.fluss.server.coordinator.rebalance.ReBalancingAction;
+import org.apache.fluss.server.coordinator.rebalance.model.ClusterModel;
+import org.apache.fluss.server.coordinator.rebalance.model.ClusterModelStats;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.Comparator;
+import java.util.Set;
+
+/** This is the interface of the optimization goals used for rebalance. */
+public interface Goal {
+ Logger LOG = LoggerFactory.getLogger(Goal.class);
+
+ /**
+ * Optimize the given cluster model as needed for this goal.
+ *
+ * The method will be given a cluster model. The goal can try to optimize the cluster model
+ * by performing some admin operations(e.g. move replicas or leader of tableBuckets).
+ *
+ *
During the optimization, the implementation should make sure that all the previously
+ * optimized goals are still satisfied after this method completes its execution. The
+ * implementation can use {@link #actionAcceptance(ReBalancingAction, ClusterModel)} to check
+ * whether an admin operation is allowed by a previously optimized goals.
+ *
+ *
The implementation of a soft goal should return a boolean indicating whether the goal has
+ * been met after the optimization or not.
+ *
+ *
The implementation of a hard goal should throw an {@link RebalanceFailureException} when
+ * the goal cannot be met. This will then fail the entire optimization attempt.
+ */
+ void optimize(ClusterModel clusterModel, Set optimizedGoals);
+
+ /**
+ * Check whether the given action is acceptable by this goal in the given state of the cluster.
+ * An action is (1) accepted by a goal if it satisfies requirements of the goal, or (2) rejected
+ * by a goal if it violates its requirements. The return value indicates whether the action is
+ * accepted or why it is rejected.
+ */
+ ActionAcceptance actionAcceptance(ReBalancingAction action, ClusterModel clusterModel);
+
+ /**
+ * Get an instance of {@link ClusterModelStatsComparator} for this goal.
+ *
+ * The {@link ClusterModelStatsComparator#compare(ClusterModelStats, ClusterModelStats)}
+ * method should give a preference between two {@link ClusterModelStats}.
+ *
+ *
The returned value must not be null.
+ *
+ * @return An instance of {@link ClusterModelStatsComparator} for this goal.
+ */
+ ClusterModelStatsComparator clusterModelStatsComparator();
+
+ /**
+ * Signal for finishing the process for rebalance. It is intended to mark the goal optimization
+ * as finished and perform the memory clean up after the goal optimization.
+ */
+ void finish();
+
+ /**
+ * @return {@code true} if this is a hard goal, {@code false} otherwise.
+ */
+ boolean isHardGoal();
+
+ /**
+ * @return The name of this goal. Name of a goal provides an identification for the goal in
+ * human-readable format.
+ */
+ String name();
+
+ /**
+ * A comparator that compares two cluster model stats.
+ *
+ *
Note: this comparator imposes orderings that are inconsistent with equals.
+ */
+ interface ClusterModelStatsComparator extends Comparator, Serializable {
+
+ /**
+ * Compare two cluster model stats and determine which stats is preferred.
+ *
+ * @param stats1 the first stats
+ * @param stats2 the second stats
+ * @return Positive value if stats1 is preferred, 0 if the two stats are equally preferred,
+ * negative value if stats2 is preferred.
+ */
+ @Override
+ int compare(ClusterModelStats stats1, ClusterModelStats stats2);
+
+ /**
+ * This is a method to get the reason for the last comparison. The implementation should at
+ * least provide a reason when the last comparison returns negative value.
+ *
+ * @return A string that explains the result of last comparison.
+ */
+ String explainLastComparison();
+ }
+}
diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/GoalOptimizer.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/GoalOptimizer.java
new file mode 100644
index 0000000000..6a34c0122a
--- /dev/null
+++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/GoalOptimizer.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator.rebalance.goal;
+
+import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.server.coordinator.rebalance.model.ClusterModel;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.fluss.server.coordinator.rebalance.goal.GoalOptimizerUtils.getDiff;
+import static org.apache.fluss.server.coordinator.rebalance.goal.GoalOptimizerUtils.hasDiff;
+
+/** A class for optimizing goals in the given order of priority. */
+public class GoalOptimizer {
+ private static final Logger LOG = LoggerFactory.getLogger(GoalOptimizer.class);
+
+ public List doOptimizeOnce(
+ ClusterModel clusterModel, List goalsByPriority) {
+ LOG.trace("Cluster before optimization is {}", clusterModel);
+ Map> initReplicaDistribution =
+ clusterModel.getReplicaDistribution();
+ Map initLeaderDistribution = clusterModel.getLeaderDistribution();
+
+ // Set of balancing proposals that will be applied to the given cluster state to satisfy
+ // goals (leadership transfer AFTER bucket transfer.)
+ Set optimizedGoals = new HashSet<>();
+ Map> preOptimizedReplicaDistribution = null;
+ Map preOptimizedLeaderDistribution = null;
+ for (Goal goal : goalsByPriority) {
+ preOptimizedReplicaDistribution =
+ preOptimizedReplicaDistribution == null
+ ? initReplicaDistribution
+ : clusterModel.getReplicaDistribution();
+ preOptimizedLeaderDistribution =
+ preOptimizedLeaderDistribution == null
+ ? initLeaderDistribution
+ : clusterModel.getLeaderDistribution();
+
+ // executing the goal optimization.
+ goal.optimize(clusterModel, optimizedGoals);
+ optimizedGoals.add(goal);
+
+ boolean hasDiff =
+ hasDiff(
+ preOptimizedReplicaDistribution,
+ preOptimizedLeaderDistribution,
+ clusterModel);
+ LOG.info(
+ "[{}/{}] Generated {} proposals for {}",
+ optimizedGoals.size(),
+ goalsByPriority.size(),
+ hasDiff ? "some" : "no",
+ goal.name());
+ }
+
+ return getDiff(initReplicaDistribution, initLeaderDistribution, clusterModel);
+ }
+}
diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/GoalOptimizerUtils.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/GoalOptimizerUtils.java
new file mode 100644
index 0000000000..dadde867b2
--- /dev/null
+++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/GoalOptimizerUtils.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator.rebalance.goal;
+
+import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.server.coordinator.rebalance.ActionAcceptance;
+import org.apache.fluss.server.coordinator.rebalance.ReBalancingAction;
+import org.apache.fluss.server.coordinator.rebalance.model.BucketModel;
+import org.apache.fluss.server.coordinator.rebalance.model.ClusterModel;
+import org.apache.fluss.server.coordinator.rebalance.model.ReplicaModel;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.fluss.server.coordinator.rebalance.ActionAcceptance.ACCEPT;
+import static org.apache.fluss.utils.Preconditions.checkNotNull;
+
+/** An util class for {@link GoalOptimizer}. */
+public class GoalOptimizerUtils {
+
+ public static final double EPSILON = 1E-5;
+
+ /** Check whether the given proposal is acceptable for all the given optimized goals. */
+ public static ActionAcceptance isProposalAcceptableForOptimizedGoals(
+ Set optimizedGoals, ReBalancingAction action, ClusterModel cluster) {
+ for (Goal goal : optimizedGoals) {
+ ActionAcceptance acceptance = goal.actionAcceptance(action, cluster);
+ if (acceptance != ACCEPT) {
+ return acceptance;
+ }
+ }
+ return ACCEPT;
+ }
+
+ /**
+ * Compare the given values.
+ *
+ *
+ * 1. Return 1 if first
+ * 2. -1 if first
+ * 3. 0 otherwise.
+ *
+ */
+ public static int compare(double d1, double d2, double epsilon) {
+ if (d2 - d1 > epsilon) {
+ // Second value is larger than the first value.
+ return -1;
+ }
+ if (d1 - d2 > epsilon) {
+ // First value is larger than the second value.
+ return 1;
+ }
+ // Given values are approximately equal.
+ return 0;
+ }
+
+ /**
+ * Get whether there is any diff represented by a set of rebalance plan to move from the initial
+ * to final distribution.
+ */
+ public static boolean hasDiff(
+ Map> initialReplicaDistribution,
+ Map initialLeaderDistribution,
+ ClusterModel optimizedCluster) {
+ Map> finalReplicaDistribution =
+ optimizedCluster.getReplicaDistribution();
+ sanityCheckReplicaDistribution(initialReplicaDistribution, finalReplicaDistribution);
+
+ boolean hasDiff = false;
+ for (Map.Entry> entry : initialReplicaDistribution.entrySet()) {
+ TableBucket tableBucket = entry.getKey();
+ List initialReplicas = entry.getValue();
+ List finalReplicas = finalReplicaDistribution.get(tableBucket);
+
+ if (!finalReplicas.equals(initialReplicas)) {
+ hasDiff = true;
+ break;
+ } else {
+ BucketModel bucket = optimizedCluster.bucket(tableBucket);
+ checkNotNull(bucket, "Bucket is not in the cluster.");
+ ReplicaModel finalLeaderReplica = bucket.leader();
+ checkNotNull(finalLeaderReplica, "Leader replica is not in the bucket.");
+ Integer finalLeader = finalLeaderReplica.server().id();
+ if (!initialLeaderDistribution.get(tableBucket).equals(finalLeader)) {
+ hasDiff = true;
+ break;
+ }
+ // The bucket has no change.
+ }
+ }
+ return hasDiff;
+ }
+
+ /**
+ * Get the diff represented by the set of rebalance plan for bucket to move from initial to
+ * final distribution.
+ */
+ public static List getDiff(
+ Map> initialReplicaDistribution,
+ Map initialLeaderDistribution,
+ ClusterModel optimizedCluster) {
+ Map> finalReplicaDistribution =
+ optimizedCluster.getReplicaDistribution();
+ sanityCheckReplicaDistribution(initialReplicaDistribution, finalReplicaDistribution);
+
+ // Generate a set of rebalance plans to represent the diff between initial and final
+ // distribution.
+ List diff = new ArrayList<>();
+ for (Map.Entry> entry : initialReplicaDistribution.entrySet()) {
+ TableBucket tableBucket = entry.getKey();
+ List initialReplicas = entry.getValue();
+ List finalReplicas = finalReplicaDistribution.get(tableBucket);
+ BucketModel bucket = optimizedCluster.bucket(tableBucket);
+ checkNotNull(bucket, "Bucket is not in the cluster.");
+ ReplicaModel finalLeaderReplica = bucket.leader();
+ checkNotNull(finalLeaderReplica, "Leader replica is not in the bucket.");
+ int finalLeader = finalLeaderReplica.server().id();
+ // The bucket has no change.
+ if (finalReplicas.equals(initialReplicas)
+ && initialLeaderDistribution.get(tableBucket).equals(finalLeader)) {
+ continue;
+ }
+ // We need to adjust the final server list order to ensure the final leader is the first
+ // replica.
+ if (finalLeader != finalReplicas.get(0)) {
+ int leaderPos = finalReplicas.indexOf(finalLeader);
+ finalReplicas.set(leaderPos, finalReplicas.get(0));
+ finalReplicas.set(0, finalLeader);
+ }
+ diff.add(
+ new RebalancePlanForBucket(
+ tableBucket,
+ initialLeaderDistribution.get(tableBucket),
+ finalLeader,
+ initialReplicas,
+ finalReplicas));
+ }
+ return diff;
+ }
+
+ /**
+ * Sanity check to ensure that initial and final replica distribution have exactly the same
+ * buckets.
+ */
+ private static void sanityCheckReplicaDistribution(
+ Map> initialReplicaDistribution,
+ Map> finalReplicaDistribution) {
+ // Sanity check to make sure that given distributions contain the same replicas.
+ if (!initialReplicaDistribution.keySet().equals(finalReplicaDistribution.keySet())) {
+ throw new IllegalArgumentException(
+ "Initial and final replica distributions do not contain the same buckets.");
+ }
+ }
+}
diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/GoalUtils.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/GoalUtils.java
new file mode 100644
index 0000000000..81d137602b
--- /dev/null
+++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/GoalUtils.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator.rebalance.goal;
+
+import org.apache.fluss.cluster.rebalance.GoalType;
+import org.apache.fluss.server.coordinator.rebalance.ActionType;
+import org.apache.fluss.server.coordinator.rebalance.model.ClusterModel;
+import org.apache.fluss.server.coordinator.rebalance.model.ReplicaModel;
+import org.apache.fluss.server.coordinator.rebalance.model.ServerModel;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/** An util class for {@link Goal}. */
+public class GoalUtils {
+
+ public static Goal getGoalByType(GoalType goalType) {
+ switch (goalType) {
+ case REPLICA_DISTRIBUTION_GOAL:
+ return new ReplicaDistributionGoal();
+ case LEADER_DISTRIBUTION_GOAL:
+ return new LeaderReplicaDistributionGoal();
+ default:
+ throw new IllegalArgumentException("Unsupported goal type " + goalType);
+ }
+ }
+
+ /**
+ * Check whether the proposed action is legit. An action is legit if it is:
+ *
+ *
+ * - 1. a replica movement across tabletServers, the dest server does not have a replica of
+ * the same bucket and is allowed to have a replica from the bucket
+ *
- a leadership movement, the replica is a leader and the dest server has a follower of
+ * the same bucket
+ *
+ */
+ public static boolean legitMove(
+ ReplicaModel replica,
+ ServerModel destServer,
+ ClusterModel cluster,
+ ActionType actionType) {
+ switch (actionType) {
+ case REPLICA_MOVEMENT:
+ return cluster.bucket(replica.tableBucket()).canAssignReplicaToServer(destServer)
+ && destServer.replica(replica.tableBucket()) == null;
+ case LEADERSHIP_MOVEMENT:
+ return replica.isLeader() && destServer.replica(replica.tableBucket()) != null;
+ default:
+ return false;
+ }
+ }
+
+ /**
+ * Retrieve alive servers ids that are not excluded for replica moves. Returns a set to provide
+ * constant time lookup guaranteed by a HashSet.
+ */
+ public static Set aliveServersNotExcludeForReplicaMove(ClusterModel cluster) {
+ return cluster.aliveServers().stream()
+ .map(ServerModel::id)
+ .collect(Collectors.toCollection(HashSet::new));
+ }
+}
diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/LeaderReplicaDistributionGoal.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/LeaderReplicaDistributionGoal.java
new file mode 100644
index 0000000000..8b5faea263
--- /dev/null
+++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/LeaderReplicaDistributionGoal.java
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator.rebalance.goal;
+
+import org.apache.fluss.exception.RebalanceFailureException;
+import org.apache.fluss.server.coordinator.rebalance.ActionAcceptance;
+import org.apache.fluss.server.coordinator.rebalance.ActionType;
+import org.apache.fluss.server.coordinator.rebalance.ReBalancingAction;
+import org.apache.fluss.server.coordinator.rebalance.model.BucketModel;
+import org.apache.fluss.server.coordinator.rebalance.model.ClusterModel;
+import org.apache.fluss.server.coordinator.rebalance.model.ClusterModelStats;
+import org.apache.fluss.server.coordinator.rebalance.model.ReplicaModel;
+import org.apache.fluss.server.coordinator.rebalance.model.ServerModel;
+import org.apache.fluss.server.coordinator.rebalance.model.Statistic;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+
+import static org.apache.fluss.server.coordinator.rebalance.ActionAcceptance.ACCEPT;
+import static org.apache.fluss.server.coordinator.rebalance.ActionAcceptance.REPLICA_REJECT;
+import static org.apache.fluss.server.coordinator.rebalance.goal.GoalOptimizerUtils.EPSILON;
+import static org.apache.fluss.server.coordinator.rebalance.goal.ReplicaDistributionAbstractGoal.ChangeType.ADD;
+import static org.apache.fluss.server.coordinator.rebalance.goal.ReplicaDistributionAbstractGoal.ChangeType.REMOVE;
+import static org.apache.fluss.utils.Preconditions.checkNotNull;
+
+/**
+ * Soft goal to generate leadership movement and leader replica movement proposals to ensure that
+ * the number of leader replicas on each server is.
+ *
+ *
+ * - Under: (the average number of leader replicas per server) * (1 + leader replica count
+ * balance percentage)
+ *
- Above: (the average number of leader replicas per server) * Math.max(0, 1 - leader replica
+ * count balance percentage)
+ *
+ */
+public class LeaderReplicaDistributionGoal extends ReplicaDistributionAbstractGoal {
+
+ private static final Logger LOG = LoggerFactory.getLogger(LeaderReplicaDistributionGoal.class);
+
+ /**
+ * The maximum allowed extent of unbalance for leader replica distribution. For example, 1.10
+ * means the highest leader replica count of a server should not be 1.10x of average leader
+ * replica count of all alive tabletServers.
+ */
+ private static final Double LEADER_REPLICA_COUNT_REBALANCE_THRESHOLD = 1.10d;
+
+ @Override
+ public ActionAcceptance actionAcceptance(ReBalancingAction action, ClusterModel clusterModel) {
+ ServerModel sourceServer = clusterModel.server(action.getSourceServerId());
+ checkNotNull(
+ sourceServer, "Source server " + action.getSourceServerId() + " is not found.");
+ ReplicaModel sourceReplica = sourceServer.replica(action.getTableBucket());
+ checkNotNull(sourceReplica, "Source replica " + action.getTableBucket() + " is not found.");
+ ServerModel destServer = clusterModel.server(action.getDestinationServerId());
+ switch (action.getActionType()) {
+ case LEADERSHIP_MOVEMENT:
+ return isLeaderMovementSatisfiable(sourceServer, destServer);
+ case REPLICA_MOVEMENT:
+ if (sourceReplica.isLeader()) {
+ return isLeaderMovementSatisfiable(sourceServer, destServer);
+ }
+ return ACCEPT;
+ default:
+ throw new IllegalArgumentException(
+ "Unsupported action type " + action.getActionType());
+ }
+ }
+
+ @Override
+ protected void rebalanceForServer(
+ ServerModel server, ClusterModel clusterModel, Set optimizedGoals)
+ throws RebalanceFailureException {
+ LOG.debug(
+ "Rebalancing server {} [limits] lower: {} upper: {}.",
+ server.id(),
+ rebalanceLowerLimit,
+ rebalanceUpperLimit);
+ int numLeaderReplicas = server.leaderReplicas().size();
+ boolean isExcludedForReplicaMove = isExcludedForReplicaMove(server);
+ boolean requireLessLeaderReplicas =
+ numLeaderReplicas > (isExcludedForReplicaMove ? 0 : rebalanceUpperLimit)
+ || !server.isAlive();
+ boolean requireMoreLeaderReplicas =
+ !isExcludedForReplicaMove
+ && server.isAlive()
+ && numLeaderReplicas < rebalanceLowerLimit;
+ // Update server ids over the balance limit for logging purposes.
+ if (((requireLessLeaderReplicas
+ && rebalanceByMovingLeadershipOut(server, clusterModel, optimizedGoals)))
+ && rebalanceByMovingReplicasOut(server, clusterModel, optimizedGoals)) {
+ serverIdsAboveRebalanceUpperLimit.add(server.id());
+ LOG.debug(
+ "Failed to sufficiently decrease leader replica count in server {}. Leader replicas: {}.",
+ server.id(),
+ server.leaderReplicas().size());
+ } else if (requireMoreLeaderReplicas
+ && rebalanceByMovingLeadershipIn(server, clusterModel, optimizedGoals)
+ && rebalanceByMovingLeaderReplicasIn(server, clusterModel, optimizedGoals)) {
+ serverIdsBelowRebalanceLowerLimit.add(server.id());
+ LOG.debug(
+ "Failed to sufficiently increase leader replica count in server {}. Leader replicas: {}.",
+ server.id(),
+ server.leaderReplicas().size());
+ }
+ }
+
+ @Override
+ public ClusterModelStatsComparator clusterModelStatsComparator() {
+ return new LeaderReplicaDistributionGoalStatsComparator();
+ }
+
+ @Override
+ int numInterestedReplicas(ClusterModel clusterModel) {
+ return clusterModel.numLeaderReplicas();
+ }
+
+ @Override
+ double balancePercentage() {
+ return LEADER_REPLICA_COUNT_REBALANCE_THRESHOLD;
+ }
+
+ private ActionAcceptance isLeaderMovementSatisfiable(
+ ServerModel sourceServer, ServerModel destServer) {
+ return (isReplicaCountUnderBalanceUpperLimitAfterChange(
+ destServer, destServer.leaderReplicas().size(), ADD)
+ && (isExcludedForReplicaMove(sourceServer)
+ || isReplicaCountAboveBalanceLowerLimitAfterChange(
+ sourceServer,
+ sourceServer.leaderReplicas().size(),
+ REMOVE)))
+ ? ACCEPT
+ : REPLICA_REJECT;
+ }
+
+ private boolean rebalanceByMovingLeadershipOut(
+ ServerModel server, ClusterModel cluster, Set optimizedGoals) {
+ // If the source server is excluded for replica move, set its upper limit to 0.
+ int balanceUpperLimitForSourceServer =
+ isExcludedForReplicaMove(server) ? 0 : rebalanceUpperLimit;
+ int numLeaderReplicas = server.leaderReplicas().size();
+ for (ReplicaModel leader : new HashSet<>(server.leaderReplicas())) {
+ BucketModel bucketModel = cluster.bucket(leader.tableBucket());
+ checkNotNull(bucketModel, "Bucket " + leader.tableBucket() + " is not found.");
+ Set candidateServers =
+ bucketModel.bucketServers().stream()
+ .filter(b -> b != server)
+ .collect(Collectors.toSet());
+ ServerModel b =
+ maybeApplyBalancingAction(
+ cluster,
+ leader,
+ candidateServers,
+ ActionType.LEADERSHIP_MOVEMENT,
+ optimizedGoals);
+ // Only check if we successfully moved something.
+ if (b != null) {
+ if (--numLeaderReplicas <= balanceUpperLimitForSourceServer) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+ private boolean rebalanceByMovingLeadershipIn(
+ ServerModel server, ClusterModel cluster, Set optimizedGoals) {
+ int numLeaderReplicas = server.leaderReplicas().size();
+ Set candidateServers = Collections.singleton(server);
+ for (ReplicaModel replica : server.replicas()) {
+ if (replica.isLeader()) {
+ continue;
+ }
+
+ BucketModel bucket = cluster.bucket(replica.tableBucket());
+ checkNotNull(bucket, "Bucket " + replica.tableBucket() + " is not found.");
+ ServerModel b =
+ maybeApplyBalancingAction(
+ cluster,
+ Objects.requireNonNull(bucket.leader()),
+ candidateServers,
+ ActionType.LEADERSHIP_MOVEMENT,
+ optimizedGoals);
+ // Only check if we successfully moved something.
+ if (b != null) {
+ if (++numLeaderReplicas >= rebalanceLowerLimit) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+ private boolean rebalanceByMovingReplicasOut(
+ ServerModel server, ClusterModel cluster, Set optimizedGoals) {
+ // Get the eligible servers.
+ SortedSet candidateServers;
+ candidateServers =
+ new TreeSet<>(
+ Comparator.comparingInt((ServerModel b) -> b.leaderReplicas().size())
+ .thenComparingInt(ServerModel::id));
+ candidateServers.addAll(
+ cluster.aliveServers().stream()
+ .filter(b -> b.leaderReplicas().size() < rebalanceUpperLimit)
+ .collect(Collectors.toSet()));
+
+ int balanceUpperLimit = rebalanceUpperLimit;
+ int numReplicas = server.replicas().size();
+ for (ReplicaModel replica : server.replicas()) {
+ ServerModel b =
+ maybeApplyBalancingAction(
+ cluster,
+ replica,
+ candidateServers,
+ ActionType.REPLICA_MOVEMENT,
+ optimizedGoals);
+ // Only check if we successfully moved something.
+ if (b != null) {
+ if (--numReplicas <= balanceUpperLimit) {
+ return false;
+ }
+ // Remove and reinsert the server so the order is correct.
+ candidateServers.remove(b);
+ if (b.leaderReplicas().size() < rebalanceUpperLimit) {
+ candidateServers.add(b);
+ }
+ }
+ }
+ return true;
+ }
+
+ private boolean rebalanceByMovingLeaderReplicasIn(
+ ServerModel server, ClusterModel clusterModel, Set optimizedGoals) {
+ PriorityQueue eligibleServers =
+ new PriorityQueue<>(
+ (b1, b2) -> {
+ int result =
+ Integer.compare(
+ b2.leaderReplicas().size(), b1.leaderReplicas().size());
+ return result == 0 ? Integer.compare(b1.id(), b2.id()) : result;
+ });
+
+ for (ServerModel aliveServer : clusterModel.aliveServers()) {
+ if (aliveServer.leaderReplicas().size() > rebalanceLowerLimit) {
+ eligibleServers.add(aliveServer);
+ }
+ }
+ List candidateServers = Collections.singletonList(server);
+ int numLeaderReplicas = server.leaderReplicas().size();
+ while (!eligibleServers.isEmpty()) {
+ ServerModel sourceServer = eligibleServers.poll();
+ for (ReplicaModel replica : sourceServer.replicas()) {
+ ServerModel b =
+ maybeApplyBalancingAction(
+ clusterModel,
+ replica,
+ candidateServers,
+ ActionType.REPLICA_MOVEMENT,
+ optimizedGoals);
+ // Only need to check status if the action is taken. This will also handle the case
+ // that the source server has nothing to move in. In that case we will never
+ // reenqueue that source server.
+ if (b != null) {
+ if (++numLeaderReplicas >= rebalanceLowerLimit) {
+ return false;
+ }
+ // If the source server has a lower number of leader replicas than the next
+ // server in the eligible server queue, we reenqueue the source server and
+ // switch to the next server.
+ if (!eligibleServers.isEmpty()
+ && sourceServer.leaderReplicas().size()
+ < eligibleServers.peek().leaderReplicas().size()) {
+ eligibleServers.add(sourceServer);
+ break;
+ }
+ }
+ }
+ }
+ return true;
+ }
+
+ private class LeaderReplicaDistributionGoalStatsComparator
+ implements ClusterModelStatsComparator {
+ private String reasonForLastNegativeResult;
+
+ @Override
+ public int compare(ClusterModelStats stats1, ClusterModelStats stats2) {
+ // Standard deviation of number of leader replicas over alive servers in the current
+ // must be less than the pre-optimized stats.
+ double stDev1 = stats1.leaderReplicaStats().get(Statistic.ST_DEV).doubleValue();
+ double stDev2 = stats2.leaderReplicaStats().get(Statistic.ST_DEV).doubleValue();
+ int result = GoalOptimizerUtils.compare(stDev2, stDev1, EPSILON);
+ if (result < 0) {
+ reasonForLastNegativeResult =
+ String.format(
+ "Violated %s. [Std Deviation of Leader Replica Distribution] post-"
+ + "optimization:%.3f pre-optimization:%.3f",
+ name(), stDev1, stDev2);
+ }
+ return result;
+ }
+
+ @Override
+ public String explainLastComparison() {
+ return reasonForLastNegativeResult;
+ }
+ }
+}
diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/ReplicaDistributionAbstractGoal.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/ReplicaDistributionAbstractGoal.java
new file mode 100644
index 0000000000..cbd55305c5
--- /dev/null
+++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/ReplicaDistributionAbstractGoal.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator.rebalance.goal;
+
+import org.apache.fluss.exception.RebalanceFailureException;
+import org.apache.fluss.server.coordinator.rebalance.ReBalancingAction;
+import org.apache.fluss.server.coordinator.rebalance.model.ClusterModel;
+import org.apache.fluss.server.coordinator.rebalance.model.ServerModel;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.fluss.server.coordinator.rebalance.ActionAcceptance.ACCEPT;
+import static org.apache.fluss.server.coordinator.rebalance.goal.GoalUtils.aliveServersNotExcludeForReplicaMove;
+
+/** An abstract class for goals that are based on the distribution of replicas. */
+public abstract class ReplicaDistributionAbstractGoal extends AbstractGoal {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ReplicaDistributionAbstractGoal.class);
+ private static final double BALANCE_MARGIN = 0.9;
+ protected final Set serverIdsAboveRebalanceUpperLimit;
+ protected final Set serverIdsBelowRebalanceLowerLimit;
+ protected double avgReplicasOnAliveServer;
+ protected int rebalanceUpperLimit;
+ protected int rebalanceLowerLimit;
+ // This is used to identify servers not excluded for replica moves.
+ protected Set serversAllowedReplicaRemove;
+
+ public ReplicaDistributionAbstractGoal() {
+ serverIdsAboveRebalanceUpperLimit = new HashSet<>();
+ serverIdsBelowRebalanceLowerLimit = new HashSet<>();
+ }
+
+ private int rebalanceUpperLimit(double balancePercentage) {
+ return (int)
+ Math.ceil(
+ avgReplicasOnAliveServer
+ * (1 + adjustedRebalancePercentage(balancePercentage)));
+ }
+
+ private int rebalanceLowerLimit(double balancePercentage) {
+ return (int)
+ Math.floor(
+ avgReplicasOnAliveServer
+ * Math.max(
+ 0, (1 - adjustedRebalancePercentage(balancePercentage))));
+ }
+
+ private double adjustedRebalancePercentage(double rebalancePercentage) {
+ return (rebalancePercentage - 1) * BALANCE_MARGIN;
+ }
+
+ boolean isReplicaCountUnderBalanceUpperLimitAfterChange(
+ ServerModel server, int currentReplicaCount, ChangeType changeType) {
+ int serverBalanceUpperLimit = server.isAlive() ? rebalanceUpperLimit : 0;
+
+ return changeType == ChangeType.ADD
+ ? currentReplicaCount + 1 <= serverBalanceUpperLimit
+ : currentReplicaCount - 1 <= serverBalanceUpperLimit;
+ }
+
+ boolean isReplicaCountAboveBalanceLowerLimitAfterChange(
+ ServerModel server, int currentReplicaCount, ChangeType changeType) {
+ int serverBalanceLowerLimit = server.isAlive() ? rebalanceLowerLimit : 0;
+
+ return changeType == ChangeType.ADD
+ ? currentReplicaCount + 1 >= serverBalanceLowerLimit
+ : currentReplicaCount - 1 >= serverBalanceLowerLimit;
+ }
+
+ @Override
+ public boolean isHardGoal() {
+ return false;
+ }
+
+ @Override
+ protected void initGoalState(ClusterModel clusterModel) throws RebalanceFailureException {
+ serversAllowedReplicaRemove = aliveServersNotExcludeForReplicaMove(clusterModel);
+ if (serversAllowedReplicaRemove.isEmpty()) {
+ throw new RebalanceFailureException(
+ String.format(
+ "[%s] All alive tabletServers are excluded from replica moves.",
+ name()));
+ }
+
+ // Initialize the average replicas on an alive server.
+ avgReplicasOnAliveServer =
+ numInterestedReplicas(clusterModel) / (double) serversAllowedReplicaRemove.size();
+
+ rebalanceUpperLimit = rebalanceUpperLimit(balancePercentage());
+ rebalanceLowerLimit = rebalanceLowerLimit(balancePercentage());
+ }
+
+ @Override
+ protected boolean selfSatisfied(ClusterModel clusterModel, ReBalancingAction action) {
+ // Check that destination and source would not become unbalanced.
+ return actionAcceptance(action, clusterModel) == ACCEPT;
+ }
+
+ @Override
+ protected void updateGoalState(ClusterModel clusterModel) throws RebalanceFailureException {
+ if (!serverIdsAboveRebalanceUpperLimit.isEmpty()) {
+ LOG.debug(
+ "Replicas count on server ids:{} {} above the balance limit of {} after rebalance.",
+ serverIdsAboveRebalanceUpperLimit,
+ (serverIdsAboveRebalanceUpperLimit.size() > 1) ? "are" : "is",
+ rebalanceUpperLimit);
+ serverIdsAboveRebalanceUpperLimit.clear();
+ succeeded = false;
+ }
+
+ if (!serverIdsBelowRebalanceLowerLimit.isEmpty()) {
+ LOG.debug(
+ "Replicas count on server ids:{} {} below the balance limit of {} after rebalance.",
+ serverIdsBelowRebalanceLowerLimit,
+ (serverIdsBelowRebalanceLowerLimit.size() > 1) ? "are" : "is",
+ rebalanceLowerLimit);
+ serverIdsBelowRebalanceLowerLimit.clear();
+ succeeded = false;
+ }
+
+ // TODO maybe need check offline server.
+
+ finish();
+ }
+
+ abstract int numInterestedReplicas(ClusterModel clusterModel);
+
+ /**
+ * @return The requested balance threshold.
+ */
+ abstract double balancePercentage();
+
+ protected boolean isExcludedForReplicaMove(ServerModel server) {
+ return !serversAllowedReplicaRemove.contains(server.id());
+ }
+
+ /** Whether bring replica in or out. */
+ protected enum ChangeType {
+ ADD,
+ REMOVE
+ }
+}
diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/ReplicaDistributionGoal.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/ReplicaDistributionGoal.java
new file mode 100644
index 0000000000..b70c5ca622
--- /dev/null
+++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/goal/ReplicaDistributionGoal.java
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator.rebalance.goal;
+
+import org.apache.fluss.exception.RebalanceFailureException;
+import org.apache.fluss.server.coordinator.rebalance.ActionAcceptance;
+import org.apache.fluss.server.coordinator.rebalance.ActionType;
+import org.apache.fluss.server.coordinator.rebalance.ReBalancingAction;
+import org.apache.fluss.server.coordinator.rebalance.model.ClusterModel;
+import org.apache.fluss.server.coordinator.rebalance.model.ClusterModelStats;
+import org.apache.fluss.server.coordinator.rebalance.model.ReplicaModel;
+import org.apache.fluss.server.coordinator.rebalance.model.ServerModel;
+import org.apache.fluss.server.coordinator.rebalance.model.Statistic;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+
+import static org.apache.fluss.server.coordinator.rebalance.ActionAcceptance.ACCEPT;
+import static org.apache.fluss.server.coordinator.rebalance.ActionAcceptance.REPLICA_REJECT;
+import static org.apache.fluss.server.coordinator.rebalance.goal.GoalOptimizerUtils.EPSILON;
+import static org.apache.fluss.server.coordinator.rebalance.goal.ReplicaDistributionAbstractGoal.ChangeType.ADD;
+import static org.apache.fluss.server.coordinator.rebalance.goal.ReplicaDistributionAbstractGoal.ChangeType.REMOVE;
+import static org.apache.fluss.utils.Preconditions.checkNotNull;
+
+/**
+ * Soft goal to generate replica movement proposals to ensure that the number of replicas on each
+ * server is.
+ *
+ *
+ * - Under: (the average number of replicas per server) * (1 + replica count balance percentage)
+ *
- Above: (the average number of replicas per server) * Math.max(0, 1 - replica count balance
+ * percentage)
+ *
+ */
+public class ReplicaDistributionGoal extends ReplicaDistributionAbstractGoal {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ReplicaDistributionGoal.class);
+
+ // TODO configurable.
+ /**
+ * The maximum allowed extent of unbalance for replica leader replica distribution. For example,
+ * 1.10 means the highest leader replica count of a server should not be 1.10x of average leader
+ * replica count of all alive tabletServers.
+ */
+ private static final Double REPLICA_COUNT_REBALANCE_THRESHOLD = 1.10d;
+
+ @Override
+ public ActionAcceptance actionAcceptance(ReBalancingAction action, ClusterModel clusterModel) {
+ switch (action.getActionType()) {
+ case LEADERSHIP_MOVEMENT:
+ return ACCEPT;
+ case REPLICA_MOVEMENT:
+ ServerModel sourceServer = clusterModel.server(action.getSourceServerId());
+ ServerModel destServer = clusterModel.server(action.getDestinationServerId());
+
+ checkNotNull(
+ sourceServer,
+ "Source server " + action.getSourceServerId() + " is not found.");
+ checkNotNull(
+ destServer,
+ "Destination server " + action.getDestinationServerId() + " is not found.");
+
+ // Check that destination and source would not become unbalanced.
+ return (isReplicaCountUnderBalanceUpperLimitAfterChange(
+ destServer, destServer.replicas().size(), ADD))
+ && (isExcludedForReplicaMove(sourceServer)
+ || isReplicaCountAboveBalanceLowerLimitAfterChange(
+ sourceServer,
+ sourceServer.replicas().size(),
+ REMOVE))
+ ? ACCEPT
+ : REPLICA_REJECT;
+ default:
+ throw new IllegalArgumentException(
+ "Unsupported balancing action " + action.getActionType() + " is provided.");
+ }
+ }
+
+ @Override
+ protected void rebalanceForServer(
+ ServerModel server, ClusterModel clusterModel, Set optimizedGoals)
+ throws RebalanceFailureException {
+ LOG.debug(
+ "Rebalancing server {} [limits] lower: {} upper: {}.",
+ server.id(),
+ rebalanceLowerLimit,
+ rebalanceUpperLimit);
+ int numReplicas = server.replicas().size();
+ boolean isExcludeForReplicaMove = isExcludedForReplicaMove(server);
+
+ boolean requireLessReplicas =
+ numReplicas > rebalanceUpperLimit || isExcludeForReplicaMove || !server.isAlive();
+ boolean requireMoreReplicas =
+ !isExcludeForReplicaMove && server.isAlive() && numReplicas < rebalanceLowerLimit;
+ if (!requireMoreReplicas && !requireLessReplicas) {
+ // return if the server is already within the limit.
+ return;
+ }
+
+ if (requireLessReplicas
+ && rebalanceByMovingReplicasOut(server, clusterModel, optimizedGoals)) {
+ serverIdsAboveRebalanceUpperLimit.add(server.id());
+ LOG.debug(
+ "Failed to sufficiently decrease replica count in server {} with replica movements. "
+ + "Replicas number after remove: {}.",
+ server.id(),
+ server.replicas().size());
+ }
+
+ if (requireMoreReplicas
+ && rebalanceByMovingReplicasIn(server, clusterModel, optimizedGoals)) {
+ serverIdsBelowRebalanceLowerLimit.add(server.id());
+ LOG.debug(
+ "Failed to sufficiently increase replica count in server {} with replica movements. "
+ + "Replicas number after remove: {}.",
+ server.id(),
+ server.replicas().size());
+ }
+
+ if (!serverIdsAboveRebalanceUpperLimit.contains(server.id())
+ && !serverIdsBelowRebalanceLowerLimit.contains(server.id())) {
+ LOG.debug(
+ "Successfully balanced replica count for server {} by moving replicas. "
+ + "Replicas number after remove: {}",
+ server.id(),
+ server.replicas().size());
+ }
+ }
+
+ @Override
+ public ClusterModelStatsComparator clusterModelStatsComparator() {
+ return new ReplicaDistributionGoalStatsComparator();
+ }
+
+ @Override
+ int numInterestedReplicas(ClusterModel clusterModel) {
+ return clusterModel.numReplicas();
+ }
+
+ @Override
+ double balancePercentage() {
+ return REPLICA_COUNT_REBALANCE_THRESHOLD;
+ }
+
+ private boolean rebalanceByMovingReplicasOut(
+ ServerModel server, ClusterModel cluster, Set optimizedGoals) {
+ SortedSet candidateServers =
+ new TreeSet<>(
+ Comparator.comparingInt((ServerModel b) -> b.replicas().size())
+ .thenComparingInt(ServerModel::id));
+
+ candidateServers.addAll(
+ cluster.aliveServers().stream()
+ .filter(b -> b.replicas().size() < rebalanceUpperLimit)
+ .collect(Collectors.toSet()));
+ int balanceUpperLimitForSourceServer =
+ isExcludedForReplicaMove(server) ? 0 : rebalanceUpperLimit;
+
+ // Now let's do the replica out operation.
+ // TODO maybe use a sorted replicas set
+ for (ReplicaModel replica : server.replicas()) {
+ ServerModel b =
+ maybeApplyBalancingAction(
+ cluster,
+ replica,
+ candidateServers,
+ ActionType.REPLICA_MOVEMENT,
+ optimizedGoals);
+ // Only check if we successfully moved something.
+ if (b != null) {
+ if (server.replicas().size() <= balanceUpperLimitForSourceServer) {
+ return false;
+ }
+
+ // Remove and reinsert the server so the order is correct.
+ candidateServers.remove(b);
+ if (b.replicas().size() < rebalanceUpperLimit) {
+ candidateServers.add(b);
+ }
+ }
+ }
+
+ return !server.replicas().isEmpty();
+ }
+
+ private boolean rebalanceByMovingReplicasIn(
+ ServerModel aliveDestServer, ClusterModel cluster, Set optimizedGoals) {
+ PriorityQueue eligibleServers =
+ new PriorityQueue<>(
+ (b1, b2) -> {
+ // Servers are sorted by (1) all replica count then (2) server id.
+ int resultByAllReplicas =
+ Integer.compare(b2.replicas().size(), b1.replicas().size());
+ return resultByAllReplicas == 0
+ ? Integer.compare(b1.id(), b2.id())
+ : resultByAllReplicas;
+ });
+
+ // Source server can be offline, alive.
+ for (ServerModel sourceServer : cluster.servers()) {
+ if (sourceServer.replicas().size() > rebalanceLowerLimit
+ || isExcludedForReplicaMove(sourceServer)) {
+ eligibleServers.add(sourceServer);
+ }
+ }
+
+ List candidateServers = Collections.singletonList(aliveDestServer);
+ while (!eligibleServers.isEmpty()) {
+ ServerModel sourceServer = eligibleServers.poll();
+ // TODO maybe use a sorted replicas set
+ for (ReplicaModel replica : sourceServer.replicas()) {
+ ServerModel b =
+ maybeApplyBalancingAction(
+ cluster,
+ replica,
+ candidateServers,
+ ActionType.REPLICA_MOVEMENT,
+ optimizedGoals);
+ // Only need to check status if the action is taken. This will also handle the case
+ // that the source server has nothing to move in. In that case we will never
+ // re-enqueue that source server.
+ if (b != null) {
+ if (aliveDestServer.replicas().size() >= rebalanceLowerLimit) {
+ // Note that the server passed to this method is always alive; hence, there
+ // is no need to check if it is dead.
+ return false;
+ }
+
+ if (!eligibleServers.isEmpty()) {
+ if (sourceServer.replicas().size()
+ < eligibleServers.peek().replicas().size()) {
+ eligibleServers.add(sourceServer);
+ break;
+ }
+ }
+ }
+ }
+ }
+ return true;
+ }
+
+ private class ReplicaDistributionGoalStatsComparator implements ClusterModelStatsComparator {
+ private String reasonForLastNegativeResult;
+
+ @Override
+ public int compare(ClusterModelStats stats1, ClusterModelStats stats2) {
+ // Standard deviation of number of replicas over servers not excluded for replica moves
+ // must be less than the
+ // pre-optimized stats.
+ double stDev1 = stats1.replicaStats().get(Statistic.ST_DEV).doubleValue();
+ double stDev2 = stats2.replicaStats().get(Statistic.ST_DEV).doubleValue();
+ int result = GoalOptimizerUtils.compare(stDev2, stDev1, EPSILON);
+ if (result < 0) {
+ reasonForLastNegativeResult =
+ String.format(
+ "Violated %s. [Std Deviation of Replica Distribution] post-"
+ + "optimization:%.3f pre-optimization:%.3f",
+ name(), stDev1, stDev2);
+ }
+ return result;
+ }
+
+ @Override
+ public String explainLastComparison() {
+ return reasonForLastNegativeResult;
+ }
+ }
+}
diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/model/BucketModel.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/model/BucketModel.java
new file mode 100644
index 0000000000..9aff7b0b80
--- /dev/null
+++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/model/BucketModel.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator.rebalance.model;
+
+import org.apache.fluss.metadata.TableBucket;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/** A class that holds the information of the {@link TableBucket} for rebalance. */
+public class BucketModel {
+ private final TableBucket tableBucket;
+ private final List replicas;
+ private @Nullable ReplicaModel leader;
+ // Set of server which are unable to host replica of this replica (such as: the server are
+ // offline).
+ private final Set ineligibleServers;
+
+ public BucketModel(TableBucket tableBucket, Set ineligibleServers) {
+ this.tableBucket = tableBucket;
+ this.replicas = new ArrayList<>();
+ this.leader = null;
+ this.ineligibleServers = ineligibleServers;
+ }
+
+ public TableBucket tableBucket() {
+ return tableBucket;
+ }
+
+ public @Nullable ReplicaModel leader() {
+ return leader;
+ }
+
+ public List replicas() {
+ return replicas;
+ }
+
+ public Set bucketServers() {
+ Set bucketServers = new HashSet<>();
+ replicas.forEach(replica -> bucketServers.add(replica.server()));
+ return bucketServers;
+ }
+
+ public boolean canAssignReplicaToServer(ServerModel candidateServer) {
+ return !ineligibleServers.contains(candidateServer);
+ }
+
+ public ReplicaModel replica(long serverId) {
+ for (ReplicaModel replica : replicas) {
+ if (replica.server().id() == serverId) {
+ return replica;
+ }
+ }
+
+ throw new IllegalArgumentException(
+ "Requested replica " + serverId + " is not a replica of bucket " + tableBucket);
+ }
+
+ public void addLeader(ReplicaModel leader, int index) {
+ if (this.leader != null) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Bucket %s already has a leader replica %s. Cannot add a new leader replica %s.",
+ tableBucket, this.leader, leader));
+ }
+
+ if (!leader.isLeader()) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Inconsistent leadership information. Trying to set %s as the leader for bucket %s while "
+ + "the replica is not marked as a leader",
+ leader, tableBucket));
+ }
+
+ this.leader = leader;
+ replicas.add(index, leader);
+ }
+
+ public void addFollower(ReplicaModel follower, int index) {
+ if (follower.isLeader()) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Inconsistent leadership information. Trying to set %s as the follower for bucket %s while "
+ + "the replica is marked as a leader",
+ follower, tableBucket));
+ }
+
+ if (!follower.tableBucket().equals(this.tableBucket)) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Inconsistent table bucket. Trying to add follower replica %s to tableBucket %s",
+ follower, tableBucket));
+ }
+
+ // Add follower to list of followers
+ replicas.add(index, follower);
+ }
+
+ void relocateLeadership(ReplicaModel prospectiveLeader) {
+ int leaderPos = replicas.indexOf(prospectiveLeader);
+ swapReplicaPositions(0, leaderPos);
+ leader = prospectiveLeader;
+ }
+
+ private void swapReplicaPositions(int index1, int index2) {
+ ReplicaModel replica1 = replicas.get(index1);
+ ReplicaModel replica2 = replicas.get(index2);
+
+ replicas.set(index2, replica1);
+ replicas.set(index1, replica2);
+ }
+}
diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/model/ClusterModel.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/model/ClusterModel.java
new file mode 100644
index 0000000000..884beafbed
--- /dev/null
+++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/model/ClusterModel.java
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator.rebalance.model;
+
+import org.apache.fluss.metadata.TableBucket;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+
+/**
+ * A class that holds the information of the cluster for rebalance.The information including live
+ * tabletServers, bucket distribution, tabletServer tag etc.
+ *
+ * Currently, the clusterModel can only be created by a rebalance request. It's used as the input
+ * of the GoalOptimizer to generate the rebalance plan for load rebalance.
+ */
+public class ClusterModel {
+ // TODO ClusterModel can be implemented in incremental mode, dynamically modified when there are
+ // events such as table create, table delete, server offline, etc. Currently designed to read
+ // coordinatorContext and generate it directly
+
+ private final Map racksById;
+ private final Map serverIdToRack;
+ private final Set aliveServers;
+ private final SortedSet offlineServers;
+ private final SortedSet servers;
+ private final Map bucketsByTableBucket;
+
+ public ClusterModel(SortedSet servers) {
+ this.servers = servers;
+ this.bucketsByTableBucket = new HashMap<>();
+
+ this.aliveServers = new HashSet<>();
+ this.offlineServers = new TreeSet<>();
+ for (ServerModel serverModel : servers) {
+ if (serverModel.isAlive()) {
+ aliveServers.add(serverModel);
+ } else {
+ offlineServers.add(serverModel);
+ }
+ }
+
+ this.racksById = new HashMap<>();
+ this.serverIdToRack = new HashMap<>();
+ for (ServerModel serverModel : servers) {
+ RackModel rackModel = racksById.computeIfAbsent(serverModel.rack(), RackModel::new);
+ rackModel.addServer(serverModel);
+ serverIdToRack.put(serverModel.id(), rackModel);
+ }
+ }
+
+ public SortedSet offlineServers() {
+ return offlineServers;
+ }
+
+ public SortedSet servers() {
+ return servers;
+ }
+
+ public Set aliveServers() {
+ return Collections.unmodifiableSet(aliveServers);
+ }
+
+ public @Nullable BucketModel bucket(TableBucket tableBucket) {
+ return bucketsByTableBucket.get(tableBucket);
+ }
+
+ public RackModel rack(String rack) {
+ return racksById.get(rack);
+ }
+
+ public @Nullable ServerModel server(int serverId) {
+ RackModel rack = serverIdToRack.get(serverId);
+ return rack == null ? null : rack.server(serverId);
+ }
+
+ /** Populate the analysis stats with this cluster. */
+ public ClusterModelStats getClusterStats() {
+ return (new ClusterModelStats()).populate(this);
+ }
+
+ public int numReplicas() {
+ return bucketsByTableBucket.values().stream().mapToInt(p -> p.replicas().size()).sum();
+ }
+
+ public int numLeaderReplicas() {
+ return bucketsByTableBucket.size();
+ }
+
+ public SortedMap> getBucketsByTable() {
+ SortedMap> bucketsByTable = new TreeMap<>();
+ for (Long tableId : tables()) {
+ bucketsByTable.put(tableId, new ArrayList<>());
+ }
+ for (Map.Entry entry : bucketsByTableBucket.entrySet()) {
+ bucketsByTable.get(entry.getKey().getTableId()).add(entry.getValue());
+ }
+ return bucketsByTable;
+ }
+
+ public Set tables() {
+ Set tables = new HashSet<>();
+
+ for (RackModel rack : racksById.values()) {
+ tables.addAll(rack.tables());
+ }
+ return tables;
+ }
+
+ /**
+ * Get the distribution of replicas in the cluster at the point of call.
+ *
+ * @return A map from tableBucket to the list of replicas. the first element is the leader, the
+ * rest are followers.
+ */
+ public Map> getReplicaDistribution() {
+ Map> replicaDistribution = new HashMap<>();
+ for (Map.Entry entry : bucketsByTableBucket.entrySet()) {
+ TableBucket tableBucket = entry.getKey();
+ BucketModel bucket = entry.getValue();
+ List replicaIds =
+ bucket.replicas().stream()
+ .map(r -> r.server().id())
+ .collect(Collectors.toList());
+ replicaDistribution.put(tableBucket, replicaIds);
+ }
+ return replicaDistribution;
+ }
+
+ public Map getLeaderDistribution() {
+ Map leaderDistribution = new HashMap<>();
+ for (Map.Entry entry : bucketsByTableBucket.entrySet()) {
+ TableBucket tableBucket = entry.getKey();
+ BucketModel bucket = entry.getValue();
+
+ ReplicaModel replicaModel = bucket.leader();
+ if (replicaModel == null) {
+ continue;
+ }
+
+ leaderDistribution.put(tableBucket, replicaModel.server().id());
+ }
+ return leaderDistribution;
+ }
+
+ public void createReplica(int serverId, TableBucket tableBucket, int index, boolean isLeader) {
+ ServerModel server = server(serverId);
+ if (server == null) {
+ throw new IllegalArgumentException("Server is not in the cluster.");
+ }
+
+ ReplicaModel replica = new ReplicaModel(tableBucket, server, isLeader);
+ server.putReplica(tableBucket, replica);
+
+ if (!bucketsByTableBucket.containsKey(tableBucket)) {
+ bucketsByTableBucket.put(tableBucket, new BucketModel(tableBucket, offlineServers()));
+ }
+
+ BucketModel bucket = bucketsByTableBucket.get(tableBucket);
+ if (isLeader) {
+ bucket.addLeader(replica, index);
+ } else {
+ bucket.addFollower(replica, index);
+ }
+ }
+
+ /**
+ * Relocate leadership from source server to destination server.
+ *
+ *
+ * - 1. Removes leadership from source replica.
+ *
- 2. Adds this leadership to the destination replica.
+ *
- 3. Updates the leader and list of followers of the bucket.
+ *
+ */
+ public boolean relocateLeadership(
+ TableBucket tableBucket, int sourceServerId, int desServerId) {
+ // Sanity check to see if the source replica is the leader.
+ BucketModel bucket = bucketsByTableBucket.get(tableBucket);
+ ReplicaModel sourceReplica = bucket.replica(sourceServerId);
+ if (!sourceReplica.isLeader()) {
+ return false;
+ }
+
+ // Sanity check to see if the destination replica is a follower.
+ ReplicaModel desReplica = bucket.replica(desServerId);
+ if (desReplica.isLeader()) {
+ throw new IllegalArgumentException(
+ "Cannot relocate leadership of bucket "
+ + tableBucket
+ + " from server "
+ + sourceServerId
+ + " to server "
+ + desServerId
+ + " because the destination replica is a leader.");
+ }
+
+ ServerModel sourceServer = server(sourceServerId);
+ if (sourceServer == null) {
+ throw new IllegalArgumentException("Source server is not in the cluster.");
+ }
+ sourceServer.makeFollower(tableBucket);
+
+ ServerModel destServer = server(desServerId);
+ if (destServer == null) {
+ throw new IllegalArgumentException("Destination server is not in the cluster.");
+ }
+ destServer.makeLeader(tableBucket);
+
+ // Update the leader and list of followers of the bucket.
+ bucket.relocateLeadership(desReplica);
+ return true;
+ }
+
+ /**
+ * Relocate replica from source server to destination server.
+ *
+ *
+ * - 1. Removes the replica from source server.
+ *
- 2. Set the server of the removed replica as the dest server
+ *
- 3. Add this replica to the dest server.
+ *
+ */
+ public void relocateReplica(TableBucket tableBucket, int sourceServerId, int destServerId) {
+ // Removes the replica from the source server.
+ ReplicaModel replica = removeReplica(sourceServerId, tableBucket);
+ if (replica == null) {
+ throw new IllegalArgumentException("Replica is not in the cluster.");
+ }
+
+ // Updates the tabletServer of the removed replicas with dest server.
+ replica.setServer(server(destServerId));
+
+ // Add this replica back to destination rack and server.
+ String rack = replica.server().rack();
+ rack(rack).addReplica(replica);
+ }
+
+ private @Nullable ReplicaModel removeReplica(int serverId, TableBucket tableBucket) {
+ for (RackModel rack : racksById.values()) {
+ ReplicaModel removedReplica = rack.removeReplica(serverId, tableBucket);
+ if (removedReplica != null) {
+ return removedReplica;
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public String toString() {
+ return String.format(
+ "ClusterModel[serverCount=%s,bucketCount=%s,aliveServerCount=%s]",
+ servers.size(), bucketsByTableBucket.size(), aliveServers.size());
+ }
+}
diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/model/ClusterModelStats.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/model/ClusterModelStats.java
new file mode 100644
index 0000000000..16bd9f29b0
--- /dev/null
+++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/model/ClusterModelStats.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator.rebalance.model;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.function.Function;
+
+/** A class that holds the statistics of the cluster for rebalance. */
+public class ClusterModelStats {
+ private final Map replicaStats;
+ private final Map leaderReplicaStats;
+ private int numServers;
+ private int numReplicasInCluster;
+
+ public ClusterModelStats() {
+ replicaStats = new HashMap<>();
+ leaderReplicaStats = new HashMap<>();
+
+ numServers = 0;
+ numReplicasInCluster = 0;
+ }
+
+ ClusterModelStats populate(ClusterModel clusterModel) {
+ final SortedSet servers = clusterModel.servers();
+ final Set aliveServers = clusterModel.aliveServers();
+ this.numServers = servers.size();
+ numForReplicas(clusterModel, servers, aliveServers);
+ numForLeaderReplicas(servers, aliveServers);
+ return this;
+ }
+
+ /** Generate statistics for replicas in the given cluster. */
+ private void numForReplicas(
+ ClusterModel clusterModel,
+ SortedSet servers,
+ Set aliveServers) {
+ populateReplicaStats(
+ serverModel -> serverModel.replicas().size(), replicaStats, servers, aliveServers);
+ numReplicasInCluster = clusterModel.numReplicas();
+ }
+
+ /** Generate statistics for leader replicas in the given cluster. */
+ private void numForLeaderReplicas(
+ SortedSet servers, Set aliveServers) {
+ populateReplicaStats(
+ serverModel -> serverModel.leaderReplicas().size(),
+ leaderReplicaStats,
+ servers,
+ aliveServers);
+ }
+
+ private void populateReplicaStats(
+ Function numInterestedReplicasFunc,
+ Map interestedReplicaStats,
+ SortedSet servers,
+ Set aliveServers) {
+ // Average, minimum, and maximum number of replicas of interest in servers.
+ int maxInterestedReplicasInServer = 0;
+ int minInterestedReplicasInServer = Integer.MAX_VALUE;
+ int numInterestedReplicasInCluster = 0;
+ for (ServerModel server : servers) {
+ int numInterestedReplicasInServer = numInterestedReplicasFunc.apply(server);
+ numInterestedReplicasInCluster += numInterestedReplicasInServer;
+ maxInterestedReplicasInServer =
+ Math.max(maxInterestedReplicasInServer, numInterestedReplicasInServer);
+ minInterestedReplicasInServer =
+ Math.min(minInterestedReplicasInServer, numInterestedReplicasInServer);
+ }
+ double avgInterestedReplicas =
+ ((double) numInterestedReplicasInCluster) / aliveServers.size();
+
+ // Standard deviation of replicas of interest in alive servers.
+ double variance = 0.0;
+ for (ServerModel broker : aliveServers) {
+ variance +=
+ (Math.pow(
+ (double) numInterestedReplicasFunc.apply(broker)
+ - avgInterestedReplicas,
+ 2)
+ / aliveServers.size());
+ }
+
+ interestedReplicaStats.put(Statistic.AVG, avgInterestedReplicas);
+ interestedReplicaStats.put(Statistic.MAX, maxInterestedReplicasInServer);
+ interestedReplicaStats.put(Statistic.MIN, minInterestedReplicasInServer);
+ interestedReplicaStats.put(Statistic.ST_DEV, Math.sqrt(variance));
+ }
+
+ public Map replicaStats() {
+ return Collections.unmodifiableMap(replicaStats);
+ }
+
+ public Map leaderReplicaStats() {
+ return Collections.unmodifiableMap(leaderReplicaStats);
+ }
+}
diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/model/RackModel.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/model/RackModel.java
new file mode 100644
index 0000000000..fdf9cbad80
--- /dev/null
+++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/model/RackModel.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator.rebalance.model;
+
+import org.apache.fluss.metadata.TableBucket;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * A class that holds the information of the rack, including its liveness tabletServers and
+ * replicas. A rack object is created as part of a cluster structure.
+ */
+public class RackModel {
+ public static final String DEFAULT_RACK = "default_rack";
+
+ private final String rack;
+ private final Map servers;
+
+ public RackModel(String rack) {
+ this.rack = rack;
+ this.servers = new HashMap<>();
+ }
+
+ @Nullable
+ ReplicaModel removeReplica(int serverId, TableBucket tableBucket) {
+ ServerModel server = servers.get(serverId);
+ if (server != null) {
+ return server.removeReplica(tableBucket);
+ }
+
+ return null;
+ }
+
+ void addReplica(ReplicaModel replica) {
+ replica.server().putReplica(replica.tableBucket(), replica);
+ }
+
+ public String rack() {
+ return rack;
+ }
+
+ @Nullable
+ ServerModel server(int serverId) {
+ return servers.get(serverId);
+ }
+
+ public void addServer(ServerModel server) {
+ servers.put(server.id(), server);
+ }
+
+ public Set tables() {
+ Set tables = new HashSet<>();
+
+ for (ServerModel server : servers.values()) {
+ tables.addAll(server.tables());
+ }
+ return tables;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("RackModel[rack=%s,servers=%s]", rack, servers.size());
+ }
+}
diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/model/ReplicaModel.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/model/ReplicaModel.java
new file mode 100644
index 0000000000..e67d9bd733
--- /dev/null
+++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/model/ReplicaModel.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator.rebalance.model;
+
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.server.replica.Replica;
+
+import java.util.Objects;
+
+/** A class that holds the information of the {@link Replica} for rebalance. */
+public class ReplicaModel {
+ private final TableBucket tableBucket;
+ private final ServerModel originalServer;
+ private ServerModel server;
+ private boolean isLeader;
+
+ public ReplicaModel(TableBucket tableBucket, ServerModel server, boolean isLeader) {
+ this.tableBucket = tableBucket;
+ this.server = server;
+ this.isLeader = isLeader;
+ this.originalServer = server;
+ }
+
+ public TableBucket tableBucket() {
+ return tableBucket;
+ }
+
+ public ServerModel originalServer() {
+ return originalServer;
+ }
+
+ public ServerModel server() {
+ return server;
+ }
+
+ public int serverId() {
+ return server.id();
+ }
+
+ public boolean isLeader() {
+ return isLeader;
+ }
+
+ public void makeFollower() {
+ setLeadership(false);
+ }
+
+ public void makeLeader() {
+ setLeadership(true);
+ }
+
+ void setLeadership(boolean leader) {
+ isLeader = leader;
+ }
+
+ public void setServer(ServerModel server) {
+ this.server = server;
+ }
+
+ @Override
+ public String toString() {
+ return String.format(
+ "ReplicaModel[TableBucket=%s,isLeader=%s,rack=%s,server=%s,originalServer=%s]",
+ tableBucket, isLeader, server.rack(), server.id(), originalServer.id());
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ReplicaModel that = (ReplicaModel) o;
+ return Objects.equals(tableBucket, that.tableBucket)
+ && originalServer.id() == that.originalServer.id();
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(tableBucket, originalServer.id());
+ }
+}
diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/model/ServerModel.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/model/ServerModel.java
new file mode 100644
index 0000000000..a57bc85b30
--- /dev/null
+++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/model/ServerModel.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator.rebalance.model;
+
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TablePartition;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/** A class that holds the information of the tabletServer for rebalance. */
+public class ServerModel implements Comparable {
+
+ private final int serverId;
+ private final boolean isAlive;
+ private final String rack;
+ private final Set replicas;
+ private final Set leaderReplicas;
+ /** A map for tracking (tableId) -> (BucketId -> replica) for none-partitioned table. */
+ private final Map> tableReplicas;
+
+ /** A map for tracking (tableId, partitionId) -> (BucketId -> replica) for partitioned table. */
+ private final Map