in hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java [419:636]
private boolean nextInternal(List<? super ExtendedCell> results, ScannerContext scannerContext)
throws IOException {
Preconditions.checkArgument(results.isEmpty(), "First parameter should be an empty list");
Preconditions.checkArgument(scannerContext != null, "Scanner context cannot be null");
Optional<RpcCall> rpcCall = RpcServer.getCurrentCall();
// Save the initial progress from the Scanner context in these local variables. The progress
// may need to be reset a few times if rows are being filtered out so we save the initial
// progress.
int initialBatchProgress = scannerContext.getBatchProgress();
long initialSizeProgress = scannerContext.getDataSizeProgress();
long initialHeapSizeProgress = scannerContext.getHeapSizeProgress();
// Used to check time limit
LimitScope limitScope = LimitScope.BETWEEN_CELLS;
// The loop here is used only when at some point during the next we determine
// that due to effects of filters or otherwise, we have an empty row in the result.
// Then we loop and try again. Otherwise, we must get out on the first iteration via return,
// "true" if there's more data to read, "false" if there isn't (storeHeap is at a stop row,
// and joinedHeap has no more data to read for the last row (if set, joinedContinuationRow).
while (true) {
resetProgress(scannerContext, initialBatchProgress, initialSizeProgress,
initialHeapSizeProgress);
checkClientDisconnect(rpcCall);
// Check for thread interrupt status in case we have been signaled from
// #interruptRegionOperation.
region.checkInterrupt();
// Let's see what we have in the storeHeap.
ExtendedCell current = this.storeHeap.peek();
boolean shouldStop = shouldStop(current);
// When has filter row is true it means that the all the cells for a particular row must be
// read before a filtering decision can be made. This means that filters where hasFilterRow
// run the risk of enLongAddering out of memory errors in the case that they are applied to a
// table that has very large rows.
boolean hasFilterRow = this.filter != null && this.filter.hasFilterRow();
// If filter#hasFilterRow is true, partial results are not allowed since allowing them
// would prevent the filters from being evaluated. Thus, if it is true, change the
// scope of any limits that could potentially create partial results to
// LimitScope.BETWEEN_ROWS so that those limits are not reached mid-row
if (hasFilterRow) {
if (LOG.isTraceEnabled()) {
LOG.trace("filter#hasFilterRow is true which prevents partial results from being "
+ " formed. Changing scope of limits that may create partials");
}
scannerContext.setSizeLimitScope(LimitScope.BETWEEN_ROWS);
scannerContext.setTimeLimitScope(LimitScope.BETWEEN_ROWS);
limitScope = LimitScope.BETWEEN_ROWS;
}
if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) {
if (hasFilterRow) {
throw new IncompatibleFilterException(
"Filter whose hasFilterRow() returns true is incompatible with scans that must "
+ " stop mid-row because of a limit. ScannerContext:" + scannerContext);
}
return true;
}
// Check if we were getting data from the joinedHeap and hit the limit.
// If not, then it's main path - getting results from storeHeap.
if (joinedContinuationRow == null) {
// First, check if we are at a stop row. If so, there are no more results.
if (shouldStop) {
if (hasFilterRow) {
filter.filterRowCells((List<Cell>) results);
}
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
}
// Check if rowkey filter wants to exclude this row. If so, loop to next.
// Technically, if we hit limits before on this row, we don't need this call.
if (filterRowKey(current)) {
incrementCountOfRowsFilteredMetric(scannerContext);
// early check, see HBASE-16296
if (isFilterDoneInternal()) {
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
}
// Typically the count of rows scanned is incremented inside #populateResult. However,
// here we are filtering a row based purely on its row key, preventing us from calling
// #populateResult. Thus, perform the necessary increment here to rows scanned metric
incrementCountOfRowsScannedMetric(scannerContext);
boolean moreRows = nextRow(scannerContext, current);
if (!moreRows) {
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
}
results.clear();
// Read nothing as the rowkey was filtered, but still need to check time limit
// We also check size limit because we might have read blocks in getting to this point.
if (scannerContext.checkAnyLimitReached(limitScope)) {
return true;
}
continue;
}
// Ok, we are good, let's try to get some results from the main heap.
populateResult(results, this.storeHeap, scannerContext, current);
if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
if (hasFilterRow) {
throw new IncompatibleFilterException(
"Filter whose hasFilterRow() returns true is incompatible with scans that must "
+ " stop mid-row because of a limit. ScannerContext:" + scannerContext);
}
return true;
}
// Check for thread interrupt status in case we have been signaled from
// #interruptRegionOperation.
region.checkInterrupt();
Cell nextKv = this.storeHeap.peek();
shouldStop = shouldStop(nextKv);
// save that the row was empty before filters applied to it.
final boolean isEmptyRow = results.isEmpty();
// We have the part of the row necessary for filtering (all of it, usually).
// First filter with the filterRow(List).
FilterWrapper.FilterRowRetCode ret = FilterWrapper.FilterRowRetCode.NOT_CALLED;
if (hasFilterRow) {
ret = filter.filterRowCellsWithRet((List<Cell>) results);
// We don't know how the results have changed after being filtered. Must set progress
// according to contents of results now.
if (scannerContext.getKeepProgress()) {
scannerContext.setProgress(initialBatchProgress, initialSizeProgress,
initialHeapSizeProgress);
} else {
scannerContext.clearProgress();
}
scannerContext.incrementBatchProgress(results.size());
for (ExtendedCell cell : (List<ExtendedCell>) results) {
scannerContext.incrementSizeProgress(PrivateCellUtil.estimatedSerializedSizeOf(cell),
cell.heapSize());
}
}
if (isEmptyRow || ret == FilterWrapper.FilterRowRetCode.EXCLUDE || filterRow()) {
incrementCountOfRowsFilteredMetric(scannerContext);
results.clear();
boolean moreRows = nextRow(scannerContext, current);
if (!moreRows) {
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
}
// This row was totally filtered out, if this is NOT the last row,
// we should continue on. Otherwise, nothing else to do.
if (!shouldStop) {
// Read nothing as the cells was filtered, but still need to check time limit.
// We also check size limit because we might have read blocks in getting to this point.
if (scannerContext.checkAnyLimitReached(limitScope)) {
return true;
}
continue;
}
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
}
// Ok, we are done with storeHeap for this row.
// Now we may need to fetch additional, non-essential data into row.
// These values are not needed for filter to work, so we postpone their
// fetch to (possibly) reduce amount of data loads from disk.
if (this.joinedHeap != null) {
boolean mayHaveData = joinedHeapMayHaveData(current);
if (mayHaveData) {
joinedContinuationRow = current;
populateFromJoinedHeap(results, scannerContext);
if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
return true;
}
}
}
} else {
// Populating from the joined heap was stopped by limits, populate some more.
populateFromJoinedHeap(results, scannerContext);
if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
return true;
}
}
// We may have just called populateFromJoinedMap and hit the limits. If that is
// the case, we need to call it again on the next next() invocation.
if (joinedContinuationRow != null) {
return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
}
// Finally, we are done with both joinedHeap and storeHeap.
// Double check to prevent empty rows from appearing in result. It could be
// the case when SingleColumnValueExcludeFilter is used.
if (results.isEmpty()) {
incrementCountOfRowsFilteredMetric(scannerContext);
boolean moreRows = nextRow(scannerContext, current);
if (!moreRows) {
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
}
if (!shouldStop) {
// We check size limit because we might have read blocks in the nextRow call above, or
// in the call populateResults call. Only scans with hasFilterRow should reach this point,
// and for those scans which filter row _cells_ this is the only place we can actually
// enforce that the scan does not exceed limits since it bypasses all other checks above.
if (scannerContext.checkSizeLimit(limitScope)) {
return true;
}
continue;
}
}
if (shouldStop) {
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
} else {
return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
}
}
}