in exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java [1060:1235]
public AggIterOutcome outputCurrentBatch() {
// Handle the case of an EMIT with an empty batch
if (handleEmit && (batchHolders == null || batchHolders[0].size() == 0)) {
lastBatchOutputCount = 0; // empty
allocateOutgoing(0);
outgoing.getContainer().setValueCount(0);
// When returning the last outgoing batch (following an incoming EMIT), then replace OK with EMIT
outcome = IterOutcome.EMIT;
handleEmit = false; // finish handling EMIT
if (outBatchIndex != null) {
outBatchIndex[0] = 0; // reset, for the next EMIT
}
return AggIterOutcome.AGG_EMIT;
}
// when incoming was an empty batch, just finish up
if (schema == null) {
logger.trace("Incoming was empty; output is an empty batch.");
outcome = IterOutcome.NONE; // no records were read
allFlushed = true;
return AggIterOutcome.AGG_NONE;
}
// Initialization (covers the case of early output)
ArrayList<BatchHolder> currPartition = batchHolders[earlyPartition];
int currOutBatchIndex = outBatchIndex[earlyPartition];
int partitionToReturn = earlyPartition;
if (!earlyOutput) {
// Update the next partition to return (if needed)
// skip fully returned (or spilled) partitions
while (nextPartitionToReturn < spilledState.getNumPartitions()) {
//
// If this partition was spilled - spill the rest of it and skip it
//
if (isSpilled(nextPartitionToReturn)) {
spillAPartition(nextPartitionToReturn); // spill the rest
HashAggSpilledPartition sp = new HashAggSpilledPartition(
spilledState.getCycle(),
nextPartitionToReturn,
originalPartition,
spilledBatchesCount[nextPartitionToReturn],
spillFiles[nextPartitionToReturn]);
spilledState.addPartition(sp);
reinitPartition(nextPartitionToReturn); // free the memory
try {
spillSet.close(writers[nextPartitionToReturn]);
} catch (IOException ioe) {
throw UserException.resourceError(ioe)
.message("IO Error while closing output stream")
.build(logger);
}
writers[nextPartitionToReturn] = null;
}
else {
currPartition = batchHolders[nextPartitionToReturn];
currOutBatchIndex = outBatchIndex[nextPartitionToReturn];
// If curr batch (partition X index) is not empty - proceed to return it
if (currOutBatchIndex < currPartition.size() && 0 != currPartition.get(currOutBatchIndex).getNumPendingOutput()) {
break;
}
}
nextPartitionToReturn++; // else check next partition
}
// if passed the last partition - either done or need to restart and read spilled partitions
if (nextPartitionToReturn >= spilledState.getNumPartitions()) {
// The following "if" is probably never used; due to a similar check at the end of this method
if (spilledState.isEmpty()) { // and no spilled partitions
allFlushed = true;
outcome = IterOutcome.NONE;
if (phase.is2nd() && spillSet.getWriteBytes() > 0) {
stats.setLongStat(Metric.SPILL_MB, // update stats - total MB spilled
(int) Math.round(spillSet.getWriteBytes() / 1024.0D / 1024.0));
}
return AggIterOutcome.AGG_NONE; // then return NONE
}
// Else - there are still spilled partitions to process - pick one and handle just like a new incoming
buildComplete = false; // go back and call doWork() again
handlingSpills = true; // beginning to work on the spill files
// pick a spilled partition; set a new incoming ...
HashAggSpilledPartition sp = spilledState.getNextSpilledPartition();
// Create a new "incoming" out of the spilled partition spill file
newIncoming = new SpilledRecordBatch(sp.getSpillFile(), sp.getSpilledBatches(), context, schema, oContext, spillSet);
originalPartition = sp.getOriginPartition(); // used for the filename
logger.trace("Reading back spilled original partition {} as an incoming",originalPartition);
// Initialize .... new incoming, new set of partitions
try {
initializeSetup(newIncoming);
} catch (Exception e) {
throw new RuntimeException(e);
}
spilledState.updateCycle(stats, sp, updater);
return AggIterOutcome.AGG_RESTART;
}
partitionToReturn = nextPartitionToReturn;
}
// get the number of records in the batch holder that are pending output
int numPendingOutput = currPartition.get(currOutBatchIndex).getNumPendingOutput();
// The following accounting is for logging, metrics, etc.
rowsInPartition += numPendingOutput;
if (!handlingSpills) { rowsNotSpilled += numPendingOutput; }
else { rowsSpilledReturned += numPendingOutput; }
if (earlyOutput) { rowsReturnedEarly += numPendingOutput; }
allocateOutgoing(numPendingOutput);
currPartition.get(currOutBatchIndex).outputValues();
int numOutputRecords = numPendingOutput;
htables[partitionToReturn].outputKeys(currOutBatchIndex, outContainer, numPendingOutput);
// set the value count for outgoing batch value vectors
outgoing.getContainer().setValueCount(numOutputRecords);
outgoing.getRecordBatchMemoryManager().updateOutgoingStats(numOutputRecords);
RecordBatchStats.logRecordBatchStats(RecordBatchIOType.OUTPUT, outgoing, outgoing.getRecordBatchStatsContext());
outcome = IterOutcome.OK;
if (EXTRA_DEBUG_SPILL && phase.is2nd()) {
logger.debug("So far returned {} + SpilledReturned {} total {} (spilled {})",rowsNotSpilled,rowsSpilledReturned,
rowsNotSpilled+rowsSpilledReturned,
rowsSpilled);
}
lastBatchOutputCount = numOutputRecords;
outBatchIndex[partitionToReturn]++;
// if just flushed the last batch in the partition
if (outBatchIndex[partitionToReturn] == currPartition.size()) {
if (EXTRA_DEBUG_SPILL) {
logger.debug("HashAggregate: {} Flushed partition {} with {} batches total {} rows",
earlyOutput ? "(Early)" : "",
partitionToReturn, outBatchIndex[partitionToReturn], rowsInPartition);
}
rowsInPartition = 0; // reset to count for the next partition
// deallocate memory used by this partition, and re-initialize
reinitPartition(partitionToReturn);
if (earlyOutput) {
if (EXTRA_DEBUG_SPILL) {
logger.debug("HASH AGG: Finished (early) re-init partition {}, mem allocated: {}", earlyPartition, allocator.getAllocatedMemory());
}
outBatchIndex[earlyPartition] = 0; // reset, for next time
earlyOutput = false; // done with early output
}
else if (handleEmit) {
// When returning the last outgoing batch (following an incoming EMIT), then replace OK with EMIT
outcome = IterOutcome.EMIT;
handleEmit = false; // finished handling EMIT
outBatchIndex[partitionToReturn] = 0; // reset, for the next EMIT
return AggIterOutcome.AGG_EMIT;
}
else if ((partitionToReturn + 1 == spilledState.getNumPartitions()) && spilledState.isEmpty()) { // last partition ?
allFlushed = true; // next next() call will return NONE
logger.trace("HashAggregate: All batches flushed.");
// cleanup my internal state since there is nothing more to return
cleanup();
}
}
return AggIterOutcome.AGG_OK;
}