in phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java [281:436]
public boolean next(List<Cell> results) throws IOException {
if (indexRowKeyforReadRepair != null &&
singleRowRebuildReturnCode == GlobalIndexChecker.RebuildReturnCode.NO_DATA_ROW.getValue()) {
byte[] rowCountBytes =
PLong.INSTANCE.toBytes((long) singleRowRebuildReturnCode);
byte[] rowKey;
byte[] startKey = scan.getStartRow().length > 0 ? scan.getStartRow() :
region.getRegionInfo().getStartKey();
byte[] endKey = scan.getStopRow().length > 0 ? scan.getStopRow() :
region.getRegionInfo().getEndKey();
final boolean isIncompatibleClient =
ScanUtil.isIncompatibleClientForServerReturnValidRowKey(scan);
if (!isIncompatibleClient) {
rowKey = ByteUtil.getLargestPossibleRowKeyInRange(startKey, endKey);
if (rowKey == null) {
if (scan.includeStartRow()) {
rowKey = startKey;
} else if (scan.includeStopRow()) {
rowKey = endKey;
} else {
rowKey = HConstants.EMPTY_END_ROW;
}
}
} else {
rowKey = UNGROUPED_AGG_ROW_KEY;
}
final Cell aggKeyValue = PhoenixKeyValueUtil.newKeyValue(rowKey,
SINGLE_COLUMN_FAMILY,
SINGLE_COLUMN, AGG_TIMESTAMP, rowCountBytes, 0, rowCountBytes.length);
results.add(aggKeyValue);
return false;
}
Map<byte[], List<Mutation>> indexMutationMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
Set<byte[]> mostRecentIndexRowKeys = new TreeSet<>(Bytes.BYTES_COMPARATOR);
Cell lastCell = null;
int dataRowCount = 0;
int indexMutationCount = 0;
region.startRegionOperation();
RegionScanner localScanner = null;
try {
localScanner = getLocalScanner();
if (localScanner == null) {
return false;
}
synchronized (localScanner) {
if (!shouldVerify()) {
skipped = true;
return false;
}
do {
/*
If region is closing and there are large number of rows being verified/rebuilt with IndexTool,
not having this check will impact/delay the region closing -- affecting the availability
as this method holds the read lock on the region.
* */
ungroupedAggregateRegionObserver.checkForRegionClosingOrSplitting();
List<Cell> row = new ArrayList<>();
hasMore = localScanner.nextRaw(row);
if (!row.isEmpty()) {
lastCell = row.get(0); // lastCell is any cell from the last visited row
if (isDummy(row)) {
break;
}
Put put = null;
Delete del = null;
for (Cell cell : row) {
if (cell.getTimestamp() < minTimestamp
&& indexMaintainer.isCDCIndex()) {
continue;
}
if (cell.getType().equals(Cell.Type.Put)) {
if (familyMap != null && !isColumnIncluded(cell)) {
continue;
}
if (put == null) {
put = new Put(CellUtil.cloneRow(cell));
}
put.add(cell);
} else {
if (del == null) {
del = new Delete(CellUtil.cloneRow(cell));
}
del.add(cell);
}
}
if (put == null && del == null) {
continue;
}
indexMutationCount += prepareIndexMutations(put, del, indexMutationMap, mostRecentIndexRowKeys);
dataRowCount++;
}
} while (hasMore && indexMutationCount < pageSizeInRows
&& dataRowCount < pageSizeInRows);
if (!indexMutationMap.isEmpty()) {
if (indexRowKeyforReadRepair != null) {
rebuildIndexRows(indexMutationMap, Collections.EMPTY_LIST, verificationResult);
} else {
verifyAndOrRebuildIndex(indexMutationMap, mostRecentIndexRowKeys);
}
}
if (verify) {
verificationResult.setScannedDataRowCount(verificationResult.getScannedDataRowCount() + dataRowCount);
}
}
} catch (Throwable e) {
LOGGER.error("Exception in IndexRebuildRegionScanner for region "
+ region.getRegionInfo().getRegionNameAsString(), e);
this.shouldRetry = true;
throw e;
} finally {
region.closeRegionOperation();
if (localScanner!=null && localScanner!=innerScanner) {
localScanner.close();
}
}
if (indexRowKeyforReadRepair != null) {
dataRowCount = singleRowRebuildReturnCode;
}
if (minTimestamp != 0) {
nextStartKey = ByteUtil.calculateTheClosestNextRowKeyForPrefix(CellUtil.cloneRow(lastCell));
}
byte[] rowCountBytes = PLong.INSTANCE.toBytes((long) dataRowCount);
final Cell aggKeyValue;
if (lastCell == null) {
byte[] rowKey;
byte[] startKey = scan.getStartRow().length > 0 ? scan.getStartRow() :
region.getRegionInfo().getStartKey();
byte[] endKey = scan.getStopRow().length > 0 ? scan.getStopRow() :
region.getRegionInfo().getEndKey();
final boolean isIncompatibleClient =
ScanUtil.isIncompatibleClientForServerReturnValidRowKey(scan);
if (!isIncompatibleClient) {
rowKey = ByteUtil.getLargestPossibleRowKeyInRange(startKey, endKey);
if (rowKey == null) {
if (scan.includeStartRow()) {
rowKey = startKey;
} else if (scan.includeStopRow()) {
rowKey = endKey;
} else {
rowKey = HConstants.EMPTY_END_ROW;
}
}
} else {
rowKey = UNGROUPED_AGG_ROW_KEY;
}
aggKeyValue = PhoenixKeyValueUtil.newKeyValue(rowKey,
SINGLE_COLUMN_FAMILY,
SINGLE_COLUMN, AGG_TIMESTAMP, rowCountBytes, 0, rowCountBytes.length);
} else {
aggKeyValue = PhoenixKeyValueUtil.newKeyValue(CellUtil.cloneRow(lastCell),
SINGLE_COLUMN_FAMILY,
SINGLE_COLUMN, AGG_TIMESTAMP, rowCountBytes, 0, rowCountBytes.length);
}
results.add(aggKeyValue);
return hasMore || hasMoreIncr;
}