private ScansWithRegionLocations getParallelScans()

in phoenix-core-client/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java [959:1296]


    private ScansWithRegionLocations getParallelScans(byte[] startKey, byte[] stopKey)
            throws SQLException {
        ScanRanges scanRanges = context.getScanRanges();
        PTable table = getTable();
        boolean isLocalIndex = table.getIndexType() == IndexType.LOCAL;
        GuidePostEstimate estimates = new GuidePostEstimate();
        if (!isLocalIndex && scanRanges.isPointLookup() && !scanRanges.useSkipScanFilter()) {
            List<List<Scan>> parallelScans = Lists.newArrayListWithExpectedSize(1);
            List<Scan> scans = Lists.newArrayListWithExpectedSize(1);
            Scan scanFromContext = context.getScan();
            Integer limit = plan.getLimit();
            boolean isAggregate = plan.getStatement().isAggregate();
            if (scanRanges.getPointLookupCount() == 1 && limit == null && !isAggregate) {
                // leverage bloom filter for single key point lookup by turning scan to
                // Get Scan#isGetScan(). There should also be no limit on the point lookup query.
                // The limit and the aggregate check is needed to handle cases where a child view
                // extends the parent's PK and you insert data through the child but do a point
                // lookup using the parent's PK. Since the parent's PK is only a prefix of the
                // actual PK we can't do a Get but need to do a regular scan with the stop key
                // set to the next key after the start key.
                try {
                    scanFromContext = new Scan(context.getScan());
                } catch (IOException e) {
                    LOGGER.error("Failure to construct point lookup scan", e);
                    throw new PhoenixIOException(e);
                }
                scanFromContext.withStopRow(scanFromContext.getStartRow(),
                    scanFromContext.includeStartRow());
            }
            scans.add(scanFromContext);
            parallelScans.add(scans);
            generateEstimates(scanRanges, table, GuidePostsInfo.NO_GUIDEPOST,
                    GuidePostsInfo.NO_GUIDEPOST.isEmptyGuidePost(), parallelScans, estimates,
                    Long.MAX_VALUE, false);
            // we don't retrieve region location for the given scan range
            return new ScansWithRegionLocations(parallelScans, null);
        }
        byte[] sampleProcessedSaltByte =
                SchemaUtil.processSplit(new byte[] { 0 }, table.getPKColumns());
        byte[] splitPostfix =
                Arrays.copyOfRange(sampleProcessedSaltByte, 1, sampleProcessedSaltByte.length);
        boolean isSalted = table.getBucketNum() != null;
        GuidePostsInfo gps = getGuidePosts();
        // case when stats wasn't collected
        hasGuidePosts = gps != GuidePostsInfo.NO_GUIDEPOST;
        // Case when stats collection did run but there possibly wasn't enough data. In such a
        // case we generate an empty guide post with the byte estimate being set as guide post
        // width. 
        boolean emptyGuidePost = gps.isEmptyGuidePost();
        byte[] startRegionBoundaryKey = startKey;
        byte[] stopRegionBoundaryKey = stopKey;
        int columnsInCommon = 0;
        ScanRanges prefixScanRanges = ScanRanges.EVERYTHING;
        boolean traverseAllRegions = isSalted || isLocalIndex;
        if (isLocalIndex) {
            // TODO: when implementing PHOENIX-4585, we should change this to an assert
            // as we should always have a data plan when a local index is being used.
            if (dataPlan != null && dataPlan.getTableRef().getTable().getType() != PTableType.INDEX) { // Sanity check
                prefixScanRanges = computePrefixScanRanges(dataPlan.getContext().getScanRanges(), columnsInCommon=computeColumnsInCommon());
                KeyRange prefixRange = prefixScanRanges.getScanRange();
                if (!prefixRange.lowerUnbound()) {
                    startRegionBoundaryKey = prefixRange.getLowerRange();
                }
                if (!prefixRange.upperUnbound()) {
                    stopRegionBoundaryKey = prefixRange.getUpperRange();
                }
            }
        } else if (!traverseAllRegions) {
            byte[] scanStartRow = scan.getStartRow();
            if (scanStartRow.length != 0 && Bytes.compareTo(scanStartRow, startKey) > 0) {
                startRegionBoundaryKey = startKey = scanStartRow;
            }
            byte[] scanStopRow = scan.getStopRow();
            if (stopKey.length == 0
                    || (scanStopRow.length != 0 && Bytes.compareTo(scanStopRow, stopKey) < 0)) {
                stopRegionBoundaryKey = stopKey = scanStopRow;
            }
        }
        
        int regionIndex = 0;
        int startRegionIndex = 0;

        List<HRegionLocation> regionLocations;
        if (isSalted && !isLocalIndex) {
            // key prefix = salt num + view index id + tenant id
            // If salting is used with tenant or view index id, scan start and end
            // rowkeys will not be empty. We need to generate region locations for
            // all the scan range such that we cover (each salt bucket num) + (prefix starting from
            // index position 1 to cover view index and/or tenant id and/or remaining prefix).
            if (scan.getStartRow().length > 0 && scan.getStopRow().length > 0) {
                regionLocations = new ArrayList<>();
                for (int i = 0; i < getTable().getBucketNum(); i++) {
                    byte[] saltStartRegionKey = new byte[scan.getStartRow().length];
                    saltStartRegionKey[0] = (byte) i;
                    System.arraycopy(scan.getStartRow(), 1, saltStartRegionKey, 1,
                        scan.getStartRow().length - 1);

                    byte[] saltStopRegionKey = new byte[scan.getStopRow().length];
                    saltStopRegionKey[0] = (byte) i;
                    System.arraycopy(scan.getStopRow(), 1, saltStopRegionKey, 1,
                        scan.getStopRow().length - 1);

                    regionLocations.addAll(
                        getRegionBoundaries(scanGrouper, saltStartRegionKey, saltStopRegionKey));
                }
            } else {
                // If scan start and end rowkeys are empty, we end up fetching all region locations.
                regionLocations =
                    getRegionBoundaries(scanGrouper, startRegionBoundaryKey, stopRegionBoundaryKey);
            }
        } else {
            // For range scans, startRegionBoundaryKey and stopRegionBoundaryKey should refer
            // to the boundary specified by the scan context.
            regionLocations =
                getRegionBoundaries(scanGrouper, startRegionBoundaryKey, stopRegionBoundaryKey);
        }

        numRegionLocationLookups = regionLocations.size();
        List<byte[]> regionBoundaries = toBoundaries(regionLocations);
        int stopIndex = regionBoundaries.size();
        if (startRegionBoundaryKey.length > 0) {
            startRegionIndex = regionIndex = getIndexContainingInclusive(regionBoundaries, startRegionBoundaryKey);
        }
        if (stopRegionBoundaryKey.length > 0) {
            stopIndex = Math.min(stopIndex, regionIndex + getIndexContainingExclusive(regionBoundaries.subList(regionIndex, stopIndex), stopRegionBoundaryKey));
            if (isLocalIndex) {
                stopKey = regionLocations.get(stopIndex).getRegion().getEndKey();
            }
        }
        ParallelScansCollector parallelScanCollector = new ParallelScansCollector(scanGrouper);
        
        ImmutableBytesWritable currentKey = new ImmutableBytesWritable(startKey);
        
        int gpsSize = gps.getGuidePostsCount();
        int keyOffset = 0;
        ImmutableBytesWritable currentGuidePost = ByteUtil.EMPTY_IMMUTABLE_BYTE_ARRAY;
        ImmutableBytesWritable guidePosts = gps.getGuidePosts();
        ByteArrayInputStream stream = null;
        DataInput input = null;
        PrefixByteDecoder decoder = null;
        int guideIndex = 0;
        boolean gpsForFirstRegion = false;
        boolean intersectWithGuidePosts = true;
        // Maintain min ts for gps in first or last region outside of
        // gps that are in the scan range. We'll use this if we find
        // no gps in range.
        long fallbackTs = Long.MAX_VALUE;
        // Determination of whether of not we found a guidepost in
        // every region between the start and stop key. If not, then
        // we cannot definitively say at what time the guideposts
        // were collected.
        boolean gpsAvailableForAllRegions = true;
        try {
            boolean delayAddingEst = false;
            ImmutableBytesWritable firstRegionStartKey = null;
            if (gpsSize > 0) {
                stream = new ByteArrayInputStream(guidePosts.get(), guidePosts.getOffset(), guidePosts.getLength());
                input = new DataInputStream(stream);
                decoder = new PrefixByteDecoder(gps.getMaxLength());
                firstRegionStartKey = new ImmutableBytesWritable(regionLocations.get(regionIndex).getRegion().getStartKey());
                try {
                    int c;
                    // Continue walking guideposts until we get past the currentKey
                    while ((c=currentKey.compareTo(currentGuidePost = PrefixByteCodec.decode(decoder, input))) >= 0) {
                        // Detect if we found a guidepost that might be in the first region. This
                        // is for the case where the start key may be past the only guidepost in
                        // the first region.
                        if (!gpsForFirstRegion && firstRegionStartKey.compareTo(currentGuidePost) <= 0) {
                            gpsForFirstRegion = true;
                        }
                        // While we have gps in the region (but outside of start/stop key), track
                        // the min ts as a fallback for the time at which stas were calculated.
                        if (gpsForFirstRegion) {
                            fallbackTs =
                                    Math.min(fallbackTs,
                                        gps.getGuidePostTimestamps()[guideIndex]);
                        }
                        // Special case for gp == startKey in which case we want to
                        // count this gp (if it's in range) though we go past it.
                        delayAddingEst = (c == 0);
                        guideIndex++;
                    }
                } catch (EOFException e) {
                    // expected. Thrown when we have decoded all guide posts.
                    intersectWithGuidePosts = false;
                }
            }
            byte[] endRegionKey = regionLocations.get(stopIndex).getRegion().getEndKey();
            byte[] currentKeyBytes = currentKey.copyBytes();
            intersectWithGuidePosts &= guideIndex < gpsSize;
            // Merge bisect with guideposts for all but the last region
            while (regionIndex <= stopIndex) {
                HRegionLocation regionLocation = regionLocations.get(regionIndex);
                RegionInfo regionInfo = regionLocation.getRegion();
                byte[] currentGuidePostBytes = currentGuidePost.copyBytes();
                byte[] endKey;
                if (regionIndex == stopIndex) {
                    endKey = stopKey;
                } else {
                    endKey = regionBoundaries.get(regionIndex);
                }
                if (isLocalIndex) {
                    if (dataPlan != null && dataPlan.getTableRef().getTable().getType() != PTableType.INDEX) { // Sanity check
                        ScanRanges dataScanRanges = dataPlan.getContext().getScanRanges();
                        // we can skip a region completely for local indexes if the data plan does not intersect
                        if (!dataScanRanges.intersectRegion(regionInfo.getStartKey(), regionInfo.getEndKey(), false)) {
                            currentKeyBytes = endKey;
                            regionIndex++;
                            continue;
                        }
                    }
                    // Only attempt further pruning if the prefix range is using
                    // a skip scan since we've already pruned the range of regions
                    // based on the start/stop key.
                    if (columnsInCommon > 0 && prefixScanRanges.useSkipScanFilter()) {
                        byte[] regionStartKey = regionInfo.getStartKey();
                        ImmutableBytesWritable ptr = context.getTempPtr();
                        clipKeyRangeBytes(prefixScanRanges.getSchema(), 0, columnsInCommon, regionStartKey, ptr, false);
                        regionStartKey = ByteUtil.copyKeyBytesIfNecessary(ptr);
                        // Prune this region if there's no intersection
                        if (!prefixScanRanges.intersectRegion(regionStartKey, regionInfo.getEndKey(), false)) {
                            currentKeyBytes = endKey;
                            regionIndex++;
                            continue;
                        }
                    }
                    keyOffset = ScanUtil.getRowKeyOffset(regionInfo.getStartKey(), regionInfo.getEndKey());
                }
                byte[] initialKeyBytes = currentKeyBytes;
                int gpsComparedToEndKey = -1;
                boolean everNotDelayed = false;
                while (intersectWithGuidePosts && (endKey.length == 0
                        || (gpsComparedToEndKey = currentGuidePost.compareTo(endKey)) <= 0)) {
                    List<Scan> newScans =
                            scanRanges.intersectScan(scan, currentKeyBytes, currentGuidePostBytes,
                                keyOffset, splitPostfix, getTable().getBucketNum(),
                                gpsComparedToEndKey == 0);
                    if (useStatsForParallelization) {
                        for (int newScanIdx = 0; newScanIdx < newScans.size(); newScanIdx++) {
                            Scan newScan = newScans.get(newScanIdx);
                            ScanUtil.setLocalIndexAttributes(newScan, keyOffset,
                                regionInfo.getStartKey(), regionInfo.getEndKey(),
                                newScan.getStartRow(), newScan.getStopRow());
                            if (regionLocation.getServerName() != null) {
                                newScan.setAttribute(BaseScannerRegionObserverConstants.SCAN_REGION_SERVER,
                                    regionLocation.getServerName().getVersionedBytes());
                            }
                            boolean lastOfNew = newScanIdx == newScans.size() - 1;
                            parallelScanCollector.addNewScan(plan, newScan,
                                gpsComparedToEndKey == 0 && lastOfNew, regionLocation);
                        }
                    }
                    if (newScans.size() > 0) {
                        // If we've delaying adding estimates, add the previous
                        // gp estimates now that we know they are in range.
                        if (delayAddingEst) {
                            updateEstimates(gps, guideIndex-1, estimates);
                        }
                        // If we're not delaying adding estimates, add the
                        // current gp estimates.
                        if (! (delayAddingEst = gpsComparedToEndKey == 0) ) {
                            updateEstimates(gps, guideIndex, estimates);
                        }
                    } else {
                        delayAddingEst = false;
                    }
                    everNotDelayed |= !delayAddingEst;
                    currentKeyBytes = currentGuidePostBytes;
                    try {
                        currentGuidePost = PrefixByteCodec.decode(decoder, input);
                        currentGuidePostBytes = currentGuidePost.copyBytes();
                        guideIndex++;
                    } catch (EOFException e) {
                        // We have read all guide posts
                        intersectWithGuidePosts = false;
                    }
                }
                boolean gpsInThisRegion = initialKeyBytes != currentKeyBytes;
                if (!useStatsForParallelization) {
                    /*
                     * If we are not using stats for generating parallel scans, we need to reset the
                     * currentKey back to what it was at the beginning of the loop.
                     */
                    currentKeyBytes = initialKeyBytes;
                }
                List<Scan> newScans =
                        scanRanges.intersectScan(scan, currentKeyBytes, endKey, keyOffset,
                            splitPostfix, getTable().getBucketNum(), true);
                for (int newScanIdx = 0; newScanIdx < newScans.size(); newScanIdx++) {
                    Scan newScan = newScans.get(newScanIdx);
                    ScanUtil.setLocalIndexAttributes(newScan, keyOffset, regionInfo.getStartKey(),
                        regionInfo.getEndKey(), newScan.getStartRow(), newScan.getStopRow());
                    if (regionLocation.getServerName() != null) {
                        newScan.setAttribute(BaseScannerRegionObserverConstants.SCAN_REGION_SERVER,
                            regionLocation.getServerName().getVersionedBytes());
                    }
                    boolean lastOfNew = newScanIdx == newScans.size() - 1;
                    parallelScanCollector.addNewScan(plan, newScan, lastOfNew, regionLocation);
                }
                if (newScans.size() > 0) {
                    // Boundary case of no GP in region after delaying adding of estimates
                    if (!gpsInThisRegion && delayAddingEst) {
                        updateEstimates(gps, guideIndex-1, estimates);
                        gpsInThisRegion = true;
                        delayAddingEst = false;
                    }
                } else if (!gpsInThisRegion) {
                    delayAddingEst = false;
                }
                currentKeyBytes = endKey;
                // We have a guide post in the region if the above loop was entered
                // or if the current key is less than the region end key (since the loop
                // may not have been entered if our scan end key is smaller than the
                // first guide post in that region).
                boolean gpsAfterStopKey = false;
                gpsAvailableForAllRegions &= 
                    ( gpsInThisRegion && everNotDelayed) || // GP in this region
                    ( regionIndex == startRegionIndex && gpsForFirstRegion ) || // GP in first region (before start key)
                    ( gpsAfterStopKey = ( regionIndex == stopIndex && intersectWithGuidePosts && // GP in last region (after stop key)
                            ( endRegionKey.length == 0 || // then check if gp is in the region
                            currentGuidePost.compareTo(endRegionKey) < 0)));
                if (gpsAfterStopKey) {
                    // If gp after stop key, but still in last region, track min ts as fallback 
                    fallbackTs =
                            Math.min(fallbackTs,
                                gps.getGuidePostTimestamps()[guideIndex]);
                }
                regionIndex++;
            }
            generateEstimates(scanRanges, table, gps, emptyGuidePost, parallelScanCollector.getParallelScans(), estimates,
                    fallbackTs, gpsAvailableForAllRegions);
        } finally {
            if (stream != null) Closeables.closeQuietly(stream);
        }
        sampleScans(parallelScanCollector.getParallelScans(),this.plan.getStatement().getTableSamplingRate());
        return new ScansWithRegionLocations(parallelScanCollector.getParallelScans(),
                parallelScanCollector.getRegionLocations());
    }