Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,19 @@ private InstanceAssignmentConfigUtils() {
/**
* Returns whether the COMPLETED segments should be relocated (offloaded from CONSUMING instances to COMPLETED
* instances) for a LLC real-time table based on the given table config.
* <p>COMPLETED segments should be relocated iff the COMPLETED instance assignment is configured or (for
* backward-compatibility) COMPLETED server tag is overridden to be different from the CONSUMING server tag.
* <p>COMPLETED segments should be relocated iff:
* <ul>
* <li>The COMPLETED instance assignment is configured via {@code instanceAssignmentConfigMap}, or</li>
* <li>The COMPLETED instance partitions are pre-configured via {@code instancePartitionsMap} (import), or</li>
* <li>(For backward-compatibility) COMPLETED server tag is overridden to be different from the CONSUMING
* server tag.</li>
* </ul>
*/
public static boolean shouldRelocateCompletedSegments(TableConfig tableConfig) {
Map<String, InstanceAssignmentConfig> instanceAssignmentConfigMap = tableConfig.getInstanceAssignmentConfigMap();
return (instanceAssignmentConfigMap != null
&& instanceAssignmentConfigMap.get(InstancePartitionsType.COMPLETED.toString()) != null)
|| InstancePartitionsUtils.hasPreConfiguredInstancePartitions(tableConfig, InstancePartitionsType.COMPLETED)
|| TagNameUtils.isRelocateCompletedSegments(tableConfig.getTenantConfig());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import org.apache.helix.HelixManager;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.common.assignment.InstanceAssignmentConfigUtils;
import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.common.restlet.resources.RebalanceConfig;
import org.apache.pinot.common.utils.LLCSegmentName;
Expand Down Expand Up @@ -622,6 +624,232 @@ public void testSubsetPartitionAssignment() {
"Partitions 0 and 7 should map to different instance sets to avoid hotspots");
}

/**
* Regression for imported COMPLETED instance partitions during rebalance.
*
* <p><b>Prod scenario:</b> Table B imports Table A's CONSUMING and COMPLETED instance partitions via
* {@code instancePartitionsMap} (no {@code instanceAssignmentConfigMap} for COMPLETED). During rebalance,
* {@link org.apache.pinot.controller.helix.core.rebalance.TableRebalancer#getInstancePartitionsMap} only
* loads COMPLETED IPs when {@link InstanceAssignmentConfigUtils#shouldRelocateCompletedSegments} returns true.
*
* <p><b>Bug:</b> {@code shouldRelocateCompletedSegments} ignored {@code instancePartitionsMap}, so rebalance
* passed only CONSUMING IPs to segment assignment and completed segments stayed on one server per partition.
*
* <p>This test mirrors {@code TableRebalancer}: the rebalance {@code instancePartitionsMap} is built from
* {@code shouldRelocateCompletedSegments(tableConfig)}, not by always passing COMPLETED IPs directly.
*/
@Test
public void testImportedInstancePartitionsWithMultipleServersPerPartition() {
int numReplicas = 2;
int numReplicaGroups = numReplicas;
int numServers = 64;
int numInstancePartitions = 8;
int numServersPerPartitionPerRG = numServers / numReplicaGroups / numInstancePartitions; // 4
int numStreamPartitions = 8; // exact prod case: 1:1 mapping with instance partitions
int numSegmentsPerPartition = 20; // 19 completed + 1 consuming per stream partition
String serverPrefix = "Server_";

List<String> allServers = SegmentAssignmentTestUtils.getNameList(serverPrefix, numServers);

// Table B: imports CONSUMING/COMPLETED IPs from Table A; no instanceAssignmentConfigMap for COMPLETED.
Map<String, String> streamConfigs = FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap();
Map<InstancePartitionsType, String> importedInstancePartitions = new HashMap<>();
importedInstancePartitions.put(InstancePartitionsType.CONSUMING, "sourceTable_CONSUMING");
importedInstancePartitions.put(InstancePartitionsType.COMPLETED, "sourceTable_COMPLETED");
TableConfig tableConfig =
new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setNumReplicas(numReplicas)
.setStreamConfigs(streamConfigs)
.setInstancePartitionsMap(importedInstancePartitions)
.setSegmentAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.COMPLETED.toString(),
new SegmentAssignmentConfig(AssignmentStrategy.REPLICA_GROUP_SEGMENT_ASSIGNMENT_STRATEGY)))
.build();
assertTrue(InstanceAssignmentConfigUtils.shouldRelocateCompletedSegments(tableConfig),
"Imported COMPLETED instance partitions must trigger completed-segment relocation during rebalance");
SegmentAssignment segmentAssignment =
SegmentAssignmentFactory.getSegmentAssignment(createHelixManager(), tableConfig, null);

// Both CONSUMING and COMPLETED instance partitions use the same 64 servers
// (imported from the same source table). 8 explicit partitions, 2 RGs, 4 servers per partition.
InstancePartitions consumingInstancePartitions = new InstancePartitions(CONSUMING_INSTANCE_PARTITIONS_NAME);
InstancePartitions completedInstancePartitions = new InstancePartitions(COMPLETED_INSTANCE_PARTITIONS_NAME);
for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups; replicaGroupId++) {
for (int partitionId = 0; partitionId < numInstancePartitions; partitionId++) {
List<String> serversInPartition = new ArrayList<>(numServersPerPartitionPerRG);
int baseIndex = replicaGroupId * (numServers / numReplicaGroups)
+ partitionId * numServersPerPartitionPerRG;
for (int i = 0; i < numServersPerPartitionPerRG; i++) {
serversInPartition.add(allServers.get(baseIndex + i));
}
consumingInstancePartitions.setInstances(partitionId, replicaGroupId, serversInPartition);
completedInstancePartitions.setInstances(partitionId, replicaGroupId,
new ArrayList<>(serversInPartition));
}
}

// Create segments: 8 stream partitions × 20 segments each = 160 total
List<String> segments = new ArrayList<>();
for (int partitionId = 0; partitionId < numStreamPartitions; partitionId++) {
for (int seqNum = 0; seqNum < numSegmentsPerPartition; seqNum++) {
segments.add(new LLCSegmentName(RAW_TABLE_NAME, partitionId, seqNum,
System.currentTimeMillis()).getSegmentName());
}
}

// Build currentAssignment: all segments initially pinned to first server in each partition
// (simulates the state produced by the buggy path where COMPLETED IPs were not loaded).
Map<String, Map<String, String>> currentAssignment = new TreeMap<>();
for (int partitionId = 0; partitionId < numStreamPartitions; partitionId++) {
for (int seqNum = 0; seqNum < numSegmentsPerPartition; seqNum++) {
String segmentName = segments.get(partitionId * numSegmentsPerPartition + seqNum);
int instancePartitionId = partitionId % numInstancePartitions;
List<String> instancesForSegment = new ArrayList<>(numReplicaGroups);
for (int rg = 0; rg < numReplicaGroups; rg++) {
instancesForSegment.add(
consumingInstancePartitions.getInstances(instancePartitionId, rg).get(0));
}
boolean isConsuming = (seqNum == numSegmentsPerPartition - 1);
String state = isConsuming ? SegmentStateModel.CONSUMING : SegmentStateModel.ONLINE;
currentAssignment.put(segmentName,
SegmentAssignmentUtils.getInstanceStateMap(instancesForSegment, state));
}
}

// Mirror TableRebalancer.getInstancePartitionsMap(): CONSUMING always; COMPLETED only when relocation applies.
Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap =
buildRebalanceInstancePartitionsMap(tableConfig, consumingInstancePartitions, completedInstancePartitions);

// Simulate pre-fix rebalance: only CONSUMING IPs in the map (COMPLETED omitted even though import is configured).
// Completed segments stay pinned to one server per stream partition.
Map<InstancePartitionsType, InstancePartitions> consumingOnlyPartitionsMap = new TreeMap<>();
consumingOnlyPartitionsMap.put(InstancePartitionsType.CONSUMING, consumingInstancePartitions);
RebalanceConfig rebalanceConfig = new RebalanceConfig();
rebalanceConfig.setIncludeConsuming(true);
Map<String, Map<String, String>> consumingOnlyAssignment =
segmentAssignment.rebalanceTable(currentAssignment, consumingOnlyPartitionsMap, null, null,
rebalanceConfig);
int expectedCompletedSegmentServerCountWhenCompletedIpsOmitted = numStreamPartitions * numReplicaGroups;
assertEquals(countServersWithCompletedSegments(consumingOnlyAssignment),
expectedCompletedSegmentServerCountWhenCompletedIpsOmitted,
"Without COMPLETED instance partitions (buggy rebalance path), completed segments should stay on "
+ expectedCompletedSegmentServerCountWhenCompletedIpsOmitted + " servers");

assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.COMPLETED),
"Rebalance must include COMPLETED instance partitions when imported via instancePartitionsMap");
Map<String, Map<String, String>> newAssignment =
segmentAssignment.rebalanceTable(currentAssignment, instancePartitionsMap, null, null, rebalanceConfig);

// COMPLETED segments should be distributed across all 64 servers
HashSet<String> completedServers = collectServersWithCompletedSegments(newAssignment);
assertEquals(completedServers.size(), numServers,
"All " + numServers + " servers should have COMPLETED segments, but only "
+ completedServers.size() + " were used.");

// Verify per-partition spread for COMPLETED segments
for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups; replicaGroupId++) {
for (int instPartId = 0; instPartId < numInstancePartitions; instPartId++) {
List<String> partitionServers = completedInstancePartitions.getInstances(instPartId, replicaGroupId);
HashSet<String> usedInPartition = new HashSet<>();
for (String server : partitionServers) {
if (completedServers.contains(server)) {
usedInPartition.add(server);
}
}
assertEquals(usedInPartition.size(), numServersPerPartitionPerRG,
"COMPLETED: instance partition " + instPartId + " in RG " + replicaGroupId + " should use all "
+ numServersPerPartitionPerRG + " servers, but only " + usedInPartition.size() + " were used");
}
}

// Verify per-stream-partition coverage: for each stream partition, the set of servers across all
// of its completed segments should equal the full set of servers in that instance partition (per RG).
for (int partitionId = 0; partitionId < numStreamPartitions; partitionId++) {
int instancePartitionId = partitionId % numInstancePartitions;
for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups; replicaGroupId++) {
List<String> expectedServers = completedInstancePartitions.getInstances(instancePartitionId, replicaGroupId);
HashSet<String> actualServers = new HashSet<>();
for (int seqNum = 0; seqNum < numSegmentsPerPartition - 1; seqNum++) {
String segmentName = segments.get(partitionId * numSegmentsPerPartition + seqNum);
Map<String, String> instanceStateMap = newAssignment.get(segmentName);
for (String server : instanceStateMap.keySet()) {
if (expectedServers.contains(server)) {
actualServers.add(server);
}
}
}
assertEquals(actualServers, new HashSet<>(expectedServers),
"Stream partition " + partitionId + " in RG " + replicaGroupId
+ " should have completed segments on all " + numServersPerPartitionPerRG
+ " servers in instance partition " + instancePartitionId
+ ", but only used: " + actualServers);
}
}

// --- Bootstrap variant ---
RebalanceConfig bootstrapConfig = new RebalanceConfig();
bootstrapConfig.setIncludeConsuming(true);
bootstrapConfig.setBootstrap(true);
Map<String, Map<String, String>> bootstrapAssignment =
segmentAssignment.rebalanceTable(currentAssignment, instancePartitionsMap, null, null, bootstrapConfig);

HashSet<String> bootstrapCompletedServers = collectServersWithCompletedSegments(bootstrapAssignment);
assertEquals(bootstrapCompletedServers.size(), numServers,
"Bootstrap: all " + numServers + " servers should have COMPLETED segments, but only "
+ bootstrapCompletedServers.size() + " were used.");

// Same per-stream-partition check for bootstrap
for (int partitionId = 0; partitionId < numStreamPartitions; partitionId++) {
int instancePartitionId = partitionId % numInstancePartitions;
for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups; replicaGroupId++) {
List<String> expectedServers = completedInstancePartitions.getInstances(instancePartitionId, replicaGroupId);
HashSet<String> actualServers = new HashSet<>();
for (int seqNum = 0; seqNum < numSegmentsPerPartition - 1; seqNum++) {
String segmentName = segments.get(partitionId * numSegmentsPerPartition + seqNum);
Map<String, String> instanceStateMap = bootstrapAssignment.get(segmentName);
for (String server : instanceStateMap.keySet()) {
if (expectedServers.contains(server)) {
actualServers.add(server);
}
}
}
assertEquals(actualServers, new HashSet<>(expectedServers),
"Bootstrap: stream partition " + partitionId + " in RG " + replicaGroupId
+ " should have completed segments on all " + numServersPerPartitionPerRG
+ " servers in instance partition " + instancePartitionId
+ ", but only used: " + actualServers);
}
}
}

/**
* Builds the instance-partitions map the same way as
* {@link org.apache.pinot.controller.helix.core.rebalance.TableRebalancer#getInstancePartitionsMap}.
*/
private static Map<InstancePartitionsType, InstancePartitions> buildRebalanceInstancePartitionsMap(
TableConfig tableConfig, InstancePartitions consumingInstancePartitions,
InstancePartitions completedInstancePartitions) {
Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap = new TreeMap<>();
instancePartitionsMap.put(InstancePartitionsType.CONSUMING, consumingInstancePartitions);
if (InstanceAssignmentConfigUtils.shouldRelocateCompletedSegments(tableConfig)) {
instancePartitionsMap.put(InstancePartitionsType.COMPLETED, completedInstancePartitions);
}
return instancePartitionsMap;
}

private static HashSet<String> collectServersWithCompletedSegments(
Map<String, Map<String, String>> assignment) {
HashSet<String> completedServers = new HashSet<>();
for (Map.Entry<String, Map<String, String>> entry : assignment.entrySet()) {
if (entry.getValue().containsValue(SegmentStateModel.ONLINE)) {
completedServers.addAll(entry.getValue().keySet());
}
}
return completedServers;
}

private static int countServersWithCompletedSegments(Map<String, Map<String, String>> assignment) {
return collectServersWithCompletedSegments(assignment).size();
}

private HelixManager createHelixManager() {
HelixManager helixManager = mock(HelixManager.class);
ZkHelixPropertyStore<ZNRecord> propertyStore = mock(ZkHelixPropertyStore.class);
Expand Down
Loading