public boolean next()

in hbase-coprocessor/src/main/java/org/apache/hadoop/hbase/regionserver/CompactorScanner.java [97:183]


    public boolean next(List<Cell> result, int limit) throws IOException {

        if (currentRowWorthValues.isEmpty()) {
            // 1) Read next row
            List<Cell> scanResult = new ArrayList<Cell>();
            hasMoreRows = internalScanner.next(scanResult);
            if (LOG.isTraceEnabled()) {
                LOG.trace("Row: Result {} limit {} more rows? {}", scanResult, limit, hasMoreRows);
            }
            // 2) Traverse result list separating normal cells from shadow
            // cells and building a map to access easily the shadow cells.
            SortedMap<Cell, Optional<Cell>> cellToSc = CellUtils.mapCellsToShadowCells(scanResult);

            // 3) traverse the list of row key values isolated before and
            // check which ones should be discarded
            Map<String, CellInfo> lastTimestampedCellsInRow = new HashMap<>();
            PeekingIterator<Map.Entry<Cell, Optional<Cell>>> iter
                    = Iterators.peekingIterator(cellToSc.entrySet().iterator());
            while (iter.hasNext()) {
                Map.Entry<Cell, Optional<Cell>> entry = iter.next();
                Cell cell = entry.getKey();
                Optional<Cell> shadowCellOp = entry.getValue();

                if (cell.getTimestamp() > lowWatermark) {
                    retain(currentRowWorthValues, cell, shadowCellOp);
                    continue;
                }

                if (shouldRetainNonTransactionallyDeletedCell(cell)) {
                    retain(currentRowWorthValues, cell, shadowCellOp);
                    continue;
                }

                // During a minor compaction the coprocessor may only see a
                // subset of store files and may not have the all the versions
                // of a cell available for consideration. Therefore, if it
                // deletes a cell with a tombstone during a minor compaction,
                // an older version of the cell may become visible again. So,
                // we have to remove tombstones only in major compactions.
                if (isMajorCompaction) {
                    // Strong assumption that family delete cells arrive first before any other column
                    if (CellUtils.isTombstone(cell)) {
                        if (shadowCellOp.isPresent()) {
                            skipToNextColumn(cell, iter);
                        } else {
                            Optional<CommitTimestamp> commitTimestamp = queryCommitTimestamp(cell);
                            // Clean the cell only if it is valid
                            if (commitTimestamp.isPresent() && commitTimestamp.get().isValid()) {
                                skipToNextColumn(cell, iter);
                            }
                        }
                        continue;
                    }
                }

                if (shadowCellOp.isPresent()) {
                    saveLastTimestampedCell(lastTimestampedCellsInRow, cell, shadowCellOp.get());
                } else {
                    Optional<CommitTimestamp> commitTimestamp = queryCommitTimestamp(cell);
                    if (commitTimestamp.isPresent() && commitTimestamp.get().isValid()) {
                        // Build the missing shadow cell...
                        byte[] shadowCellValue = Bytes.toBytes(commitTimestamp.get().getValue());
                        Cell shadowCell = CellUtils.buildShadowCellFromCell(cell, shadowCellValue);
                        saveLastTimestampedCell(lastTimestampedCellsInRow, cell, shadowCell);
                    } else {
                        LOG.trace("Discarding cell {}", cell);
                    }
                }
            }
            retainLastTimestampedCellsSaved(currentRowWorthValues, lastTimestampedCellsInRow);

            // 4) Sort the list
            Collections.sort(currentRowWorthValues, CellComparator.getInstance());
        }

        // Chomp current row worth values up to the limit
        if (currentRowWorthValues.size() <= limit || limit == -1) {
            result.addAll(currentRowWorthValues);
            currentRowWorthValues.clear();
        } else {
            result.addAll(currentRowWorthValues.subList(0, limit));
            currentRowWorthValues.subList(0, limit).clear();
        }
        LOG.trace("Results to preserve {}", result);

        return hasMoreRows;
    }