in hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java [553:815]
public boolean next(List<? super ExtendedCell> outResult, ScannerContext scannerContext)
throws IOException {
if (scannerContext == null) {
throw new IllegalArgumentException("Scanner context cannot be null");
}
if (checkFlushed() && reopenAfterFlush()) {
return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
}
// if the heap was left null, then the scanners had previously run out anyways, close and
// return.
if (this.heap == null) {
// By this time partial close should happened because already heap is null
close(false);// Do all cleanup except heap.close()
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
}
ExtendedCell cell = this.heap.peek();
if (cell == null) {
close(false);// Do all cleanup except heap.close()
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
}
// only call setRow if the row changes; avoids confusing the query matcher
// if scanning intra-row
// If no limits exists in the scope LimitScope.Between_Cells then we are sure we are changing
// rows. Else it is possible we are still traversing the same row so we must perform the row
// comparison.
if (!scannerContext.hasAnyLimit(LimitScope.BETWEEN_CELLS) || matcher.currentRow() == null) {
this.countPerRow = 0;
matcher.setToNewRow(cell);
}
// Clear progress away unless invoker has indicated it should be kept.
if (!scannerContext.getKeepProgress() && !scannerContext.getSkippingRow()) {
scannerContext.clearProgress();
}
Optional<RpcCall> rpcCall =
matcher.isUserScan() ? RpcServer.getCurrentCall() : Optional.empty();
// re-useable closure to avoid allocations
IntConsumer recordBlockSize = blockSize -> {
if (rpcCall.isPresent()) {
rpcCall.get().incrementBlockBytesScanned(blockSize);
}
scannerContext.incrementBlockProgress(blockSize);
};
int count = 0;
long totalBytesRead = 0;
// track the cells for metrics only if it is a user read request.
boolean onlyFromMemstore = matcher.isUserScan();
try {
LOOP: do {
// Update and check the time limit based on the configured value of cellsPerTimeoutCheck
// Or if the preadMaxBytes is reached and we may want to return so we can switch to stream
// in
// the shipped method below.
if (
kvsScanned % cellsPerHeartbeatCheck == 0
|| (scanUsePread && readType == Scan.ReadType.DEFAULT && bytesRead > preadMaxBytes)
) {
if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) {
return scannerContext.setScannerState(NextState.TIME_LIMIT_REACHED).hasMoreValues();
}
}
// Do object compare - we set prevKV from the same heap.
if (prevCell != cell) {
++kvsScanned;
}
checkScanOrder(prevCell, cell, comparator);
int cellSize = PrivateCellUtil.estimatedSerializedSizeOf(cell);
bytesRead += cellSize;
if (scanUsePread && readType == Scan.ReadType.DEFAULT && bytesRead > preadMaxBytes) {
// return immediately if we want to switch from pread to stream. We need this because we
// can
// only switch in the shipped method, if user use a filter to filter out everything and
// rpc
// timeout is very large then the shipped method will never be called until the whole scan
// is finished, but at that time we have already scan all the data...
// See HBASE-20457 for more details.
// And there is still a scenario that can not be handled. If we have a very large row,
// which
// have millions of qualifiers, and filter.filterRow is used, then even if we set the flag
// here, we still need to scan all the qualifiers before returning...
scannerContext.returnImmediately();
}
heap.recordBlockSize(recordBlockSize);
prevCell = cell;
scannerContext.setLastPeekedCell(cell);
topChanged = false;
ScanQueryMatcher.MatchCode qcode = matcher.match(cell);
switch (qcode) {
case INCLUDE:
case INCLUDE_AND_SEEK_NEXT_ROW:
case INCLUDE_AND_SEEK_NEXT_COL:
Filter f = matcher.getFilter();
if (f != null) {
Cell transformedCell = f.transformCell(cell);
// fast path, most filters just return the same cell instance
if (transformedCell != cell) {
if (transformedCell instanceof ExtendedCell) {
cell = (ExtendedCell) transformedCell;
} else {
throw new DoNotRetryIOException("Incorrect filter implementation, "
+ "the Cell returned by transformCell is not an ExtendedCell. Filter class: "
+ f.getClass().getName());
}
}
}
this.countPerRow++;
// add to results only if we have skipped #storeOffset kvs
// also update metric accordingly
if (this.countPerRow > storeOffset) {
outResult.add(cell);
// Update local tracking information
count++;
totalBytesRead += cellSize;
/**
* Increment the metric if all the cells are from memstore. If not we will account it
* for mixed reads
*/
onlyFromMemstore = onlyFromMemstore && heap.isLatestCellFromMemstore();
// Update the progress of the scanner context
scannerContext.incrementSizeProgress(cellSize, cell.heapSize());
scannerContext.incrementBatchProgress(1);
if (matcher.isUserScan() && totalBytesRead > maxRowSize) {
String message = "Max row size allowed: " + maxRowSize
+ ", but the row is bigger than that, the row info: "
+ CellUtil.toString(cell, false) + ", already have process row cells = "
+ outResult.size() + ", it belong to region = "
+ store.getHRegion().getRegionInfo().getRegionNameAsString();
LOG.warn(message);
throw new RowTooBigException(message);
}
if (storeLimit > -1 && this.countPerRow >= (storeLimit + storeOffset)) {
// do what SEEK_NEXT_ROW does.
if (!matcher.moreRowsMayExistAfter(cell)) {
close(false);// Do all cleanup except heap.close()
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
}
matcher.clearCurrentRow();
seekToNextRow(cell);
break LOOP;
}
}
if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) {
if (!matcher.moreRowsMayExistAfter(cell)) {
close(false);// Do all cleanup except heap.close()
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
}
matcher.clearCurrentRow();
seekOrSkipToNextRow(cell);
} else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) {
seekOrSkipToNextColumn(cell);
} else {
this.heap.next();
}
if (scannerContext.checkBatchLimit(LimitScope.BETWEEN_CELLS)) {
break LOOP;
}
if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_CELLS)) {
break LOOP;
}
continue;
case DONE:
// Optimization for Gets! If DONE, no more to get on this row, early exit!
if (get) {
// Then no more to this row... exit.
close(false);// Do all cleanup except heap.close()
// update metric
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
}
matcher.clearCurrentRow();
return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
case DONE_SCAN:
close(false);// Do all cleanup except heap.close()
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
case SEEK_NEXT_ROW:
// This is just a relatively simple end of scan fix, to short-cut end
// us if there is an endKey in the scan.
if (!matcher.moreRowsMayExistAfter(cell)) {
close(false);// Do all cleanup except heap.close()
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
}
matcher.clearCurrentRow();
seekOrSkipToNextRow(cell);
NextState stateAfterSeekNextRow = needToReturn();
if (stateAfterSeekNextRow != null) {
return scannerContext.setScannerState(stateAfterSeekNextRow).hasMoreValues();
}
break;
case SEEK_NEXT_COL:
seekOrSkipToNextColumn(cell);
NextState stateAfterSeekNextColumn = needToReturn();
if (stateAfterSeekNextColumn != null) {
return scannerContext.setScannerState(stateAfterSeekNextColumn).hasMoreValues();
}
break;
case SKIP:
this.heap.next();
break;
case SEEK_NEXT_USING_HINT:
ExtendedCell nextKV = matcher.getNextKeyHint(cell);
if (nextKV != null) {
int difference = comparator.compare(nextKV, cell);
if (
((!scan.isReversed() && difference > 0) || (scan.isReversed() && difference < 0))
) {
seekAsDirection(nextKV);
NextState stateAfterSeekByHint = needToReturn();
if (stateAfterSeekByHint != null) {
return scannerContext.setScannerState(stateAfterSeekByHint).hasMoreValues();
}
break;
}
}
heap.next();
break;
default:
throw new RuntimeException("UNEXPECTED");
}
// One last chance to break due to size limit. The INCLUDE* cases above already check
// limit and continue. For the various filtered cases, we need to check because block
// size limit may have been exceeded even if we don't add cells to result list.
if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_CELLS)) {
return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
}
} while ((cell = this.heap.peek()) != null);
if (count > 0) {
return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
}
// No more keys
close(false);// Do all cleanup except heap.close()
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
} finally {
// increment only if we have some result
if (count > 0 && matcher.isUserScan()) {
// if true increment memstore metrics, if not the mixed one
updateMetricsStore(onlyFromMemstore);
}
}
}