in phoenix-core-client/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java [959:1296]
private ScansWithRegionLocations getParallelScans(byte[] startKey, byte[] stopKey)
throws SQLException {
ScanRanges scanRanges = context.getScanRanges();
PTable table = getTable();
boolean isLocalIndex = table.getIndexType() == IndexType.LOCAL;
GuidePostEstimate estimates = new GuidePostEstimate();
if (!isLocalIndex && scanRanges.isPointLookup() && !scanRanges.useSkipScanFilter()) {
List<List<Scan>> parallelScans = Lists.newArrayListWithExpectedSize(1);
List<Scan> scans = Lists.newArrayListWithExpectedSize(1);
Scan scanFromContext = context.getScan();
Integer limit = plan.getLimit();
boolean isAggregate = plan.getStatement().isAggregate();
if (scanRanges.getPointLookupCount() == 1 && limit == null && !isAggregate) {
// leverage bloom filter for single key point lookup by turning scan to
// Get Scan#isGetScan(). There should also be no limit on the point lookup query.
// The limit and the aggregate check is needed to handle cases where a child view
// extends the parent's PK and you insert data through the child but do a point
// lookup using the parent's PK. Since the parent's PK is only a prefix of the
// actual PK we can't do a Get but need to do a regular scan with the stop key
// set to the next key after the start key.
try {
scanFromContext = new Scan(context.getScan());
} catch (IOException e) {
LOGGER.error("Failure to construct point lookup scan", e);
throw new PhoenixIOException(e);
}
scanFromContext.withStopRow(scanFromContext.getStartRow(),
scanFromContext.includeStartRow());
}
scans.add(scanFromContext);
parallelScans.add(scans);
generateEstimates(scanRanges, table, GuidePostsInfo.NO_GUIDEPOST,
GuidePostsInfo.NO_GUIDEPOST.isEmptyGuidePost(), parallelScans, estimates,
Long.MAX_VALUE, false);
// we don't retrieve region location for the given scan range
return new ScansWithRegionLocations(parallelScans, null);
}
byte[] sampleProcessedSaltByte =
SchemaUtil.processSplit(new byte[] { 0 }, table.getPKColumns());
byte[] splitPostfix =
Arrays.copyOfRange(sampleProcessedSaltByte, 1, sampleProcessedSaltByte.length);
boolean isSalted = table.getBucketNum() != null;
GuidePostsInfo gps = getGuidePosts();
// case when stats wasn't collected
hasGuidePosts = gps != GuidePostsInfo.NO_GUIDEPOST;
// Case when stats collection did run but there possibly wasn't enough data. In such a
// case we generate an empty guide post with the byte estimate being set as guide post
// width.
boolean emptyGuidePost = gps.isEmptyGuidePost();
byte[] startRegionBoundaryKey = startKey;
byte[] stopRegionBoundaryKey = stopKey;
int columnsInCommon = 0;
ScanRanges prefixScanRanges = ScanRanges.EVERYTHING;
boolean traverseAllRegions = isSalted || isLocalIndex;
if (isLocalIndex) {
// TODO: when implementing PHOENIX-4585, we should change this to an assert
// as we should always have a data plan when a local index is being used.
if (dataPlan != null && dataPlan.getTableRef().getTable().getType() != PTableType.INDEX) { // Sanity check
prefixScanRanges = computePrefixScanRanges(dataPlan.getContext().getScanRanges(), columnsInCommon=computeColumnsInCommon());
KeyRange prefixRange = prefixScanRanges.getScanRange();
if (!prefixRange.lowerUnbound()) {
startRegionBoundaryKey = prefixRange.getLowerRange();
}
if (!prefixRange.upperUnbound()) {
stopRegionBoundaryKey = prefixRange.getUpperRange();
}
}
} else if (!traverseAllRegions) {
byte[] scanStartRow = scan.getStartRow();
if (scanStartRow.length != 0 && Bytes.compareTo(scanStartRow, startKey) > 0) {
startRegionBoundaryKey = startKey = scanStartRow;
}
byte[] scanStopRow = scan.getStopRow();
if (stopKey.length == 0
|| (scanStopRow.length != 0 && Bytes.compareTo(scanStopRow, stopKey) < 0)) {
stopRegionBoundaryKey = stopKey = scanStopRow;
}
}
int regionIndex = 0;
int startRegionIndex = 0;
List<HRegionLocation> regionLocations;
if (isSalted && !isLocalIndex) {
// key prefix = salt num + view index id + tenant id
// If salting is used with tenant or view index id, scan start and end
// rowkeys will not be empty. We need to generate region locations for
// all the scan range such that we cover (each salt bucket num) + (prefix starting from
// index position 1 to cover view index and/or tenant id and/or remaining prefix).
if (scan.getStartRow().length > 0 && scan.getStopRow().length > 0) {
regionLocations = new ArrayList<>();
for (int i = 0; i < getTable().getBucketNum(); i++) {
byte[] saltStartRegionKey = new byte[scan.getStartRow().length];
saltStartRegionKey[0] = (byte) i;
System.arraycopy(scan.getStartRow(), 1, saltStartRegionKey, 1,
scan.getStartRow().length - 1);
byte[] saltStopRegionKey = new byte[scan.getStopRow().length];
saltStopRegionKey[0] = (byte) i;
System.arraycopy(scan.getStopRow(), 1, saltStopRegionKey, 1,
scan.getStopRow().length - 1);
regionLocations.addAll(
getRegionBoundaries(scanGrouper, saltStartRegionKey, saltStopRegionKey));
}
} else {
// If scan start and end rowkeys are empty, we end up fetching all region locations.
regionLocations =
getRegionBoundaries(scanGrouper, startRegionBoundaryKey, stopRegionBoundaryKey);
}
} else {
// For range scans, startRegionBoundaryKey and stopRegionBoundaryKey should refer
// to the boundary specified by the scan context.
regionLocations =
getRegionBoundaries(scanGrouper, startRegionBoundaryKey, stopRegionBoundaryKey);
}
numRegionLocationLookups = regionLocations.size();
List<byte[]> regionBoundaries = toBoundaries(regionLocations);
int stopIndex = regionBoundaries.size();
if (startRegionBoundaryKey.length > 0) {
startRegionIndex = regionIndex = getIndexContainingInclusive(regionBoundaries, startRegionBoundaryKey);
}
if (stopRegionBoundaryKey.length > 0) {
stopIndex = Math.min(stopIndex, regionIndex + getIndexContainingExclusive(regionBoundaries.subList(regionIndex, stopIndex), stopRegionBoundaryKey));
if (isLocalIndex) {
stopKey = regionLocations.get(stopIndex).getRegion().getEndKey();
}
}
ParallelScansCollector parallelScanCollector = new ParallelScansCollector(scanGrouper);
ImmutableBytesWritable currentKey = new ImmutableBytesWritable(startKey);
int gpsSize = gps.getGuidePostsCount();
int keyOffset = 0;
ImmutableBytesWritable currentGuidePost = ByteUtil.EMPTY_IMMUTABLE_BYTE_ARRAY;
ImmutableBytesWritable guidePosts = gps.getGuidePosts();
ByteArrayInputStream stream = null;
DataInput input = null;
PrefixByteDecoder decoder = null;
int guideIndex = 0;
boolean gpsForFirstRegion = false;
boolean intersectWithGuidePosts = true;
// Maintain min ts for gps in first or last region outside of
// gps that are in the scan range. We'll use this if we find
// no gps in range.
long fallbackTs = Long.MAX_VALUE;
// Determination of whether of not we found a guidepost in
// every region between the start and stop key. If not, then
// we cannot definitively say at what time the guideposts
// were collected.
boolean gpsAvailableForAllRegions = true;
try {
boolean delayAddingEst = false;
ImmutableBytesWritable firstRegionStartKey = null;
if (gpsSize > 0) {
stream = new ByteArrayInputStream(guidePosts.get(), guidePosts.getOffset(), guidePosts.getLength());
input = new DataInputStream(stream);
decoder = new PrefixByteDecoder(gps.getMaxLength());
firstRegionStartKey = new ImmutableBytesWritable(regionLocations.get(regionIndex).getRegion().getStartKey());
try {
int c;
// Continue walking guideposts until we get past the currentKey
while ((c=currentKey.compareTo(currentGuidePost = PrefixByteCodec.decode(decoder, input))) >= 0) {
// Detect if we found a guidepost that might be in the first region. This
// is for the case where the start key may be past the only guidepost in
// the first region.
if (!gpsForFirstRegion && firstRegionStartKey.compareTo(currentGuidePost) <= 0) {
gpsForFirstRegion = true;
}
// While we have gps in the region (but outside of start/stop key), track
// the min ts as a fallback for the time at which stas were calculated.
if (gpsForFirstRegion) {
fallbackTs =
Math.min(fallbackTs,
gps.getGuidePostTimestamps()[guideIndex]);
}
// Special case for gp == startKey in which case we want to
// count this gp (if it's in range) though we go past it.
delayAddingEst = (c == 0);
guideIndex++;
}
} catch (EOFException e) {
// expected. Thrown when we have decoded all guide posts.
intersectWithGuidePosts = false;
}
}
byte[] endRegionKey = regionLocations.get(stopIndex).getRegion().getEndKey();
byte[] currentKeyBytes = currentKey.copyBytes();
intersectWithGuidePosts &= guideIndex < gpsSize;
// Merge bisect with guideposts for all but the last region
while (regionIndex <= stopIndex) {
HRegionLocation regionLocation = regionLocations.get(regionIndex);
RegionInfo regionInfo = regionLocation.getRegion();
byte[] currentGuidePostBytes = currentGuidePost.copyBytes();
byte[] endKey;
if (regionIndex == stopIndex) {
endKey = stopKey;
} else {
endKey = regionBoundaries.get(regionIndex);
}
if (isLocalIndex) {
if (dataPlan != null && dataPlan.getTableRef().getTable().getType() != PTableType.INDEX) { // Sanity check
ScanRanges dataScanRanges = dataPlan.getContext().getScanRanges();
// we can skip a region completely for local indexes if the data plan does not intersect
if (!dataScanRanges.intersectRegion(regionInfo.getStartKey(), regionInfo.getEndKey(), false)) {
currentKeyBytes = endKey;
regionIndex++;
continue;
}
}
// Only attempt further pruning if the prefix range is using
// a skip scan since we've already pruned the range of regions
// based on the start/stop key.
if (columnsInCommon > 0 && prefixScanRanges.useSkipScanFilter()) {
byte[] regionStartKey = regionInfo.getStartKey();
ImmutableBytesWritable ptr = context.getTempPtr();
clipKeyRangeBytes(prefixScanRanges.getSchema(), 0, columnsInCommon, regionStartKey, ptr, false);
regionStartKey = ByteUtil.copyKeyBytesIfNecessary(ptr);
// Prune this region if there's no intersection
if (!prefixScanRanges.intersectRegion(regionStartKey, regionInfo.getEndKey(), false)) {
currentKeyBytes = endKey;
regionIndex++;
continue;
}
}
keyOffset = ScanUtil.getRowKeyOffset(regionInfo.getStartKey(), regionInfo.getEndKey());
}
byte[] initialKeyBytes = currentKeyBytes;
int gpsComparedToEndKey = -1;
boolean everNotDelayed = false;
while (intersectWithGuidePosts && (endKey.length == 0
|| (gpsComparedToEndKey = currentGuidePost.compareTo(endKey)) <= 0)) {
List<Scan> newScans =
scanRanges.intersectScan(scan, currentKeyBytes, currentGuidePostBytes,
keyOffset, splitPostfix, getTable().getBucketNum(),
gpsComparedToEndKey == 0);
if (useStatsForParallelization) {
for (int newScanIdx = 0; newScanIdx < newScans.size(); newScanIdx++) {
Scan newScan = newScans.get(newScanIdx);
ScanUtil.setLocalIndexAttributes(newScan, keyOffset,
regionInfo.getStartKey(), regionInfo.getEndKey(),
newScan.getStartRow(), newScan.getStopRow());
if (regionLocation.getServerName() != null) {
newScan.setAttribute(BaseScannerRegionObserverConstants.SCAN_REGION_SERVER,
regionLocation.getServerName().getVersionedBytes());
}
boolean lastOfNew = newScanIdx == newScans.size() - 1;
parallelScanCollector.addNewScan(plan, newScan,
gpsComparedToEndKey == 0 && lastOfNew, regionLocation);
}
}
if (newScans.size() > 0) {
// If we've delaying adding estimates, add the previous
// gp estimates now that we know they are in range.
if (delayAddingEst) {
updateEstimates(gps, guideIndex-1, estimates);
}
// If we're not delaying adding estimates, add the
// current gp estimates.
if (! (delayAddingEst = gpsComparedToEndKey == 0) ) {
updateEstimates(gps, guideIndex, estimates);
}
} else {
delayAddingEst = false;
}
everNotDelayed |= !delayAddingEst;
currentKeyBytes = currentGuidePostBytes;
try {
currentGuidePost = PrefixByteCodec.decode(decoder, input);
currentGuidePostBytes = currentGuidePost.copyBytes();
guideIndex++;
} catch (EOFException e) {
// We have read all guide posts
intersectWithGuidePosts = false;
}
}
boolean gpsInThisRegion = initialKeyBytes != currentKeyBytes;
if (!useStatsForParallelization) {
/*
* If we are not using stats for generating parallel scans, we need to reset the
* currentKey back to what it was at the beginning of the loop.
*/
currentKeyBytes = initialKeyBytes;
}
List<Scan> newScans =
scanRanges.intersectScan(scan, currentKeyBytes, endKey, keyOffset,
splitPostfix, getTable().getBucketNum(), true);
for (int newScanIdx = 0; newScanIdx < newScans.size(); newScanIdx++) {
Scan newScan = newScans.get(newScanIdx);
ScanUtil.setLocalIndexAttributes(newScan, keyOffset, regionInfo.getStartKey(),
regionInfo.getEndKey(), newScan.getStartRow(), newScan.getStopRow());
if (regionLocation.getServerName() != null) {
newScan.setAttribute(BaseScannerRegionObserverConstants.SCAN_REGION_SERVER,
regionLocation.getServerName().getVersionedBytes());
}
boolean lastOfNew = newScanIdx == newScans.size() - 1;
parallelScanCollector.addNewScan(plan, newScan, lastOfNew, regionLocation);
}
if (newScans.size() > 0) {
// Boundary case of no GP in region after delaying adding of estimates
if (!gpsInThisRegion && delayAddingEst) {
updateEstimates(gps, guideIndex-1, estimates);
gpsInThisRegion = true;
delayAddingEst = false;
}
} else if (!gpsInThisRegion) {
delayAddingEst = false;
}
currentKeyBytes = endKey;
// We have a guide post in the region if the above loop was entered
// or if the current key is less than the region end key (since the loop
// may not have been entered if our scan end key is smaller than the
// first guide post in that region).
boolean gpsAfterStopKey = false;
gpsAvailableForAllRegions &=
( gpsInThisRegion && everNotDelayed) || // GP in this region
( regionIndex == startRegionIndex && gpsForFirstRegion ) || // GP in first region (before start key)
( gpsAfterStopKey = ( regionIndex == stopIndex && intersectWithGuidePosts && // GP in last region (after stop key)
( endRegionKey.length == 0 || // then check if gp is in the region
currentGuidePost.compareTo(endRegionKey) < 0)));
if (gpsAfterStopKey) {
// If gp after stop key, but still in last region, track min ts as fallback
fallbackTs =
Math.min(fallbackTs,
gps.getGuidePostTimestamps()[guideIndex]);
}
regionIndex++;
}
generateEstimates(scanRanges, table, gps, emptyGuidePost, parallelScanCollector.getParallelScans(), estimates,
fallbackTs, gpsAvailableForAllRegions);
} finally {
if (stream != null) Closeables.closeQuietly(stream);
}
sampleScans(parallelScanCollector.getParallelScans(),this.plan.getStatement().getTableSamplingRate());
return new ScansWithRegionLocations(parallelScanCollector.getParallelScans(),
parallelScanCollector.getRegionLocations());
}