diff --git a/phoenix-core-client/pom.xml b/phoenix-core-client/pom.xml
index a04148b0bf6..095b5dd5c74 100644
--- a/phoenix-core-client/pom.xml
+++ b/phoenix-core-client/pom.xml
@@ -253,7 +253,6 @@
org.bouncycastle
bcprov-jdk18on
- 1.79
diff --git a/phoenix-core-server/pom.xml b/phoenix-core-server/pom.xml
index bb236a47f61..8eb3c38413f 100644
--- a/phoenix-core-server/pom.xml
+++ b/phoenix-core-server/pom.xml
@@ -176,7 +176,6 @@
org.bouncycastle
bcprov-jdk18on
- 1.79
org.xerial.snappy
diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputSplit.java b/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputSplit.java
index c71e6ca5d1c..c49ce59232d 100644
--- a/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputSplit.java
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputSplit.java
@@ -38,7 +38,7 @@
public class PhoenixInputSplit extends InputSplit implements Writable {
private List scans;
- private KeyRange keyRange;
+ private List keyRanges;
private String regionLocation = null;
private long splitSize = 0;
@@ -68,13 +68,43 @@ public List getScans() {
return scans;
}
+ /**
+ * Returns the overall KeyRange spanning this split. For coalesced splits, spans from the first
+ * region's lower bound to the last region's upper bound. Computed on-demand from keyRanges.
+ * @return KeyRange spanning the entire split, or null if keyRanges is empty
+ */
public KeyRange getKeyRange() {
- return keyRange;
+ if (keyRanges == null || keyRanges.isEmpty()) {
+ return null;
+ }
+ return KeyRange.getKeyRange(keyRanges.get(0).getLowerRange(),
+ keyRanges.get(keyRanges.size() - 1).getUpperRange());
+ }
+
+ /**
+ * Returns all KeyRanges for this split. For coalesced splits, returns multiple KeyRanges (one per
+ * region). For non-coalesced splits, returns a single-element list.
+ * @return List of KeyRanges, never null
+ */
+ public List getKeyRanges() {
+ return keyRanges;
+ }
+
+ /**
+ * Checks if this split is coalesced (contains multiple regions).
+ * @return true if split contains multiple regions
+ */
+ public boolean isCoalesced() {
+ return keyRanges.size() > 1;
}
private void init() {
- this.keyRange =
- KeyRange.getKeyRange(scans.get(0).getStartRow(), scans.get(scans.size() - 1).getStopRow());
+ // Initialize keyRanges from scans
+ this.keyRanges = Lists.newArrayListWithExpectedSize(scans.size());
+ for (Scan scan : scans) {
+ KeyRange kr = KeyRange.getKeyRange(scan.getStartRow(), scan.getStopRow());
+ this.keyRanges.add(kr);
+ }
}
@Override
@@ -126,7 +156,8 @@ public String[] getLocations() throws IOException, InterruptedException {
public int hashCode() {
final int prime = 31;
int result = 1;
- result = prime * result + keyRange.hashCode();
+ KeyRange range = getKeyRange();
+ result = prime * result + (range == null ? 0 : range.hashCode());
return result;
}
@@ -142,11 +173,13 @@ public boolean equals(Object obj) {
return false;
}
PhoenixInputSplit other = (PhoenixInputSplit) obj;
- if (keyRange == null) {
- if (other.keyRange != null) {
+ KeyRange thisRange = getKeyRange();
+ KeyRange otherRange = other.getKeyRange();
+ if (thisRange == null) {
+ if (otherRange != null) {
return false;
}
- } else if (!keyRange.equals(other.keyRange)) {
+ } else if (!thisRange.equals(otherRange)) {
return false;
}
return true;
diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableInputFormat.java b/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableInputFormat.java
index d717c3f7bbf..85117828171 100644
--- a/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableInputFormat.java
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableInputFormat.java
@@ -21,9 +21,13 @@
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
@@ -31,8 +35,10 @@
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.mapreduce.util.ConnectionUtil;
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.query.KeyRange;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -94,15 +100,31 @@ public List getSplits(JobContext context) throws IOException, Interr
} catch (SQLException e) {
throw new RuntimeException(e);
}
- if (completedRegions.isEmpty()) {
- LOGGER.info("No completed regions for table {} - processing all {} splits", tableName,
- allSplits.size());
- return allSplits;
- }
List unprocessedSplits = filterCompletedSplits(allSplits, completedRegions);
LOGGER.info("Found {} completed mapper regions for table {}, {} unprocessed splits remaining",
completedRegions.size(), tableName, unprocessedSplits.size());
+
+ boolean enableSplitCoalescing =
+ conf.getBoolean(PhoenixSyncTableTool.PHOENIX_SYNC_TABLE_SPLIT_COALESCING,
+ PhoenixSyncTableTool.DEFAULT_PHOENIX_SYNC_TABLE_SPLIT_COALESCING);
+ LOGGER.info("Split coalescing enabled: {}, for table {}", enableSplitCoalescing, tableName);
+
+ if (enableSplitCoalescing && unprocessedSplits.size() > 1) {
+ try (Connection conn = ConnectionUtil.getInputConnection(conf)) {
+ PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);
+ byte[] physicalTableName = pConn.getTable(tableName).getPhysicalName().getBytes();
+ List coalescedSplits =
+ coalesceSplits(unprocessedSplits, pConn.getQueryServices(), physicalTableName);
+ LOGGER.info("Split coalescing: {} unprocessed splits {} coalesced splits for table {}",
+ unprocessedSplits.size(), coalescedSplits.size(), tableName);
+ return coalescedSplits;
+ } catch (Exception e) {
+ throw new IOException(String.format("Failed to coalesce splits for table %s. "
+ + "Split coalescing is enabled but failed due to: %s.", tableName, e.getMessage()), e);
+ }
+ }
+
return unprocessedSplits;
}
@@ -133,6 +155,9 @@ private List queryCompletedMapperRegions(Configuration conf, String ta
*/
List filterCompletedSplits(List allSplits,
List completedRegions) {
+ if (completedRegions.isEmpty()) {
+ return allSplits;
+ }
allSplits.sort((s1, s2) -> {
PhoenixInputSplit ps1 = (PhoenixInputSplit) s1;
PhoenixInputSplit ps2 = (PhoenixInputSplit) s2;
@@ -211,4 +236,97 @@ List filterCompletedSplits(List allSplits,
}
return unprocessedSplits;
}
+
+ /**
+ * Coalesces multiple region splits from the same RegionServer into single InputSplits. All
+ * regions from the same server are coalesced into one split, regardless of count or size. This
+ * reduces mapper count and avoids hot spotting when many concurrent mappers hit the same server.
+ * @param unprocessedSplits Splits remaining after filtering completed regions
+ * @param queryServices ConnectionQueryServices for querying region locations
+ * @param physicalTableName Physical HBase table name
+ * @return Coalesced splits with all regions per server combined into one split
+ */
+ List coalesceSplits(List unprocessedSplits,
+ ConnectionQueryServices queryServices, byte[] physicalTableName)
+ throws IOException, InterruptedException, SQLException {
+ // Group splits by RegionServer location
+ Map> splitsByServer =
+ groupSplitsByServer(unprocessedSplits, queryServices, physicalTableName);
+
+ List coalescedSplits = new ArrayList<>();
+
+ // For each RegionServer, create one coalesced split with ALL regions from that server
+ for (Map.Entry> entry : splitsByServer.entrySet()) {
+ String serverName = entry.getKey();
+ List serverSplits = entry.getValue();
+
+ // Sort splits by start key for sequential processing
+ serverSplits.sort((s1, s2) -> Bytes.compareTo(s1.getKeyRange().getLowerRange(),
+ s2.getKeyRange().getLowerRange()));
+ // Create single coalesced split with ALL regions from this server
+ coalescedSplits.add(createCoalescedSplit(serverSplits, serverName));
+ }
+
+ return coalescedSplits;
+ }
+
+ /**
+ * Groups splits by RegionServer location for locality-aware coalescing. Uses
+ * ConnectionQueryServices to determine which server hosts each region.
+ * @param splits List of splits to group
+ * @param queryServices ConnectionQueryServices for querying region locations
+ * @param physicalTableName Physical HBase table name
+ * @return Map of server name to list of splits hosted on that server
+ */
+ private Map> groupSplitsByServer(List splits,
+ ConnectionQueryServices queryServices, byte[] physicalTableName)
+ throws IOException, SQLException {
+ Map> splitsByServer = new HashMap<>();
+ for (InputSplit split : splits) {
+ PhoenixInputSplit pSplit = (PhoenixInputSplit) split;
+ KeyRange keyRange = pSplit.getKeyRange();
+ HRegionLocation regionLocation =
+ queryServices.getTableRegionLocation(physicalTableName, keyRange.getLowerRange());
+ if (regionLocation == null) {
+ throw new IOException("Could not determine region location for key: "
+ + Bytes.toStringBinary(keyRange.getLowerRange()));
+ }
+ if (regionLocation.getServerName() == null) {
+ throw new IOException("Could not determine server name for region at key: "
+ + Bytes.toStringBinary(keyRange.getLowerRange()));
+ }
+ String serverName = regionLocation.getServerName().getAddress().toString();
+ splitsByServer.computeIfAbsent(serverName, k -> new ArrayList<>()).add(pSplit);
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Split {} assigned to server {}",
+ Bytes.toStringBinary(keyRange.getLowerRange()), serverName);
+ }
+ }
+
+ return splitsByServer;
+ }
+
+ /**
+ * Creates a coalesced PhoenixInputSplit containing multiple regions. Combines scans and KeyRanges
+ * from individual splits into a single split.
+ * @param splits List of splits to coalesce (from same RegionServer)
+ * @param serverLocation RegionServer location for data locality
+ * @return Coalesced PhoenixInputSplit
+ */
+ private PhoenixInputSplit createCoalescedSplit(List splits,
+ String serverLocation) throws IOException, InterruptedException {
+
+ List allScans = new ArrayList<>();
+ long totalSize = 0;
+ // Extract all scans from individual splits
+ for (PhoenixInputSplit split : splits) {
+ allScans.addAll(split.getScans());
+ totalSize += split.getLength();
+ }
+
+ LOGGER.info("Created coalesced split with {} regions, {} MB from server {}", splits.size(),
+ totalSize / (1024 * 1024), serverLocation);
+ // Create a new PhoenixInputSplit, keyRanges will be derived from scans in init()
+ return new PhoenixInputSplit(allScans, totalSize, serverLocation);
+ }
}
diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableMapper.java b/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableMapper.java
index acaac01fc3c..30457641236 100644
--- a/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableMapper.java
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableMapper.java
@@ -31,7 +31,6 @@
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
@@ -45,7 +44,6 @@
import org.apache.phoenix.mapreduce.util.ConnectionUtil;
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
import org.apache.phoenix.query.KeyRange;
-import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.util.MetaDataUtil;
import org.apache.phoenix.util.PhoenixRuntime;
@@ -88,16 +86,13 @@ public enum SyncCounters {
private Connection globalConnection;
private PTable pTable;
private byte[] physicalTableName;
- private byte[] mapperRegionStart;
- private byte[] mapperRegionEnd;
+ private List regionKeyRanges;
private PhoenixSyncTableOutputRepository syncTableOutputRepository;
- private Timestamp mapperStartTime;
@Override
protected void setup(Context context) throws InterruptedException {
try {
super.setup(context);
- mapperStartTime = new Timestamp(System.currentTimeMillis());
this.conf = context.getConfiguration();
tableName = PhoenixSyncTableTool.getPhoenixSyncTableName(conf);
targetZkQuorum = PhoenixSyncTableTool.getPhoenixSyncTableTargetZkQuorum(conf);
@@ -123,18 +118,25 @@ protected void setup(Context context) throws InterruptedException {
}
/**
- * Extracts mapper region boundaries from the PhoenixInputSplit
+ * Extracts region key ranges from the PhoenixInputSplit. Handles both single-region splits and
+ * coalesced splits with multiple regions.
*/
private void extractRegionBoundariesFromSplit(Context context) {
PhoenixInputSplit split = (PhoenixInputSplit) context.getInputSplit();
- KeyRange keyRange = split.getKeyRange();
- if (keyRange == null) {
+ regionKeyRanges = split.getKeyRanges();
+
+ if (regionKeyRanges == null || regionKeyRanges.isEmpty()) {
throw new IllegalStateException(String.format(
- "PhoenixInputSplit has no KeyRange for table: %s . Cannot determine region boundaries for sync operation.",
+ "PhoenixInputSplit has no KeyRanges for table: %s. Cannot determine region boundaries for sync operation.",
tableName));
}
- mapperRegionStart = keyRange.getLowerRange();
- mapperRegionEnd = keyRange.getUpperRange();
+
+ if (split.isCoalesced()) {
+ LOGGER.info("Mapper processing coalesced split with {} regions for table {}",
+ regionKeyRanges.size(), tableName);
+ } else {
+ LOGGER.info("Mapper processing single region split for table {}", tableName);
+ }
}
/**
@@ -156,67 +158,23 @@ private Connection createGlobalConnection(Configuration conf) throws SQLExceptio
}
/**
- * Processes a mapper region by comparing chunks between source and target clusters. Gets already
- * processed chunks from checkpoint table, resumes from check pointed progress and records final
- * status for chunks & mapper (VERIFIED/MISMATCHED).
+ * Processes mapper region(s) by comparing chunks between source and target clusters. For
+ * coalesced splits, processes each region sequentially. Gets already processed chunks from
+ * checkpoint table, resumes from check pointed progress and records final status for chunks &
+ * mapper (VERIFIED/MISMATCHED).
*/
@Override
protected void map(NullWritable key, DBInputFormat.NullDBWritable value, Context context)
throws IOException, InterruptedException {
+ context.getCounter(PhoenixJobCounters.INPUT_RECORDS).increment(1);
try {
- List processedChunks =
- syncTableOutputRepository.getProcessedChunks(tableName, targetZkQuorum, fromTime, toTime,
- tenantId, mapperRegionStart, mapperRegionEnd);
- List unprocessedRanges =
- calculateUnprocessedRanges(mapperRegionStart, mapperRegionEnd, processedChunks);
- boolean isStartKeyInclusive = shouldStartKeyBeInclusive(mapperRegionStart, processedChunks);
- for (KeyRange range : unprocessedRanges) {
- processMapperRanges(range.getLowerRange(), range.getUpperRange(), isStartKeyInclusive,
- context);
- isStartKeyInclusive = false;
- }
-
- long chunksMismatched = context.getCounter(SyncCounters.CHUNKS_MISMATCHED).getValue();
- long chunksVerified = context.getCounter(SyncCounters.CHUNKS_VERIFIED).getValue();
- long sourceRowsProcessed = context.getCounter(SyncCounters.SOURCE_ROWS_PROCESSED).getValue();
- long targetRowsProcessed = context.getCounter(SyncCounters.TARGET_ROWS_PROCESSED).getValue();
- Timestamp mapperEndTime = new Timestamp(System.currentTimeMillis());
- String counters = PhoenixSyncTableCheckpointOutputRow.CounterFormatter
- .formatMapper(chunksVerified, chunksMismatched, sourceRowsProcessed, targetRowsProcessed);
- if (sourceRowsProcessed > 0) {
- if (chunksMismatched == 0) {
- context.getCounter(SyncCounters.MAPPERS_VERIFIED).increment(1);
- syncTableOutputRepository
- .checkpointSyncTableResult(new PhoenixSyncTableCheckpointOutputRow.Builder()
- .setTableName(tableName).setTargetCluster(targetZkQuorum)
- .setType(PhoenixSyncTableCheckpointOutputRow.Type.REGION).setFromTime(fromTime)
- .setToTime(toTime).setTenantId(tenantId).setIsDryRun(isDryRun)
- .setStartRowKey(mapperRegionStart).setEndRowKey(mapperRegionEnd)
- .setStatus(PhoenixSyncTableCheckpointOutputRow.Status.VERIFIED)
- .setExecutionStartTime(mapperStartTime).setExecutionEndTime(mapperEndTime)
- .setCounters(counters).build());
- LOGGER.info(
- "PhoenixSyncTable mapper completed with verified: {} verified chunks, {} mismatched chunks",
- chunksVerified, chunksMismatched);
- } else {
- context.getCounter(SyncCounters.MAPPERS_MISMATCHED).increment(1);
- LOGGER.warn(
- "PhoenixSyncTable mapper completed with mismatch: {} verified chunks, {} mismatched chunks",
- chunksVerified, chunksMismatched);
- syncTableOutputRepository
- .checkpointSyncTableResult(new PhoenixSyncTableCheckpointOutputRow.Builder()
- .setTableName(tableName).setTargetCluster(targetZkQuorum)
- .setType(PhoenixSyncTableCheckpointOutputRow.Type.REGION).setFromTime(fromTime)
- .setToTime(toTime).setTenantId(tenantId).setIsDryRun(isDryRun)
- .setStartRowKey(mapperRegionStart).setEndRowKey(mapperRegionEnd)
- .setStatus(PhoenixSyncTableCheckpointOutputRow.Status.MISMATCHED)
- .setExecutionStartTime(mapperStartTime).setExecutionEndTime(mapperEndTime)
- .setCounters(counters).build());
- }
- } else {
- LOGGER.info(
- "No rows pending to process. All mapper region boundaries are covered for startKey:{}, endKey: {}",
- mapperRegionStart, mapperRegionEnd);
+ // Process each region in the split (one or multiple for coalesced splits)
+ for (KeyRange keyRange : regionKeyRanges) {
+ byte[] regionStart = keyRange.getLowerRange();
+ byte[] regionEnd = keyRange.getUpperRange();
+ LOGGER.info("Processing region [{}, {}) from split for table {}",
+ Bytes.toStringBinary(regionStart), Bytes.toStringBinary(regionEnd), tableName);
+ processRegion(regionStart, regionEnd, context);
}
} catch (SQLException e) {
tryClosingResources();
@@ -224,6 +182,124 @@ protected void map(NullWritable key, DBInputFormat.NullDBWritable value, Context
}
}
+ /**
+ * Processes a single region within a split (could be part of a coalesced split).
+ * @param regionStart Start key of the region
+ * @param regionEnd End key of the region
+ * @param context Mapper context
+ */
+ private void processRegion(byte[] regionStart, byte[] regionEnd, Context context)
+ throws SQLException, IOException, InterruptedException {
+
+ Timestamp regionStartTime = new Timestamp(System.currentTimeMillis());
+
+ // Get processed chunks for this specific region
+ List processedChunks =
+ syncTableOutputRepository.getProcessedChunks(tableName, targetZkQuorum, fromTime, toTime,
+ tenantId, regionStart, regionEnd);
+
+ // Calculate unprocessed ranges within this region
+ List unprocessedRanges =
+ calculateUnprocessedRanges(regionStart, regionEnd, processedChunks);
+
+ // Track counters before processing this region
+ long verifiedBefore = context.getCounter(SyncCounters.CHUNKS_VERIFIED).getValue();
+ long mismatchedBefore = context.getCounter(SyncCounters.CHUNKS_MISMATCHED).getValue();
+ long sourceRowsBefore = context.getCounter(SyncCounters.SOURCE_ROWS_PROCESSED).getValue();
+ long targetRowsBefore = context.getCounter(SyncCounters.TARGET_ROWS_PROCESSED).getValue();
+
+ // Process all unprocessed ranges in this region
+ boolean isStartKeyInclusive = shouldStartKeyBeInclusive(regionStart, processedChunks);
+ for (KeyRange range : unprocessedRanges) {
+ processMapperRanges(range.getLowerRange(), range.getUpperRange(), isStartKeyInclusive,
+ context);
+ isStartKeyInclusive = false;
+ }
+
+ // Calculate counters for this region only
+ long verifiedChunks =
+ context.getCounter(SyncCounters.CHUNKS_VERIFIED).getValue() - verifiedBefore;
+ long mismatchedChunks =
+ context.getCounter(SyncCounters.CHUNKS_MISMATCHED).getValue() - mismatchedBefore;
+ long sourceRowsProcessed =
+ context.getCounter(SyncCounters.SOURCE_ROWS_PROCESSED).getValue() - sourceRowsBefore;
+ long targetRowsProcessed =
+ context.getCounter(SyncCounters.TARGET_ROWS_PROCESSED).getValue() - targetRowsBefore;
+
+ Timestamp regionEndTime = new Timestamp(System.currentTimeMillis());
+ String counters = PhoenixSyncTableCheckpointOutputRow.CounterFormatter
+ .formatMapper(verifiedChunks, mismatchedChunks, sourceRowsProcessed, targetRowsProcessed);
+ if (sourceRowsProcessed > 0) {
+ recordRegionCompletion(regionStart, regionEnd, regionStartTime, regionEndTime, verifiedChunks,
+ mismatchedChunks, counters, context);
+ } else {
+ LOGGER.info(
+ "No rows pending to process. All region boundaries are covered for startKey:{}, endKey: {}",
+ Bytes.toStringBinary(regionStart), Bytes.toStringBinary(regionEnd));
+ }
+ }
+
+ /**
+ * Records region completion by updating counters, recording checkpoint, and logging result.
+ * Consolidates all region completion logic to eliminate duplication.
+ * @param regionStart Region start key
+ * @param regionEnd Region end key
+ * @param regionStartTime Region processing start time
+ * @param regionEndTime Region processing end time
+ * @param verifiedChunks Number of verified chunks
+ * @param mismatchedChunks Number of mismatched chunks
+ * @param counters Formatted counter string
+ * @param context Mapper context
+ */
+ private void recordRegionCompletion(byte[] regionStart, byte[] regionEnd,
+ Timestamp regionStartTime, Timestamp regionEndTime, long verifiedChunks, long mismatchedChunks,
+ String counters, Context context) throws SQLException {
+
+ boolean isVerified = (mismatchedChunks == 0);
+ PhoenixSyncTableCheckpointOutputRow.Status status = isVerified
+ ? PhoenixSyncTableCheckpointOutputRow.Status.VERIFIED
+ : PhoenixSyncTableCheckpointOutputRow.Status.MISMATCHED;
+
+ context.getCounter(isVerified ? SyncCounters.MAPPERS_VERIFIED : SyncCounters.MAPPERS_MISMATCHED)
+ .increment(1);
+
+ recordRegionCheckpoint(regionStart, regionEnd, status, regionStartTime, regionEndTime,
+ counters);
+
+ String logMessage = String.format(
+ "PhoenixSyncTable region [%s, %s) completed with %s: %d verified chunks, %d mismatched chunks",
+ Bytes.toStringBinary(regionStart), Bytes.toStringBinary(regionEnd),
+ isVerified ? "verified" : "mismatch", verifiedChunks, mismatchedChunks);
+
+ if (isVerified) {
+ LOGGER.info(logMessage);
+ } else {
+ LOGGER.warn(logMessage);
+ }
+ }
+
+ /**
+ * Records a region checkpoint to the checkpoint table.
+ * @param regionStart Region start key
+ * @param regionEnd Region end key
+ * @param status Status (VERIFIED or MISMATCHED)
+ * @param regionStartTime Region processing start time
+ * @param regionEndTime Region processing end time
+ * @param counters Formatted counter string
+ */
+ private void recordRegionCheckpoint(byte[] regionStart, byte[] regionEnd,
+ PhoenixSyncTableCheckpointOutputRow.Status status, Timestamp regionStartTime,
+ Timestamp regionEndTime, String counters) throws SQLException {
+
+ syncTableOutputRepository
+ .checkpointSyncTableResult(new PhoenixSyncTableCheckpointOutputRow.Builder()
+ .setTableName(tableName).setTargetCluster(targetZkQuorum)
+ .setType(PhoenixSyncTableCheckpointOutputRow.Type.REGION).setFromTime(fromTime)
+ .setToTime(toTime).setTenantId(tenantId).setIsDryRun(isDryRun).setStartRowKey(regionStart)
+ .setEndRowKey(regionEnd).setStatus(status).setExecutionStartTime(regionStartTime)
+ .setExecutionEndTime(regionEndTime).setCounters(counters).build());
+ }
+
/**
* Processes a chunk range by comparing source and target cluster data. Source chunking: Breaks
* data into size-based chunks within given mapper region boundary. Target chunking: Follows
@@ -408,13 +484,7 @@ private ChunkScannerContext createChunkScanner(Connection conn, byte[] startKey,
scan.setAttribute(BaseScannerRegionObserverConstants.SYNC_TABLE_CHUNK_SIZE_BYTES,
Bytes.toBytes(chunkSizeBytes));
}
- // Use the half of the HBase RPC timeout value as the server page size to make sure
- // that the HBase region server will be able to send a heartbeat message to the
- // client before the client times out.
- long syncTablePageTimeoutMs = (long) (conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY,
- QueryServicesOptions.DEFAULT_SYNC_TABLE_RPC_TIMEOUT) * 0.5);
- scan.setAttribute(BaseScannerRegionObserverConstants.SERVER_PAGE_SIZE_MS,
- Bytes.toBytes(syncTablePageTimeoutMs));
+ ScanUtil.setScanAttributeForPaging(scan, phoenixConn);
ResultScanner scanner = hTable.getScanner(scan);
return new ChunkScannerContext(hTable, scanner);
}
@@ -453,14 +523,8 @@ private ChunkInfo parseChunkInfo(Result result) {
private void handleVerifiedChunk(ChunkInfo sourceChunk, Context context, String counters)
throws SQLException {
- syncTableOutputRepository.checkpointSyncTableResult(
- new PhoenixSyncTableCheckpointOutputRow.Builder().setTableName(tableName)
- .setTargetCluster(targetZkQuorum).setType(PhoenixSyncTableCheckpointOutputRow.Type.CHUNK)
- .setFromTime(fromTime).setToTime(toTime).setTenantId(tenantId).setIsDryRun(isDryRun)
- .setStartRowKey(sourceChunk.startKey).setEndRowKey(sourceChunk.endKey)
- .setStatus(PhoenixSyncTableCheckpointOutputRow.Status.VERIFIED)
- .setExecutionStartTime(sourceChunk.executionStartTime)
- .setExecutionEndTime(sourceChunk.executionEndTime).setCounters(counters).build());
+ recordChunkCheckpoint(sourceChunk, PhoenixSyncTableCheckpointOutputRow.Status.VERIFIED,
+ counters);
context.getCounter(SyncCounters.CHUNKS_VERIFIED).increment(1);
}
@@ -468,16 +532,27 @@ private void handleMismatchedChunk(ChunkInfo sourceChunk, Context context, Strin
throws SQLException {
LOGGER.warn("Chunk mismatch detected for table: {}, with startKey: {}, endKey {}", tableName,
Bytes.toStringBinary(sourceChunk.startKey), Bytes.toStringBinary(sourceChunk.endKey));
+ recordChunkCheckpoint(sourceChunk, PhoenixSyncTableCheckpointOutputRow.Status.MISMATCHED,
+ counters);
+ context.getCounter(SyncCounters.CHUNKS_MISMATCHED).increment(1);
+ }
+
+ /**
+ * Records a chunk checkpoint to the checkpoint table.
+ * @param sourceChunk Chunk information
+ * @param status Status (VERIFIED or MISMATCHED)
+ * @param counters Formatted counter string
+ */
+ private void recordChunkCheckpoint(ChunkInfo sourceChunk,
+ PhoenixSyncTableCheckpointOutputRow.Status status, String counters) throws SQLException {
+
syncTableOutputRepository.checkpointSyncTableResult(
new PhoenixSyncTableCheckpointOutputRow.Builder().setTableName(tableName)
.setTargetCluster(targetZkQuorum).setType(PhoenixSyncTableCheckpointOutputRow.Type.CHUNK)
.setFromTime(fromTime).setToTime(toTime).setTenantId(tenantId).setIsDryRun(isDryRun)
- .setStartRowKey(sourceChunk.startKey).setEndRowKey(sourceChunk.endKey)
- .setStatus(PhoenixSyncTableCheckpointOutputRow.Status.MISMATCHED)
+ .setStartRowKey(sourceChunk.startKey).setEndRowKey(sourceChunk.endKey).setStatus(status)
.setExecutionStartTime(sourceChunk.executionStartTime)
.setExecutionEndTime(sourceChunk.executionEndTime).setCounters(counters).build());
-
- context.getCounter(SyncCounters.CHUNKS_MISMATCHED).increment(1);
}
/**
diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableTool.java b/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableTool.java
index 90963c37a33..80eacde25e7 100644
--- a/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableTool.java
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableTool.java
@@ -31,6 +31,7 @@
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobCounter;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
@@ -115,6 +116,8 @@ public class PhoenixSyncTableTool extends Configured implements Tool {
private static final Option READ_ALL_VERSIONS_OPTION = new Option("rav", "read-all-versions",
false,
"Enable reading all cell versions (optional, disabled by default, reads only latest version)");
+ private static final Option COALESCE_SPLIT_OPTION = new Option("coal", "coalesce-split", false,
+ "Enable split coalescing to reduce mapper count (optional, disabled by default)");
private static final Option HELP_OPTION = new Option("h", "help", false, "Help");
public static final String PHOENIX_SYNC_TABLE_NAME = "phoenix.sync.table.table.name";
@@ -124,7 +127,10 @@ public class PhoenixSyncTableTool extends Configured implements Tool {
public static final String PHOENIX_SYNC_TABLE_DRY_RUN = "phoenix.sync.table.dry.run";
public static final String PHOENIX_SYNC_TABLE_CHUNK_SIZE_BYTES =
"phoenix.sync.table.chunk.size.bytes";
- public static final long DEFAULT_PHOENIX_SYNC_TABLE_CHUNK_SIZE_BYTES = 1024 * 1024 * 1024; // 1GB
+ public static final long DEFAULT_PHOENIX_SYNC_TABLE_CHUNK_SIZE_BYTES = 1024 * 1024 * 1024; // 1 GB
+ public static final String PHOENIX_SYNC_TABLE_SPLIT_COALESCING =
+ "phoenix.sync.table.split.coalescing";
+ public static final boolean DEFAULT_PHOENIX_SYNC_TABLE_SPLIT_COALESCING = false;
public static final String PHOENIX_SYNC_TABLE_RAW_SCAN = "phoenix.sync.table.raw.scan";
public static final String PHOENIX_SYNC_TABLE_READ_ALL_VERSIONS =
"phoenix.sync.table.read.all.versions";
@@ -140,6 +146,7 @@ public class PhoenixSyncTableTool extends Configured implements Tool {
private String tenantId;
private boolean isRawScan = false;
private boolean isReadAllVersions = false;
+ private boolean isCoalesceSplit = false;
private String qTable;
private String qSchemaName;
@@ -218,6 +225,7 @@ private void setPhoenixSyncTableToolConfiguration(Configuration configuration) {
setPhoenixSyncTableDryRun(configuration, isDryRun);
setPhoenixSyncTableRawScan(configuration, isRawScan);
setPhoenixSyncTableReadAllVersions(configuration, isReadAllVersions);
+ setPhoenixSyncTableSplitCoalescing(configuration, isCoalesceSplit);
PhoenixConfigurationUtil.setSplitByStats(configuration, false);
if (chunkSizeBytes != null) {
setPhoenixSyncTableChunkSizeBytes(configuration, chunkSizeBytes);
@@ -292,6 +300,7 @@ private Options getOptions() {
options.addOption(TENANT_ID_OPTION);
options.addOption(RAW_SCAN_OPTION);
options.addOption(READ_ALL_VERSIONS_OPTION);
+ options.addOption(COALESCE_SPLIT_OPTION);
options.addOption(HELP_OPTION);
return options;
}
@@ -307,7 +316,7 @@ private void printHelpAndExit(Options options, int exitCode) {
formatter.printHelp(cmdLineSyntax,
"Synchronize a Phoenix table between source and target clusters", options,
"\nExample:\n" + cmdLineSyntax + " \\\n" + " --table-name MY_TABLE \\\n"
- + " --target-cluster :2181 \\\n" + " --dry-run\n",
+ + " --target-cluster :2181 \\\n" + " --dry-run \\\n" + " --coalesce-split\n",
true);
System.exit(exitCode);
}
@@ -344,15 +353,16 @@ public void populateSyncTableToolAttributes(CommandLine cmdLine) {
isForeground = cmdLine.hasOption(RUN_FOREGROUND_OPTION.getOpt());
isRawScan = cmdLine.hasOption(RAW_SCAN_OPTION.getOpt());
isReadAllVersions = cmdLine.hasOption(READ_ALL_VERSIONS_OPTION.getOpt());
+ isCoalesceSplit = cmdLine.hasOption(COALESCE_SPLIT_OPTION.getOpt());
qTable = SchemaUtil.getQualifiedTableName(schemaName, tableName);
qSchemaName = SchemaUtil.normalizeIdentifier(schemaName);
PhoenixMapReduceUtil.validateTimeRange(startTime, endTime, qTable);
LOGGER.info(
"PhoenixSyncTableTool configured - Table: {}, Schema: {}, Target: {}, "
+ "StartTime: {}, EndTime: {}, DryRun: {}, ChunkSize: {}, Foreground: {}, TenantId: {}, "
- + "RawScan: {}, ReadAllVersions: {}",
+ + "RawScan: {}, ReadAllVersions: {}, CoalesceSplit: {}",
qTable, qSchemaName, targetZkQuorum, startTime, endTime, isDryRun, chunkSizeBytes,
- isForeground, tenantId, isRawScan, isReadAllVersions);
+ isForeground, tenantId, isRawScan, isReadAllVersions, isCoalesceSplit);
}
/**
@@ -396,6 +406,7 @@ private boolean submitPhoenixSyncTableJob() throws Exception {
}
Counters counters = job.getCounters();
if (counters != null) {
+ long taskCreated = counters.findCounter(JobCounter.TOTAL_LAUNCHED_MAPS).getValue();
long verifiedMappers =
counters.findCounter(PhoenixSyncTableMapper.SyncCounters.MAPPERS_VERIFIED).getValue();
long mismatchedMappers =
@@ -409,11 +420,11 @@ private boolean submitPhoenixSyncTableJob() throws Exception {
long targetRowsProcessed =
counters.findCounter(PhoenixSyncTableMapper.SyncCounters.TARGET_ROWS_PROCESSED).getValue();
LOGGER.info(
- "PhoenixSyncTable job completed, gathered counters are \n Verified Mappers: {}, \n"
+ "PhoenixSyncTable job completed, gathered counters are \n Task Created: {}, \n Verified Mappers: {}, \n"
+ "Mismatched Mappers: {}, \n Chunks Verified: {}, \n"
+ "Chunks Mismatched: {}, \n Source Rows Processed: {}, \n Target Rows Processed: {}",
- verifiedMappers, mismatchedMappers, chunksVerified, chunksMismatched, sourceRowsProcessed,
- targetRowsProcessed);
+ taskCreated, verifiedMappers, mismatchedMappers, chunksVerified, chunksMismatched,
+ sourceRowsProcessed, targetRowsProcessed);
} else {
LOGGER.warn("Unable to retrieve job counters for table {} - job may have failed "
+ "during initialization", qTable);
@@ -535,6 +546,18 @@ public static boolean getPhoenixSyncTableReadAllVersions(Configuration conf) {
return conf.getBoolean(PHOENIX_SYNC_TABLE_READ_ALL_VERSIONS, false);
}
+ public static void setPhoenixSyncTableSplitCoalescing(Configuration conf,
+ boolean splitCoalescing) {
+ Preconditions.checkNotNull(conf);
+ conf.setBoolean(PHOENIX_SYNC_TABLE_SPLIT_COALESCING, splitCoalescing);
+ }
+
+ public static boolean getPhoenixSyncTableSplitCoalescing(Configuration conf) {
+ Preconditions.checkNotNull(conf);
+ return conf.getBoolean(PHOENIX_SYNC_TABLE_SPLIT_COALESCING,
+ DEFAULT_PHOENIX_SYNC_TABLE_SPLIT_COALESCING);
+ }
+
public Job getJob() {
return job;
}
diff --git a/phoenix-core/pom.xml b/phoenix-core/pom.xml
index 8c242392170..6067ef4eb71 100644
--- a/phoenix-core/pom.xml
+++ b/phoenix-core/pom.xml
@@ -407,7 +407,6 @@
org.bouncycastle
bcprov-jdk18on
- 1.79
test
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixSyncTableToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixSyncTableToolIT.java
index fc277131ab1..0661973dbe7 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixSyncTableToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixSyncTableToolIT.java
@@ -54,6 +54,7 @@
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.TaskCounter;
import org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.HBaseTestingUtilityPair;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDriver;
@@ -159,6 +160,7 @@ public void testSyncTableValidateWithDataDifference() throws Exception {
validateSyncCounters(counters, 10, 10, 1, 3);
validateMapperCounters(counters, 1, 3);
+ assertEquals("Expected 4 mapper task to be created", 4, counters.taskCreated);
List checkpointEntries =
queryCheckpointTable(sourceConnection, uniqueTableName, targetZkQuorum, null);
@@ -205,6 +207,8 @@ public void testSyncTableWithDeletedRowsOnTarget() throws Exception {
validateSyncCounters(counters, 10, 7, 7, 3);
validateMapperCounters(counters, 1, 3);
+ assertEquals("Should have only 1 Mapper task created with coalescing", 4, counters.taskCreated);
+
}
@Test
@@ -1244,10 +1248,8 @@ public void testSyncTableValidateWithPagingTimeout() throws Exception {
// Configure paging with aggressive timeouts to force mid-chunk timeouts
Configuration conf = new Configuration(CLUSTERS.getHBaseCluster1().getConfiguration());
-
- long aggressiveRpcTimeout = 2L;
- conf.setLong(QueryServices.SYNC_TABLE_RPC_TIMEOUT_ATTRIB, aggressiveRpcTimeout);
- conf.setLong(HConstants.HBASE_RPC_TIMEOUT_KEY, aggressiveRpcTimeout);
+ conf.setBoolean(QueryServices.PHOENIX_SERVER_PAGING_ENABLED_ATTRIB, true);
+ conf.setLong(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, 1);
int chunkSize = 10240;
@@ -1307,15 +1309,10 @@ public void testSyncTableValidateWithPagingTimeoutWithSplits() throws Exception
// Configure paging with aggressive timeouts to force mid-chunk timeouts
Configuration conf = new Configuration(CLUSTERS.getHBaseCluster1().getConfiguration());
-
- // Enable server-side paging
conf.setBoolean(QueryServices.PHOENIX_SERVER_PAGING_ENABLED_ATTRIB, true);
- // Set extremely short rpc timeout to force frequent paging
- long aggressiveRpcTimeout = 1L; // 1ms RPC timeout
- conf.setLong(QueryServices.SYNC_TABLE_RPC_TIMEOUT_ATTRIB, aggressiveRpcTimeout);
- conf.setLong(HConstants.HBASE_RPC_TIMEOUT_KEY, aggressiveRpcTimeout);
+ conf.setLong(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, 1);
- int chunkSize = 102400; // 100KB
+ int chunkSize = 10240;
// Create a thread that will perform splits on source cluster during sync
Thread sourceSplitThread = new Thread(() -> {
@@ -1751,6 +1748,29 @@ public void testSyncTableWithMultipleVersionAndCompactionOnTarget() throws Excep
validateMapperCounters(counters3, 3, 1);
}
+ @Test
+ public void testSyncTableValidateWithSplitCoalescing() throws Exception {
+ setupStandardTestWithReplication(uniqueTableName, 1, 10);
+
+ introduceAndVerifyTargetDifferences(uniqueTableName);
+
+ // Enable split coalescing via command-line parameter, all regions will be coalesced into one
+ // mapper
+ Job job = runSyncTool(uniqueTableName, "--coalesce-split");
+ SyncCountersResult counters = getSyncCounters(job);
+
+ assertEquals("Should have only 1 Mapper task created with coalescing", 1, counters.taskCreated);
+
+ validateSyncCounters(counters, 10, 10, 7, 3);
+ validateMapperCounters(counters, 1, 3);
+
+ // Verify checkpoint entries are created correctly
+ List checkpointEntries =
+ queryCheckpointTable(sourceConnection, uniqueTableName, targetZkQuorum, null);
+ validateCheckpointEntries(checkpointEntries, uniqueTableName, targetZkQuorum, 10, 10, 7, 3, 4,
+ 3, null);
+ }
+
/**
* Helper class to hold separated mapper and chunk entries.
*/
@@ -2327,6 +2347,7 @@ private void splitTableAt(Connection conn, String tableName, int splitId) {
LOGGER.info("Split completed for table {} at split point {} (bytes: {})", tableName, splitId,
Bytes.toStringBinary(splitPoint));
} catch (Exception e) {
+ // Ignore split failures - they don't affect the test's main goal
LOGGER.warn("Failed to split table {} at split point {}: {}", tableName, splitId,
e.getMessage());
}
@@ -2576,6 +2597,7 @@ private static class SyncCountersResult {
public final long chunksVerified;
public final long mappersVerified;
public final long mappersMismatched;
+ public final long taskCreated;
SyncCountersResult(Counters counters) {
this.sourceRowsProcessed =
@@ -2586,6 +2608,7 @@ private static class SyncCountersResult {
this.chunksVerified = counters.findCounter(SyncCounters.CHUNKS_VERIFIED).getValue();
this.mappersVerified = counters.findCounter(SyncCounters.MAPPERS_VERIFIED).getValue();
this.mappersMismatched = counters.findCounter(SyncCounters.MAPPERS_MISMATCHED).getValue();
+ this.taskCreated = counters.findCounter(TaskCounter.MAP_INPUT_RECORDS).getValue();
}
public void logCounters(String testName) {
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/PhoenixInputSplitTest.java b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/PhoenixInputSplitTest.java
new file mode 100644
index 00000000000..d780e25b874
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/PhoenixInputSplitTest.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.query.KeyRange;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class PhoenixInputSplitTest {
+
+ /**
+ * Helper method to create a Scan with given row boundaries.
+ */
+ private Scan createScan(byte[] startRow, byte[] stopRow) {
+ Scan scan = new Scan();
+ scan.withStartRow(startRow, true);
+ scan.withStopRow(stopRow, false);
+ return scan;
+ }
+
+ /**
+ * Helper method to serialize and deserialize a PhoenixInputSplit.
+ */
+ private PhoenixInputSplit serializeAndDeserialize(PhoenixInputSplit split) throws IOException {
+ // Serialize
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutput out = new DataOutputStream(baos);
+ split.write(out);
+
+ // Deserialize
+ ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
+ DataInput in = new DataInputStream(bais);
+ PhoenixInputSplit deserialized = new PhoenixInputSplit();
+ deserialized.readFields(in);
+
+ return deserialized;
+ }
+
+ @Test
+ public void testStandardConstructorWithSingleScan() {
+ List scans = new ArrayList<>();
+ scans.add(createScan(Bytes.toBytes("a"), Bytes.toBytes("d")));
+
+ PhoenixInputSplit split = new PhoenixInputSplit(scans);
+
+ assertFalse("Should not be coalesced with single scan", split.isCoalesced());
+ assertEquals("Should have 1 keyRange", 1, split.getKeyRanges().size());
+ assertNotNull("KeyRange should be initialized", split.getKeyRange());
+ assertTrue("KeyRange should span scan boundaries",
+ Bytes.equals(Bytes.toBytes("a"), split.getKeyRange().getLowerRange()));
+ assertTrue("KeyRange should span scan boundaries",
+ Bytes.equals(Bytes.toBytes("d"), split.getKeyRange().getUpperRange()));
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testStandardConstructorWithNullScans() {
+ new PhoenixInputSplit(null);
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testStandardConstructorWithEmptyScans() {
+ new PhoenixInputSplit(Collections.emptyList());
+ }
+
+ @Test
+ public void testCoalescingConstructorWithMultipleRegions()
+ throws IOException, InterruptedException {
+ List scans = new ArrayList<>();
+ scans.add(createScan(Bytes.toBytes("a"), Bytes.toBytes("d")));
+ scans.add(createScan(Bytes.toBytes("d"), Bytes.toBytes("g")));
+ scans.add(createScan(Bytes.toBytes("g"), Bytes.toBytes("j")));
+
+ long splitSize = 3072;
+ String regionLocation = "server1:16020";
+
+ // KeyRanges are now automatically derived from scans
+ PhoenixInputSplit split = new PhoenixInputSplit(scans, splitSize, regionLocation);
+
+ assertTrue("Should be coalesced with multiple regions", split.isCoalesced());
+ assertEquals("Should have 3 keyRanges (derived from scans)", 3, split.getKeyRanges().size());
+ assertEquals("Split size should match", splitSize, split.getLength());
+ assertArrayEquals("Region location should match", new String[] { regionLocation },
+ split.getLocations());
+
+ // Verify keyRanges were derived correctly from scans
+ List keyRanges = split.getKeyRanges();
+ assertTrue("First keyRange should match first scan",
+ Bytes.equals(Bytes.toBytes("a"), keyRanges.get(0).getLowerRange()));
+ assertTrue("First keyRange should match first scan",
+ Bytes.equals(Bytes.toBytes("d"), keyRanges.get(0).getUpperRange()));
+ assertTrue("Second keyRange should match second scan",
+ Bytes.equals(Bytes.toBytes("d"), keyRanges.get(1).getLowerRange()));
+ assertTrue("Second keyRange should match second scan",
+ Bytes.equals(Bytes.toBytes("g"), keyRanges.get(1).getUpperRange()));
+ assertTrue("Third keyRange should match third scan",
+ Bytes.equals(Bytes.toBytes("g"), keyRanges.get(2).getLowerRange()));
+ assertTrue("Third keyRange should match third scan",
+ Bytes.equals(Bytes.toBytes("j"), keyRanges.get(2).getUpperRange()));
+ }
+
+ @Test
+ public void testSerializationWithSingleRegion() throws IOException, InterruptedException {
+ List scans = new ArrayList<>();
+ scans.add(createScan(Bytes.toBytes("a"), Bytes.toBytes("d")));
+
+ PhoenixInputSplit original = new PhoenixInputSplit(scans, 1024, "server1:16020");
+
+ PhoenixInputSplit deserialized = serializeAndDeserialize(original);
+
+ assertFalse("Should not be coalesced after deserialization", deserialized.isCoalesced());
+ assertEquals("Should have 1 keyRange", 1, deserialized.getKeyRanges().size());
+ assertEquals("Split size should match", original.getLength(), deserialized.getLength());
+ assertArrayEquals("Region location should match", original.getLocations(),
+ deserialized.getLocations());
+ assertTrue("KeyRange should match", Bytes.equals(original.getKeyRange().getLowerRange(),
+ deserialized.getKeyRange().getLowerRange()));
+ assertTrue("KeyRange should match", Bytes.equals(original.getKeyRange().getUpperRange(),
+ deserialized.getKeyRange().getUpperRange()));
+ }
+
+ @Test
+ public void testSerializationWithCoalescedSplit() throws IOException, InterruptedException {
+ List scans = new ArrayList<>();
+ scans.add(createScan(Bytes.toBytes("a"), Bytes.toBytes("d")));
+ scans.add(createScan(Bytes.toBytes("d"), Bytes.toBytes("g")));
+ scans.add(createScan(Bytes.toBytes("g"), Bytes.toBytes("j")));
+
+ // KeyRanges are now automatically derived from scans
+ PhoenixInputSplit original = new PhoenixInputSplit(scans, 3072, "server1:16020");
+
+ PhoenixInputSplit deserialized = serializeAndDeserialize(original);
+
+ assertTrue("Should be coalesced after deserialization", deserialized.isCoalesced());
+ assertEquals("Should have 3 keyRanges", 3, deserialized.getKeyRanges().size());
+ assertEquals("Split size should match", original.getLength(), deserialized.getLength());
+ assertArrayEquals("Region location should match", original.getLocations(),
+ deserialized.getLocations());
+
+ // Verify all keyRanges are preserved (derived from scans)
+ List originalKeyRanges = original.getKeyRanges();
+ List deserializedKeyRanges = deserialized.getKeyRanges();
+ for (int i = 0; i < originalKeyRanges.size(); i++) {
+ assertTrue("KeyRange " + i + " should match", Bytes.equals(
+ originalKeyRanges.get(i).getLowerRange(), deserializedKeyRanges.get(i).getLowerRange()));
+ assertTrue("KeyRange " + i + " should match", Bytes.equals(
+ originalKeyRanges.get(i).getUpperRange(), deserializedKeyRanges.get(i).getUpperRange()));
+ }
+ }
+
+ @Test
+ public void testGetLocations() throws IOException, InterruptedException {
+ List scans =
+ Collections.singletonList(createScan(Bytes.toBytes("a"), Bytes.toBytes("d")));
+
+ // Test with null regionLocation
+ PhoenixInputSplit split1 = new PhoenixInputSplit(scans, 1024, null);
+ assertArrayEquals("Should return empty array for null location", new String[] {},
+ split1.getLocations());
+
+ // Test with valid regionLocation
+ PhoenixInputSplit split2 = new PhoenixInputSplit(scans, 1024, "server1:16020");
+ assertArrayEquals("Should return array with server location", new String[] { "server1:16020" },
+ split2.getLocations());
+ }
+}
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/PhoenixSyncTableInputFormatTest.java b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/PhoenixSyncTableInputFormatTest.java
index 15e643feaf0..8039ee57aef 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/PhoenixSyncTableInputFormatTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/PhoenixSyncTableInputFormatTest.java
@@ -18,27 +18,47 @@
package org.apache.phoenix.mapreduce;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import java.io.IOException;
+import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.query.KeyRange;
+import org.junit.Before;
import org.junit.Test;
/**
* Unit tests for PhoenixSyncTableInputFormat. Tests various scenarios of filtering completed splits
+ * and split coalescing functionality.
*/
public class PhoenixSyncTableInputFormatTest {
private PhoenixSyncTableInputFormat inputFormat = new PhoenixSyncTableInputFormat();
+ private ConnectionQueryServices mockQueryServices;
+ private byte[] physicalTableName = Bytes.toBytes("TEST_TABLE");
+
+ @Before
+ public void setup() throws Exception {
+ mockQueryServices = mock(ConnectionQueryServices.class);
+ }
+
/**
* Helper method to create a PhoenixInputSplit with given key range boundaries.
*/
@@ -280,4 +300,211 @@ public void testCreateRecordReaderReturnsNoOpReader() {
assertTrue("Should return a PhoenixNoOpSingleRecordReader",
reader instanceof PhoenixNoOpSingleRecordReader);
}
+
+ @Test
+ public void testCoalesceSplitsWithSingleServer() throws Exception {
+ // Create 3 PhoenixInputSplits all on same server
+ List splits = new ArrayList<>();
+ splits.add(createSplit(Bytes.toBytes("a"), Bytes.toBytes("d")));
+ splits.add(createSplit(Bytes.toBytes("d"), Bytes.toBytes("g")));
+ splits.add(createSplit(Bytes.toBytes("g"), Bytes.toBytes("j")));
+
+ // Create mock region location - all splits on server1
+ HRegionLocation mockRegion = createMockRegionLocation("server1:16020", Bytes.toBytes("a"));
+
+ // Mock ConnectionQueryServices: all splits → server1
+ when(mockQueryServices.getTableRegionLocation(any(byte[].class), any(byte[].class)))
+ .thenReturn(mockRegion);
+
+ // Call coalesceSplits()
+ List result =
+ inputFormat.coalesceSplits(splits, mockQueryServices, physicalTableName);
+
+ // Verify: 1 coalesced split (all on same server)
+ assertEquals("Should have 1 coalesced split (all on same server)", 1, result.size());
+
+ // Verify: Split is coalesced and contains 3 KeyRanges
+ PhoenixInputSplit coalescedSplit = (PhoenixInputSplit) result.get(0);
+
+ assertTrue("Split should be coalesced", coalescedSplit.isCoalesced());
+ assertEquals("Split should have 3 KeyRanges", 3, coalescedSplit.getKeyRanges().size());
+
+ // Verify: KeyRanges are sorted
+ List keyRanges = coalescedSplit.getKeyRanges();
+ assertTrue("First KeyRange should start with 'a'",
+ Bytes.equals(Bytes.toBytes("a"), keyRanges.get(0).getLowerRange()));
+ assertTrue("Second KeyRange should start with 'd'",
+ Bytes.equals(Bytes.toBytes("d"), keyRanges.get(1).getLowerRange()));
+ assertTrue("Third KeyRange should start with 'g'",
+ Bytes.equals(Bytes.toBytes("g"), keyRanges.get(2).getLowerRange()));
+ }
+
+ @Test
+ public void testCoalesceSplitsWithMultipleServers() throws Exception {
+ // Create 6 PhoenixInputSplits
+ List splits = new ArrayList<>();
+ splits.add(createSplit(Bytes.toBytes("a"), Bytes.toBytes("c")));
+ splits.add(createSplit(Bytes.toBytes("c"), Bytes.toBytes("e")));
+ splits.add(createSplit(Bytes.toBytes("e"), Bytes.toBytes("g")));
+ splits.add(createSplit(Bytes.toBytes("g"), Bytes.toBytes("i")));
+ splits.add(createSplit(Bytes.toBytes("i"), Bytes.toBytes("k")));
+ splits.add(createSplit(Bytes.toBytes("k"), Bytes.toBytes("m")));
+
+ // Create mock region locations BEFORE stubbing to avoid nested stubbing issues
+ HRegionLocation mockRegionA = createMockRegionLocation("server1:16020", Bytes.toBytes("a"));
+ HRegionLocation mockRegionC = createMockRegionLocation("server1:16020", Bytes.toBytes("c"));
+ HRegionLocation mockRegionE = createMockRegionLocation("server1:16020", Bytes.toBytes("e"));
+ HRegionLocation mockRegionG = createMockRegionLocation("server2:16020", Bytes.toBytes("g"));
+ HRegionLocation mockRegionI = createMockRegionLocation("server2:16020", Bytes.toBytes("i"));
+ HRegionLocation mockRegionK = createMockRegionLocation("server2:16020", Bytes.toBytes("k"));
+
+ // Mock ConnectionQueryServices: first 3 splits → server1, last 3 splits → server2
+ when(mockQueryServices.getTableRegionLocation(physicalTableName, Bytes.toBytes("a")))
+ .thenReturn(mockRegionA);
+ when(mockQueryServices.getTableRegionLocation(physicalTableName, Bytes.toBytes("c")))
+ .thenReturn(mockRegionC);
+ when(mockQueryServices.getTableRegionLocation(physicalTableName, Bytes.toBytes("e")))
+ .thenReturn(mockRegionE);
+ when(mockQueryServices.getTableRegionLocation(physicalTableName, Bytes.toBytes("g")))
+ .thenReturn(mockRegionG);
+ when(mockQueryServices.getTableRegionLocation(physicalTableName, Bytes.toBytes("i")))
+ .thenReturn(mockRegionI);
+ when(mockQueryServices.getTableRegionLocation(physicalTableName, Bytes.toBytes("k")))
+ .thenReturn(mockRegionK);
+
+ // Call coalesceSplits()
+ List result =
+ inputFormat.coalesceSplits(splits, mockQueryServices, physicalTableName);
+
+ // Verify: 2 coalesced splits (one per server)
+ assertEquals("Should have 2 coalesced splits (one per server)", 2, result.size());
+
+ // Verify: Each split is coalesced and contains 3 KeyRanges
+ PhoenixInputSplit split1 = (PhoenixInputSplit) result.get(0);
+ PhoenixInputSplit split2 = (PhoenixInputSplit) result.get(1);
+
+ assertTrue("Split 1 should be coalesced", split1.isCoalesced());
+ assertTrue("Split 2 should be coalesced", split2.isCoalesced());
+
+ assertEquals("Split 1 should have 3 KeyRanges", 3, split1.getKeyRanges().size());
+ assertEquals("Split 2 should have 3 KeyRanges", 3, split2.getKeyRanges().size());
+
+ // Verify: Splits are sorted by start key within each server group
+ List keyRanges1 = split1.getKeyRanges();
+ List keyRanges2 = split2.getKeyRanges();
+
+ // Check that KeyRanges are sorted (each should be less than next)
+ for (int i = 0; i < keyRanges1.size() - 1; i++) {
+ assertTrue("KeyRanges in split 1 should be sorted",
+ Bytes.compareTo(keyRanges1.get(i).getLowerRange(), keyRanges1.get(i + 1).getLowerRange())
+ < 0);
+ }
+ for (int i = 0; i < keyRanges2.size() - 1; i++) {
+ assertTrue("KeyRanges in split 2 should be sorted",
+ Bytes.compareTo(keyRanges2.get(i).getLowerRange(), keyRanges2.get(i + 1).getLowerRange())
+ < 0);
+ }
+ }
+
+ @Test
+ public void testCoalesceSplitsWithEmptyList() throws Exception {
+ // Test edge case: empty input list
+ List splits = new ArrayList<>();
+
+ List result =
+ inputFormat.coalesceSplits(splits, mockQueryServices, physicalTableName);
+
+ assertEquals("Should return empty list for empty input", 0, result.size());
+ }
+
+ @Test
+ public void testCoalesceSplitsWithSingleSplit() throws Exception {
+ // Test edge case: single split (no coalescing needed)
+ List splits = new ArrayList<>();
+ splits.add(createSplit(Bytes.toBytes("a"), Bytes.toBytes("d")));
+
+ HRegionLocation mockRegion = createMockRegionLocation("server1:16020", Bytes.toBytes("a"));
+ when(mockQueryServices.getTableRegionLocation(any(byte[].class), any(byte[].class)))
+ .thenReturn(mockRegion);
+
+ List result =
+ inputFormat.coalesceSplits(splits, mockQueryServices, physicalTableName);
+
+ assertEquals("Should return 1 split", 1, result.size());
+ PhoenixInputSplit resultSplit = (PhoenixInputSplit) result.get(0);
+ assertFalse("Single split should not be marked as coalesced", resultSplit.isCoalesced());
+ assertEquals("Should have 1 KeyRange", 1, resultSplit.getKeyRanges().size());
+ }
+
+ @Test(expected = IOException.class)
+ public void testCoalesceSplitsWithNullRegionLocation() throws Exception {
+ // ConnectionQueryServices returns null
+ List splits = new ArrayList<>();
+ splits.add(createSplit(Bytes.toBytes("a"), Bytes.toBytes("d")));
+
+ // Mock ConnectionQueryServices to return null (region not found)
+ when(mockQueryServices.getTableRegionLocation(any(byte[].class), any(byte[].class)))
+ .thenReturn(null);
+
+ // Should throw IOException with message about null region location
+ inputFormat.coalesceSplits(splits, mockQueryServices, physicalTableName);
+ }
+
+ @Test(expected = IOException.class)
+ public void testCoalesceSplitsWithNullServerName() throws Exception {
+ // RegionLocation has null ServerName
+ List splits = new ArrayList<>();
+ splits.add(createSplit(Bytes.toBytes("a"), Bytes.toBytes("d")));
+
+ // Create mock region location with null ServerName
+ HRegionLocation mockRegionLocation = mock(HRegionLocation.class);
+ when(mockRegionLocation.getServerName()).thenReturn(null);
+
+ when(mockQueryServices.getTableRegionLocation(any(byte[].class), any(byte[].class)))
+ .thenReturn(mockRegionLocation);
+
+ // Should throw IOException with message about null server name
+ inputFormat.coalesceSplits(splits, mockQueryServices, physicalTableName);
+ }
+
+ @Test
+ public void testCoalesceSplitsFailureThrowsIOExceptionWithMessage() throws Exception {
+ // coalescing failures throw SQLException directly when called from tests
+ List splits = new ArrayList<>();
+ splits.add(createSplit(Bytes.toBytes("a"), Bytes.toBytes("d")));
+
+ // Mock ConnectionQueryServices to throw SQLException (simulating cluster issue)
+ SQLException simulatedFailure =
+ new SQLException("Simulated RegionServer communication failure");
+ when(mockQueryServices.getTableRegionLocation(any(byte[].class), any(byte[].class)))
+ .thenThrow(simulatedFailure);
+
+ try {
+ inputFormat.coalesceSplits(splits, mockQueryServices, physicalTableName);
+ fail("Expected SQLException to be thrown for coalescing failure");
+ } catch (SQLException e) {
+ // Verify exception message is informative
+ assertTrue("Exception message should mention coalescing failure",
+ e.getMessage().contains("Simulated RegionServer communication failure"));
+ }
+ }
+
+ /**
+ * Helper method to create a mock HRegionLocation with the given server address and start key.
+ */
+ private HRegionLocation createMockRegionLocation(String serverAddress, byte[] startKey) {
+ HRegionLocation mockRegionLocation = mock(HRegionLocation.class);
+ ServerName mockServerName = mock(ServerName.class);
+ // Create a real Address object instead of mocking it, since toString() is final
+ // Parse the serverAddress string to extract hostname and port
+ String[] parts = serverAddress.split(":");
+ String hostname = parts[0];
+ int port = parts.length > 1 ? Integer.parseInt(parts[1]) : 16020;
+ org.apache.hadoop.hbase.net.Address address =
+ org.apache.hadoop.hbase.net.Address.fromParts(hostname, port);
+
+ when(mockServerName.getAddress()).thenReturn(address);
+ when(mockRegionLocation.getServerName()).thenReturn(mockServerName);
+ return mockRegionLocation;
+ }
}
diff --git a/pom.xml b/pom.xml
index ae2d2c6d795..cc9925c627f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -136,6 +136,7 @@
4.13.1
2.1.12
1.15.11
+ 1.79