in phoenix5-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java [121:193]
private List<InputSplit> generateSplits(final JobConf jobConf, final QueryPlan qplan,
final List<KeyRange> splits, String query) throws
IOException {
if (qplan == null){
throw new NullPointerException();
}if (splits == null){
throw new NullPointerException();
}
final List<InputSplit> psplits = new ArrayList<>(splits.size());
Path[] tablePaths = FileInputFormat.getInputPaths(ShimLoader.getHadoopShims()
.newJobContext(new Job(jobConf)));
boolean splitByStats = jobConf.getBoolean(PhoenixStorageHandlerConstants.SPLIT_BY_STATS,
false);
setScanCacheSize(jobConf);
// Adding Localization
try (org.apache.hadoop.hbase.client.Connection connection = ConnectionFactory.createConnection(PhoenixConnectionUtil.getConfiguration(jobConf))) {
RegionLocator regionLocator = connection.getRegionLocator(TableName.valueOf(qplan
.getTableRef().getTable().getPhysicalName().toString()));
for (List<Scan> scans : qplan.getScans()) {
PhoenixInputSplit inputSplit;
HRegionLocation location = regionLocator.getRegionLocation(scans.get(0).getStartRow()
, false);
RegionSizeCalculator sizeCalculator = new RegionSizeCalculator(regionLocator, connection.getAdmin());
long regionSize = sizeCalculator.getRegionSize(location.getRegionInfo().getRegionName());
String regionLocation = PhoenixStorageHandlerUtil.getRegionLocation(location, LOG);
if (splitByStats) {
for (Scan aScan : scans) {
if (LOG.isDebugEnabled()) {
LOG.debug("Split for scan : " + aScan + "with scanAttribute : " + aScan
.getAttributesMap() + " [scanCache, cacheBlock, scanBatch] : [" +
aScan.getCaching() + ", " + aScan.getCacheBlocks() + ", " + aScan
.getBatch() + "] and regionLocation : " + regionLocation);
}
inputSplit = new PhoenixInputSplit(new ArrayList<>(Arrays.asList(aScan)), tablePaths[0],
regionLocation, regionSize);
inputSplit.setQuery(query);
psplits.add(inputSplit);
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Scan count[" + scans.size() + "] : " + Bytes.toStringBinary(scans
.get(0).getStartRow()) + " ~ " + Bytes.toStringBinary(scans.get(scans
.size() - 1).getStopRow()));
LOG.debug("First scan : " + scans.get(0) + "with scanAttribute : " + scans
.get(0).getAttributesMap() + " [scanCache, cacheBlock, scanBatch] : " +
"[" + scans.get(0).getCaching() + ", " + scans.get(0).getCacheBlocks()
+ ", " + scans.get(0).getBatch() + "] and regionLocation : " +
regionLocation);
for (int i = 0, limit = scans.size(); i < limit; i++) {
LOG.debug("EXPECTED_UPPER_REGION_KEY[" + i + "] : " + Bytes
.toStringBinary(scans.get(i).getAttribute
(BaseScannerRegionObserver.EXPECTED_UPPER_REGION_KEY)));
}
}
inputSplit = new PhoenixInputSplit(scans, tablePaths[0], regionLocation,
regionSize);
inputSplit.setQuery(query);
psplits.add(inputSplit);
}
}
}
return psplits;
}