in hbase-coprocessor/src/main/java/org/apache/hadoop/hbase/regionserver/CompactorScanner.java [97:183]
public boolean next(List<Cell> result, int limit) throws IOException {
if (currentRowWorthValues.isEmpty()) {
// 1) Read next row
List<Cell> scanResult = new ArrayList<Cell>();
hasMoreRows = internalScanner.next(scanResult);
if (LOG.isTraceEnabled()) {
LOG.trace("Row: Result {} limit {} more rows? {}", scanResult, limit, hasMoreRows);
}
// 2) Traverse result list separating normal cells from shadow
// cells and building a map to access easily the shadow cells.
SortedMap<Cell, Optional<Cell>> cellToSc = CellUtils.mapCellsToShadowCells(scanResult);
// 3) traverse the list of row key values isolated before and
// check which ones should be discarded
Map<String, CellInfo> lastTimestampedCellsInRow = new HashMap<>();
PeekingIterator<Map.Entry<Cell, Optional<Cell>>> iter
= Iterators.peekingIterator(cellToSc.entrySet().iterator());
while (iter.hasNext()) {
Map.Entry<Cell, Optional<Cell>> entry = iter.next();
Cell cell = entry.getKey();
Optional<Cell> shadowCellOp = entry.getValue();
if (cell.getTimestamp() > lowWatermark) {
retain(currentRowWorthValues, cell, shadowCellOp);
continue;
}
if (shouldRetainNonTransactionallyDeletedCell(cell)) {
retain(currentRowWorthValues, cell, shadowCellOp);
continue;
}
// During a minor compaction the coprocessor may only see a
// subset of store files and may not have the all the versions
// of a cell available for consideration. Therefore, if it
// deletes a cell with a tombstone during a minor compaction,
// an older version of the cell may become visible again. So,
// we have to remove tombstones only in major compactions.
if (isMajorCompaction) {
// Strong assumption that family delete cells arrive first before any other column
if (CellUtils.isTombstone(cell)) {
if (shadowCellOp.isPresent()) {
skipToNextColumn(cell, iter);
} else {
Optional<CommitTimestamp> commitTimestamp = queryCommitTimestamp(cell);
// Clean the cell only if it is valid
if (commitTimestamp.isPresent() && commitTimestamp.get().isValid()) {
skipToNextColumn(cell, iter);
}
}
continue;
}
}
if (shadowCellOp.isPresent()) {
saveLastTimestampedCell(lastTimestampedCellsInRow, cell, shadowCellOp.get());
} else {
Optional<CommitTimestamp> commitTimestamp = queryCommitTimestamp(cell);
if (commitTimestamp.isPresent() && commitTimestamp.get().isValid()) {
// Build the missing shadow cell...
byte[] shadowCellValue = Bytes.toBytes(commitTimestamp.get().getValue());
Cell shadowCell = CellUtils.buildShadowCellFromCell(cell, shadowCellValue);
saveLastTimestampedCell(lastTimestampedCellsInRow, cell, shadowCell);
} else {
LOG.trace("Discarding cell {}", cell);
}
}
}
retainLastTimestampedCellsSaved(currentRowWorthValues, lastTimestampedCellsInRow);
// 4) Sort the list
Collections.sort(currentRowWorthValues, CellComparator.getInstance());
}
// Chomp current row worth values up to the limit
if (currentRowWorthValues.size() <= limit || limit == -1) {
result.addAll(currentRowWorthValues);
currentRowWorthValues.clear();
} else {
result.addAll(currentRowWorthValues.subList(0, limit));
currentRowWorthValues.subList(0, limit).clear();
}
LOG.trace("Results to preserve {}", result);
return hasMoreRows;
}