static void doLookup()

in core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java [855:1036]


  static void doLookup(ClientContext context, String server, Map<KeyExtent,List<Range>> requested,
      Map<KeyExtent,List<Range>> failures, Map<KeyExtent,List<Range>> unscanned,
      ResultReceiver receiver, List<Column> columns, ScannerOptions options,
      Authorizations authorizations, TimeoutTracker timeoutTracker, long busyTimeout)
      throws IOException, AccumuloSecurityException, AccumuloServerException {

    if (requested.isEmpty()) {
      return;
    }

    // copy requested to unscanned map. we will remove ranges as they are scanned in trackScanning()
    for (Entry<KeyExtent,List<Range>> entry : requested.entrySet()) {
      ArrayList<Range> ranges = new ArrayList<>();
      for (Range range : entry.getValue()) {
        ranges.add(new Range(range));
      }
      unscanned.put(KeyExtent.copyOf(entry.getKey()), ranges);
    }

    timeoutTracker.startingScan();
    try {
      final HostAndPort parsedServer = HostAndPort.fromString(server);
      final TabletScanClientService.Client client;
      if (timeoutTracker.getTimeOut() < context.getClientTimeoutInMillis()) {
        client = ThriftUtil.getClient(ThriftClientTypes.TABLET_SCAN, parsedServer, context,
            timeoutTracker.getTimeOut());
      } else {
        client = ThriftUtil.getClient(ThriftClientTypes.TABLET_SCAN, parsedServer, context);
      }

      // Tracks unclosed scan session id for the case when the following try block exits with an
      // exception.
      Long scanIdToClose = null;
      try {

        Timer timer = null;

        if (log.isTraceEnabled()) {
          log.trace(
              "tid={} Starting multi scan, tserver={}  #tablets={}  #ranges={} ssil={} ssio={}",
              Thread.currentThread().getId(), server, requested.size(),
              sumSizes(requested.values()), options.serverSideIteratorList,
              options.serverSideIteratorOptions);

          timer = Timer.startNew();
        }

        TabletType ttype = TabletType.type(requested.keySet());
        boolean waitForWrites = !ThriftScanner.serversWaitedForWrites.get(ttype).contains(server);

        // @formatter:off
        Map<TKeyExtent, List<TRange>> thriftTabletRanges = requested.entrySet().stream().collect(Collectors.toMap(
                        entry -> entry.getKey().toThrift(),
                        entry -> entry.getValue().stream().map(Range::toThrift).collect(Collectors.toList())
        ));
        // @formatter:on

        Map<String,String> execHints =
            options.executionHints.isEmpty() ? null : options.executionHints;

        InitialMultiScan imsr = client.startMultiScan(TraceUtil.traceInfo(), context.rpcCreds(),
            thriftTabletRanges, columns.stream().map(Column::toThrift).collect(Collectors.toList()),
            options.serverSideIteratorList, options.serverSideIteratorOptions,
            ByteBufferUtil.toByteBuffers(authorizations.getAuthorizations()), waitForWrites,
            SamplerConfigurationImpl.toThrift(options.getSamplerConfiguration()),
            options.batchTimeout, options.classLoaderContext, execHints, busyTimeout);
        scanIdToClose = imsr.scanID;
        if (waitForWrites) {
          ThriftScanner.serversWaitedForWrites.get(ttype).add(server.toString());
        }

        MultiScanResult scanResult = imsr.result;

        if (timer != null) {
          log.trace("tid={} Got 1st multi scan results, #results={} {} in {}",
              Thread.currentThread().getId(), scanResult.results.size(),
              (scanResult.more ? "scanID=" + imsr.scanID : ""),
              String.format("%.3f secs", timer.elapsed(MILLISECONDS) / 1000.0));
        }

        ArrayList<Entry<Key,Value>> entries = new ArrayList<>(scanResult.results.size());
        for (TKeyValue kv : scanResult.results) {
          entries.add(new SimpleImmutableEntry<>(new Key(kv.key), new Value(kv.value)));
        }

        if (!entries.isEmpty()) {
          receiver.receive(entries);
        }

        if (!entries.isEmpty() || !scanResult.fullScans.isEmpty()) {
          timeoutTracker.madeProgress();
        }

        trackScanning(failures, unscanned, scanResult);

        AtomicLong nextOpid = new AtomicLong();

        while (scanResult.more) {

          timeoutTracker.check();

          if (timer != null) {
            log.trace("tid={} oid={} Continuing multi scan, scanid={}",
                Thread.currentThread().getId(), nextOpid.get(), imsr.scanID);
            timer.restart();
          }

          scanResult = client.continueMultiScan(TraceUtil.traceInfo(), imsr.scanID, busyTimeout);

          if (timer != null) {
            log.trace("tid={} oid={} Got more multi scan results, #results={} {} in {}",
                Thread.currentThread().getId(), nextOpid.getAndIncrement(),
                scanResult.results.size(), (scanResult.more ? " scanID=" + imsr.scanID : ""),
                String.format("%.3f secs", timer.elapsed(MILLISECONDS) / 1000.0));
          }

          entries = new ArrayList<>(scanResult.results.size());
          for (TKeyValue kv : scanResult.results) {
            entries.add(new SimpleImmutableEntry<>(new Key(kv.key), new Value(kv.value)));
          }

          if (!entries.isEmpty()) {
            receiver.receive(entries);
          }

          if (!entries.isEmpty() || !scanResult.fullScans.isEmpty()) {
            timeoutTracker.madeProgress();
          }

          trackScanning(failures, unscanned, scanResult);
        }

        client.closeMultiScan(TraceUtil.traceInfo(), imsr.scanID);
        scanIdToClose = null;

      } finally {
        try {
          if (scanIdToClose != null) {
            // If this code is running it is likely that an exception happened and the scan session
            // was never closed. Make a best effort attempt to close the scan session which will
            // clean up server side resources. When the batch scanner is closed it will interrupt
            // the threads in its thread pool which could cause an interrupted exception in this
            // code.
            client.closeMultiScan(TraceUtil.traceInfo(), scanIdToClose);
          }
        } catch (Exception e) {
          log.trace("Failed to close scan session in finally {} {}", server, scanIdToClose, e);
        } finally {
          ThriftUtil.returnClient(client, context);
        }
      }
    } catch (TTransportException e) {
      log.debug("Server : {} msg : {}", server, e.getMessage());
      timeoutTracker.errorOccured();
      throw new IOException(e);
    } catch (ThriftSecurityException e) {
      log.debug("Server : {} msg : {}", server, e.getMessage(), e);
      throw new AccumuloSecurityException(e.user, e.code, e);
    } catch (TApplicationException e) {
      log.debug("Server : {} msg : {}", server, e.getMessage(), e);
      throw new AccumuloServerException(server, e);
    } catch (NoSuchScanIDException e) {
      log.debug("Server : {} msg : {}", server, e.getMessage(), e);
      throw new IOException(e);
    } catch (ScanServerBusyException e) {
      log.debug("Server : {} msg : {}", server, e.getMessage(), e);
      throw new IOException(e);
    } catch (TSampleNotPresentException e) {
      log.debug("Server : " + server + " msg : " + e.getMessage(), e);
      String tableInfo = "?";
      if (e.getExtent() != null) {
        TableId tableId = KeyExtent.fromThrift(e.getExtent()).tableId();
        tableInfo = context.getPrintableTableInfoFromId(tableId);
      }
      String message = "Table " + tableInfo + " does not have sampling configured or built";
      throw new SampleNotPresentException(message, e);
    } catch (TException e) {
      log.debug("Server : {} msg : {}", server, e.getMessage(), e);
      timeoutTracker.errorOccured();
      throw new IOException(e);
    }
  }