public boolean next()

in hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java [553:815]


  public boolean next(List<? super ExtendedCell> outResult, ScannerContext scannerContext)
    throws IOException {
    if (scannerContext == null) {
      throw new IllegalArgumentException("Scanner context cannot be null");
    }
    if (checkFlushed() && reopenAfterFlush()) {
      return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
    }

    // if the heap was left null, then the scanners had previously run out anyways, close and
    // return.
    if (this.heap == null) {
      // By this time partial close should happened because already heap is null
      close(false);// Do all cleanup except heap.close()
      return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
    }

    ExtendedCell cell = this.heap.peek();
    if (cell == null) {
      close(false);// Do all cleanup except heap.close()
      return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
    }

    // only call setRow if the row changes; avoids confusing the query matcher
    // if scanning intra-row

    // If no limits exists in the scope LimitScope.Between_Cells then we are sure we are changing
    // rows. Else it is possible we are still traversing the same row so we must perform the row
    // comparison.
    if (!scannerContext.hasAnyLimit(LimitScope.BETWEEN_CELLS) || matcher.currentRow() == null) {
      this.countPerRow = 0;
      matcher.setToNewRow(cell);
    }

    // Clear progress away unless invoker has indicated it should be kept.
    if (!scannerContext.getKeepProgress() && !scannerContext.getSkippingRow()) {
      scannerContext.clearProgress();
    }

    Optional<RpcCall> rpcCall =
      matcher.isUserScan() ? RpcServer.getCurrentCall() : Optional.empty();
    // re-useable closure to avoid allocations
    IntConsumer recordBlockSize = blockSize -> {
      if (rpcCall.isPresent()) {
        rpcCall.get().incrementBlockBytesScanned(blockSize);
      }
      scannerContext.incrementBlockProgress(blockSize);
    };

    int count = 0;
    long totalBytesRead = 0;
    // track the cells for metrics only if it is a user read request.
    boolean onlyFromMemstore = matcher.isUserScan();
    try {
      LOOP: do {
        // Update and check the time limit based on the configured value of cellsPerTimeoutCheck
        // Or if the preadMaxBytes is reached and we may want to return so we can switch to stream
        // in
        // the shipped method below.
        if (
          kvsScanned % cellsPerHeartbeatCheck == 0
            || (scanUsePread && readType == Scan.ReadType.DEFAULT && bytesRead > preadMaxBytes)
        ) {
          if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) {
            return scannerContext.setScannerState(NextState.TIME_LIMIT_REACHED).hasMoreValues();
          }
        }
        // Do object compare - we set prevKV from the same heap.
        if (prevCell != cell) {
          ++kvsScanned;
        }
        checkScanOrder(prevCell, cell, comparator);
        int cellSize = PrivateCellUtil.estimatedSerializedSizeOf(cell);
        bytesRead += cellSize;
        if (scanUsePread && readType == Scan.ReadType.DEFAULT && bytesRead > preadMaxBytes) {
          // return immediately if we want to switch from pread to stream. We need this because we
          // can
          // only switch in the shipped method, if user use a filter to filter out everything and
          // rpc
          // timeout is very large then the shipped method will never be called until the whole scan
          // is finished, but at that time we have already scan all the data...
          // See HBASE-20457 for more details.
          // And there is still a scenario that can not be handled. If we have a very large row,
          // which
          // have millions of qualifiers, and filter.filterRow is used, then even if we set the flag
          // here, we still need to scan all the qualifiers before returning...
          scannerContext.returnImmediately();
        }

        heap.recordBlockSize(recordBlockSize);

        prevCell = cell;
        scannerContext.setLastPeekedCell(cell);
        topChanged = false;
        ScanQueryMatcher.MatchCode qcode = matcher.match(cell);
        switch (qcode) {
          case INCLUDE:
          case INCLUDE_AND_SEEK_NEXT_ROW:
          case INCLUDE_AND_SEEK_NEXT_COL:
            Filter f = matcher.getFilter();
            if (f != null) {
              Cell transformedCell = f.transformCell(cell);
              // fast path, most filters just return the same cell instance
              if (transformedCell != cell) {
                if (transformedCell instanceof ExtendedCell) {
                  cell = (ExtendedCell) transformedCell;
                } else {
                  throw new DoNotRetryIOException("Incorrect filter implementation, "
                    + "the Cell returned by transformCell is not an ExtendedCell. Filter class: "
                    + f.getClass().getName());
                }
              }
            }
            this.countPerRow++;

            // add to results only if we have skipped #storeOffset kvs
            // also update metric accordingly
            if (this.countPerRow > storeOffset) {
              outResult.add(cell);

              // Update local tracking information
              count++;
              totalBytesRead += cellSize;

              /**
               * Increment the metric if all the cells are from memstore. If not we will account it
               * for mixed reads
               */
              onlyFromMemstore = onlyFromMemstore && heap.isLatestCellFromMemstore();
              // Update the progress of the scanner context
              scannerContext.incrementSizeProgress(cellSize, cell.heapSize());
              scannerContext.incrementBatchProgress(1);

              if (matcher.isUserScan() && totalBytesRead > maxRowSize) {
                String message = "Max row size allowed: " + maxRowSize
                  + ", but the row is bigger than that, the row info: "
                  + CellUtil.toString(cell, false) + ", already have process row cells = "
                  + outResult.size() + ", it belong to region = "
                  + store.getHRegion().getRegionInfo().getRegionNameAsString();
                LOG.warn(message);
                throw new RowTooBigException(message);
              }

              if (storeLimit > -1 && this.countPerRow >= (storeLimit + storeOffset)) {
                // do what SEEK_NEXT_ROW does.
                if (!matcher.moreRowsMayExistAfter(cell)) {
                  close(false);// Do all cleanup except heap.close()
                  return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
                }
                matcher.clearCurrentRow();
                seekToNextRow(cell);
                break LOOP;
              }
            }

            if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) {
              if (!matcher.moreRowsMayExistAfter(cell)) {
                close(false);// Do all cleanup except heap.close()
                return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
              }
              matcher.clearCurrentRow();
              seekOrSkipToNextRow(cell);
            } else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) {
              seekOrSkipToNextColumn(cell);
            } else {
              this.heap.next();
            }

            if (scannerContext.checkBatchLimit(LimitScope.BETWEEN_CELLS)) {
              break LOOP;
            }
            if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_CELLS)) {
              break LOOP;
            }
            continue;

          case DONE:
            // Optimization for Gets! If DONE, no more to get on this row, early exit!
            if (get) {
              // Then no more to this row... exit.
              close(false);// Do all cleanup except heap.close()
              // update metric
              return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
            }
            matcher.clearCurrentRow();
            return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();

          case DONE_SCAN:
            close(false);// Do all cleanup except heap.close()
            return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();

          case SEEK_NEXT_ROW:
            // This is just a relatively simple end of scan fix, to short-cut end
            // us if there is an endKey in the scan.
            if (!matcher.moreRowsMayExistAfter(cell)) {
              close(false);// Do all cleanup except heap.close()
              return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
            }
            matcher.clearCurrentRow();
            seekOrSkipToNextRow(cell);
            NextState stateAfterSeekNextRow = needToReturn();
            if (stateAfterSeekNextRow != null) {
              return scannerContext.setScannerState(stateAfterSeekNextRow).hasMoreValues();
            }
            break;

          case SEEK_NEXT_COL:
            seekOrSkipToNextColumn(cell);
            NextState stateAfterSeekNextColumn = needToReturn();
            if (stateAfterSeekNextColumn != null) {
              return scannerContext.setScannerState(stateAfterSeekNextColumn).hasMoreValues();
            }
            break;

          case SKIP:
            this.heap.next();
            break;

          case SEEK_NEXT_USING_HINT:
            ExtendedCell nextKV = matcher.getNextKeyHint(cell);
            if (nextKV != null) {
              int difference = comparator.compare(nextKV, cell);
              if (
                ((!scan.isReversed() && difference > 0) || (scan.isReversed() && difference < 0))
              ) {
                seekAsDirection(nextKV);
                NextState stateAfterSeekByHint = needToReturn();
                if (stateAfterSeekByHint != null) {
                  return scannerContext.setScannerState(stateAfterSeekByHint).hasMoreValues();
                }
                break;
              }
            }
            heap.next();
            break;

          default:
            throw new RuntimeException("UNEXPECTED");
        }

        // One last chance to break due to size limit. The INCLUDE* cases above already check
        // limit and continue. For the various filtered cases, we need to check because block
        // size limit may have been exceeded even if we don't add cells to result list.
        if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_CELLS)) {
          return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
        }
      } while ((cell = this.heap.peek()) != null);

      if (count > 0) {
        return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
      }

      // No more keys
      close(false);// Do all cleanup except heap.close()
      return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
    } finally {
      // increment only if we have some result
      if (count > 0 && matcher.isUserScan()) {
        // if true increment memstore metrics, if not the mixed one
        updateMetricsStore(onlyFromMemstore);
      }
    }
  }