in core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java [848:983]
private static List<KeyValue> scanRpc(ScanAddress addr, ScanState scanState,
ClientContext context, long busyTimeout) throws AccumuloSecurityException,
NotServingTabletException, TException, NoSuchScanIDException, TooManyFilesException,
TSampleNotPresentException, ScanServerBusyException {
Timer timer = null;
final TInfo tinfo = TraceUtil.traceInfo();
final HostAndPort parsedLocation = HostAndPort.fromString(addr.serverAddress);
TabletScanClientService.Client client =
ThriftUtil.getClient(ThriftClientTypes.TABLET_SCAN, parsedLocation, context);
String old = Thread.currentThread().getName();
try {
ScanResult sr;
if (scanState.prevLoc != null && !scanState.prevLoc.equals(addr)) {
scanState.scanID = null;
}
scanState.prevLoc = addr;
if (scanState.scanID == null) {
Thread.currentThread().setName("Starting scan tserver=" + addr.serverAddress + " tableId="
+ addr.getExtent().tableId());
if (log.isTraceEnabled()) {
String msg = "Starting scan server=" + addr.serverAddress + " tablet=" + addr.getExtent()
+ " range=" + scanState.range + " ssil=" + scanState.serverSideIteratorList + " ssio="
+ scanState.serverSideIteratorOptions + " context=" + scanState.classLoaderContext;
log.trace("tid={} {}", Thread.currentThread().getId(), msg);
timer = Timer.startNew();
}
TabletType ttype = TabletType.type(addr.getExtent());
boolean waitForWrites = !serversWaitedForWrites.get(ttype).contains(addr.serverAddress);
InitialScan is = client.startScan(tinfo, scanState.context.rpcCreds(),
addr.getExtent().toThrift(), scanState.range.toThrift(),
scanState.columns.stream().map(Column::toThrift).collect(Collectors.toList()),
scanState.size, scanState.serverSideIteratorList, scanState.serverSideIteratorOptions,
scanState.authorizations.getAuthorizationsBB(), waitForWrites, scanState.isolated,
scanState.readaheadThreshold,
SamplerConfigurationImpl.toThrift(scanState.samplerConfig), scanState.batchTimeOut,
scanState.classLoaderContext, scanState.executionHints, busyTimeout);
if (waitForWrites) {
serversWaitedForWrites.get(ttype).add(addr.serverAddress);
}
sr = is.result;
if (sr.more) {
scanState.scanID = is.scanID;
} else {
client.closeScan(tinfo, is.scanID);
}
} else {
String msg =
"Continuing scan tserver=" + addr.serverAddress + " scanid=" + scanState.scanID;
Thread.currentThread().setName(msg);
if (log.isTraceEnabled()) {
log.trace("tid={} {}", Thread.currentThread().getId(), msg);
timer = Timer.startNew();
}
sr = client.continueScan(tinfo, scanState.scanID, busyTimeout);
if (!sr.more) {
client.closeScan(tinfo, scanState.scanID);
scanState.scanID = null;
}
}
if (sr.more) {
if (timer != null) {
log.trace("tid={} Finished scan in {} #results={} scanid={}",
Thread.currentThread().getId(),
String.format("%.3f secs", timer.elapsed(MILLISECONDS) / 1000.0), sr.results.size(),
scanState.scanID);
}
} else {
if (addr.getExtent().endRow() == null) {
scanState.finished = true;
if (timer != null) {
log.trace("tid={} Completely finished scan in {} #results={}",
Thread.currentThread().getId(),
String.format("%.3f secs", timer.elapsed(MILLISECONDS) / 1000.0),
sr.results.size());
}
} else if (scanState.range.getEndKey() == null || !scanState.range
.afterEndKey(new Key(addr.getExtent().endRow()).followingKey(PartialKey.ROW))) {
scanState.startRow = addr.getExtent().endRow();
scanState.skipStartRow = true;
if (timer != null) {
log.trace("tid={} Finished scanning tablet in {} #results={}",
Thread.currentThread().getId(),
String.format("%.3f secs", timer.elapsed(MILLISECONDS) / 1000.0),
sr.results.size());
}
} else {
scanState.finished = true;
if (timer != null) {
log.trace("tid={} Completely finished in {} #results={}",
Thread.currentThread().getId(),
String.format("%.3f secs", timer.elapsed(MILLISECONDS) / 1000.0),
sr.results.size());
}
}
}
Key.decompress(sr.results);
if (!sr.results.isEmpty() && !scanState.finished) {
scanState.range = new Range(new Key(sr.results.get(sr.results.size() - 1).key), false,
scanState.range.getEndKey(), scanState.range.isEndKeyInclusive());
}
List<KeyValue> results = new ArrayList<>(sr.results.size());
for (TKeyValue tkv : sr.results) {
results.add(new KeyValue(new Key(tkv.key), tkv.value));
}
return results;
} catch (ThriftSecurityException e) {
throw new AccumuloSecurityException(e.user, e.code, e);
} finally {
ThriftUtil.returnClient(client, context);
Thread.currentThread().setName(old);
}
}