diff --git a/phoenix-core-client/pom.xml b/phoenix-core-client/pom.xml index a04148b0bf6..095b5dd5c74 100644 --- a/phoenix-core-client/pom.xml +++ b/phoenix-core-client/pom.xml @@ -253,7 +253,6 @@ org.bouncycastle bcprov-jdk18on - 1.79 diff --git a/phoenix-core-server/pom.xml b/phoenix-core-server/pom.xml index bb236a47f61..8eb3c38413f 100644 --- a/phoenix-core-server/pom.xml +++ b/phoenix-core-server/pom.xml @@ -176,7 +176,6 @@ org.bouncycastle bcprov-jdk18on - 1.79 org.xerial.snappy diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputSplit.java b/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputSplit.java index c71e6ca5d1c..c49ce59232d 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputSplit.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputSplit.java @@ -38,7 +38,7 @@ public class PhoenixInputSplit extends InputSplit implements Writable { private List scans; - private KeyRange keyRange; + private List keyRanges; private String regionLocation = null; private long splitSize = 0; @@ -68,13 +68,43 @@ public List getScans() { return scans; } + /** + * Returns the overall KeyRange spanning this split. For coalesced splits, spans from the first + * region's lower bound to the last region's upper bound. Computed on-demand from keyRanges. + * @return KeyRange spanning the entire split, or null if keyRanges is empty + */ public KeyRange getKeyRange() { - return keyRange; + if (keyRanges == null || keyRanges.isEmpty()) { + return null; + } + return KeyRange.getKeyRange(keyRanges.get(0).getLowerRange(), + keyRanges.get(keyRanges.size() - 1).getUpperRange()); + } + + /** + * Returns all KeyRanges for this split. For coalesced splits, returns multiple KeyRanges (one per + * region). For non-coalesced splits, returns a single-element list. + * @return List of KeyRanges, never null + */ + public List getKeyRanges() { + return keyRanges; + } + + /** + * Checks if this split is coalesced (contains multiple regions). + * @return true if split contains multiple regions + */ + public boolean isCoalesced() { + return keyRanges.size() > 1; } private void init() { - this.keyRange = - KeyRange.getKeyRange(scans.get(0).getStartRow(), scans.get(scans.size() - 1).getStopRow()); + // Initialize keyRanges from scans + this.keyRanges = Lists.newArrayListWithExpectedSize(scans.size()); + for (Scan scan : scans) { + KeyRange kr = KeyRange.getKeyRange(scan.getStartRow(), scan.getStopRow()); + this.keyRanges.add(kr); + } } @Override @@ -126,7 +156,8 @@ public String[] getLocations() throws IOException, InterruptedException { public int hashCode() { final int prime = 31; int result = 1; - result = prime * result + keyRange.hashCode(); + KeyRange range = getKeyRange(); + result = prime * result + (range == null ? 0 : range.hashCode()); return result; } @@ -142,11 +173,13 @@ public boolean equals(Object obj) { return false; } PhoenixInputSplit other = (PhoenixInputSplit) obj; - if (keyRange == null) { - if (other.keyRange != null) { + KeyRange thisRange = getKeyRange(); + KeyRange otherRange = other.getKeyRange(); + if (thisRange == null) { + if (otherRange != null) { return false; } - } else if (!keyRange.equals(other.keyRange)) { + } else if (!thisRange.equals(otherRange)) { return false; } return true; diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableInputFormat.java b/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableInputFormat.java index d717c3f7bbf..85117828171 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableInputFormat.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableInputFormat.java @@ -21,9 +21,13 @@ import java.sql.Connection; import java.sql.SQLException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.InputSplit; @@ -31,8 +35,10 @@ import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.db.DBWritable; +import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.mapreduce.util.ConnectionUtil; import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; +import org.apache.phoenix.query.ConnectionQueryServices; import org.apache.phoenix.query.KeyRange; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -94,15 +100,31 @@ public List getSplits(JobContext context) throws IOException, Interr } catch (SQLException e) { throw new RuntimeException(e); } - if (completedRegions.isEmpty()) { - LOGGER.info("No completed regions for table {} - processing all {} splits", tableName, - allSplits.size()); - return allSplits; - } List unprocessedSplits = filterCompletedSplits(allSplits, completedRegions); LOGGER.info("Found {} completed mapper regions for table {}, {} unprocessed splits remaining", completedRegions.size(), tableName, unprocessedSplits.size()); + + boolean enableSplitCoalescing = + conf.getBoolean(PhoenixSyncTableTool.PHOENIX_SYNC_TABLE_SPLIT_COALESCING, + PhoenixSyncTableTool.DEFAULT_PHOENIX_SYNC_TABLE_SPLIT_COALESCING); + LOGGER.info("Split coalescing enabled: {}, for table {}", enableSplitCoalescing, tableName); + + if (enableSplitCoalescing && unprocessedSplits.size() > 1) { + try (Connection conn = ConnectionUtil.getInputConnection(conf)) { + PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class); + byte[] physicalTableName = pConn.getTable(tableName).getPhysicalName().getBytes(); + List coalescedSplits = + coalesceSplits(unprocessedSplits, pConn.getQueryServices(), physicalTableName); + LOGGER.info("Split coalescing: {} unprocessed splits {} coalesced splits for table {}", + unprocessedSplits.size(), coalescedSplits.size(), tableName); + return coalescedSplits; + } catch (Exception e) { + throw new IOException(String.format("Failed to coalesce splits for table %s. " + + "Split coalescing is enabled but failed due to: %s.", tableName, e.getMessage()), e); + } + } + return unprocessedSplits; } @@ -133,6 +155,9 @@ private List queryCompletedMapperRegions(Configuration conf, String ta */ List filterCompletedSplits(List allSplits, List completedRegions) { + if (completedRegions.isEmpty()) { + return allSplits; + } allSplits.sort((s1, s2) -> { PhoenixInputSplit ps1 = (PhoenixInputSplit) s1; PhoenixInputSplit ps2 = (PhoenixInputSplit) s2; @@ -211,4 +236,97 @@ List filterCompletedSplits(List allSplits, } return unprocessedSplits; } + + /** + * Coalesces multiple region splits from the same RegionServer into single InputSplits. All + * regions from the same server are coalesced into one split, regardless of count or size. This + * reduces mapper count and avoids hot spotting when many concurrent mappers hit the same server. + * @param unprocessedSplits Splits remaining after filtering completed regions + * @param queryServices ConnectionQueryServices for querying region locations + * @param physicalTableName Physical HBase table name + * @return Coalesced splits with all regions per server combined into one split + */ + List coalesceSplits(List unprocessedSplits, + ConnectionQueryServices queryServices, byte[] physicalTableName) + throws IOException, InterruptedException, SQLException { + // Group splits by RegionServer location + Map> splitsByServer = + groupSplitsByServer(unprocessedSplits, queryServices, physicalTableName); + + List coalescedSplits = new ArrayList<>(); + + // For each RegionServer, create one coalesced split with ALL regions from that server + for (Map.Entry> entry : splitsByServer.entrySet()) { + String serverName = entry.getKey(); + List serverSplits = entry.getValue(); + + // Sort splits by start key for sequential processing + serverSplits.sort((s1, s2) -> Bytes.compareTo(s1.getKeyRange().getLowerRange(), + s2.getKeyRange().getLowerRange())); + // Create single coalesced split with ALL regions from this server + coalescedSplits.add(createCoalescedSplit(serverSplits, serverName)); + } + + return coalescedSplits; + } + + /** + * Groups splits by RegionServer location for locality-aware coalescing. Uses + * ConnectionQueryServices to determine which server hosts each region. + * @param splits List of splits to group + * @param queryServices ConnectionQueryServices for querying region locations + * @param physicalTableName Physical HBase table name + * @return Map of server name to list of splits hosted on that server + */ + private Map> groupSplitsByServer(List splits, + ConnectionQueryServices queryServices, byte[] physicalTableName) + throws IOException, SQLException { + Map> splitsByServer = new HashMap<>(); + for (InputSplit split : splits) { + PhoenixInputSplit pSplit = (PhoenixInputSplit) split; + KeyRange keyRange = pSplit.getKeyRange(); + HRegionLocation regionLocation = + queryServices.getTableRegionLocation(physicalTableName, keyRange.getLowerRange()); + if (regionLocation == null) { + throw new IOException("Could not determine region location for key: " + + Bytes.toStringBinary(keyRange.getLowerRange())); + } + if (regionLocation.getServerName() == null) { + throw new IOException("Could not determine server name for region at key: " + + Bytes.toStringBinary(keyRange.getLowerRange())); + } + String serverName = regionLocation.getServerName().getAddress().toString(); + splitsByServer.computeIfAbsent(serverName, k -> new ArrayList<>()).add(pSplit); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Split {} assigned to server {}", + Bytes.toStringBinary(keyRange.getLowerRange()), serverName); + } + } + + return splitsByServer; + } + + /** + * Creates a coalesced PhoenixInputSplit containing multiple regions. Combines scans and KeyRanges + * from individual splits into a single split. + * @param splits List of splits to coalesce (from same RegionServer) + * @param serverLocation RegionServer location for data locality + * @return Coalesced PhoenixInputSplit + */ + private PhoenixInputSplit createCoalescedSplit(List splits, + String serverLocation) throws IOException, InterruptedException { + + List allScans = new ArrayList<>(); + long totalSize = 0; + // Extract all scans from individual splits + for (PhoenixInputSplit split : splits) { + allScans.addAll(split.getScans()); + totalSize += split.getLength(); + } + + LOGGER.info("Created coalesced split with {} regions, {} MB from server {}", splits.size(), + totalSize / (1024 * 1024), serverLocation); + // Create a new PhoenixInputSplit, keyRanges will be derived from scans in init() + return new PhoenixInputSplit(allScans, totalSize, serverLocation); + } } diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableMapper.java b/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableMapper.java index acaac01fc3c..30457641236 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableMapper.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableMapper.java @@ -31,7 +31,6 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; @@ -45,7 +44,6 @@ import org.apache.phoenix.mapreduce.util.ConnectionUtil; import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; import org.apache.phoenix.query.KeyRange; -import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.util.MetaDataUtil; import org.apache.phoenix.util.PhoenixRuntime; @@ -88,16 +86,13 @@ public enum SyncCounters { private Connection globalConnection; private PTable pTable; private byte[] physicalTableName; - private byte[] mapperRegionStart; - private byte[] mapperRegionEnd; + private List regionKeyRanges; private PhoenixSyncTableOutputRepository syncTableOutputRepository; - private Timestamp mapperStartTime; @Override protected void setup(Context context) throws InterruptedException { try { super.setup(context); - mapperStartTime = new Timestamp(System.currentTimeMillis()); this.conf = context.getConfiguration(); tableName = PhoenixSyncTableTool.getPhoenixSyncTableName(conf); targetZkQuorum = PhoenixSyncTableTool.getPhoenixSyncTableTargetZkQuorum(conf); @@ -123,18 +118,25 @@ protected void setup(Context context) throws InterruptedException { } /** - * Extracts mapper region boundaries from the PhoenixInputSplit + * Extracts region key ranges from the PhoenixInputSplit. Handles both single-region splits and + * coalesced splits with multiple regions. */ private void extractRegionBoundariesFromSplit(Context context) { PhoenixInputSplit split = (PhoenixInputSplit) context.getInputSplit(); - KeyRange keyRange = split.getKeyRange(); - if (keyRange == null) { + regionKeyRanges = split.getKeyRanges(); + + if (regionKeyRanges == null || regionKeyRanges.isEmpty()) { throw new IllegalStateException(String.format( - "PhoenixInputSplit has no KeyRange for table: %s . Cannot determine region boundaries for sync operation.", + "PhoenixInputSplit has no KeyRanges for table: %s. Cannot determine region boundaries for sync operation.", tableName)); } - mapperRegionStart = keyRange.getLowerRange(); - mapperRegionEnd = keyRange.getUpperRange(); + + if (split.isCoalesced()) { + LOGGER.info("Mapper processing coalesced split with {} regions for table {}", + regionKeyRanges.size(), tableName); + } else { + LOGGER.info("Mapper processing single region split for table {}", tableName); + } } /** @@ -156,67 +158,23 @@ private Connection createGlobalConnection(Configuration conf) throws SQLExceptio } /** - * Processes a mapper region by comparing chunks between source and target clusters. Gets already - * processed chunks from checkpoint table, resumes from check pointed progress and records final - * status for chunks & mapper (VERIFIED/MISMATCHED). + * Processes mapper region(s) by comparing chunks between source and target clusters. For + * coalesced splits, processes each region sequentially. Gets already processed chunks from + * checkpoint table, resumes from check pointed progress and records final status for chunks & + * mapper (VERIFIED/MISMATCHED). */ @Override protected void map(NullWritable key, DBInputFormat.NullDBWritable value, Context context) throws IOException, InterruptedException { + context.getCounter(PhoenixJobCounters.INPUT_RECORDS).increment(1); try { - List processedChunks = - syncTableOutputRepository.getProcessedChunks(tableName, targetZkQuorum, fromTime, toTime, - tenantId, mapperRegionStart, mapperRegionEnd); - List unprocessedRanges = - calculateUnprocessedRanges(mapperRegionStart, mapperRegionEnd, processedChunks); - boolean isStartKeyInclusive = shouldStartKeyBeInclusive(mapperRegionStart, processedChunks); - for (KeyRange range : unprocessedRanges) { - processMapperRanges(range.getLowerRange(), range.getUpperRange(), isStartKeyInclusive, - context); - isStartKeyInclusive = false; - } - - long chunksMismatched = context.getCounter(SyncCounters.CHUNKS_MISMATCHED).getValue(); - long chunksVerified = context.getCounter(SyncCounters.CHUNKS_VERIFIED).getValue(); - long sourceRowsProcessed = context.getCounter(SyncCounters.SOURCE_ROWS_PROCESSED).getValue(); - long targetRowsProcessed = context.getCounter(SyncCounters.TARGET_ROWS_PROCESSED).getValue(); - Timestamp mapperEndTime = new Timestamp(System.currentTimeMillis()); - String counters = PhoenixSyncTableCheckpointOutputRow.CounterFormatter - .formatMapper(chunksVerified, chunksMismatched, sourceRowsProcessed, targetRowsProcessed); - if (sourceRowsProcessed > 0) { - if (chunksMismatched == 0) { - context.getCounter(SyncCounters.MAPPERS_VERIFIED).increment(1); - syncTableOutputRepository - .checkpointSyncTableResult(new PhoenixSyncTableCheckpointOutputRow.Builder() - .setTableName(tableName).setTargetCluster(targetZkQuorum) - .setType(PhoenixSyncTableCheckpointOutputRow.Type.REGION).setFromTime(fromTime) - .setToTime(toTime).setTenantId(tenantId).setIsDryRun(isDryRun) - .setStartRowKey(mapperRegionStart).setEndRowKey(mapperRegionEnd) - .setStatus(PhoenixSyncTableCheckpointOutputRow.Status.VERIFIED) - .setExecutionStartTime(mapperStartTime).setExecutionEndTime(mapperEndTime) - .setCounters(counters).build()); - LOGGER.info( - "PhoenixSyncTable mapper completed with verified: {} verified chunks, {} mismatched chunks", - chunksVerified, chunksMismatched); - } else { - context.getCounter(SyncCounters.MAPPERS_MISMATCHED).increment(1); - LOGGER.warn( - "PhoenixSyncTable mapper completed with mismatch: {} verified chunks, {} mismatched chunks", - chunksVerified, chunksMismatched); - syncTableOutputRepository - .checkpointSyncTableResult(new PhoenixSyncTableCheckpointOutputRow.Builder() - .setTableName(tableName).setTargetCluster(targetZkQuorum) - .setType(PhoenixSyncTableCheckpointOutputRow.Type.REGION).setFromTime(fromTime) - .setToTime(toTime).setTenantId(tenantId).setIsDryRun(isDryRun) - .setStartRowKey(mapperRegionStart).setEndRowKey(mapperRegionEnd) - .setStatus(PhoenixSyncTableCheckpointOutputRow.Status.MISMATCHED) - .setExecutionStartTime(mapperStartTime).setExecutionEndTime(mapperEndTime) - .setCounters(counters).build()); - } - } else { - LOGGER.info( - "No rows pending to process. All mapper region boundaries are covered for startKey:{}, endKey: {}", - mapperRegionStart, mapperRegionEnd); + // Process each region in the split (one or multiple for coalesced splits) + for (KeyRange keyRange : regionKeyRanges) { + byte[] regionStart = keyRange.getLowerRange(); + byte[] regionEnd = keyRange.getUpperRange(); + LOGGER.info("Processing region [{}, {}) from split for table {}", + Bytes.toStringBinary(regionStart), Bytes.toStringBinary(regionEnd), tableName); + processRegion(regionStart, regionEnd, context); } } catch (SQLException e) { tryClosingResources(); @@ -224,6 +182,124 @@ protected void map(NullWritable key, DBInputFormat.NullDBWritable value, Context } } + /** + * Processes a single region within a split (could be part of a coalesced split). + * @param regionStart Start key of the region + * @param regionEnd End key of the region + * @param context Mapper context + */ + private void processRegion(byte[] regionStart, byte[] regionEnd, Context context) + throws SQLException, IOException, InterruptedException { + + Timestamp regionStartTime = new Timestamp(System.currentTimeMillis()); + + // Get processed chunks for this specific region + List processedChunks = + syncTableOutputRepository.getProcessedChunks(tableName, targetZkQuorum, fromTime, toTime, + tenantId, regionStart, regionEnd); + + // Calculate unprocessed ranges within this region + List unprocessedRanges = + calculateUnprocessedRanges(regionStart, regionEnd, processedChunks); + + // Track counters before processing this region + long verifiedBefore = context.getCounter(SyncCounters.CHUNKS_VERIFIED).getValue(); + long mismatchedBefore = context.getCounter(SyncCounters.CHUNKS_MISMATCHED).getValue(); + long sourceRowsBefore = context.getCounter(SyncCounters.SOURCE_ROWS_PROCESSED).getValue(); + long targetRowsBefore = context.getCounter(SyncCounters.TARGET_ROWS_PROCESSED).getValue(); + + // Process all unprocessed ranges in this region + boolean isStartKeyInclusive = shouldStartKeyBeInclusive(regionStart, processedChunks); + for (KeyRange range : unprocessedRanges) { + processMapperRanges(range.getLowerRange(), range.getUpperRange(), isStartKeyInclusive, + context); + isStartKeyInclusive = false; + } + + // Calculate counters for this region only + long verifiedChunks = + context.getCounter(SyncCounters.CHUNKS_VERIFIED).getValue() - verifiedBefore; + long mismatchedChunks = + context.getCounter(SyncCounters.CHUNKS_MISMATCHED).getValue() - mismatchedBefore; + long sourceRowsProcessed = + context.getCounter(SyncCounters.SOURCE_ROWS_PROCESSED).getValue() - sourceRowsBefore; + long targetRowsProcessed = + context.getCounter(SyncCounters.TARGET_ROWS_PROCESSED).getValue() - targetRowsBefore; + + Timestamp regionEndTime = new Timestamp(System.currentTimeMillis()); + String counters = PhoenixSyncTableCheckpointOutputRow.CounterFormatter + .formatMapper(verifiedChunks, mismatchedChunks, sourceRowsProcessed, targetRowsProcessed); + if (sourceRowsProcessed > 0) { + recordRegionCompletion(regionStart, regionEnd, regionStartTime, regionEndTime, verifiedChunks, + mismatchedChunks, counters, context); + } else { + LOGGER.info( + "No rows pending to process. All region boundaries are covered for startKey:{}, endKey: {}", + Bytes.toStringBinary(regionStart), Bytes.toStringBinary(regionEnd)); + } + } + + /** + * Records region completion by updating counters, recording checkpoint, and logging result. + * Consolidates all region completion logic to eliminate duplication. + * @param regionStart Region start key + * @param regionEnd Region end key + * @param regionStartTime Region processing start time + * @param regionEndTime Region processing end time + * @param verifiedChunks Number of verified chunks + * @param mismatchedChunks Number of mismatched chunks + * @param counters Formatted counter string + * @param context Mapper context + */ + private void recordRegionCompletion(byte[] regionStart, byte[] regionEnd, + Timestamp regionStartTime, Timestamp regionEndTime, long verifiedChunks, long mismatchedChunks, + String counters, Context context) throws SQLException { + + boolean isVerified = (mismatchedChunks == 0); + PhoenixSyncTableCheckpointOutputRow.Status status = isVerified + ? PhoenixSyncTableCheckpointOutputRow.Status.VERIFIED + : PhoenixSyncTableCheckpointOutputRow.Status.MISMATCHED; + + context.getCounter(isVerified ? SyncCounters.MAPPERS_VERIFIED : SyncCounters.MAPPERS_MISMATCHED) + .increment(1); + + recordRegionCheckpoint(regionStart, regionEnd, status, regionStartTime, regionEndTime, + counters); + + String logMessage = String.format( + "PhoenixSyncTable region [%s, %s) completed with %s: %d verified chunks, %d mismatched chunks", + Bytes.toStringBinary(regionStart), Bytes.toStringBinary(regionEnd), + isVerified ? "verified" : "mismatch", verifiedChunks, mismatchedChunks); + + if (isVerified) { + LOGGER.info(logMessage); + } else { + LOGGER.warn(logMessage); + } + } + + /** + * Records a region checkpoint to the checkpoint table. + * @param regionStart Region start key + * @param regionEnd Region end key + * @param status Status (VERIFIED or MISMATCHED) + * @param regionStartTime Region processing start time + * @param regionEndTime Region processing end time + * @param counters Formatted counter string + */ + private void recordRegionCheckpoint(byte[] regionStart, byte[] regionEnd, + PhoenixSyncTableCheckpointOutputRow.Status status, Timestamp regionStartTime, + Timestamp regionEndTime, String counters) throws SQLException { + + syncTableOutputRepository + .checkpointSyncTableResult(new PhoenixSyncTableCheckpointOutputRow.Builder() + .setTableName(tableName).setTargetCluster(targetZkQuorum) + .setType(PhoenixSyncTableCheckpointOutputRow.Type.REGION).setFromTime(fromTime) + .setToTime(toTime).setTenantId(tenantId).setIsDryRun(isDryRun).setStartRowKey(regionStart) + .setEndRowKey(regionEnd).setStatus(status).setExecutionStartTime(regionStartTime) + .setExecutionEndTime(regionEndTime).setCounters(counters).build()); + } + /** * Processes a chunk range by comparing source and target cluster data. Source chunking: Breaks * data into size-based chunks within given mapper region boundary. Target chunking: Follows @@ -408,13 +484,7 @@ private ChunkScannerContext createChunkScanner(Connection conn, byte[] startKey, scan.setAttribute(BaseScannerRegionObserverConstants.SYNC_TABLE_CHUNK_SIZE_BYTES, Bytes.toBytes(chunkSizeBytes)); } - // Use the half of the HBase RPC timeout value as the server page size to make sure - // that the HBase region server will be able to send a heartbeat message to the - // client before the client times out. - long syncTablePageTimeoutMs = (long) (conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, - QueryServicesOptions.DEFAULT_SYNC_TABLE_RPC_TIMEOUT) * 0.5); - scan.setAttribute(BaseScannerRegionObserverConstants.SERVER_PAGE_SIZE_MS, - Bytes.toBytes(syncTablePageTimeoutMs)); + ScanUtil.setScanAttributeForPaging(scan, phoenixConn); ResultScanner scanner = hTable.getScanner(scan); return new ChunkScannerContext(hTable, scanner); } @@ -453,14 +523,8 @@ private ChunkInfo parseChunkInfo(Result result) { private void handleVerifiedChunk(ChunkInfo sourceChunk, Context context, String counters) throws SQLException { - syncTableOutputRepository.checkpointSyncTableResult( - new PhoenixSyncTableCheckpointOutputRow.Builder().setTableName(tableName) - .setTargetCluster(targetZkQuorum).setType(PhoenixSyncTableCheckpointOutputRow.Type.CHUNK) - .setFromTime(fromTime).setToTime(toTime).setTenantId(tenantId).setIsDryRun(isDryRun) - .setStartRowKey(sourceChunk.startKey).setEndRowKey(sourceChunk.endKey) - .setStatus(PhoenixSyncTableCheckpointOutputRow.Status.VERIFIED) - .setExecutionStartTime(sourceChunk.executionStartTime) - .setExecutionEndTime(sourceChunk.executionEndTime).setCounters(counters).build()); + recordChunkCheckpoint(sourceChunk, PhoenixSyncTableCheckpointOutputRow.Status.VERIFIED, + counters); context.getCounter(SyncCounters.CHUNKS_VERIFIED).increment(1); } @@ -468,16 +532,27 @@ private void handleMismatchedChunk(ChunkInfo sourceChunk, Context context, Strin throws SQLException { LOGGER.warn("Chunk mismatch detected for table: {}, with startKey: {}, endKey {}", tableName, Bytes.toStringBinary(sourceChunk.startKey), Bytes.toStringBinary(sourceChunk.endKey)); + recordChunkCheckpoint(sourceChunk, PhoenixSyncTableCheckpointOutputRow.Status.MISMATCHED, + counters); + context.getCounter(SyncCounters.CHUNKS_MISMATCHED).increment(1); + } + + /** + * Records a chunk checkpoint to the checkpoint table. + * @param sourceChunk Chunk information + * @param status Status (VERIFIED or MISMATCHED) + * @param counters Formatted counter string + */ + private void recordChunkCheckpoint(ChunkInfo sourceChunk, + PhoenixSyncTableCheckpointOutputRow.Status status, String counters) throws SQLException { + syncTableOutputRepository.checkpointSyncTableResult( new PhoenixSyncTableCheckpointOutputRow.Builder().setTableName(tableName) .setTargetCluster(targetZkQuorum).setType(PhoenixSyncTableCheckpointOutputRow.Type.CHUNK) .setFromTime(fromTime).setToTime(toTime).setTenantId(tenantId).setIsDryRun(isDryRun) - .setStartRowKey(sourceChunk.startKey).setEndRowKey(sourceChunk.endKey) - .setStatus(PhoenixSyncTableCheckpointOutputRow.Status.MISMATCHED) + .setStartRowKey(sourceChunk.startKey).setEndRowKey(sourceChunk.endKey).setStatus(status) .setExecutionStartTime(sourceChunk.executionStartTime) .setExecutionEndTime(sourceChunk.executionEndTime).setCounters(counters).build()); - - context.getCounter(SyncCounters.CHUNKS_MISMATCHED).increment(1); } /** diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableTool.java b/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableTool.java index 90963c37a33..80eacde25e7 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableTool.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableTool.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobCounter; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.lib.db.DBInputFormat; import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; @@ -115,6 +116,8 @@ public class PhoenixSyncTableTool extends Configured implements Tool { private static final Option READ_ALL_VERSIONS_OPTION = new Option("rav", "read-all-versions", false, "Enable reading all cell versions (optional, disabled by default, reads only latest version)"); + private static final Option COALESCE_SPLIT_OPTION = new Option("coal", "coalesce-split", false, + "Enable split coalescing to reduce mapper count (optional, disabled by default)"); private static final Option HELP_OPTION = new Option("h", "help", false, "Help"); public static final String PHOENIX_SYNC_TABLE_NAME = "phoenix.sync.table.table.name"; @@ -124,7 +127,10 @@ public class PhoenixSyncTableTool extends Configured implements Tool { public static final String PHOENIX_SYNC_TABLE_DRY_RUN = "phoenix.sync.table.dry.run"; public static final String PHOENIX_SYNC_TABLE_CHUNK_SIZE_BYTES = "phoenix.sync.table.chunk.size.bytes"; - public static final long DEFAULT_PHOENIX_SYNC_TABLE_CHUNK_SIZE_BYTES = 1024 * 1024 * 1024; // 1GB + public static final long DEFAULT_PHOENIX_SYNC_TABLE_CHUNK_SIZE_BYTES = 1024 * 1024 * 1024; // 1 GB + public static final String PHOENIX_SYNC_TABLE_SPLIT_COALESCING = + "phoenix.sync.table.split.coalescing"; + public static final boolean DEFAULT_PHOENIX_SYNC_TABLE_SPLIT_COALESCING = false; public static final String PHOENIX_SYNC_TABLE_RAW_SCAN = "phoenix.sync.table.raw.scan"; public static final String PHOENIX_SYNC_TABLE_READ_ALL_VERSIONS = "phoenix.sync.table.read.all.versions"; @@ -140,6 +146,7 @@ public class PhoenixSyncTableTool extends Configured implements Tool { private String tenantId; private boolean isRawScan = false; private boolean isReadAllVersions = false; + private boolean isCoalesceSplit = false; private String qTable; private String qSchemaName; @@ -218,6 +225,7 @@ private void setPhoenixSyncTableToolConfiguration(Configuration configuration) { setPhoenixSyncTableDryRun(configuration, isDryRun); setPhoenixSyncTableRawScan(configuration, isRawScan); setPhoenixSyncTableReadAllVersions(configuration, isReadAllVersions); + setPhoenixSyncTableSplitCoalescing(configuration, isCoalesceSplit); PhoenixConfigurationUtil.setSplitByStats(configuration, false); if (chunkSizeBytes != null) { setPhoenixSyncTableChunkSizeBytes(configuration, chunkSizeBytes); @@ -292,6 +300,7 @@ private Options getOptions() { options.addOption(TENANT_ID_OPTION); options.addOption(RAW_SCAN_OPTION); options.addOption(READ_ALL_VERSIONS_OPTION); + options.addOption(COALESCE_SPLIT_OPTION); options.addOption(HELP_OPTION); return options; } @@ -307,7 +316,7 @@ private void printHelpAndExit(Options options, int exitCode) { formatter.printHelp(cmdLineSyntax, "Synchronize a Phoenix table between source and target clusters", options, "\nExample:\n" + cmdLineSyntax + " \\\n" + " --table-name MY_TABLE \\\n" - + " --target-cluster :2181 \\\n" + " --dry-run\n", + + " --target-cluster :2181 \\\n" + " --dry-run \\\n" + " --coalesce-split\n", true); System.exit(exitCode); } @@ -344,15 +353,16 @@ public void populateSyncTableToolAttributes(CommandLine cmdLine) { isForeground = cmdLine.hasOption(RUN_FOREGROUND_OPTION.getOpt()); isRawScan = cmdLine.hasOption(RAW_SCAN_OPTION.getOpt()); isReadAllVersions = cmdLine.hasOption(READ_ALL_VERSIONS_OPTION.getOpt()); + isCoalesceSplit = cmdLine.hasOption(COALESCE_SPLIT_OPTION.getOpt()); qTable = SchemaUtil.getQualifiedTableName(schemaName, tableName); qSchemaName = SchemaUtil.normalizeIdentifier(schemaName); PhoenixMapReduceUtil.validateTimeRange(startTime, endTime, qTable); LOGGER.info( "PhoenixSyncTableTool configured - Table: {}, Schema: {}, Target: {}, " + "StartTime: {}, EndTime: {}, DryRun: {}, ChunkSize: {}, Foreground: {}, TenantId: {}, " - + "RawScan: {}, ReadAllVersions: {}", + + "RawScan: {}, ReadAllVersions: {}, CoalesceSplit: {}", qTable, qSchemaName, targetZkQuorum, startTime, endTime, isDryRun, chunkSizeBytes, - isForeground, tenantId, isRawScan, isReadAllVersions); + isForeground, tenantId, isRawScan, isReadAllVersions, isCoalesceSplit); } /** @@ -396,6 +406,7 @@ private boolean submitPhoenixSyncTableJob() throws Exception { } Counters counters = job.getCounters(); if (counters != null) { + long taskCreated = counters.findCounter(JobCounter.TOTAL_LAUNCHED_MAPS).getValue(); long verifiedMappers = counters.findCounter(PhoenixSyncTableMapper.SyncCounters.MAPPERS_VERIFIED).getValue(); long mismatchedMappers = @@ -409,11 +420,11 @@ private boolean submitPhoenixSyncTableJob() throws Exception { long targetRowsProcessed = counters.findCounter(PhoenixSyncTableMapper.SyncCounters.TARGET_ROWS_PROCESSED).getValue(); LOGGER.info( - "PhoenixSyncTable job completed, gathered counters are \n Verified Mappers: {}, \n" + "PhoenixSyncTable job completed, gathered counters are \n Task Created: {}, \n Verified Mappers: {}, \n" + "Mismatched Mappers: {}, \n Chunks Verified: {}, \n" + "Chunks Mismatched: {}, \n Source Rows Processed: {}, \n Target Rows Processed: {}", - verifiedMappers, mismatchedMappers, chunksVerified, chunksMismatched, sourceRowsProcessed, - targetRowsProcessed); + taskCreated, verifiedMappers, mismatchedMappers, chunksVerified, chunksMismatched, + sourceRowsProcessed, targetRowsProcessed); } else { LOGGER.warn("Unable to retrieve job counters for table {} - job may have failed " + "during initialization", qTable); @@ -535,6 +546,18 @@ public static boolean getPhoenixSyncTableReadAllVersions(Configuration conf) { return conf.getBoolean(PHOENIX_SYNC_TABLE_READ_ALL_VERSIONS, false); } + public static void setPhoenixSyncTableSplitCoalescing(Configuration conf, + boolean splitCoalescing) { + Preconditions.checkNotNull(conf); + conf.setBoolean(PHOENIX_SYNC_TABLE_SPLIT_COALESCING, splitCoalescing); + } + + public static boolean getPhoenixSyncTableSplitCoalescing(Configuration conf) { + Preconditions.checkNotNull(conf); + return conf.getBoolean(PHOENIX_SYNC_TABLE_SPLIT_COALESCING, + DEFAULT_PHOENIX_SYNC_TABLE_SPLIT_COALESCING); + } + public Job getJob() { return job; } diff --git a/phoenix-core/pom.xml b/phoenix-core/pom.xml index 8c242392170..6067ef4eb71 100644 --- a/phoenix-core/pom.xml +++ b/phoenix-core/pom.xml @@ -407,7 +407,6 @@ org.bouncycastle bcprov-jdk18on - 1.79 test diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixSyncTableToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixSyncTableToolIT.java index fc277131ab1..0661973dbe7 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixSyncTableToolIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixSyncTableToolIT.java @@ -54,6 +54,7 @@ import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.TaskCounter; import org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.HBaseTestingUtilityPair; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixDriver; @@ -159,6 +160,7 @@ public void testSyncTableValidateWithDataDifference() throws Exception { validateSyncCounters(counters, 10, 10, 1, 3); validateMapperCounters(counters, 1, 3); + assertEquals("Expected 4 mapper task to be created", 4, counters.taskCreated); List checkpointEntries = queryCheckpointTable(sourceConnection, uniqueTableName, targetZkQuorum, null); @@ -205,6 +207,8 @@ public void testSyncTableWithDeletedRowsOnTarget() throws Exception { validateSyncCounters(counters, 10, 7, 7, 3); validateMapperCounters(counters, 1, 3); + assertEquals("Should have only 1 Mapper task created with coalescing", 4, counters.taskCreated); + } @Test @@ -1244,10 +1248,8 @@ public void testSyncTableValidateWithPagingTimeout() throws Exception { // Configure paging with aggressive timeouts to force mid-chunk timeouts Configuration conf = new Configuration(CLUSTERS.getHBaseCluster1().getConfiguration()); - - long aggressiveRpcTimeout = 2L; - conf.setLong(QueryServices.SYNC_TABLE_RPC_TIMEOUT_ATTRIB, aggressiveRpcTimeout); - conf.setLong(HConstants.HBASE_RPC_TIMEOUT_KEY, aggressiveRpcTimeout); + conf.setBoolean(QueryServices.PHOENIX_SERVER_PAGING_ENABLED_ATTRIB, true); + conf.setLong(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, 1); int chunkSize = 10240; @@ -1307,15 +1309,10 @@ public void testSyncTableValidateWithPagingTimeoutWithSplits() throws Exception // Configure paging with aggressive timeouts to force mid-chunk timeouts Configuration conf = new Configuration(CLUSTERS.getHBaseCluster1().getConfiguration()); - - // Enable server-side paging conf.setBoolean(QueryServices.PHOENIX_SERVER_PAGING_ENABLED_ATTRIB, true); - // Set extremely short rpc timeout to force frequent paging - long aggressiveRpcTimeout = 1L; // 1ms RPC timeout - conf.setLong(QueryServices.SYNC_TABLE_RPC_TIMEOUT_ATTRIB, aggressiveRpcTimeout); - conf.setLong(HConstants.HBASE_RPC_TIMEOUT_KEY, aggressiveRpcTimeout); + conf.setLong(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, 1); - int chunkSize = 102400; // 100KB + int chunkSize = 10240; // Create a thread that will perform splits on source cluster during sync Thread sourceSplitThread = new Thread(() -> { @@ -1751,6 +1748,29 @@ public void testSyncTableWithMultipleVersionAndCompactionOnTarget() throws Excep validateMapperCounters(counters3, 3, 1); } + @Test + public void testSyncTableValidateWithSplitCoalescing() throws Exception { + setupStandardTestWithReplication(uniqueTableName, 1, 10); + + introduceAndVerifyTargetDifferences(uniqueTableName); + + // Enable split coalescing via command-line parameter, all regions will be coalesced into one + // mapper + Job job = runSyncTool(uniqueTableName, "--coalesce-split"); + SyncCountersResult counters = getSyncCounters(job); + + assertEquals("Should have only 1 Mapper task created with coalescing", 1, counters.taskCreated); + + validateSyncCounters(counters, 10, 10, 7, 3); + validateMapperCounters(counters, 1, 3); + + // Verify checkpoint entries are created correctly + List checkpointEntries = + queryCheckpointTable(sourceConnection, uniqueTableName, targetZkQuorum, null); + validateCheckpointEntries(checkpointEntries, uniqueTableName, targetZkQuorum, 10, 10, 7, 3, 4, + 3, null); + } + /** * Helper class to hold separated mapper and chunk entries. */ @@ -2327,6 +2347,7 @@ private void splitTableAt(Connection conn, String tableName, int splitId) { LOGGER.info("Split completed for table {} at split point {} (bytes: {})", tableName, splitId, Bytes.toStringBinary(splitPoint)); } catch (Exception e) { + // Ignore split failures - they don't affect the test's main goal LOGGER.warn("Failed to split table {} at split point {}: {}", tableName, splitId, e.getMessage()); } @@ -2576,6 +2597,7 @@ private static class SyncCountersResult { public final long chunksVerified; public final long mappersVerified; public final long mappersMismatched; + public final long taskCreated; SyncCountersResult(Counters counters) { this.sourceRowsProcessed = @@ -2586,6 +2608,7 @@ private static class SyncCountersResult { this.chunksVerified = counters.findCounter(SyncCounters.CHUNKS_VERIFIED).getValue(); this.mappersVerified = counters.findCounter(SyncCounters.MAPPERS_VERIFIED).getValue(); this.mappersMismatched = counters.findCounter(SyncCounters.MAPPERS_MISMATCHED).getValue(); + this.taskCreated = counters.findCounter(TaskCounter.MAP_INPUT_RECORDS).getValue(); } public void logCounters(String testName) { diff --git a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/PhoenixInputSplitTest.java b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/PhoenixInputSplitTest.java new file mode 100644 index 00000000000..d780e25b874 --- /dev/null +++ b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/PhoenixInputSplitTest.java @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.query.KeyRange; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class PhoenixInputSplitTest { + + /** + * Helper method to create a Scan with given row boundaries. + */ + private Scan createScan(byte[] startRow, byte[] stopRow) { + Scan scan = new Scan(); + scan.withStartRow(startRow, true); + scan.withStopRow(stopRow, false); + return scan; + } + + /** + * Helper method to serialize and deserialize a PhoenixInputSplit. + */ + private PhoenixInputSplit serializeAndDeserialize(PhoenixInputSplit split) throws IOException { + // Serialize + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutput out = new DataOutputStream(baos); + split.write(out); + + // Deserialize + ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); + DataInput in = new DataInputStream(bais); + PhoenixInputSplit deserialized = new PhoenixInputSplit(); + deserialized.readFields(in); + + return deserialized; + } + + @Test + public void testStandardConstructorWithSingleScan() { + List scans = new ArrayList<>(); + scans.add(createScan(Bytes.toBytes("a"), Bytes.toBytes("d"))); + + PhoenixInputSplit split = new PhoenixInputSplit(scans); + + assertFalse("Should not be coalesced with single scan", split.isCoalesced()); + assertEquals("Should have 1 keyRange", 1, split.getKeyRanges().size()); + assertNotNull("KeyRange should be initialized", split.getKeyRange()); + assertTrue("KeyRange should span scan boundaries", + Bytes.equals(Bytes.toBytes("a"), split.getKeyRange().getLowerRange())); + assertTrue("KeyRange should span scan boundaries", + Bytes.equals(Bytes.toBytes("d"), split.getKeyRange().getUpperRange())); + } + + @Test(expected = NullPointerException.class) + public void testStandardConstructorWithNullScans() { + new PhoenixInputSplit(null); + } + + @Test(expected = IllegalStateException.class) + public void testStandardConstructorWithEmptyScans() { + new PhoenixInputSplit(Collections.emptyList()); + } + + @Test + public void testCoalescingConstructorWithMultipleRegions() + throws IOException, InterruptedException { + List scans = new ArrayList<>(); + scans.add(createScan(Bytes.toBytes("a"), Bytes.toBytes("d"))); + scans.add(createScan(Bytes.toBytes("d"), Bytes.toBytes("g"))); + scans.add(createScan(Bytes.toBytes("g"), Bytes.toBytes("j"))); + + long splitSize = 3072; + String regionLocation = "server1:16020"; + + // KeyRanges are now automatically derived from scans + PhoenixInputSplit split = new PhoenixInputSplit(scans, splitSize, regionLocation); + + assertTrue("Should be coalesced with multiple regions", split.isCoalesced()); + assertEquals("Should have 3 keyRanges (derived from scans)", 3, split.getKeyRanges().size()); + assertEquals("Split size should match", splitSize, split.getLength()); + assertArrayEquals("Region location should match", new String[] { regionLocation }, + split.getLocations()); + + // Verify keyRanges were derived correctly from scans + List keyRanges = split.getKeyRanges(); + assertTrue("First keyRange should match first scan", + Bytes.equals(Bytes.toBytes("a"), keyRanges.get(0).getLowerRange())); + assertTrue("First keyRange should match first scan", + Bytes.equals(Bytes.toBytes("d"), keyRanges.get(0).getUpperRange())); + assertTrue("Second keyRange should match second scan", + Bytes.equals(Bytes.toBytes("d"), keyRanges.get(1).getLowerRange())); + assertTrue("Second keyRange should match second scan", + Bytes.equals(Bytes.toBytes("g"), keyRanges.get(1).getUpperRange())); + assertTrue("Third keyRange should match third scan", + Bytes.equals(Bytes.toBytes("g"), keyRanges.get(2).getLowerRange())); + assertTrue("Third keyRange should match third scan", + Bytes.equals(Bytes.toBytes("j"), keyRanges.get(2).getUpperRange())); + } + + @Test + public void testSerializationWithSingleRegion() throws IOException, InterruptedException { + List scans = new ArrayList<>(); + scans.add(createScan(Bytes.toBytes("a"), Bytes.toBytes("d"))); + + PhoenixInputSplit original = new PhoenixInputSplit(scans, 1024, "server1:16020"); + + PhoenixInputSplit deserialized = serializeAndDeserialize(original); + + assertFalse("Should not be coalesced after deserialization", deserialized.isCoalesced()); + assertEquals("Should have 1 keyRange", 1, deserialized.getKeyRanges().size()); + assertEquals("Split size should match", original.getLength(), deserialized.getLength()); + assertArrayEquals("Region location should match", original.getLocations(), + deserialized.getLocations()); + assertTrue("KeyRange should match", Bytes.equals(original.getKeyRange().getLowerRange(), + deserialized.getKeyRange().getLowerRange())); + assertTrue("KeyRange should match", Bytes.equals(original.getKeyRange().getUpperRange(), + deserialized.getKeyRange().getUpperRange())); + } + + @Test + public void testSerializationWithCoalescedSplit() throws IOException, InterruptedException { + List scans = new ArrayList<>(); + scans.add(createScan(Bytes.toBytes("a"), Bytes.toBytes("d"))); + scans.add(createScan(Bytes.toBytes("d"), Bytes.toBytes("g"))); + scans.add(createScan(Bytes.toBytes("g"), Bytes.toBytes("j"))); + + // KeyRanges are now automatically derived from scans + PhoenixInputSplit original = new PhoenixInputSplit(scans, 3072, "server1:16020"); + + PhoenixInputSplit deserialized = serializeAndDeserialize(original); + + assertTrue("Should be coalesced after deserialization", deserialized.isCoalesced()); + assertEquals("Should have 3 keyRanges", 3, deserialized.getKeyRanges().size()); + assertEquals("Split size should match", original.getLength(), deserialized.getLength()); + assertArrayEquals("Region location should match", original.getLocations(), + deserialized.getLocations()); + + // Verify all keyRanges are preserved (derived from scans) + List originalKeyRanges = original.getKeyRanges(); + List deserializedKeyRanges = deserialized.getKeyRanges(); + for (int i = 0; i < originalKeyRanges.size(); i++) { + assertTrue("KeyRange " + i + " should match", Bytes.equals( + originalKeyRanges.get(i).getLowerRange(), deserializedKeyRanges.get(i).getLowerRange())); + assertTrue("KeyRange " + i + " should match", Bytes.equals( + originalKeyRanges.get(i).getUpperRange(), deserializedKeyRanges.get(i).getUpperRange())); + } + } + + @Test + public void testGetLocations() throws IOException, InterruptedException { + List scans = + Collections.singletonList(createScan(Bytes.toBytes("a"), Bytes.toBytes("d"))); + + // Test with null regionLocation + PhoenixInputSplit split1 = new PhoenixInputSplit(scans, 1024, null); + assertArrayEquals("Should return empty array for null location", new String[] {}, + split1.getLocations()); + + // Test with valid regionLocation + PhoenixInputSplit split2 = new PhoenixInputSplit(scans, 1024, "server1:16020"); + assertArrayEquals("Should return array with server location", new String[] { "server1:16020" }, + split2.getLocations()); + } +} diff --git a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/PhoenixSyncTableInputFormatTest.java b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/PhoenixSyncTableInputFormatTest.java index 15e643feaf0..8039ee57aef 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/PhoenixSyncTableInputFormatTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/PhoenixSyncTableInputFormatTest.java @@ -18,27 +18,47 @@ package org.apache.phoenix.mapreduce; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import java.io.IOException; +import java.sql.SQLException; import java.util.ArrayList; import java.util.Collections; import java.util.List; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.phoenix.query.ConnectionQueryServices; import org.apache.phoenix.query.KeyRange; +import org.junit.Before; import org.junit.Test; /** * Unit tests for PhoenixSyncTableInputFormat. Tests various scenarios of filtering completed splits + * and split coalescing functionality. */ public class PhoenixSyncTableInputFormatTest { private PhoenixSyncTableInputFormat inputFormat = new PhoenixSyncTableInputFormat(); + private ConnectionQueryServices mockQueryServices; + private byte[] physicalTableName = Bytes.toBytes("TEST_TABLE"); + + @Before + public void setup() throws Exception { + mockQueryServices = mock(ConnectionQueryServices.class); + } + /** * Helper method to create a PhoenixInputSplit with given key range boundaries. */ @@ -280,4 +300,211 @@ public void testCreateRecordReaderReturnsNoOpReader() { assertTrue("Should return a PhoenixNoOpSingleRecordReader", reader instanceof PhoenixNoOpSingleRecordReader); } + + @Test + public void testCoalesceSplitsWithSingleServer() throws Exception { + // Create 3 PhoenixInputSplits all on same server + List splits = new ArrayList<>(); + splits.add(createSplit(Bytes.toBytes("a"), Bytes.toBytes("d"))); + splits.add(createSplit(Bytes.toBytes("d"), Bytes.toBytes("g"))); + splits.add(createSplit(Bytes.toBytes("g"), Bytes.toBytes("j"))); + + // Create mock region location - all splits on server1 + HRegionLocation mockRegion = createMockRegionLocation("server1:16020", Bytes.toBytes("a")); + + // Mock ConnectionQueryServices: all splits → server1 + when(mockQueryServices.getTableRegionLocation(any(byte[].class), any(byte[].class))) + .thenReturn(mockRegion); + + // Call coalesceSplits() + List result = + inputFormat.coalesceSplits(splits, mockQueryServices, physicalTableName); + + // Verify: 1 coalesced split (all on same server) + assertEquals("Should have 1 coalesced split (all on same server)", 1, result.size()); + + // Verify: Split is coalesced and contains 3 KeyRanges + PhoenixInputSplit coalescedSplit = (PhoenixInputSplit) result.get(0); + + assertTrue("Split should be coalesced", coalescedSplit.isCoalesced()); + assertEquals("Split should have 3 KeyRanges", 3, coalescedSplit.getKeyRanges().size()); + + // Verify: KeyRanges are sorted + List keyRanges = coalescedSplit.getKeyRanges(); + assertTrue("First KeyRange should start with 'a'", + Bytes.equals(Bytes.toBytes("a"), keyRanges.get(0).getLowerRange())); + assertTrue("Second KeyRange should start with 'd'", + Bytes.equals(Bytes.toBytes("d"), keyRanges.get(1).getLowerRange())); + assertTrue("Third KeyRange should start with 'g'", + Bytes.equals(Bytes.toBytes("g"), keyRanges.get(2).getLowerRange())); + } + + @Test + public void testCoalesceSplitsWithMultipleServers() throws Exception { + // Create 6 PhoenixInputSplits + List splits = new ArrayList<>(); + splits.add(createSplit(Bytes.toBytes("a"), Bytes.toBytes("c"))); + splits.add(createSplit(Bytes.toBytes("c"), Bytes.toBytes("e"))); + splits.add(createSplit(Bytes.toBytes("e"), Bytes.toBytes("g"))); + splits.add(createSplit(Bytes.toBytes("g"), Bytes.toBytes("i"))); + splits.add(createSplit(Bytes.toBytes("i"), Bytes.toBytes("k"))); + splits.add(createSplit(Bytes.toBytes("k"), Bytes.toBytes("m"))); + + // Create mock region locations BEFORE stubbing to avoid nested stubbing issues + HRegionLocation mockRegionA = createMockRegionLocation("server1:16020", Bytes.toBytes("a")); + HRegionLocation mockRegionC = createMockRegionLocation("server1:16020", Bytes.toBytes("c")); + HRegionLocation mockRegionE = createMockRegionLocation("server1:16020", Bytes.toBytes("e")); + HRegionLocation mockRegionG = createMockRegionLocation("server2:16020", Bytes.toBytes("g")); + HRegionLocation mockRegionI = createMockRegionLocation("server2:16020", Bytes.toBytes("i")); + HRegionLocation mockRegionK = createMockRegionLocation("server2:16020", Bytes.toBytes("k")); + + // Mock ConnectionQueryServices: first 3 splits → server1, last 3 splits → server2 + when(mockQueryServices.getTableRegionLocation(physicalTableName, Bytes.toBytes("a"))) + .thenReturn(mockRegionA); + when(mockQueryServices.getTableRegionLocation(physicalTableName, Bytes.toBytes("c"))) + .thenReturn(mockRegionC); + when(mockQueryServices.getTableRegionLocation(physicalTableName, Bytes.toBytes("e"))) + .thenReturn(mockRegionE); + when(mockQueryServices.getTableRegionLocation(physicalTableName, Bytes.toBytes("g"))) + .thenReturn(mockRegionG); + when(mockQueryServices.getTableRegionLocation(physicalTableName, Bytes.toBytes("i"))) + .thenReturn(mockRegionI); + when(mockQueryServices.getTableRegionLocation(physicalTableName, Bytes.toBytes("k"))) + .thenReturn(mockRegionK); + + // Call coalesceSplits() + List result = + inputFormat.coalesceSplits(splits, mockQueryServices, physicalTableName); + + // Verify: 2 coalesced splits (one per server) + assertEquals("Should have 2 coalesced splits (one per server)", 2, result.size()); + + // Verify: Each split is coalesced and contains 3 KeyRanges + PhoenixInputSplit split1 = (PhoenixInputSplit) result.get(0); + PhoenixInputSplit split2 = (PhoenixInputSplit) result.get(1); + + assertTrue("Split 1 should be coalesced", split1.isCoalesced()); + assertTrue("Split 2 should be coalesced", split2.isCoalesced()); + + assertEquals("Split 1 should have 3 KeyRanges", 3, split1.getKeyRanges().size()); + assertEquals("Split 2 should have 3 KeyRanges", 3, split2.getKeyRanges().size()); + + // Verify: Splits are sorted by start key within each server group + List keyRanges1 = split1.getKeyRanges(); + List keyRanges2 = split2.getKeyRanges(); + + // Check that KeyRanges are sorted (each should be less than next) + for (int i = 0; i < keyRanges1.size() - 1; i++) { + assertTrue("KeyRanges in split 1 should be sorted", + Bytes.compareTo(keyRanges1.get(i).getLowerRange(), keyRanges1.get(i + 1).getLowerRange()) + < 0); + } + for (int i = 0; i < keyRanges2.size() - 1; i++) { + assertTrue("KeyRanges in split 2 should be sorted", + Bytes.compareTo(keyRanges2.get(i).getLowerRange(), keyRanges2.get(i + 1).getLowerRange()) + < 0); + } + } + + @Test + public void testCoalesceSplitsWithEmptyList() throws Exception { + // Test edge case: empty input list + List splits = new ArrayList<>(); + + List result = + inputFormat.coalesceSplits(splits, mockQueryServices, physicalTableName); + + assertEquals("Should return empty list for empty input", 0, result.size()); + } + + @Test + public void testCoalesceSplitsWithSingleSplit() throws Exception { + // Test edge case: single split (no coalescing needed) + List splits = new ArrayList<>(); + splits.add(createSplit(Bytes.toBytes("a"), Bytes.toBytes("d"))); + + HRegionLocation mockRegion = createMockRegionLocation("server1:16020", Bytes.toBytes("a")); + when(mockQueryServices.getTableRegionLocation(any(byte[].class), any(byte[].class))) + .thenReturn(mockRegion); + + List result = + inputFormat.coalesceSplits(splits, mockQueryServices, physicalTableName); + + assertEquals("Should return 1 split", 1, result.size()); + PhoenixInputSplit resultSplit = (PhoenixInputSplit) result.get(0); + assertFalse("Single split should not be marked as coalesced", resultSplit.isCoalesced()); + assertEquals("Should have 1 KeyRange", 1, resultSplit.getKeyRanges().size()); + } + + @Test(expected = IOException.class) + public void testCoalesceSplitsWithNullRegionLocation() throws Exception { + // ConnectionQueryServices returns null + List splits = new ArrayList<>(); + splits.add(createSplit(Bytes.toBytes("a"), Bytes.toBytes("d"))); + + // Mock ConnectionQueryServices to return null (region not found) + when(mockQueryServices.getTableRegionLocation(any(byte[].class), any(byte[].class))) + .thenReturn(null); + + // Should throw IOException with message about null region location + inputFormat.coalesceSplits(splits, mockQueryServices, physicalTableName); + } + + @Test(expected = IOException.class) + public void testCoalesceSplitsWithNullServerName() throws Exception { + // RegionLocation has null ServerName + List splits = new ArrayList<>(); + splits.add(createSplit(Bytes.toBytes("a"), Bytes.toBytes("d"))); + + // Create mock region location with null ServerName + HRegionLocation mockRegionLocation = mock(HRegionLocation.class); + when(mockRegionLocation.getServerName()).thenReturn(null); + + when(mockQueryServices.getTableRegionLocation(any(byte[].class), any(byte[].class))) + .thenReturn(mockRegionLocation); + + // Should throw IOException with message about null server name + inputFormat.coalesceSplits(splits, mockQueryServices, physicalTableName); + } + + @Test + public void testCoalesceSplitsFailureThrowsIOExceptionWithMessage() throws Exception { + // coalescing failures throw SQLException directly when called from tests + List splits = new ArrayList<>(); + splits.add(createSplit(Bytes.toBytes("a"), Bytes.toBytes("d"))); + + // Mock ConnectionQueryServices to throw SQLException (simulating cluster issue) + SQLException simulatedFailure = + new SQLException("Simulated RegionServer communication failure"); + when(mockQueryServices.getTableRegionLocation(any(byte[].class), any(byte[].class))) + .thenThrow(simulatedFailure); + + try { + inputFormat.coalesceSplits(splits, mockQueryServices, physicalTableName); + fail("Expected SQLException to be thrown for coalescing failure"); + } catch (SQLException e) { + // Verify exception message is informative + assertTrue("Exception message should mention coalescing failure", + e.getMessage().contains("Simulated RegionServer communication failure")); + } + } + + /** + * Helper method to create a mock HRegionLocation with the given server address and start key. + */ + private HRegionLocation createMockRegionLocation(String serverAddress, byte[] startKey) { + HRegionLocation mockRegionLocation = mock(HRegionLocation.class); + ServerName mockServerName = mock(ServerName.class); + // Create a real Address object instead of mocking it, since toString() is final + // Parse the serverAddress string to extract hostname and port + String[] parts = serverAddress.split(":"); + String hostname = parts[0]; + int port = parts.length > 1 ? Integer.parseInt(parts[1]) : 16020; + org.apache.hadoop.hbase.net.Address address = + org.apache.hadoop.hbase.net.Address.fromParts(hostname, port); + + when(mockServerName.getAddress()).thenReturn(address); + when(mockRegionLocation.getServerName()).thenReturn(mockServerName); + return mockRegionLocation; + } } diff --git a/pom.xml b/pom.xml index ae2d2c6d795..cc9925c627f 100644 --- a/pom.xml +++ b/pom.xml @@ -136,6 +136,7 @@ 4.13.1 2.1.12 1.15.11 + 1.79