public AggIterOutcome outputCurrentBatch()

in exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java [1060:1235]


  public AggIterOutcome outputCurrentBatch() {

    // Handle the case of an EMIT with an empty batch
    if (handleEmit && (batchHolders == null || batchHolders[0].size() == 0)) {
      lastBatchOutputCount = 0; // empty
      allocateOutgoing(0);
      outgoing.getContainer().setValueCount(0);

      // When returning the last outgoing batch (following an incoming EMIT), then replace OK with EMIT
      outcome = IterOutcome.EMIT;
      handleEmit = false; // finish handling EMIT
      if (outBatchIndex != null) {
        outBatchIndex[0] = 0; // reset, for the next EMIT
      }
      return AggIterOutcome.AGG_EMIT;
    }

    // when incoming was an empty batch, just finish up
    if (schema == null) {
      logger.trace("Incoming was empty; output is an empty batch.");
      outcome = IterOutcome.NONE; // no records were read
      allFlushed = true;
      return AggIterOutcome.AGG_NONE;
    }

    // Initialization (covers the case of early output)
    ArrayList<BatchHolder> currPartition = batchHolders[earlyPartition];
    int currOutBatchIndex = outBatchIndex[earlyPartition];
    int partitionToReturn = earlyPartition;

    if (!earlyOutput) {
      // Update the next partition to return (if needed)
      // skip fully returned (or spilled) partitions
      while (nextPartitionToReturn < spilledState.getNumPartitions()) {
        //
        // If this partition was spilled - spill the rest of it and skip it
        //
        if (isSpilled(nextPartitionToReturn)) {
          spillAPartition(nextPartitionToReturn); // spill the rest
          HashAggSpilledPartition sp = new HashAggSpilledPartition(
            spilledState.getCycle(),
            nextPartitionToReturn,
            originalPartition,
            spilledBatchesCount[nextPartitionToReturn],
            spillFiles[nextPartitionToReturn]);

          spilledState.addPartition(sp);

          reinitPartition(nextPartitionToReturn); // free the memory
          try {
            spillSet.close(writers[nextPartitionToReturn]);
          } catch (IOException ioe) {
            throw UserException.resourceError(ioe)
                .message("IO Error while closing output stream")
                .build(logger);
          }
          writers[nextPartitionToReturn] = null;
        }
        else {
          currPartition = batchHolders[nextPartitionToReturn];
          currOutBatchIndex = outBatchIndex[nextPartitionToReturn];
          // If curr batch (partition X index) is not empty - proceed to return it
          if (currOutBatchIndex < currPartition.size() && 0 != currPartition.get(currOutBatchIndex).getNumPendingOutput()) {
            break;
          }
        }
        nextPartitionToReturn++; // else check next partition
      }

      // if passed the last partition - either done or need to restart and read spilled partitions
      if (nextPartitionToReturn >= spilledState.getNumPartitions()) {
        // The following "if" is probably never used; due to a similar check at the end of this method
        if (spilledState.isEmpty()) { // and no spilled partitions
          allFlushed = true;
          outcome = IterOutcome.NONE;
          if (phase.is2nd() && spillSet.getWriteBytes() > 0) {
            stats.setLongStat(Metric.SPILL_MB, // update stats - total MB spilled
                (int) Math.round(spillSet.getWriteBytes() / 1024.0D / 1024.0));
          }
          return AggIterOutcome.AGG_NONE;  // then return NONE
        }
        // Else - there are still spilled partitions to process - pick one and handle just like a new incoming
        buildComplete = false; // go back and call doWork() again
        handlingSpills = true; // beginning to work on the spill files
        // pick a spilled partition; set a new incoming ...
        HashAggSpilledPartition sp = spilledState.getNextSpilledPartition();
        // Create a new "incoming" out of the spilled partition spill file
        newIncoming = new SpilledRecordBatch(sp.getSpillFile(), sp.getSpilledBatches(), context, schema, oContext, spillSet);
        originalPartition = sp.getOriginPartition(); // used for the filename
        logger.trace("Reading back spilled original partition {} as an incoming",originalPartition);
        // Initialize .... new incoming, new set of partitions
        try {
          initializeSetup(newIncoming);
        } catch (Exception e) {
          throw new RuntimeException(e);
        }

        spilledState.updateCycle(stats, sp, updater);
        return AggIterOutcome.AGG_RESTART;
      }

      partitionToReturn = nextPartitionToReturn;
    }

    // get the number of records in the batch holder that are pending output
    int numPendingOutput = currPartition.get(currOutBatchIndex).getNumPendingOutput();

    // The following accounting is for logging, metrics, etc.
    rowsInPartition += numPendingOutput;
    if (!handlingSpills) { rowsNotSpilled += numPendingOutput; }
    else { rowsSpilledReturned += numPendingOutput; }
    if (earlyOutput) { rowsReturnedEarly += numPendingOutput; }

    allocateOutgoing(numPendingOutput);

    currPartition.get(currOutBatchIndex).outputValues();
    int numOutputRecords = numPendingOutput;
    htables[partitionToReturn].outputKeys(currOutBatchIndex, outContainer, numPendingOutput);

    // set the value count for outgoing batch value vectors
    outgoing.getContainer().setValueCount(numOutputRecords);

    outgoing.getRecordBatchMemoryManager().updateOutgoingStats(numOutputRecords);
    RecordBatchStats.logRecordBatchStats(RecordBatchIOType.OUTPUT, outgoing, outgoing.getRecordBatchStatsContext());

    outcome = IterOutcome.OK;

    if (EXTRA_DEBUG_SPILL && phase.is2nd()) {
      logger.debug("So far returned {} + SpilledReturned {}  total {} (spilled {})",rowsNotSpilled,rowsSpilledReturned,
        rowsNotSpilled+rowsSpilledReturned,
        rowsSpilled);
    }

    lastBatchOutputCount = numOutputRecords;
    outBatchIndex[partitionToReturn]++;
    // if just flushed the last batch in the partition
    if (outBatchIndex[partitionToReturn] == currPartition.size()) {

      if (EXTRA_DEBUG_SPILL) {
        logger.debug("HashAggregate: {} Flushed partition {} with {} batches total {} rows",
            earlyOutput ? "(Early)" : "",
            partitionToReturn, outBatchIndex[partitionToReturn], rowsInPartition);
      }
      rowsInPartition = 0; // reset to count for the next partition

      // deallocate memory used by this partition, and re-initialize
      reinitPartition(partitionToReturn);

      if (earlyOutput) {

        if (EXTRA_DEBUG_SPILL) {
          logger.debug("HASH AGG: Finished (early) re-init partition {}, mem allocated: {}", earlyPartition, allocator.getAllocatedMemory());
        }
        outBatchIndex[earlyPartition] = 0; // reset, for next time
        earlyOutput = false; // done with early output
      }
      else if (handleEmit) {
        // When returning the last outgoing batch (following an incoming EMIT), then replace OK with EMIT
        outcome = IterOutcome.EMIT;
        handleEmit = false; // finished handling EMIT
        outBatchIndex[partitionToReturn] = 0; // reset, for the next EMIT
        return AggIterOutcome.AGG_EMIT;
      }
      else if ((partitionToReturn + 1 == spilledState.getNumPartitions()) && spilledState.isEmpty()) { // last partition ?

        allFlushed = true; // next next() call will return NONE

        logger.trace("HashAggregate: All batches flushed.");

        // cleanup my internal state since there is nothing more to return
        cleanup();
      }
    }

    return AggIterOutcome.AGG_OK;
  }