public static List scan()

in core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java [630:820]


  public static List<KeyValue> scan(ClientContext context, ScanState scanState, Duration timeOut)
      throws ScanTimedOutException, AccumuloException, AccumuloSecurityException,
      TableNotFoundException {

    Timer scanTimer = Timer.startNew();
    String lastError = null;
    String error = null;
    int tooManyFilesCount = 0;
    long sleepMillis = 100;
    final long maxSleepTime =
        context.getConfiguration().getTimeInMillis(Property.GENERAL_MAX_SCANNER_RETRY_PERIOD);

    List<KeyValue> results = null;

    Span parent = TraceUtil.startSpan(ThriftScanner.class, "scan");
    try (Scope scope = parent.makeCurrent()) {
      while (results == null && !scanState.finished) {
        if (Thread.currentThread().isInterrupted()) {
          throw new AccumuloException("Thread interrupted");
        }

        if (scanTimer.hasElapsed(timeOut)) {
          throw new ScanTimedOutException(
              "Failed to retrieve next batch of key values before timeout");
        }

        ScanAddress addr;
        long beginTime = System.nanoTime();
        try {
          addr = getNextScanAddress(context, scanState, timeOut, scanTimer, maxSleepTime);
        } finally {
          // track the initial time that we started tracking the time for getting the next scan
          // address
          if (scanState.startTimeNanos == 0) {
            scanState.startTimeNanos = beginTime;
          }

          // track the total amount of time spent getting the next scan address
          scanState.getNextScanAddressTimeNanos += System.nanoTime() - beginTime;
        }

        Span child2 = TraceUtil.startSpan(ThriftScanner.class, "scan::location",
            Map.of("tserver", addr.serverAddress));
        try (Scope scanLocation = child2.makeCurrent()) {
          results = scan(addr, scanState, context);
        } catch (AccumuloSecurityException e) {
          context.clearTableListCache();
          context.requireNotDeleted(scanState.tableId);
          e.setTableInfo(context.getPrintableTableInfoFromId(scanState.tableId));
          TraceUtil.setException(child2, e, true);
          throw e;
        } catch (TApplicationException tae) {
          TraceUtil.setException(child2, tae, true);
          throw new AccumuloServerException(addr.serverAddress, tae);
        } catch (TSampleNotPresentException tsnpe) {
          String message = "Table " + context.getPrintableTableInfoFromId(scanState.tableId)
              + " does not have sampling configured or built";
          TraceUtil.setException(child2, tsnpe, true);
          throw new SampleNotPresentException(message, tsnpe);
        } catch (NotServingTabletException e) {
          error = "Scan failed, not serving tablet " + addr.serverAddress;
          if (!error.equals(lastError)) {
            log.debug("{}", error);
          } else if (log.isTraceEnabled()) {
            log.trace("{}", error);
          }
          lastError = error;

          context.getTabletLocationCache(scanState.tableId).invalidateCache(addr.getExtent());

          // no need to try the current scan id somewhere else
          scanState.scanID = null;

          if (scanState.isolated) {
            TraceUtil.setException(child2, e, true);
            throw new IsolationException();
          }

          TraceUtil.setException(child2, e, false);
          sleepMillis = pause(sleepMillis, maxSleepTime, scanState.runOnScanServer);
        } catch (ScanServerBusyException e) {
          error = "Scan failed, scan server was busy " + addr.serverAddress;
          if (!error.equals(lastError)) {
            log.debug("{}", error);
          } else if (log.isTraceEnabled()) {
            log.trace("{}", error);
          }
          lastError = error;

          if (scanState.isolated) {
            TraceUtil.setException(child2, e, true);
            throw new IsolationException();
          }

          TraceUtil.setException(child2, e, false);
          scanState.scanID = null;
        } catch (NoSuchScanIDException e) {
          error = "Scan failed, no such scan id " + scanState.scanID + " " + addr.serverAddress;
          if (!error.equals(lastError)) {
            log.debug("{}", error);
          } else if (log.isTraceEnabled()) {
            log.trace("{}", error);
          }
          lastError = error;

          if (scanState.isolated) {
            TraceUtil.setException(child2, e, true);
            throw new IsolationException();
          }

          TraceUtil.setException(child2, e, false);
          scanState.scanID = null;
        } catch (TooManyFilesException e) {
          error = "Tablet has too many files " + addr.serverAddress + " retrying...";
          if (error.equals(lastError)) {
            tooManyFilesCount++;
            if (tooManyFilesCount == 300) {
              log.warn("{}", error);
            } else if (log.isTraceEnabled()) {
              log.trace("{}", error);
            }
          } else {
            log.debug("{}", error);
            tooManyFilesCount = 0;
          }
          lastError = error;

          // not sure what state the scan session on the server side is
          // in after this occurs, so lets be cautious and start a new
          // scan session
          scanState.scanID = null;

          if (scanState.isolated) {
            TraceUtil.setException(child2, e, true);
            throw new IsolationException();
          }

          TraceUtil.setException(child2, e, false);
          sleepMillis = pause(sleepMillis, maxSleepTime, scanState.runOnScanServer);
        } catch (TException e) {
          if (addr.serverType == ServerType.TSERVER) {
            // only tsever locations are in cache, invalidating a scan server would not find
            // anything the cache
            boolean wasInterruptedAfterClose =
                e.getCause() != null && e.getCause().getClass().equals(InterruptedIOException.class)
                    && scanState.closeInitiated;
            if (!wasInterruptedAfterClose) {
              context.getTabletLocationCache(scanState.tableId).invalidateCache(addr.getExtent());
            }
          }
          error = "Scan failed, thrift error " + e.getClass().getName() + "  " + e.getMessage()
              + " " + addr.serverAddress;
          if (!error.equals(lastError)) {
            log.debug("{}", error);
          } else if (log.isTraceEnabled()) {
            log.trace("{}", error);
          }
          lastError = error;

          // do not want to continue using the same scan id, if a timeout occurred could cause a
          // batch to be skipped
          // because a thread on the server side may still be processing the timed out continue scan
          scanState.scanID = null;

          if (scanState.isolated) {
            TraceUtil.setException(child2, e, true);
            throw new IsolationException();
          }

          TraceUtil.setException(child2, e, false);
          sleepMillis = pause(sleepMillis, maxSleepTime, scanState.runOnScanServer);
        } finally {
          child2.end();
        }
      }

      if (results != null && results.isEmpty() && scanState.finished) {
        results = null;
      }

      return results;
    } catch (InterruptedException ex) {
      TraceUtil.setException(parent, ex, true);
      throw new AccumuloException(ex);
    } catch (InvalidTabletHostingRequestException e) {
      TraceUtil.setException(parent, e, true);
      throw new AccumuloException(e);
    } finally {
      parent.end();
    }
  }