private void scan()

in hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java [3315:3559]


  private void scan(HBaseRpcController controller, ScanRequest request, RegionScannerHolder rsh,
    long maxQuotaResultSize, int maxResults, int limitOfRows, List<Result> results,
    ScanResponse.Builder builder, RpcCall rpcCall) throws IOException {
    HRegion region = rsh.r;
    RegionScanner scanner = rsh.s;
    long maxResultSize;
    if (scanner.getMaxResultSize() > 0) {
      maxResultSize = Math.min(scanner.getMaxResultSize(), maxQuotaResultSize);
    } else {
      maxResultSize = maxQuotaResultSize;
    }
    // This is cells inside a row. Default size is 10 so if many versions or many cfs,
    // then we'll resize. Resizings show in profiler. Set it higher than 10. For now
    // arbitrary 32. TODO: keep record of general size of results being returned.
    ArrayList<Cell> values = new ArrayList<>(32);
    region.startRegionOperation(Operation.SCAN);
    long before = EnvironmentEdgeManager.currentTime();
    // Used to check if we've matched the row limit set on the Scan
    int numOfCompleteRows = 0;
    // Count of times we call nextRaw; can be > numOfCompleteRows.
    int numOfNextRawCalls = 0;
    try {
      int numOfResults = 0;
      synchronized (scanner) {
        boolean stale = (region.getRegionInfo().getReplicaId() != 0);
        boolean clientHandlesPartials =
          request.hasClientHandlesPartials() && request.getClientHandlesPartials();
        boolean clientHandlesHeartbeats =
          request.hasClientHandlesHeartbeats() && request.getClientHandlesHeartbeats();

        // On the server side we must ensure that the correct ordering of partial results is
        // returned to the client to allow them to properly reconstruct the partial results.
        // If the coprocessor host is adding to the result list, we cannot guarantee the
        // correct ordering of partial results and so we prevent partial results from being
        // formed.
        boolean serverGuaranteesOrderOfPartials = results.isEmpty();
        boolean allowPartialResults = clientHandlesPartials && serverGuaranteesOrderOfPartials;
        boolean moreRows = false;

        // Heartbeat messages occur when the processing of the ScanRequest is exceeds a
        // certain time threshold on the server. When the time threshold is exceeded, the
        // server stops the scan and sends back whatever Results it has accumulated within
        // that time period (may be empty). Since heartbeat messages have the potential to
        // create partial Results (in the event that the timeout occurs in the middle of a
        // row), we must only generate heartbeat messages when the client can handle both
        // heartbeats AND partials
        boolean allowHeartbeatMessages = clientHandlesHeartbeats && allowPartialResults;

        long timeLimit = getTimeLimit(rpcCall, controller, allowHeartbeatMessages);

        final LimitScope sizeScope =
          allowPartialResults ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS;
        final LimitScope timeScope =
          allowHeartbeatMessages ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS;

        boolean trackMetrics = request.hasTrackScanMetrics() && request.getTrackScanMetrics();

        // Configure with limits for this RPC. Set keep progress true since size progress
        // towards size limit should be kept between calls to nextRaw
        ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true);
        // maxResultSize - either we can reach this much size for all cells(being read) data or sum
        // of heap size occupied by cells(being read). Cell data means its key and value parts.
        // maxQuotaResultSize - max results just from server side configuration and quotas, without
        // user's specified max. We use this for evaluating limits based on blocks (not cells).
        // We may have accumulated some results in coprocessor preScannerNext call. Subtract any
        // cell or block size from maximum here so we adhere to total limits of request.
        // Note: we track block size in StoreScanner. If the CP hook got cells from hbase, it will
        // have accumulated block bytes. If not, this will be 0 for block size.
        long maxCellSize = maxResultSize;
        long maxBlockSize = maxQuotaResultSize;
        if (rpcCall != null) {
          maxBlockSize -= rpcCall.getBlockBytesScanned();
          maxCellSize -= rpcCall.getResponseCellSize();
        }

        contextBuilder.setSizeLimit(sizeScope, maxCellSize, maxCellSize, maxBlockSize);
        contextBuilder.setBatchLimit(scanner.getBatch());
        contextBuilder.setTimeLimit(timeScope, timeLimit);
        contextBuilder.setTrackMetrics(trackMetrics);
        ScannerContext scannerContext = contextBuilder.build();
        boolean limitReached = false;
        long blockBytesScannedBefore = 0;
        while (numOfResults < maxResults) {
          // Reset the batch progress to 0 before every call to RegionScanner#nextRaw. The
          // batch limit is a limit on the number of cells per Result. Thus, if progress is
          // being tracked (i.e. scannerContext.keepProgress() is true) then we need to
          // reset the batch progress between nextRaw invocations since we don't want the
          // batch progress from previous calls to affect future calls
          scannerContext.setBatchProgress(0);
          assert values.isEmpty();

          // Collect values to be returned here
          moreRows = scanner.nextRaw(values, scannerContext);

          long blockBytesScanned = scannerContext.getBlockSizeProgress() - blockBytesScannedBefore;
          blockBytesScannedBefore = scannerContext.getBlockSizeProgress();

          if (rpcCall == null) {
            // When there is no RpcCallContext,copy EC to heap, then the scanner would close,
            // This can be an EXPENSIVE call. It may make an extra copy from offheap to onheap
            // buffers.See more details in HBASE-26036.
            CellUtil.cloneIfNecessary(values);
          }
          numOfNextRawCalls++;

          if (!values.isEmpty()) {
            if (limitOfRows > 0) {
              // First we need to check if the last result is partial and we have a row change. If
              // so then we need to increase the numOfCompleteRows.
              if (results.isEmpty()) {
                if (
                  rsh.rowOfLastPartialResult != null
                    && !CellUtil.matchingRows(values.get(0), rsh.rowOfLastPartialResult)
                ) {
                  numOfCompleteRows++;
                  checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext,
                    builder);
                }
              } else {
                Result lastResult = results.get(results.size() - 1);
                if (
                  lastResult.mayHaveMoreCellsInRow()
                    && !CellUtil.matchingRows(values.get(0), lastResult.getRow())
                ) {
                  numOfCompleteRows++;
                  checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext,
                    builder);
                }
              }
              if (builder.hasMoreResults() && !builder.getMoreResults()) {
                break;
              }
            }
            boolean mayHaveMoreCellsInRow = scannerContext.mayHaveMoreCellsInRow();
            Result r = Result.create(values, null, stale, mayHaveMoreCellsInRow);

            if (request.getScan().getQueryMetricsEnabled()) {
              builder.addQueryMetrics(ClientProtos.QueryMetrics.newBuilder()
                .setBlockBytesScanned(blockBytesScanned).build());
            }

            results.add(r);
            numOfResults++;
            if (!mayHaveMoreCellsInRow && limitOfRows > 0) {
              numOfCompleteRows++;
              checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext, builder);
              if (builder.hasMoreResults() && !builder.getMoreResults()) {
                break;
              }
            }
          } else if (!moreRows && !results.isEmpty()) {
            // No more cells for the scan here, we need to ensure that the mayHaveMoreCellsInRow of
            // last result is false. Otherwise it's possible that: the first nextRaw returned
            // because BATCH_LIMIT_REACHED (BTW it happen to exhaust all cells of the scan),so the
            // last result's mayHaveMoreCellsInRow will be true. while the following nextRaw will
            // return with moreRows=false, which means moreResultsInRegion would be false, it will
            // be a contradictory state (HBASE-21206).
            int lastIdx = results.size() - 1;
            Result r = results.get(lastIdx);
            if (r.mayHaveMoreCellsInRow()) {
              results.set(lastIdx, ClientInternalHelper.createResult(
                ClientInternalHelper.getExtendedRawCells(r), r.getExists(), r.isStale(), false));
            }
          }
          boolean sizeLimitReached = scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS);
          boolean timeLimitReached = scannerContext.checkTimeLimit(LimitScope.BETWEEN_ROWS);
          boolean resultsLimitReached = numOfResults >= maxResults;
          limitReached = sizeLimitReached || timeLimitReached || resultsLimitReached;

          if (limitReached || !moreRows) {
            // With block size limit, we may exceed size limit without collecting any results.
            // In this case we want to send heartbeat and/or cursor. We don't want to send heartbeat
            // or cursor if results were collected, for example for cell size or heap size limits.
            boolean sizeLimitReachedWithoutResults = sizeLimitReached && results.isEmpty();
            // We only want to mark a ScanResponse as a heartbeat message in the event that
            // there are more values to be read server side. If there aren't more values,
            // marking it as a heartbeat is wasteful because the client will need to issue
            // another ScanRequest only to realize that they already have all the values
            if (moreRows && (timeLimitReached || sizeLimitReachedWithoutResults)) {
              // Heartbeat messages occur when the time limit has been reached, or size limit has
              // been reached before collecting any results. This can happen for heavily filtered
              // scans which scan over too many blocks.
              builder.setHeartbeatMessage(true);
              if (rsh.needCursor) {
                Cell cursorCell = scannerContext.getLastPeekedCell();
                if (cursorCell != null) {
                  builder.setCursor(ProtobufUtil.toCursor(cursorCell));
                }
              }
            }
            break;
          }
          values.clear();
        }
        if (rpcCall != null) {
          rpcCall.incrementResponseCellSize(scannerContext.getHeapSizeProgress());
        }
        builder.setMoreResultsInRegion(moreRows);
        // Check to see if the client requested that we track metrics server side. If the
        // client requested metrics, retrieve the metrics from the scanner context.
        if (trackMetrics) {
          // rather than increment yet another counter in StoreScanner, just set the value here
          // from block size progress before writing into the response
          scannerContext.getMetrics().countOfBlockBytesScanned
            .set(scannerContext.getBlockSizeProgress());
          if (rpcCall != null) {
            scannerContext.getMetrics().fsReadTime.set(rpcCall.getFsReadTime());
          }
          Map<String, Long> metrics = scannerContext.getMetrics().getMetricsMap();
          ScanMetrics.Builder metricBuilder = ScanMetrics.newBuilder();
          NameInt64Pair.Builder pairBuilder = NameInt64Pair.newBuilder();

          for (Entry<String, Long> entry : metrics.entrySet()) {
            pairBuilder.setName(entry.getKey());
            pairBuilder.setValue(entry.getValue());
            metricBuilder.addMetrics(pairBuilder.build());
          }

          builder.setScanMetrics(metricBuilder.build());
        }
      }
    } finally {
      region.closeRegionOperation();
      // Update serverside metrics, even on error.
      long end = EnvironmentEdgeManager.currentTime();

      long responseCellSize = 0;
      long blockBytesScanned = 0;
      if (rpcCall != null) {
        responseCellSize = rpcCall.getResponseCellSize();
        blockBytesScanned = rpcCall.getBlockBytesScanned();
        rsh.updateBlockBytesScanned(blockBytesScanned);
      }
      region.getMetrics().updateScan();
      final MetricsRegionServer metricsRegionServer = server.getMetrics();
      if (metricsRegionServer != null) {
        metricsRegionServer.updateScan(region, end - before, responseCellSize, blockBytesScanned);
        metricsRegionServer.updateReadQueryMeter(region, numOfNextRawCalls);
      }
    }
    // coprocessor postNext hook
    if (region.getCoprocessorHost() != null) {
      region.getCoprocessorHost().postScannerNext(scanner, results, maxResults, true);
    }
  }