in hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/AccumuloRecordReader.java [328:511]
public static List<InputSplit> getSplits(JobContext context, Class<?> callingClass)
throws IOException {
validateOptions(context, callingClass);
LinkedList<InputSplit> splits = new LinkedList<>();
try (AccumuloClient client = createClient(context, callingClass);
var clientContext = ((ClientContext) client)) {
Map<String,InputTableConfig> tableConfigs =
InputConfigurator.getInputTableConfigs(callingClass, context.getConfiguration());
for (Map.Entry<String,InputTableConfig> tableConfigEntry : tableConfigs.entrySet()) {
String tableName = tableConfigEntry.getKey();
InputTableConfig tableConfig = tableConfigEntry.getValue();
TableId tableId;
// resolve table name to id once, and use id from this point forward
try {
tableId = clientContext.getTableId(tableName);
} catch (TableNotFoundException e) {
throw new IOException(e);
}
boolean batchScan = InputConfigurator.isBatchScan(callingClass, context.getConfiguration());
boolean supportBatchScan = !(tableConfig.isOfflineScan()
|| tableConfig.shouldUseIsolatedScanners() || tableConfig.shouldUseLocalIterators());
if (batchScan && !supportBatchScan) {
throw new IllegalArgumentException("BatchScanner optimization not available for offline"
+ " scan, isolated, or local iterators");
}
boolean autoAdjust = tableConfig.shouldAutoAdjustRanges();
if (batchScan && !autoAdjust) {
throw new IllegalArgumentException(
"AutoAdjustRanges must be enabled when using BatchScanner optimization");
}
List<Range> ranges =
autoAdjust ? Range.mergeOverlapping(tableConfig.getRanges()) : tableConfig.getRanges();
if (ranges.isEmpty()) {
ranges = new ArrayList<>(1);
ranges.add(new Range());
}
// get the metadata information for these ranges
Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<>();
ClientTabletCache tl;
try {
if (tableConfig.isOfflineScan()) {
binnedRanges = binOfflineTable(context, tableId, ranges, callingClass);
while (binnedRanges == null) {
// Some tablets were still online, try again
// sleep randomly between 100 and 200 ms
sleepUninterruptibly(100 + RANDOM.get().nextInt(100), TimeUnit.MILLISECONDS);
binnedRanges = binOfflineTable(context, tableId, ranges, callingClass);
}
} else {
tl = InputConfigurator.getTabletLocator(callingClass, context.getConfiguration(),
tableId);
// its possible that the cache could contain complete, but old information about a
// tables tablets... so clear it
tl.invalidateCache();
if (InputConfigurator.getConsistencyLevel(callingClass, context.getConfiguration())
== ConsistencyLevel.IMMEDIATE) {
while (!tl.binRanges(clientContext, ranges, binnedRanges).isEmpty()) {
clientContext.requireNotDeleted(tableId);
clientContext.requireNotOffline(tableId, tableName);
binnedRanges.clear();
log.warn("Unable to locate bins for specified ranges. Retrying.");
// sleep randomly between 100 and 200 ms
sleepUninterruptibly(100 + RANDOM.get().nextInt(100), TimeUnit.MILLISECONDS);
tl.invalidateCache();
}
} else {
Map<String,Map<KeyExtent,List<Range>>> unhostedRanges = new HashMap<>();
unhostedRanges.put("", new HashMap<>());
BiConsumer<CachedTablet,Range> consumer = (ct, r) -> {
unhostedRanges.get("").computeIfAbsent(ct.getExtent(), k -> new ArrayList<>())
.add(r);
};
List<Range> failures =
tl.findTablets(clientContext, ranges, consumer, LocationNeed.NOT_REQUIRED);
Retry retry = Retry.builder().infiniteRetries().retryAfter(Duration.ofMillis(100))
.incrementBy(Duration.ofMillis(100)).maxWait(Duration.ofSeconds(2))
.backOffFactor(1.5).logInterval(Duration.ofMinutes(3)).createRetry();
while (!failures.isEmpty()) {
clientContext.requireNotDeleted(tableId);
try {
retry.waitForNextAttempt(log,
String.format("locating tablets in table %s(%s) for %d ranges", tableName,
tableId, ranges.size()));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
unhostedRanges.get("").clear();
tl.invalidateCache();
failures =
tl.findTablets(clientContext, ranges, consumer, LocationNeed.NOT_REQUIRED);
}
binnedRanges = unhostedRanges;
}
}
} catch (InvalidTabletHostingRequestException | TableOfflineException
| TableNotFoundException | AccumuloException | AccumuloSecurityException e) {
throw new IOException(e);
}
// all of this code will add either range per each locations or split ranges and add
// range-location split
// Map from Range to Array of Locations, we only use this if we're don't split
HashMap<Range,ArrayList<String>> splitsToAdd = null;
if (!autoAdjust) {
splitsToAdd = new HashMap<>();
}
HashMap<String,String> hostNameCache = new HashMap<>();
for (Map.Entry<String,Map<KeyExtent,List<Range>>> tserverBin : binnedRanges.entrySet()) {
String ip = tserverBin.getKey().split(":", 2)[0];
String location = hostNameCache.get(ip);
if (location == null) {
InetAddress inetAddress = InetAddress.getByName(ip);
location = inetAddress.getCanonicalHostName();
hostNameCache.put(ip, location);
}
for (Map.Entry<KeyExtent,List<Range>> extentRanges : tserverBin.getValue().entrySet()) {
Range ke = extentRanges.getKey().toDataRange();
if (batchScan) {
// group ranges by tablet to be read by a BatchScanner
ArrayList<Range> clippedRanges = new ArrayList<>();
for (Range r : extentRanges.getValue()) {
clippedRanges.add(ke.clip(r));
}
BatchInputSplit split =
new BatchInputSplit(tableName, tableId, clippedRanges, new String[] {location});
SplitUtils.updateSplit(split, tableConfig);
splits.add(split);
} else {
// not grouping by tablet
for (Range r : extentRanges.getValue()) {
if (autoAdjust) {
// divide ranges into smaller ranges, based on the tablets
RangeInputSplit split = new RangeInputSplit(tableName, tableId.canonical(),
ke.clip(r), new String[] {location});
SplitUtils.updateSplit(split, tableConfig);
split.setOffline(tableConfig.isOfflineScan());
split.setIsolatedScan(tableConfig.shouldUseIsolatedScanners());
split.setUsesLocalIterators(tableConfig.shouldUseLocalIterators());
splits.add(split);
} else {
// don't divide ranges
ArrayList<String> locations = splitsToAdd.get(r);
if (locations == null) {
locations = new ArrayList<>(1);
}
locations.add(location);
splitsToAdd.put(r, locations);
}
}
}
}
}
if (!autoAdjust) {
for (Map.Entry<Range,ArrayList<String>> entry : splitsToAdd.entrySet()) {
RangeInputSplit split = new RangeInputSplit(tableName, tableId.canonical(),
entry.getKey(), entry.getValue().toArray(new String[0]));
SplitUtils.updateSplit(split, tableConfig);
split.setOffline(tableConfig.isOfflineScan());
split.setIsolatedScan(tableConfig.shouldUseIsolatedScanners());
split.setUsesLocalIterators(tableConfig.shouldUseLocalIterators());
splits.add(split);
}
}
}
}
return splits;
}