-
Notifications
You must be signed in to change notification settings - Fork 1k
PHOENIX-7799 Coalesce splits by region server to avoid hotspotting from concurrent mappers #2411
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
3c54c86
c97f7e0
53e9a3b
6b75fec
6f40ab4
7328f93
fd46404
58ef6a9
6f226f6
1ccf4b6
e75c6c1
a5060ab
cffd2e6
2ef30e6
2dc4722
dd18dae
326e792
b7127cc
f588291
f81aa56
d60104f
359f345
1bcd693
7904c50
b9dfd3c
6c50f95
b8c00e4
8d6357e
4be0405
d54f970
a951251
6daafa2
05cf3da
9aed71c
050e3ed
0dccd70
a0014a9
e9c0c35
9517708
0a9c2ab
3e29320
59ac033
3abd1cb
a7d49f2
112c1ba
6fb8c13
63c5fd1
65a3fe9
66d90c0
ecc9cb0
b00c658
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,18 +21,24 @@ | |
| import java.sql.Connection; | ||
| import java.sql.SQLException; | ||
| import java.util.ArrayList; | ||
| import java.util.HashMap; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import org.apache.hadoop.conf.Configuration; | ||
| import org.apache.hadoop.hbase.HConstants; | ||
| import org.apache.hadoop.hbase.HRegionLocation; | ||
| import org.apache.hadoop.hbase.client.Scan; | ||
| import org.apache.hadoop.hbase.util.Bytes; | ||
| import org.apache.hadoop.io.NullWritable; | ||
| import org.apache.hadoop.mapreduce.InputSplit; | ||
| import org.apache.hadoop.mapreduce.JobContext; | ||
| import org.apache.hadoop.mapreduce.RecordReader; | ||
| import org.apache.hadoop.mapreduce.TaskAttemptContext; | ||
| import org.apache.hadoop.mapreduce.lib.db.DBWritable; | ||
| import org.apache.phoenix.jdbc.PhoenixConnection; | ||
| import org.apache.phoenix.mapreduce.util.ConnectionUtil; | ||
| import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; | ||
| import org.apache.phoenix.query.ConnectionQueryServices; | ||
| import org.apache.phoenix.query.KeyRange; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
@@ -94,15 +100,31 @@ public List<InputSplit> getSplits(JobContext context) throws IOException, Interr | |
| } catch (SQLException e) { | ||
| throw new RuntimeException(e); | ||
| } | ||
| if (completedRegions.isEmpty()) { | ||
| LOGGER.info("No completed regions for table {} - processing all {} splits", tableName, | ||
| allSplits.size()); | ||
| return allSplits; | ||
| } | ||
|
|
||
| List<InputSplit> unprocessedSplits = filterCompletedSplits(allSplits, completedRegions); | ||
| LOGGER.info("Found {} completed mapper regions for table {}, {} unprocessed splits remaining", | ||
| completedRegions.size(), tableName, unprocessedSplits.size()); | ||
|
|
||
| boolean enableSplitCoalescing = | ||
| conf.getBoolean(PhoenixSyncTableTool.PHOENIX_SYNC_TABLE_SPLIT_COALESCING, | ||
| PhoenixSyncTableTool.DEFAULT_PHOENIX_SYNC_TABLE_SPLIT_COALESCING); | ||
| LOGGER.info("Split coalescing enabled: {}, for table {}", enableSplitCoalescing, tableName); | ||
|
|
||
| if (enableSplitCoalescing && unprocessedSplits.size() > 1) { | ||
| try (Connection conn = ConnectionUtil.getInputConnection(conf)) { | ||
| PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class); | ||
| byte[] physicalTableName = pConn.getTable(tableName).getPhysicalName().getBytes(); | ||
| List<InputSplit> coalescedSplits = | ||
| coalesceSplits(unprocessedSplits, pConn.getQueryServices(), physicalTableName); | ||
| LOGGER.info("Split coalescing: {} unprocessed splits {} coalesced splits for table {}", | ||
| unprocessedSplits.size(), coalescedSplits.size(), tableName); | ||
| return coalescedSplits; | ||
| } catch (Exception e) { | ||
| throw new IOException(String.format("Failed to coalesce splits for table %s. " | ||
| + "Split coalescing is enabled but failed due to: %s.", tableName, e.getMessage()), e); | ||
| } | ||
| } | ||
|
|
||
| return unprocessedSplits; | ||
| } | ||
|
|
||
|
|
@@ -133,6 +155,9 @@ private List<KeyRange> queryCompletedMapperRegions(Configuration conf, String ta | |
| */ | ||
| List<InputSplit> filterCompletedSplits(List<InputSplit> allSplits, | ||
| List<KeyRange> completedRegions) { | ||
| if (completedRegions.isEmpty()) { | ||
| return allSplits; | ||
| } | ||
| allSplits.sort((s1, s2) -> { | ||
| PhoenixInputSplit ps1 = (PhoenixInputSplit) s1; | ||
| PhoenixInputSplit ps2 = (PhoenixInputSplit) s2; | ||
|
|
@@ -211,4 +236,97 @@ List<InputSplit> filterCompletedSplits(List<InputSplit> allSplits, | |
| } | ||
| return unprocessedSplits; | ||
| } | ||
|
|
||
| /** | ||
| * Coalesces multiple region splits from the same RegionServer into single InputSplits. All | ||
| * regions from the same server are coalesced into one split, regardless of count or size. This | ||
| * reduces mapper count and avoids hot spotting when many concurrent mappers hit the same server. | ||
| * @param unprocessedSplits Splits remaining after filtering completed regions | ||
| * @param queryServices ConnectionQueryServices for querying region locations | ||
| * @param physicalTableName Physical HBase table name | ||
| * @return Coalesced splits with all regions per server combined into one split | ||
| */ | ||
| List<InputSplit> coalesceSplits(List<InputSplit> unprocessedSplits, | ||
| ConnectionQueryServices queryServices, byte[] physicalTableName) | ||
| throws IOException, InterruptedException, SQLException { | ||
| // Group splits by RegionServer location | ||
| Map<String, List<PhoenixInputSplit>> splitsByServer = | ||
| groupSplitsByServer(unprocessedSplits, queryServices, physicalTableName); | ||
|
|
||
| List<InputSplit> coalescedSplits = new ArrayList<>(); | ||
|
|
||
| // For each RegionServer, create one coalesced split with ALL regions from that server | ||
| for (Map.Entry<String, List<PhoenixInputSplit>> entry : splitsByServer.entrySet()) { | ||
| String serverName = entry.getKey(); | ||
| List<PhoenixInputSplit> serverSplits = entry.getValue(); | ||
|
|
||
| // Sort splits by start key for sequential processing | ||
| serverSplits.sort((s1, s2) -> Bytes.compareTo(s1.getKeyRange().getLowerRange(), | ||
| s2.getKeyRange().getLowerRange())); | ||
| // Create single coalesced split with ALL regions from this server | ||
| coalescedSplits.add(createCoalescedSplit(serverSplits, serverName)); | ||
| } | ||
|
|
||
| return coalescedSplits; | ||
| } | ||
|
|
||
| /** | ||
| * Groups splits by RegionServer location for locality-aware coalescing. Uses | ||
| * ConnectionQueryServices to determine which server hosts each region. | ||
| * @param splits List of splits to group | ||
| * @param queryServices ConnectionQueryServices for querying region locations | ||
| * @param physicalTableName Physical HBase table name | ||
| * @return Map of server name to list of splits hosted on that server | ||
| */ | ||
| private Map<String, List<PhoenixInputSplit>> groupSplitsByServer(List<InputSplit> splits, | ||
| ConnectionQueryServices queryServices, byte[] physicalTableName) | ||
| throws IOException, SQLException { | ||
| Map<String, List<PhoenixInputSplit>> splitsByServer = new HashMap<>(); | ||
| for (InputSplit split : splits) { | ||
| PhoenixInputSplit pSplit = (PhoenixInputSplit) split; | ||
| KeyRange keyRange = pSplit.getKeyRange(); | ||
| HRegionLocation regionLocation = | ||
| queryServices.getTableRegionLocation(physicalTableName, keyRange.getLowerRange()); | ||
| if (regionLocation == null) { | ||
| throw new IOException("Could not determine region location for key: " | ||
| + Bytes.toStringBinary(keyRange.getLowerRange())); | ||
| } | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am thinking that this should never see a
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Even during RIT, I believe meta would return cached location which might be outdated. It could give a false colocation but that is expected for some of regions as sync tool runs. |
||
| if (regionLocation.getServerName() == null) { | ||
| throw new IOException("Could not determine server name for region at key: " | ||
| + Bytes.toStringBinary(keyRange.getLowerRange())); | ||
| } | ||
| String serverName = regionLocation.getServerName().getAddress().toString(); | ||
| splitsByServer.computeIfAbsent(serverName, k -> new ArrayList<>()).add(pSplit); | ||
| if (LOGGER.isDebugEnabled()) { | ||
| LOGGER.debug("Split {} assigned to server {}", | ||
| Bytes.toStringBinary(keyRange.getLowerRange()), serverName); | ||
| } | ||
| } | ||
|
|
||
| return splitsByServer; | ||
| } | ||
|
|
||
| /** | ||
| * Creates a coalesced PhoenixInputSplit containing multiple regions. Combines scans and KeyRanges | ||
| * from individual splits into a single split. | ||
| * @param splits List of splits to coalesce (from same RegionServer) | ||
| * @param serverLocation RegionServer location for data locality | ||
| * @return Coalesced PhoenixInputSplit | ||
| */ | ||
| private PhoenixInputSplit createCoalescedSplit(List<PhoenixInputSplit> splits, | ||
| String serverLocation) throws IOException, InterruptedException { | ||
|
|
||
| List<Scan> allScans = new ArrayList<>(); | ||
| long totalSize = 0; | ||
| // Extract all scans from individual splits | ||
| for (PhoenixInputSplit split : splits) { | ||
| allScans.addAll(split.getScans()); | ||
| totalSize += split.getLength(); | ||
| } | ||
|
|
||
| LOGGER.info("Created coalesced split with {} regions, {} MB from server {}", splits.size(), | ||
| totalSize / (1024 * 1024), serverLocation); | ||
| // Create a new PhoenixInputSplit, keyRanges will be derived from scans in init() | ||
| return new PhoenixInputSplit(allScans, totalSize, serverLocation); | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
keyRange is redundant now. It's just the span from
keyRanges.get(0).getLowerRange()tokeyRanges.get(last).getUpperRange(). You can keep only keyRanges and derive the overall bounds on demand: