in phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java [599:738]
public boolean next(List<Cell> resultsToReturn, ScannerContext scannerContext)
throws IOException {
boolean hasMore;
long startTime = EnvironmentEdgeManager.currentTimeMillis();
Configuration conf = env.getConfiguration();
final TenantCache tenantCache = GlobalCache.getTenantCache(env, ScanUtil.getTenantId(scan));
try (MemoryManager.MemoryChunk em = tenantCache.getMemoryManager().allocate(0)) {
Aggregators aggregators = ServerAggregators.deserialize(
scan.getAttribute(BaseScannerRegionObserverConstants.AGGREGATORS), conf, em);
Aggregator[] rowAggregators = aggregators.getAggregators();
aggregators.reset(rowAggregators);
Cell lastCell = null;
boolean hasAny = false;
ImmutableBytesWritable ptr = new ImmutableBytesWritable();
Tuple result = useQualifierAsIndex ? new PositionBasedMultiKeyValueTuple() : new MultiKeyValueTuple();
UngroupedAggregateRegionObserver.MutationList mutations = new UngroupedAggregateRegionObserver.MutationList();
if (isDescRowKeyOrderUpgrade || isDelete || isUpsert
|| (deleteCQ != null && deleteCF != null) || emptyCF != null || buildLocalIndex) {
mutations = new UngroupedAggregateRegionObserver.MutationList(Ints.saturatedCast(maxBatchSize + maxBatchSize / 10));
}
Result atomicSingleRowDeleteResult = null;
region.startRegionOperation();
try {
synchronized (innerScanner) {
do {
ungroupedAggregateRegionObserver.checkForRegionClosingOrSplitting();
List<Cell> results = useQualifierAsIndex ? new EncodedColumnQualiferCellsList(minMaxQualifiers.getFirst(), minMaxQualifiers.getSecond(), encodingScheme) : new ArrayList<Cell>();
// Results are potentially returned even when the return value of s.next is false
// since this is an indication of whether or not there are more values after the
// ones returned
hasMore = (scannerContext == null)
? innerScanner.nextRaw(results)
: innerScanner.nextRaw(results, scannerContext);
if (isDummy(results)) {
if (!hasAny) {
resultsToReturn.addAll(results);
return true;
}
break;
}
if (!results.isEmpty()) {
lastCell = results.get(0);
result.setKeyValues(results);
if (isDescRowKeyOrderUpgrade) {
if (!descRowKeyOrderUpgrade(results, ptr, mutations)) {
continue;
}
} else if (buildLocalIndex) {
buildLocalIndex(result, results, ptr);
} else if (isDelete) {
deleteRow(results, mutations);
} else if (isUpsert) {
upsert(result, ptr, mutations);
} else if (deleteCF != null && deleteCQ != null) {
deleteCForQ(result, results, mutations);
}
if (emptyCF != null) {
/*
* If we've specified an emptyCF, then we need to insert an empty
* key value "retroactively" for any key value that is visible at
* the timestamp that the DDL was issued. Key values that are not
* visible at this timestamp will not ever be projected up to
* scans past this timestamp, so don't need to be considered.
* We insert one empty key value per row per timestamp.
*/
insertEmptyKeyValue(results, mutations);
}
if (ServerUtil.readyToCommit(mutations.size(), mutations.byteSize(), maxBatchSize, maxBatchSizeBytes)) {
if (!isSingleRowDelete) {
annotateAndCommit(mutations);
} else {
atomicSingleRowDeleteResult =
annotateCommitAndReturnResult(mutations);
}
}
// Commit in batches based on UPSERT_BATCH_SIZE_BYTES_ATTRIB in config
if (ServerUtil.readyToCommit(indexMutations.size(), indexMutations.byteSize(), maxBatchSize, maxBatchSizeBytes)) {
setIndexAndTransactionProperties(indexMutations, indexUUID, indexMaintainersPtr, txState, clientVersionBytes, useIndexProto);
ungroupedAggregateRegionObserver.commitBatch(region, indexMutations, blockingMemStoreSize);
indexMutations.clear();
}
aggregators.aggregate(rowAggregators, result);
hasAny = true;
}
} while (hasMore && (EnvironmentEdgeManager.currentTimeMillis() - startTime) < pageSizeMs);
if (!mutations.isEmpty()) {
if (!isSingleRowDelete) {
annotateAndCommit(mutations);
} else {
atomicSingleRowDeleteResult = annotateCommitAndReturnResult(mutations);
}
}
if (!indexMutations.isEmpty()) {
ungroupedAggregateRegionObserver.commitBatch(region, indexMutations, blockingMemStoreSize);
indexMutations.clear();
}
}
} catch (InsufficientMemoryException e) {
throw new DoNotRetryIOException(e);
} catch (DataExceedsCapacityException e) {
throw new DoNotRetryIOException(e.getMessage(), e);
} catch (Throwable e) {
LOGGER.error("Exception in UngroupedAggregateRegionScanner for region "
+ region.getRegionInfo().getRegionNameAsString(), e);
throw e;
} finally {
region.closeRegionOperation();
}
Cell keyValue;
if (hasAny) {
final byte[] value;
if (isSingleRowDelete && atomicSingleRowDeleteResult != null) {
resultsToReturn.addAll(atomicSingleRowDeleteResult.listCells());
return hasMore;
} else {
value = aggregators.toBytes(rowAggregators);
}
if (pageSizeMs == Long.MAX_VALUE) {
byte[] rowKey;
final boolean isIncompatibleClient =
ScanUtil.isIncompatibleClientForServerReturnValidRowKey(scan);
if (!isIncompatibleClient) {
rowKey = CellUtil.cloneRow(lastCell);
} else {
// Paging is not set. To be compatible with older clients, do not set the row key
rowKey = UNGROUPED_AGG_ROW_KEY;
}
keyValue = PhoenixKeyValueUtil.newKeyValue(rowKey, SINGLE_COLUMN_FAMILY,
SINGLE_COLUMN,
AGG_TIMESTAMP, value, 0, value.length);
} else {
keyValue = PhoenixKeyValueUtil.newKeyValue(CellUtil.cloneRow(lastCell), SINGLE_COLUMN_FAMILY, SINGLE_COLUMN,
AGG_TIMESTAMP, value, 0, value.length);
}
resultsToReturn.add(keyValue);
}
return hasMore;
}
}