public static List getSplits()

in hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/AccumuloRecordReader.java [328:511]


  public static List<InputSplit> getSplits(JobContext context, Class<?> callingClass)
      throws IOException {
    validateOptions(context, callingClass);
    LinkedList<InputSplit> splits = new LinkedList<>();
    try (AccumuloClient client = createClient(context, callingClass);
        var clientContext = ((ClientContext) client)) {
      Map<String,InputTableConfig> tableConfigs =
          InputConfigurator.getInputTableConfigs(callingClass, context.getConfiguration());
      for (Map.Entry<String,InputTableConfig> tableConfigEntry : tableConfigs.entrySet()) {

        String tableName = tableConfigEntry.getKey();
        InputTableConfig tableConfig = tableConfigEntry.getValue();

        TableId tableId;
        // resolve table name to id once, and use id from this point forward
        try {
          tableId = clientContext.getTableId(tableName);
        } catch (TableNotFoundException e) {
          throw new IOException(e);
        }

        boolean batchScan = InputConfigurator.isBatchScan(callingClass, context.getConfiguration());
        boolean supportBatchScan = !(tableConfig.isOfflineScan()
            || tableConfig.shouldUseIsolatedScanners() || tableConfig.shouldUseLocalIterators());
        if (batchScan && !supportBatchScan) {
          throw new IllegalArgumentException("BatchScanner optimization not available for offline"
              + " scan, isolated, or local iterators");
        }

        boolean autoAdjust = tableConfig.shouldAutoAdjustRanges();
        if (batchScan && !autoAdjust) {
          throw new IllegalArgumentException(
              "AutoAdjustRanges must be enabled when using BatchScanner optimization");
        }

        List<Range> ranges =
            autoAdjust ? Range.mergeOverlapping(tableConfig.getRanges()) : tableConfig.getRanges();
        if (ranges.isEmpty()) {
          ranges = new ArrayList<>(1);
          ranges.add(new Range());
        }

        // get the metadata information for these ranges
        Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<>();
        ClientTabletCache tl;
        try {
          if (tableConfig.isOfflineScan()) {
            binnedRanges = binOfflineTable(context, tableId, ranges, callingClass);
            while (binnedRanges == null) {
              // Some tablets were still online, try again
              // sleep randomly between 100 and 200 ms
              sleepUninterruptibly(100 + RANDOM.get().nextInt(100), TimeUnit.MILLISECONDS);
              binnedRanges = binOfflineTable(context, tableId, ranges, callingClass);

            }
          } else {
            tl = InputConfigurator.getTabletLocator(callingClass, context.getConfiguration(),
                tableId);
            // its possible that the cache could contain complete, but old information about a
            // tables tablets... so clear it
            tl.invalidateCache();

            if (InputConfigurator.getConsistencyLevel(callingClass, context.getConfiguration())
                == ConsistencyLevel.IMMEDIATE) {
              while (!tl.binRanges(clientContext, ranges, binnedRanges).isEmpty()) {
                clientContext.requireNotDeleted(tableId);
                clientContext.requireNotOffline(tableId, tableName);
                binnedRanges.clear();
                log.warn("Unable to locate bins for specified ranges. Retrying.");
                // sleep randomly between 100 and 200 ms
                sleepUninterruptibly(100 + RANDOM.get().nextInt(100), TimeUnit.MILLISECONDS);
                tl.invalidateCache();
              }
            } else {
              Map<String,Map<KeyExtent,List<Range>>> unhostedRanges = new HashMap<>();
              unhostedRanges.put("", new HashMap<>());
              BiConsumer<CachedTablet,Range> consumer = (ct, r) -> {
                unhostedRanges.get("").computeIfAbsent(ct.getExtent(), k -> new ArrayList<>())
                    .add(r);
              };
              List<Range> failures =
                  tl.findTablets(clientContext, ranges, consumer, LocationNeed.NOT_REQUIRED);

              Retry retry = Retry.builder().infiniteRetries().retryAfter(Duration.ofMillis(100))
                  .incrementBy(Duration.ofMillis(100)).maxWait(Duration.ofSeconds(2))
                  .backOffFactor(1.5).logInterval(Duration.ofMinutes(3)).createRetry();

              while (!failures.isEmpty()) {

                clientContext.requireNotDeleted(tableId);

                try {
                  retry.waitForNextAttempt(log,
                      String.format("locating tablets in table %s(%s) for %d ranges", tableName,
                          tableId, ranges.size()));
                } catch (InterruptedException e) {
                  throw new RuntimeException(e);
                }
                unhostedRanges.get("").clear();
                tl.invalidateCache();
                failures =
                    tl.findTablets(clientContext, ranges, consumer, LocationNeed.NOT_REQUIRED);
              }
              binnedRanges = unhostedRanges;
            }
          }
        } catch (InvalidTabletHostingRequestException | TableOfflineException
            | TableNotFoundException | AccumuloException | AccumuloSecurityException e) {
          throw new IOException(e);
        }

        // all of this code will add either range per each locations or split ranges and add
        // range-location split
        // Map from Range to Array of Locations, we only use this if we're don't split
        HashMap<Range,ArrayList<String>> splitsToAdd = null;

        if (!autoAdjust) {
          splitsToAdd = new HashMap<>();
        }

        HashMap<String,String> hostNameCache = new HashMap<>();
        for (Map.Entry<String,Map<KeyExtent,List<Range>>> tserverBin : binnedRanges.entrySet()) {
          String ip = tserverBin.getKey().split(":", 2)[0];
          String location = hostNameCache.get(ip);
          if (location == null) {
            InetAddress inetAddress = InetAddress.getByName(ip);
            location = inetAddress.getCanonicalHostName();
            hostNameCache.put(ip, location);
          }
          for (Map.Entry<KeyExtent,List<Range>> extentRanges : tserverBin.getValue().entrySet()) {
            Range ke = extentRanges.getKey().toDataRange();
            if (batchScan) {
              // group ranges by tablet to be read by a BatchScanner
              ArrayList<Range> clippedRanges = new ArrayList<>();
              for (Range r : extentRanges.getValue()) {
                clippedRanges.add(ke.clip(r));
              }
              BatchInputSplit split =
                  new BatchInputSplit(tableName, tableId, clippedRanges, new String[] {location});
              SplitUtils.updateSplit(split, tableConfig);

              splits.add(split);
            } else {
              // not grouping by tablet
              for (Range r : extentRanges.getValue()) {
                if (autoAdjust) {
                  // divide ranges into smaller ranges, based on the tablets
                  RangeInputSplit split = new RangeInputSplit(tableName, tableId.canonical(),
                      ke.clip(r), new String[] {location});
                  SplitUtils.updateSplit(split, tableConfig);
                  split.setOffline(tableConfig.isOfflineScan());
                  split.setIsolatedScan(tableConfig.shouldUseIsolatedScanners());
                  split.setUsesLocalIterators(tableConfig.shouldUseLocalIterators());
                  splits.add(split);
                } else {
                  // don't divide ranges
                  ArrayList<String> locations = splitsToAdd.get(r);
                  if (locations == null) {
                    locations = new ArrayList<>(1);
                  }
                  locations.add(location);
                  splitsToAdd.put(r, locations);
                }
              }
            }
          }
        }

        if (!autoAdjust) {
          for (Map.Entry<Range,ArrayList<String>> entry : splitsToAdd.entrySet()) {
            RangeInputSplit split = new RangeInputSplit(tableName, tableId.canonical(),
                entry.getKey(), entry.getValue().toArray(new String[0]));
            SplitUtils.updateSplit(split, tableConfig);
            split.setOffline(tableConfig.isOfflineScan());
            split.setIsolatedScan(tableConfig.shouldUseIsolatedScanners());
            split.setUsesLocalIterators(tableConfig.shouldUseLocalIterators());

            splits.add(split);
          }
        }
      }
    }
    return splits;
  }