in hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java [3315:3559]
private void scan(HBaseRpcController controller, ScanRequest request, RegionScannerHolder rsh,
long maxQuotaResultSize, int maxResults, int limitOfRows, List<Result> results,
ScanResponse.Builder builder, RpcCall rpcCall) throws IOException {
HRegion region = rsh.r;
RegionScanner scanner = rsh.s;
long maxResultSize;
if (scanner.getMaxResultSize() > 0) {
maxResultSize = Math.min(scanner.getMaxResultSize(), maxQuotaResultSize);
} else {
maxResultSize = maxQuotaResultSize;
}
// This is cells inside a row. Default size is 10 so if many versions or many cfs,
// then we'll resize. Resizings show in profiler. Set it higher than 10. For now
// arbitrary 32. TODO: keep record of general size of results being returned.
ArrayList<Cell> values = new ArrayList<>(32);
region.startRegionOperation(Operation.SCAN);
long before = EnvironmentEdgeManager.currentTime();
// Used to check if we've matched the row limit set on the Scan
int numOfCompleteRows = 0;
// Count of times we call nextRaw; can be > numOfCompleteRows.
int numOfNextRawCalls = 0;
try {
int numOfResults = 0;
synchronized (scanner) {
boolean stale = (region.getRegionInfo().getReplicaId() != 0);
boolean clientHandlesPartials =
request.hasClientHandlesPartials() && request.getClientHandlesPartials();
boolean clientHandlesHeartbeats =
request.hasClientHandlesHeartbeats() && request.getClientHandlesHeartbeats();
// On the server side we must ensure that the correct ordering of partial results is
// returned to the client to allow them to properly reconstruct the partial results.
// If the coprocessor host is adding to the result list, we cannot guarantee the
// correct ordering of partial results and so we prevent partial results from being
// formed.
boolean serverGuaranteesOrderOfPartials = results.isEmpty();
boolean allowPartialResults = clientHandlesPartials && serverGuaranteesOrderOfPartials;
boolean moreRows = false;
// Heartbeat messages occur when the processing of the ScanRequest is exceeds a
// certain time threshold on the server. When the time threshold is exceeded, the
// server stops the scan and sends back whatever Results it has accumulated within
// that time period (may be empty). Since heartbeat messages have the potential to
// create partial Results (in the event that the timeout occurs in the middle of a
// row), we must only generate heartbeat messages when the client can handle both
// heartbeats AND partials
boolean allowHeartbeatMessages = clientHandlesHeartbeats && allowPartialResults;
long timeLimit = getTimeLimit(rpcCall, controller, allowHeartbeatMessages);
final LimitScope sizeScope =
allowPartialResults ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS;
final LimitScope timeScope =
allowHeartbeatMessages ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS;
boolean trackMetrics = request.hasTrackScanMetrics() && request.getTrackScanMetrics();
// Configure with limits for this RPC. Set keep progress true since size progress
// towards size limit should be kept between calls to nextRaw
ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true);
// maxResultSize - either we can reach this much size for all cells(being read) data or sum
// of heap size occupied by cells(being read). Cell data means its key and value parts.
// maxQuotaResultSize - max results just from server side configuration and quotas, without
// user's specified max. We use this for evaluating limits based on blocks (not cells).
// We may have accumulated some results in coprocessor preScannerNext call. Subtract any
// cell or block size from maximum here so we adhere to total limits of request.
// Note: we track block size in StoreScanner. If the CP hook got cells from hbase, it will
// have accumulated block bytes. If not, this will be 0 for block size.
long maxCellSize = maxResultSize;
long maxBlockSize = maxQuotaResultSize;
if (rpcCall != null) {
maxBlockSize -= rpcCall.getBlockBytesScanned();
maxCellSize -= rpcCall.getResponseCellSize();
}
contextBuilder.setSizeLimit(sizeScope, maxCellSize, maxCellSize, maxBlockSize);
contextBuilder.setBatchLimit(scanner.getBatch());
contextBuilder.setTimeLimit(timeScope, timeLimit);
contextBuilder.setTrackMetrics(trackMetrics);
ScannerContext scannerContext = contextBuilder.build();
boolean limitReached = false;
long blockBytesScannedBefore = 0;
while (numOfResults < maxResults) {
// Reset the batch progress to 0 before every call to RegionScanner#nextRaw. The
// batch limit is a limit on the number of cells per Result. Thus, if progress is
// being tracked (i.e. scannerContext.keepProgress() is true) then we need to
// reset the batch progress between nextRaw invocations since we don't want the
// batch progress from previous calls to affect future calls
scannerContext.setBatchProgress(0);
assert values.isEmpty();
// Collect values to be returned here
moreRows = scanner.nextRaw(values, scannerContext);
long blockBytesScanned = scannerContext.getBlockSizeProgress() - blockBytesScannedBefore;
blockBytesScannedBefore = scannerContext.getBlockSizeProgress();
if (rpcCall == null) {
// When there is no RpcCallContext,copy EC to heap, then the scanner would close,
// This can be an EXPENSIVE call. It may make an extra copy from offheap to onheap
// buffers.See more details in HBASE-26036.
CellUtil.cloneIfNecessary(values);
}
numOfNextRawCalls++;
if (!values.isEmpty()) {
if (limitOfRows > 0) {
// First we need to check if the last result is partial and we have a row change. If
// so then we need to increase the numOfCompleteRows.
if (results.isEmpty()) {
if (
rsh.rowOfLastPartialResult != null
&& !CellUtil.matchingRows(values.get(0), rsh.rowOfLastPartialResult)
) {
numOfCompleteRows++;
checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext,
builder);
}
} else {
Result lastResult = results.get(results.size() - 1);
if (
lastResult.mayHaveMoreCellsInRow()
&& !CellUtil.matchingRows(values.get(0), lastResult.getRow())
) {
numOfCompleteRows++;
checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext,
builder);
}
}
if (builder.hasMoreResults() && !builder.getMoreResults()) {
break;
}
}
boolean mayHaveMoreCellsInRow = scannerContext.mayHaveMoreCellsInRow();
Result r = Result.create(values, null, stale, mayHaveMoreCellsInRow);
if (request.getScan().getQueryMetricsEnabled()) {
builder.addQueryMetrics(ClientProtos.QueryMetrics.newBuilder()
.setBlockBytesScanned(blockBytesScanned).build());
}
results.add(r);
numOfResults++;
if (!mayHaveMoreCellsInRow && limitOfRows > 0) {
numOfCompleteRows++;
checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext, builder);
if (builder.hasMoreResults() && !builder.getMoreResults()) {
break;
}
}
} else if (!moreRows && !results.isEmpty()) {
// No more cells for the scan here, we need to ensure that the mayHaveMoreCellsInRow of
// last result is false. Otherwise it's possible that: the first nextRaw returned
// because BATCH_LIMIT_REACHED (BTW it happen to exhaust all cells of the scan),so the
// last result's mayHaveMoreCellsInRow will be true. while the following nextRaw will
// return with moreRows=false, which means moreResultsInRegion would be false, it will
// be a contradictory state (HBASE-21206).
int lastIdx = results.size() - 1;
Result r = results.get(lastIdx);
if (r.mayHaveMoreCellsInRow()) {
results.set(lastIdx, ClientInternalHelper.createResult(
ClientInternalHelper.getExtendedRawCells(r), r.getExists(), r.isStale(), false));
}
}
boolean sizeLimitReached = scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS);
boolean timeLimitReached = scannerContext.checkTimeLimit(LimitScope.BETWEEN_ROWS);
boolean resultsLimitReached = numOfResults >= maxResults;
limitReached = sizeLimitReached || timeLimitReached || resultsLimitReached;
if (limitReached || !moreRows) {
// With block size limit, we may exceed size limit without collecting any results.
// In this case we want to send heartbeat and/or cursor. We don't want to send heartbeat
// or cursor if results were collected, for example for cell size or heap size limits.
boolean sizeLimitReachedWithoutResults = sizeLimitReached && results.isEmpty();
// We only want to mark a ScanResponse as a heartbeat message in the event that
// there are more values to be read server side. If there aren't more values,
// marking it as a heartbeat is wasteful because the client will need to issue
// another ScanRequest only to realize that they already have all the values
if (moreRows && (timeLimitReached || sizeLimitReachedWithoutResults)) {
// Heartbeat messages occur when the time limit has been reached, or size limit has
// been reached before collecting any results. This can happen for heavily filtered
// scans which scan over too many blocks.
builder.setHeartbeatMessage(true);
if (rsh.needCursor) {
Cell cursorCell = scannerContext.getLastPeekedCell();
if (cursorCell != null) {
builder.setCursor(ProtobufUtil.toCursor(cursorCell));
}
}
}
break;
}
values.clear();
}
if (rpcCall != null) {
rpcCall.incrementResponseCellSize(scannerContext.getHeapSizeProgress());
}
builder.setMoreResultsInRegion(moreRows);
// Check to see if the client requested that we track metrics server side. If the
// client requested metrics, retrieve the metrics from the scanner context.
if (trackMetrics) {
// rather than increment yet another counter in StoreScanner, just set the value here
// from block size progress before writing into the response
scannerContext.getMetrics().countOfBlockBytesScanned
.set(scannerContext.getBlockSizeProgress());
if (rpcCall != null) {
scannerContext.getMetrics().fsReadTime.set(rpcCall.getFsReadTime());
}
Map<String, Long> metrics = scannerContext.getMetrics().getMetricsMap();
ScanMetrics.Builder metricBuilder = ScanMetrics.newBuilder();
NameInt64Pair.Builder pairBuilder = NameInt64Pair.newBuilder();
for (Entry<String, Long> entry : metrics.entrySet()) {
pairBuilder.setName(entry.getKey());
pairBuilder.setValue(entry.getValue());
metricBuilder.addMetrics(pairBuilder.build());
}
builder.setScanMetrics(metricBuilder.build());
}
}
} finally {
region.closeRegionOperation();
// Update serverside metrics, even on error.
long end = EnvironmentEdgeManager.currentTime();
long responseCellSize = 0;
long blockBytesScanned = 0;
if (rpcCall != null) {
responseCellSize = rpcCall.getResponseCellSize();
blockBytesScanned = rpcCall.getBlockBytesScanned();
rsh.updateBlockBytesScanned(blockBytesScanned);
}
region.getMetrics().updateScan();
final MetricsRegionServer metricsRegionServer = server.getMetrics();
if (metricsRegionServer != null) {
metricsRegionServer.updateScan(region, end - before, responseCellSize, blockBytesScanned);
metricsRegionServer.updateReadQueryMeter(region, numOfNextRawCalls);
}
}
// coprocessor postNext hook
if (region.getCoprocessorHost() != null) {
region.getCoprocessorHost().postScannerNext(scanner, results, maxResults, true);
}
}