private List getIterators()

in phoenix-core-client/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java [1450:1605]


    private List<PeekingResultIterator> getIterators(List<List<Scan>> scan, ConnectionQueryServices services,
            boolean isLocalIndex, Queue<PeekingResultIterator> allIterators, List<PeekingResultIterator> iterators,
            boolean isReverse, long maxQueryEndTime, int splitSize, ScanWrapper previousScan, int retryCount) throws SQLException {
        boolean success = false;
        final List<List<Pair<Scan,Future<PeekingResultIterator>>>> futures = Lists.newArrayListWithExpectedSize(splitSize);
        allFutures.add(futures);
        SQLException toThrow = null;
        final HashCacheClient hashCacheClient = new HashCacheClient(context.getConnection());
        int queryTimeOut = context.getStatement().getQueryTimeoutInMillis();
        try {
            submitWork(scan, futures, allIterators, splitSize, isReverse, scanGrouper, maxQueryEndTime);
            boolean clearedCache = false;
            for (List<Pair<Scan,Future<PeekingResultIterator>>> future : reverseIfNecessary(futures,isReverse)) {
                List<PeekingResultIterator> concatIterators = Lists.newArrayListWithExpectedSize(future.size());
                Iterator<Pair<Scan, Future<PeekingResultIterator>>> scanPairItr = reverseIfNecessary(future,isReverse).iterator();
                while (scanPairItr.hasNext()) {
                    Pair<Scan,Future<PeekingResultIterator>> scanPair = scanPairItr.next();
                    try {
                        long timeOutForScan = maxQueryEndTime - EnvironmentEdgeManager.currentTimeMillis();
                        if (forTestingSetTimeoutToMaxToLetQueryPassHere) {
                            timeOutForScan = Long.MAX_VALUE;
                        }
                        if (timeOutForScan < 0) {
                            throw new SQLExceptionInfo.Builder(OPERATION_TIMED_OUT).setMessage(
                                    ". Query couldn't be completed in the allotted time: "
                                            + queryTimeOut + " ms").build().buildException();
                        }
                        // make sure we apply the iterators in order
                        if (isLocalIndex && previousScan != null && previousScan.getScan() != null
                                && (((!isReverse && Bytes.compareTo(scanPair.getFirst().getAttribute(SCAN_ACTUAL_START_ROW),
                                        previousScan.getScan().getStopRow()) < 0)
                                || (isReverse && previousScan.getScan().getStopRow().length > 0 && Bytes.compareTo(scanPair.getFirst().getAttribute(SCAN_ACTUAL_START_ROW),
                                        previousScan.getScan().getStopRow()) > 0)
                                || (Bytes.compareTo(scanPair.getFirst().getStopRow(), previousScan.getScan().getStopRow()) == 0)) 
                                    && Bytes.compareTo(scanPair.getFirst().getAttribute(SCAN_START_ROW_SUFFIX), previousScan.getScan().getAttribute(SCAN_START_ROW_SUFFIX))==0)) {
                            continue;
                        }
                        PeekingResultIterator iterator = scanPair.getSecond().get(timeOutForScan, TimeUnit.MILLISECONDS);
                        concatIterators.add(iterator);
                        previousScan.setScan(scanPair.getFirst());
                    } catch (ExecutionException e) {
                        LOGGER.warn("Getting iterators at BaseResultIterators encountered error "
                                + "for table {}", TableName.valueOf(physicalTableName), e);
                        try { // Rethrow as SQLException
                            throw ClientUtil.parseServerException(e);
                        } catch (StaleRegionBoundaryCacheException | HashJoinCacheNotFoundException e2){
                            // Catch only to try to recover from region boundary cache being out of date
                            if (!clearedCache) { // Clear cache once so that we rejigger job based on new boundaries
                                services.clearTableRegionCache(TableName.valueOf(physicalTableName));
                                context.getOverallQueryMetrics().cacheRefreshedDueToSplits();
                            }
                            // Resubmit just this portion of work again
                            Scan oldScan = scanPair.getFirst();
                            byte[] startKey = oldScan.getAttribute(SCAN_ACTUAL_START_ROW);
                            if (e2 instanceof HashJoinCacheNotFoundException) {
                                LOGGER.debug(
                                        "Retrying when Hash Join cache is not found on the server ,by sending the cache again");
                                if (retryCount <= 0) {
                                    throw e2;
                                }
                                Long cacheId = ((HashJoinCacheNotFoundException)e2).getCacheId();
                                ServerCache cache = caches.get(new ImmutableBytesPtr(Bytes.toBytes(cacheId)));
                                if (cache .getCachePtr() != null) {
                                    if (!hashCacheClient.addHashCacheToServer(startKey, cache, plan.getTableRef().getTable())) {
                                        throw e2;
                                    }
                                }
                            }
                            concatIterators =
                                    recreateIterators(services, isLocalIndex, allIterators,
                                        iterators, isReverse, maxQueryEndTime, previousScan,
                                        clearedCache, concatIterators, scanPairItr, scanPair, retryCount-1);
                        } catch(ColumnFamilyNotFoundException cfnfe) {
                            if (scanPair.getFirst().getAttribute(LOCAL_INDEX_BUILD) != null) {
                                Thread.sleep(1000);
                                concatIterators =
                                        recreateIterators(services, isLocalIndex, allIterators,
                                            iterators, isReverse, maxQueryEndTime, previousScan,
                                            clearedCache, concatIterators, scanPairItr, scanPair, retryCount);
                            }
                            
                        }
                    } catch (CancellationException ce) {
                        LOGGER.warn("Iterator scheduled to be executed in Future was being cancelled", ce);
                    }
                }
                addIterator(iterators, concatIterators);
            }
            success = true;
            return iterators;
        } catch (TimeoutException e) {
            OverAllQueryMetrics overAllQueryMetrics = context.getOverallQueryMetrics();
            overAllQueryMetrics.queryTimedOut();
            if (context.getScanRanges().isPointLookup()) {
                overAllQueryMetrics.queryPointLookupTimedOut();
            } else {
                overAllQueryMetrics.queryScanTimedOut();
            }
            GLOBAL_QUERY_TIMEOUT_COUNTER.increment();
            // thrown when a thread times out waiting for the future.get() call to return
            toThrow = new SQLExceptionInfo.Builder(OPERATION_TIMED_OUT).setMessage(
                    ". Query couldn't be completed in the allotted time: " + queryTimeOut + " ms")
                    .setRootCause(e).build().buildException();
        } catch (SQLException e) {
            if (e.getErrorCode() == OPERATION_TIMED_OUT.getErrorCode()) {
                OverAllQueryMetrics overAllQueryMetrics = context.getOverallQueryMetrics();
                overAllQueryMetrics.queryTimedOut();
                if (context.getScanRanges().isPointLookup()) {
                    overAllQueryMetrics.queryPointLookupTimedOut();
                } else {
                    overAllQueryMetrics.queryScanTimedOut();
                }
                GLOBAL_QUERY_TIMEOUT_COUNTER.increment();
            }
            toThrow = e;
        } catch (Exception e) {
            toThrow = ClientUtil.parseServerException(e);
        } finally {
            try {
                if (!success) {
                    try {
                        close();
                    } catch (Exception e) {
                        if (toThrow == null) {
                            toThrow = ClientUtil.parseServerException(e);
                        } else {
                            toThrow.setNextException(ClientUtil.parseServerException(e));
                        }
                    } finally {
                        try {
                            SQLCloseables.closeAll(allIterators);
                        } catch (Exception e) {
                            if (toThrow == null) {
                                toThrow = ClientUtil.parseServerException(e);
                            } else {
                                toThrow.setNextException(ClientUtil.parseServerException(e));
                            }
                        }
                    }
                }
            } finally {
                if (toThrow != null) {
                    GLOBAL_FAILED_QUERY_COUNTER.increment();
                    OverAllQueryMetrics overAllQueryMetrics = context.getOverallQueryMetrics();
                    overAllQueryMetrics.queryFailed();
                    if (context.getScanRanges().isPointLookup()) {
                        overAllQueryMetrics.queryPointLookupFailed();
                    } else {
                        overAllQueryMetrics.queryScanFailed();
                    }
                    throw toThrow;
                }
            }
        }
        return null; // Not reachable
    }