Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
3c54c86
connection creation time
Aug 1, 2025
c97f7e0
Revert "connection creation time"
Aug 1, 2025
53e9a3b
Revert "Revert "connection creation time""
Aug 1, 2025
6b75fec
Merge remote-tracking branch 'upstream/master'
Sep 22, 2025
6f40ab4
Merge remote-tracking branch 'upstream/master'
Sep 30, 2025
7328f93
Merge remote-tracking branch 'upstream/master'
Oct 12, 2025
fd46404
ITs changes
Jan 6, 2026
58ef6a9
Revert "ITs changes"
Jan 6, 2026
6f226f6
Merge remote-tracking branch 'upstream/master'
Feb 18, 2026
1ccf4b6
PHOENIX-7751 : [SyncTable Tool] Feature to validate table data using …
Feb 18, 2026
e75c6c1
revert other changes
Feb 18, 2026
a5060ab
checkstyle fix
Feb 18, 2026
cffd2e6
checkstyle fix
Feb 18, 2026
2ef30e6
checkstyle fix
Feb 18, 2026
2dc4722
batching of splits, generated code
Feb 20, 2026
dd18dae
adding more ITs
Feb 23, 2026
326e792
adding more ITs
Feb 23, 2026
b7127cc
misc fix
Feb 24, 2026
f588291
code comment
Feb 24, 2026
f81aa56
code comment formatting
Feb 25, 2026
d60104f
Adding all UT/ITs
Mar 11, 2026
359f345
Fix tests
Mar 12, 2026
1bcd693
Fix tests
Mar 12, 2026
7904c50
Merge remote-tracking branch 'upstream/master' into PHOENIX-7751
Mar 12, 2026
b9dfd3c
PhoenixConfigurationUtilTest
Mar 12, 2026
6c50f95
Fix build issues
Mar 13, 2026
b8c00e4
Some More UTs
Mar 17, 2026
8d6357e
Rebase
Mar 18, 2026
4be0405
remove configs
Mar 21, 2026
d54f970
Address review comments
Mar 27, 2026
a951251
Address review comments
Apr 1, 2026
6daafa2
address suggestion
Apr 9, 2026
05cf3da
Address review comments
Apr 9, 2026
9aed71c
Fix tests
Apr 9, 2026
050e3ed
spotless apply
Apr 9, 2026
0dccd70
remove max size check
Apr 9, 2026
a0014a9
fix timeout
Apr 10, 2026
e9c0c35
add column encoding
Apr 10, 2026
9517708
add compression and TTL attribute
Apr 13, 2026
0a9c2ab
fix checkstyle
Apr 13, 2026
3e29320
rebase
Apr 13, 2026
59ac033
rebase
Apr 13, 2026
3abd1cb
add tests
Apr 15, 2026
a7d49f2
rebase
Apr 15, 2026
112c1ba
spotless apply
Apr 15, 2026
6fb8c13
misc fix
Apr 16, 2026
63c5fd1
test fix
Apr 16, 2026
65a3fe9
spotless apply
Apr 16, 2026
66d90c0
add test
Apr 16, 2026
ecc9cb0
fix syntax
Apr 16, 2026
b00c658
address comment
Apr 22, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion phoenix-core-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,6 @@
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcprov-jdk18on</artifactId>
<version>1.79</version>
</dependency>
</dependencies>

Expand Down
1 change: 0 additions & 1 deletion phoenix-core-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,6 @@
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcprov-jdk18on</artifactId>
<version>1.79</version>
</dependency>
</dependencies>
<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
public class PhoenixInputSplit extends InputSplit implements Writable {

private List<Scan> scans;
private KeyRange keyRange;
private List<KeyRange> keyRanges;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

keyRange is redundant now. It's just the span from keyRanges.get(0).getLowerRange() to keyRanges.get(last).getUpperRange(). You can keep only keyRanges and derive the overall bounds on demand:

Suggested change
private List<KeyRange> keyRanges;
public KeyRange getKeyRange() {
return KeyRange.getKeyRange(
keyRanges.get(0).getLowerRange(),
keyRanges.get(keyRanges.size() - 1).getUpperRange());
}

private String regionLocation = null;
private long splitSize = 0;

Expand Down Expand Up @@ -68,13 +68,43 @@ public List<Scan> getScans() {
return scans;
}

/**
* Returns the overall KeyRange spanning this split. For coalesced splits, spans from the first
* region's lower bound to the last region's upper bound. Computed on-demand from keyRanges.
* @return KeyRange spanning the entire split, or null if keyRanges is empty
*/
public KeyRange getKeyRange() {
return keyRange;
if (keyRanges == null || keyRanges.isEmpty()) {
return null;
}
return KeyRange.getKeyRange(keyRanges.get(0).getLowerRange(),
keyRanges.get(keyRanges.size() - 1).getUpperRange());
}

/**
* Returns all KeyRanges for this split. For coalesced splits, returns multiple KeyRanges (one per
* region). For non-coalesced splits, returns a single-element list.
* @return List of KeyRanges, never null
*/
public List<KeyRange> getKeyRanges() {
return keyRanges;
}

/**
* Checks if this split is coalesced (contains multiple regions).
* @return true if split contains multiple regions
*/
public boolean isCoalesced() {
return keyRanges.size() > 1;
}

private void init() {
this.keyRange =
KeyRange.getKeyRange(scans.get(0).getStartRow(), scans.get(scans.size() - 1).getStopRow());
// Initialize keyRanges from scans
this.keyRanges = Lists.newArrayListWithExpectedSize(scans.size());
for (Scan scan : scans) {
KeyRange kr = KeyRange.getKeyRange(scan.getStartRow(), scan.getStopRow());
this.keyRanges.add(kr);
}
}

@Override
Expand Down Expand Up @@ -126,7 +156,8 @@ public String[] getLocations() throws IOException, InterruptedException {
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + keyRange.hashCode();
KeyRange range = getKeyRange();
result = prime * result + (range == null ? 0 : range.hashCode());
return result;
}

Expand All @@ -142,11 +173,13 @@ public boolean equals(Object obj) {
return false;
}
PhoenixInputSplit other = (PhoenixInputSplit) obj;
if (keyRange == null) {
if (other.keyRange != null) {
KeyRange thisRange = getKeyRange();
KeyRange otherRange = other.getKeyRange();
if (thisRange == null) {
if (otherRange != null) {
return false;
}
} else if (!keyRange.equals(other.keyRange)) {
} else if (!thisRange.equals(otherRange)) {
return false;
}
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,24 @@
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.mapreduce.util.ConnectionUtil;
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.query.KeyRange;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -94,15 +100,31 @@ public List<InputSplit> getSplits(JobContext context) throws IOException, Interr
} catch (SQLException e) {
throw new RuntimeException(e);
}
if (completedRegions.isEmpty()) {
LOGGER.info("No completed regions for table {} - processing all {} splits", tableName,
allSplits.size());
return allSplits;
}

List<InputSplit> unprocessedSplits = filterCompletedSplits(allSplits, completedRegions);
LOGGER.info("Found {} completed mapper regions for table {}, {} unprocessed splits remaining",
completedRegions.size(), tableName, unprocessedSplits.size());

boolean enableSplitCoalescing =
conf.getBoolean(PhoenixSyncTableTool.PHOENIX_SYNC_TABLE_SPLIT_COALESCING,
PhoenixSyncTableTool.DEFAULT_PHOENIX_SYNC_TABLE_SPLIT_COALESCING);
LOGGER.info("Split coalescing enabled: {}, for table {}", enableSplitCoalescing, tableName);

if (enableSplitCoalescing && unprocessedSplits.size() > 1) {
try (Connection conn = ConnectionUtil.getInputConnection(conf)) {
PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);
byte[] physicalTableName = pConn.getTable(tableName).getPhysicalName().getBytes();
List<InputSplit> coalescedSplits =
coalesceSplits(unprocessedSplits, pConn.getQueryServices(), physicalTableName);
LOGGER.info("Split coalescing: {} unprocessed splits {} coalesced splits for table {}",
unprocessedSplits.size(), coalescedSplits.size(), tableName);
return coalescedSplits;
} catch (Exception e) {
throw new IOException(String.format("Failed to coalesce splits for table %s. "
+ "Split coalescing is enabled but failed due to: %s.", tableName, e.getMessage()), e);
}
}

return unprocessedSplits;
}

Expand Down Expand Up @@ -133,6 +155,9 @@ private List<KeyRange> queryCompletedMapperRegions(Configuration conf, String ta
*/
List<InputSplit> filterCompletedSplits(List<InputSplit> allSplits,
List<KeyRange> completedRegions) {
if (completedRegions.isEmpty()) {
return allSplits;
}
allSplits.sort((s1, s2) -> {
PhoenixInputSplit ps1 = (PhoenixInputSplit) s1;
PhoenixInputSplit ps2 = (PhoenixInputSplit) s2;
Expand Down Expand Up @@ -211,4 +236,97 @@ List<InputSplit> filterCompletedSplits(List<InputSplit> allSplits,
}
return unprocessedSplits;
}

/**
* Coalesces multiple region splits from the same RegionServer into single InputSplits. All
* regions from the same server are coalesced into one split, regardless of count or size. This
* reduces mapper count and avoids hot spotting when many concurrent mappers hit the same server.
* @param unprocessedSplits Splits remaining after filtering completed regions
* @param queryServices ConnectionQueryServices for querying region locations
* @param physicalTableName Physical HBase table name
* @return Coalesced splits with all regions per server combined into one split
*/
List<InputSplit> coalesceSplits(List<InputSplit> unprocessedSplits,
ConnectionQueryServices queryServices, byte[] physicalTableName)
throws IOException, InterruptedException, SQLException {
// Group splits by RegionServer location
Map<String, List<PhoenixInputSplit>> splitsByServer =
groupSplitsByServer(unprocessedSplits, queryServices, physicalTableName);

List<InputSplit> coalescedSplits = new ArrayList<>();

// For each RegionServer, create one coalesced split with ALL regions from that server
for (Map.Entry<String, List<PhoenixInputSplit>> entry : splitsByServer.entrySet()) {
String serverName = entry.getKey();
List<PhoenixInputSplit> serverSplits = entry.getValue();

// Sort splits by start key for sequential processing
serverSplits.sort((s1, s2) -> Bytes.compareTo(s1.getKeyRange().getLowerRange(),
s2.getKeyRange().getLowerRange()));
// Create single coalesced split with ALL regions from this server
coalescedSplits.add(createCoalescedSplit(serverSplits, serverName));
}

return coalescedSplits;
}

/**
* Groups splits by RegionServer location for locality-aware coalescing. Uses
* ConnectionQueryServices to determine which server hosts each region.
* @param splits List of splits to group
* @param queryServices ConnectionQueryServices for querying region locations
* @param physicalTableName Physical HBase table name
* @return Map of server name to list of splits hosted on that server
*/
private Map<String, List<PhoenixInputSplit>> groupSplitsByServer(List<InputSplit> splits,
ConnectionQueryServices queryServices, byte[] physicalTableName)
throws IOException, SQLException {
Map<String, List<PhoenixInputSplit>> splitsByServer = new HashMap<>();
for (InputSplit split : splits) {
PhoenixInputSplit pSplit = (PhoenixInputSplit) split;
KeyRange keyRange = pSplit.getKeyRange();
HRegionLocation regionLocation =
queryServices.getTableRegionLocation(physicalTableName, keyRange.getLowerRange());
if (regionLocation == null) {
throw new IOException("Could not determine region location for key: "
+ Bytes.toStringBinary(keyRange.getLowerRange()));
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am thinking that this should never see a null if all regions are fully online and client cache is up to date. However (per what AI says), if the region the key maps to is currently offline due to a RIT event (say, in the middle of a split), then the return value will be null, so shouldn't we retry to ride over such RITs, just like how hbase-client does during data path such as scans? Otherwise, there is a high chance of hitting a RIT and treating it as an error, correct?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even during RIT, I believe meta would return cached location which might be outdated. It could give a false colocation but that is expected for some of regions as sync tool runs.

if (regionLocation.getServerName() == null) {
throw new IOException("Could not determine server name for region at key: "
+ Bytes.toStringBinary(keyRange.getLowerRange()));
}
String serverName = regionLocation.getServerName().getAddress().toString();
splitsByServer.computeIfAbsent(serverName, k -> new ArrayList<>()).add(pSplit);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Split {} assigned to server {}",
Bytes.toStringBinary(keyRange.getLowerRange()), serverName);
}
}

return splitsByServer;
}

/**
* Creates a coalesced PhoenixInputSplit containing multiple regions. Combines scans and KeyRanges
* from individual splits into a single split.
* @param splits List of splits to coalesce (from same RegionServer)
* @param serverLocation RegionServer location for data locality
* @return Coalesced PhoenixInputSplit
*/
private PhoenixInputSplit createCoalescedSplit(List<PhoenixInputSplit> splits,
String serverLocation) throws IOException, InterruptedException {

List<Scan> allScans = new ArrayList<>();
long totalSize = 0;
// Extract all scans from individual splits
for (PhoenixInputSplit split : splits) {
allScans.addAll(split.getScans());
totalSize += split.getLength();
}

LOGGER.info("Created coalesced split with {} regions, {} MB from server {}", splits.size(),
totalSize / (1024 * 1024), serverLocation);
// Create a new PhoenixInputSplit, keyRanges will be derived from scans in init()
return new PhoenixInputSplit(allScans, totalSize, serverLocation);
}
}
Loading