in core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java [630:820]
public static List<KeyValue> scan(ClientContext context, ScanState scanState, Duration timeOut)
throws ScanTimedOutException, AccumuloException, AccumuloSecurityException,
TableNotFoundException {
Timer scanTimer = Timer.startNew();
String lastError = null;
String error = null;
int tooManyFilesCount = 0;
long sleepMillis = 100;
final long maxSleepTime =
context.getConfiguration().getTimeInMillis(Property.GENERAL_MAX_SCANNER_RETRY_PERIOD);
List<KeyValue> results = null;
Span parent = TraceUtil.startSpan(ThriftScanner.class, "scan");
try (Scope scope = parent.makeCurrent()) {
while (results == null && !scanState.finished) {
if (Thread.currentThread().isInterrupted()) {
throw new AccumuloException("Thread interrupted");
}
if (scanTimer.hasElapsed(timeOut)) {
throw new ScanTimedOutException(
"Failed to retrieve next batch of key values before timeout");
}
ScanAddress addr;
long beginTime = System.nanoTime();
try {
addr = getNextScanAddress(context, scanState, timeOut, scanTimer, maxSleepTime);
} finally {
// track the initial time that we started tracking the time for getting the next scan
// address
if (scanState.startTimeNanos == 0) {
scanState.startTimeNanos = beginTime;
}
// track the total amount of time spent getting the next scan address
scanState.getNextScanAddressTimeNanos += System.nanoTime() - beginTime;
}
Span child2 = TraceUtil.startSpan(ThriftScanner.class, "scan::location",
Map.of("tserver", addr.serverAddress));
try (Scope scanLocation = child2.makeCurrent()) {
results = scan(addr, scanState, context);
} catch (AccumuloSecurityException e) {
context.clearTableListCache();
context.requireNotDeleted(scanState.tableId);
e.setTableInfo(context.getPrintableTableInfoFromId(scanState.tableId));
TraceUtil.setException(child2, e, true);
throw e;
} catch (TApplicationException tae) {
TraceUtil.setException(child2, tae, true);
throw new AccumuloServerException(addr.serverAddress, tae);
} catch (TSampleNotPresentException tsnpe) {
String message = "Table " + context.getPrintableTableInfoFromId(scanState.tableId)
+ " does not have sampling configured or built";
TraceUtil.setException(child2, tsnpe, true);
throw new SampleNotPresentException(message, tsnpe);
} catch (NotServingTabletException e) {
error = "Scan failed, not serving tablet " + addr.serverAddress;
if (!error.equals(lastError)) {
log.debug("{}", error);
} else if (log.isTraceEnabled()) {
log.trace("{}", error);
}
lastError = error;
context.getTabletLocationCache(scanState.tableId).invalidateCache(addr.getExtent());
// no need to try the current scan id somewhere else
scanState.scanID = null;
if (scanState.isolated) {
TraceUtil.setException(child2, e, true);
throw new IsolationException();
}
TraceUtil.setException(child2, e, false);
sleepMillis = pause(sleepMillis, maxSleepTime, scanState.runOnScanServer);
} catch (ScanServerBusyException e) {
error = "Scan failed, scan server was busy " + addr.serverAddress;
if (!error.equals(lastError)) {
log.debug("{}", error);
} else if (log.isTraceEnabled()) {
log.trace("{}", error);
}
lastError = error;
if (scanState.isolated) {
TraceUtil.setException(child2, e, true);
throw new IsolationException();
}
TraceUtil.setException(child2, e, false);
scanState.scanID = null;
} catch (NoSuchScanIDException e) {
error = "Scan failed, no such scan id " + scanState.scanID + " " + addr.serverAddress;
if (!error.equals(lastError)) {
log.debug("{}", error);
} else if (log.isTraceEnabled()) {
log.trace("{}", error);
}
lastError = error;
if (scanState.isolated) {
TraceUtil.setException(child2, e, true);
throw new IsolationException();
}
TraceUtil.setException(child2, e, false);
scanState.scanID = null;
} catch (TooManyFilesException e) {
error = "Tablet has too many files " + addr.serverAddress + " retrying...";
if (error.equals(lastError)) {
tooManyFilesCount++;
if (tooManyFilesCount == 300) {
log.warn("{}", error);
} else if (log.isTraceEnabled()) {
log.trace("{}", error);
}
} else {
log.debug("{}", error);
tooManyFilesCount = 0;
}
lastError = error;
// not sure what state the scan session on the server side is
// in after this occurs, so lets be cautious and start a new
// scan session
scanState.scanID = null;
if (scanState.isolated) {
TraceUtil.setException(child2, e, true);
throw new IsolationException();
}
TraceUtil.setException(child2, e, false);
sleepMillis = pause(sleepMillis, maxSleepTime, scanState.runOnScanServer);
} catch (TException e) {
if (addr.serverType == ServerType.TSERVER) {
// only tsever locations are in cache, invalidating a scan server would not find
// anything the cache
boolean wasInterruptedAfterClose =
e.getCause() != null && e.getCause().getClass().equals(InterruptedIOException.class)
&& scanState.closeInitiated;
if (!wasInterruptedAfterClose) {
context.getTabletLocationCache(scanState.tableId).invalidateCache(addr.getExtent());
}
}
error = "Scan failed, thrift error " + e.getClass().getName() + " " + e.getMessage()
+ " " + addr.serverAddress;
if (!error.equals(lastError)) {
log.debug("{}", error);
} else if (log.isTraceEnabled()) {
log.trace("{}", error);
}
lastError = error;
// do not want to continue using the same scan id, if a timeout occurred could cause a
// batch to be skipped
// because a thread on the server side may still be processing the timed out continue scan
scanState.scanID = null;
if (scanState.isolated) {
TraceUtil.setException(child2, e, true);
throw new IsolationException();
}
TraceUtil.setException(child2, e, false);
sleepMillis = pause(sleepMillis, maxSleepTime, scanState.runOnScanServer);
} finally {
child2.end();
}
}
if (results != null && results.isEmpty() && scanState.finished) {
results = null;
}
return results;
} catch (InterruptedException ex) {
TraceUtil.setException(parent, ex, true);
throw new AccumuloException(ex);
} catch (InvalidTabletHostingRequestException e) {
TraceUtil.setException(parent, e, true);
throw new AccumuloException(e);
} finally {
parent.end();
}
}