in phoenix-core-client/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java [1450:1605]
private List<PeekingResultIterator> getIterators(List<List<Scan>> scan, ConnectionQueryServices services,
boolean isLocalIndex, Queue<PeekingResultIterator> allIterators, List<PeekingResultIterator> iterators,
boolean isReverse, long maxQueryEndTime, int splitSize, ScanWrapper previousScan, int retryCount) throws SQLException {
boolean success = false;
final List<List<Pair<Scan,Future<PeekingResultIterator>>>> futures = Lists.newArrayListWithExpectedSize(splitSize);
allFutures.add(futures);
SQLException toThrow = null;
final HashCacheClient hashCacheClient = new HashCacheClient(context.getConnection());
int queryTimeOut = context.getStatement().getQueryTimeoutInMillis();
try {
submitWork(scan, futures, allIterators, splitSize, isReverse, scanGrouper, maxQueryEndTime);
boolean clearedCache = false;
for (List<Pair<Scan,Future<PeekingResultIterator>>> future : reverseIfNecessary(futures,isReverse)) {
List<PeekingResultIterator> concatIterators = Lists.newArrayListWithExpectedSize(future.size());
Iterator<Pair<Scan, Future<PeekingResultIterator>>> scanPairItr = reverseIfNecessary(future,isReverse).iterator();
while (scanPairItr.hasNext()) {
Pair<Scan,Future<PeekingResultIterator>> scanPair = scanPairItr.next();
try {
long timeOutForScan = maxQueryEndTime - EnvironmentEdgeManager.currentTimeMillis();
if (forTestingSetTimeoutToMaxToLetQueryPassHere) {
timeOutForScan = Long.MAX_VALUE;
}
if (timeOutForScan < 0) {
throw new SQLExceptionInfo.Builder(OPERATION_TIMED_OUT).setMessage(
". Query couldn't be completed in the allotted time: "
+ queryTimeOut + " ms").build().buildException();
}
// make sure we apply the iterators in order
if (isLocalIndex && previousScan != null && previousScan.getScan() != null
&& (((!isReverse && Bytes.compareTo(scanPair.getFirst().getAttribute(SCAN_ACTUAL_START_ROW),
previousScan.getScan().getStopRow()) < 0)
|| (isReverse && previousScan.getScan().getStopRow().length > 0 && Bytes.compareTo(scanPair.getFirst().getAttribute(SCAN_ACTUAL_START_ROW),
previousScan.getScan().getStopRow()) > 0)
|| (Bytes.compareTo(scanPair.getFirst().getStopRow(), previousScan.getScan().getStopRow()) == 0))
&& Bytes.compareTo(scanPair.getFirst().getAttribute(SCAN_START_ROW_SUFFIX), previousScan.getScan().getAttribute(SCAN_START_ROW_SUFFIX))==0)) {
continue;
}
PeekingResultIterator iterator = scanPair.getSecond().get(timeOutForScan, TimeUnit.MILLISECONDS);
concatIterators.add(iterator);
previousScan.setScan(scanPair.getFirst());
} catch (ExecutionException e) {
LOGGER.warn("Getting iterators at BaseResultIterators encountered error "
+ "for table {}", TableName.valueOf(physicalTableName), e);
try { // Rethrow as SQLException
throw ClientUtil.parseServerException(e);
} catch (StaleRegionBoundaryCacheException | HashJoinCacheNotFoundException e2){
// Catch only to try to recover from region boundary cache being out of date
if (!clearedCache) { // Clear cache once so that we rejigger job based on new boundaries
services.clearTableRegionCache(TableName.valueOf(physicalTableName));
context.getOverallQueryMetrics().cacheRefreshedDueToSplits();
}
// Resubmit just this portion of work again
Scan oldScan = scanPair.getFirst();
byte[] startKey = oldScan.getAttribute(SCAN_ACTUAL_START_ROW);
if (e2 instanceof HashJoinCacheNotFoundException) {
LOGGER.debug(
"Retrying when Hash Join cache is not found on the server ,by sending the cache again");
if (retryCount <= 0) {
throw e2;
}
Long cacheId = ((HashJoinCacheNotFoundException)e2).getCacheId();
ServerCache cache = caches.get(new ImmutableBytesPtr(Bytes.toBytes(cacheId)));
if (cache .getCachePtr() != null) {
if (!hashCacheClient.addHashCacheToServer(startKey, cache, plan.getTableRef().getTable())) {
throw e2;
}
}
}
concatIterators =
recreateIterators(services, isLocalIndex, allIterators,
iterators, isReverse, maxQueryEndTime, previousScan,
clearedCache, concatIterators, scanPairItr, scanPair, retryCount-1);
} catch(ColumnFamilyNotFoundException cfnfe) {
if (scanPair.getFirst().getAttribute(LOCAL_INDEX_BUILD) != null) {
Thread.sleep(1000);
concatIterators =
recreateIterators(services, isLocalIndex, allIterators,
iterators, isReverse, maxQueryEndTime, previousScan,
clearedCache, concatIterators, scanPairItr, scanPair, retryCount);
}
}
} catch (CancellationException ce) {
LOGGER.warn("Iterator scheduled to be executed in Future was being cancelled", ce);
}
}
addIterator(iterators, concatIterators);
}
success = true;
return iterators;
} catch (TimeoutException e) {
OverAllQueryMetrics overAllQueryMetrics = context.getOverallQueryMetrics();
overAllQueryMetrics.queryTimedOut();
if (context.getScanRanges().isPointLookup()) {
overAllQueryMetrics.queryPointLookupTimedOut();
} else {
overAllQueryMetrics.queryScanTimedOut();
}
GLOBAL_QUERY_TIMEOUT_COUNTER.increment();
// thrown when a thread times out waiting for the future.get() call to return
toThrow = new SQLExceptionInfo.Builder(OPERATION_TIMED_OUT).setMessage(
". Query couldn't be completed in the allotted time: " + queryTimeOut + " ms")
.setRootCause(e).build().buildException();
} catch (SQLException e) {
if (e.getErrorCode() == OPERATION_TIMED_OUT.getErrorCode()) {
OverAllQueryMetrics overAllQueryMetrics = context.getOverallQueryMetrics();
overAllQueryMetrics.queryTimedOut();
if (context.getScanRanges().isPointLookup()) {
overAllQueryMetrics.queryPointLookupTimedOut();
} else {
overAllQueryMetrics.queryScanTimedOut();
}
GLOBAL_QUERY_TIMEOUT_COUNTER.increment();
}
toThrow = e;
} catch (Exception e) {
toThrow = ClientUtil.parseServerException(e);
} finally {
try {
if (!success) {
try {
close();
} catch (Exception e) {
if (toThrow == null) {
toThrow = ClientUtil.parseServerException(e);
} else {
toThrow.setNextException(ClientUtil.parseServerException(e));
}
} finally {
try {
SQLCloseables.closeAll(allIterators);
} catch (Exception e) {
if (toThrow == null) {
toThrow = ClientUtil.parseServerException(e);
} else {
toThrow.setNextException(ClientUtil.parseServerException(e));
}
}
}
}
} finally {
if (toThrow != null) {
GLOBAL_FAILED_QUERY_COUNTER.increment();
OverAllQueryMetrics overAllQueryMetrics = context.getOverallQueryMetrics();
overAllQueryMetrics.queryFailed();
if (context.getScanRanges().isPointLookup()) {
overAllQueryMetrics.queryPointLookupFailed();
} else {
overAllQueryMetrics.queryScanFailed();
}
throw toThrow;
}
}
}
return null; // Not reachable
}