in core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java [855:1036]
static void doLookup(ClientContext context, String server, Map<KeyExtent,List<Range>> requested,
Map<KeyExtent,List<Range>> failures, Map<KeyExtent,List<Range>> unscanned,
ResultReceiver receiver, List<Column> columns, ScannerOptions options,
Authorizations authorizations, TimeoutTracker timeoutTracker, long busyTimeout)
throws IOException, AccumuloSecurityException, AccumuloServerException {
if (requested.isEmpty()) {
return;
}
// copy requested to unscanned map. we will remove ranges as they are scanned in trackScanning()
for (Entry<KeyExtent,List<Range>> entry : requested.entrySet()) {
ArrayList<Range> ranges = new ArrayList<>();
for (Range range : entry.getValue()) {
ranges.add(new Range(range));
}
unscanned.put(KeyExtent.copyOf(entry.getKey()), ranges);
}
timeoutTracker.startingScan();
try {
final HostAndPort parsedServer = HostAndPort.fromString(server);
final TabletScanClientService.Client client;
if (timeoutTracker.getTimeOut() < context.getClientTimeoutInMillis()) {
client = ThriftUtil.getClient(ThriftClientTypes.TABLET_SCAN, parsedServer, context,
timeoutTracker.getTimeOut());
} else {
client = ThriftUtil.getClient(ThriftClientTypes.TABLET_SCAN, parsedServer, context);
}
// Tracks unclosed scan session id for the case when the following try block exits with an
// exception.
Long scanIdToClose = null;
try {
Timer timer = null;
if (log.isTraceEnabled()) {
log.trace(
"tid={} Starting multi scan, tserver={} #tablets={} #ranges={} ssil={} ssio={}",
Thread.currentThread().getId(), server, requested.size(),
sumSizes(requested.values()), options.serverSideIteratorList,
options.serverSideIteratorOptions);
timer = Timer.startNew();
}
TabletType ttype = TabletType.type(requested.keySet());
boolean waitForWrites = !ThriftScanner.serversWaitedForWrites.get(ttype).contains(server);
// @formatter:off
Map<TKeyExtent, List<TRange>> thriftTabletRanges = requested.entrySet().stream().collect(Collectors.toMap(
entry -> entry.getKey().toThrift(),
entry -> entry.getValue().stream().map(Range::toThrift).collect(Collectors.toList())
));
// @formatter:on
Map<String,String> execHints =
options.executionHints.isEmpty() ? null : options.executionHints;
InitialMultiScan imsr = client.startMultiScan(TraceUtil.traceInfo(), context.rpcCreds(),
thriftTabletRanges, columns.stream().map(Column::toThrift).collect(Collectors.toList()),
options.serverSideIteratorList, options.serverSideIteratorOptions,
ByteBufferUtil.toByteBuffers(authorizations.getAuthorizations()), waitForWrites,
SamplerConfigurationImpl.toThrift(options.getSamplerConfiguration()),
options.batchTimeout, options.classLoaderContext, execHints, busyTimeout);
scanIdToClose = imsr.scanID;
if (waitForWrites) {
ThriftScanner.serversWaitedForWrites.get(ttype).add(server.toString());
}
MultiScanResult scanResult = imsr.result;
if (timer != null) {
log.trace("tid={} Got 1st multi scan results, #results={} {} in {}",
Thread.currentThread().getId(), scanResult.results.size(),
(scanResult.more ? "scanID=" + imsr.scanID : ""),
String.format("%.3f secs", timer.elapsed(MILLISECONDS) / 1000.0));
}
ArrayList<Entry<Key,Value>> entries = new ArrayList<>(scanResult.results.size());
for (TKeyValue kv : scanResult.results) {
entries.add(new SimpleImmutableEntry<>(new Key(kv.key), new Value(kv.value)));
}
if (!entries.isEmpty()) {
receiver.receive(entries);
}
if (!entries.isEmpty() || !scanResult.fullScans.isEmpty()) {
timeoutTracker.madeProgress();
}
trackScanning(failures, unscanned, scanResult);
AtomicLong nextOpid = new AtomicLong();
while (scanResult.more) {
timeoutTracker.check();
if (timer != null) {
log.trace("tid={} oid={} Continuing multi scan, scanid={}",
Thread.currentThread().getId(), nextOpid.get(), imsr.scanID);
timer.restart();
}
scanResult = client.continueMultiScan(TraceUtil.traceInfo(), imsr.scanID, busyTimeout);
if (timer != null) {
log.trace("tid={} oid={} Got more multi scan results, #results={} {} in {}",
Thread.currentThread().getId(), nextOpid.getAndIncrement(),
scanResult.results.size(), (scanResult.more ? " scanID=" + imsr.scanID : ""),
String.format("%.3f secs", timer.elapsed(MILLISECONDS) / 1000.0));
}
entries = new ArrayList<>(scanResult.results.size());
for (TKeyValue kv : scanResult.results) {
entries.add(new SimpleImmutableEntry<>(new Key(kv.key), new Value(kv.value)));
}
if (!entries.isEmpty()) {
receiver.receive(entries);
}
if (!entries.isEmpty() || !scanResult.fullScans.isEmpty()) {
timeoutTracker.madeProgress();
}
trackScanning(failures, unscanned, scanResult);
}
client.closeMultiScan(TraceUtil.traceInfo(), imsr.scanID);
scanIdToClose = null;
} finally {
try {
if (scanIdToClose != null) {
// If this code is running it is likely that an exception happened and the scan session
// was never closed. Make a best effort attempt to close the scan session which will
// clean up server side resources. When the batch scanner is closed it will interrupt
// the threads in its thread pool which could cause an interrupted exception in this
// code.
client.closeMultiScan(TraceUtil.traceInfo(), scanIdToClose);
}
} catch (Exception e) {
log.trace("Failed to close scan session in finally {} {}", server, scanIdToClose, e);
} finally {
ThriftUtil.returnClient(client, context);
}
}
} catch (TTransportException e) {
log.debug("Server : {} msg : {}", server, e.getMessage());
timeoutTracker.errorOccured();
throw new IOException(e);
} catch (ThriftSecurityException e) {
log.debug("Server : {} msg : {}", server, e.getMessage(), e);
throw new AccumuloSecurityException(e.user, e.code, e);
} catch (TApplicationException e) {
log.debug("Server : {} msg : {}", server, e.getMessage(), e);
throw new AccumuloServerException(server, e);
} catch (NoSuchScanIDException e) {
log.debug("Server : {} msg : {}", server, e.getMessage(), e);
throw new IOException(e);
} catch (ScanServerBusyException e) {
log.debug("Server : {} msg : {}", server, e.getMessage(), e);
throw new IOException(e);
} catch (TSampleNotPresentException e) {
log.debug("Server : " + server + " msg : " + e.getMessage(), e);
String tableInfo = "?";
if (e.getExtent() != null) {
TableId tableId = KeyExtent.fromThrift(e.getExtent()).tableId();
tableInfo = context.getPrintableTableInfoFromId(tableId);
}
String message = "Table " + tableInfo + " does not have sampling configured or built";
throw new SampleNotPresentException(message, e);
} catch (TException e) {
log.debug("Server : {} msg : {}", server, e.getMessage(), e);
timeoutTracker.errorOccured();
throw new IOException(e);
}
}