public boolean next()

in phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java [599:738]


    public boolean next(List<Cell> resultsToReturn, ScannerContext scannerContext)
            throws IOException {
        boolean hasMore;
        long startTime = EnvironmentEdgeManager.currentTimeMillis();
        Configuration conf = env.getConfiguration();
        final TenantCache tenantCache = GlobalCache.getTenantCache(env, ScanUtil.getTenantId(scan));
        try (MemoryManager.MemoryChunk em = tenantCache.getMemoryManager().allocate(0)) {
            Aggregators aggregators = ServerAggregators.deserialize(
                    scan.getAttribute(BaseScannerRegionObserverConstants.AGGREGATORS), conf, em);
            Aggregator[] rowAggregators = aggregators.getAggregators();
            aggregators.reset(rowAggregators);
            Cell lastCell = null;
            boolean hasAny = false;
            ImmutableBytesWritable ptr = new ImmutableBytesWritable();
            Tuple result = useQualifierAsIndex ? new PositionBasedMultiKeyValueTuple() : new MultiKeyValueTuple();
            UngroupedAggregateRegionObserver.MutationList mutations = new UngroupedAggregateRegionObserver.MutationList();
            if (isDescRowKeyOrderUpgrade || isDelete || isUpsert
                    || (deleteCQ != null && deleteCF != null) || emptyCF != null || buildLocalIndex) {
                mutations = new UngroupedAggregateRegionObserver.MutationList(Ints.saturatedCast(maxBatchSize + maxBatchSize / 10));
            }
            Result atomicSingleRowDeleteResult = null;
            region.startRegionOperation();
            try {
                synchronized (innerScanner) {
                    do {
                        ungroupedAggregateRegionObserver.checkForRegionClosingOrSplitting();
                        List<Cell> results = useQualifierAsIndex ? new EncodedColumnQualiferCellsList(minMaxQualifiers.getFirst(), minMaxQualifiers.getSecond(), encodingScheme) : new ArrayList<Cell>();
                        // Results are potentially returned even when the return value of s.next is false
                        // since this is an indication of whether or not there are more values after the
                        // ones returned
                        hasMore = (scannerContext == null)
                            ? innerScanner.nextRaw(results)
                            : innerScanner.nextRaw(results, scannerContext);
                        if (isDummy(results)) {
                            if (!hasAny) {
                                resultsToReturn.addAll(results);
                                return true;
                            }
                            break;
                        }
                        if (!results.isEmpty()) {
                            lastCell = results.get(0);
                            result.setKeyValues(results);
                            if (isDescRowKeyOrderUpgrade) {
                                if (!descRowKeyOrderUpgrade(results, ptr, mutations)) {
                                    continue;
                                }
                            } else if (buildLocalIndex) {
                                buildLocalIndex(result, results, ptr);
                            } else if (isDelete) {
                                deleteRow(results, mutations);
                            } else if (isUpsert) {
                                upsert(result, ptr, mutations);
                            } else if (deleteCF != null && deleteCQ != null) {
                                deleteCForQ(result, results, mutations);
                            }
                            if (emptyCF != null) {
                                /*
                                 * If we've specified an emptyCF, then we need to insert an empty
                                 * key value "retroactively" for any key value that is visible at
                                 * the timestamp that the DDL was issued. Key values that are not
                                 * visible at this timestamp will not ever be projected up to
                                 * scans past this timestamp, so don't need to be considered.
                                 * We insert one empty key value per row per timestamp.
                                 */
                                insertEmptyKeyValue(results, mutations);
                            }
                            if (ServerUtil.readyToCommit(mutations.size(), mutations.byteSize(), maxBatchSize, maxBatchSizeBytes)) {
                                if (!isSingleRowDelete) {
                                    annotateAndCommit(mutations);
                                } else {
                                    atomicSingleRowDeleteResult =
                                            annotateCommitAndReturnResult(mutations);
                                }
                            }
                            // Commit in batches based on UPSERT_BATCH_SIZE_BYTES_ATTRIB in config

                            if (ServerUtil.readyToCommit(indexMutations.size(), indexMutations.byteSize(), maxBatchSize, maxBatchSizeBytes)) {
                                setIndexAndTransactionProperties(indexMutations, indexUUID, indexMaintainersPtr, txState, clientVersionBytes, useIndexProto);
                                ungroupedAggregateRegionObserver.commitBatch(region, indexMutations, blockingMemStoreSize);
                                indexMutations.clear();
                            }
                            aggregators.aggregate(rowAggregators, result);
                            hasAny = true;
                        }
                    } while (hasMore && (EnvironmentEdgeManager.currentTimeMillis() - startTime) < pageSizeMs);
                    if (!mutations.isEmpty()) {
                        if (!isSingleRowDelete) {
                            annotateAndCommit(mutations);
                        } else {
                            atomicSingleRowDeleteResult = annotateCommitAndReturnResult(mutations);
                        }
                    }
                    if (!indexMutations.isEmpty()) {
                        ungroupedAggregateRegionObserver.commitBatch(region, indexMutations, blockingMemStoreSize);
                        indexMutations.clear();
                    }
                }
            } catch (InsufficientMemoryException e) {
                throw new DoNotRetryIOException(e);
            } catch (DataExceedsCapacityException e) {
                throw new DoNotRetryIOException(e.getMessage(), e);
            } catch (Throwable e) {
                LOGGER.error("Exception in UngroupedAggregateRegionScanner for region "
                        + region.getRegionInfo().getRegionNameAsString(), e);
                throw e;
            } finally {
                region.closeRegionOperation();
            }
            Cell keyValue;
            if (hasAny) {
                final byte[] value;
                if (isSingleRowDelete && atomicSingleRowDeleteResult != null) {
                    resultsToReturn.addAll(atomicSingleRowDeleteResult.listCells());
                    return hasMore;
                } else {
                    value = aggregators.toBytes(rowAggregators);
                }
                if (pageSizeMs == Long.MAX_VALUE) {
                    byte[] rowKey;
                    final boolean isIncompatibleClient =
                            ScanUtil.isIncompatibleClientForServerReturnValidRowKey(scan);
                    if (!isIncompatibleClient) {
                        rowKey = CellUtil.cloneRow(lastCell);
                    } else {
                        // Paging is not set. To be compatible with older clients, do not set the row key
                        rowKey = UNGROUPED_AGG_ROW_KEY;
                    }
                    keyValue = PhoenixKeyValueUtil.newKeyValue(rowKey, SINGLE_COLUMN_FAMILY,
                            SINGLE_COLUMN,
                            AGG_TIMESTAMP, value, 0, value.length);
                } else {
                    keyValue = PhoenixKeyValueUtil.newKeyValue(CellUtil.cloneRow(lastCell), SINGLE_COLUMN_FAMILY, SINGLE_COLUMN,
                            AGG_TIMESTAMP, value, 0, value.length);
                }
                resultsToReturn.add(keyValue);
            }
            return hasMore;
        }
    }