public AggOutcome doWork()

in exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java [103:348]


  public AggOutcome doWork(IterOutcome outerOutcome) {
    if (done || outerOutcome == NONE) {
      outcome = IterOutcome.NONE;
      return AggOutcome.CLEANUP_AND_RETURN;
    }

    try { // outside block to ensure that first is set to false after the first run.
      outputCount = 0;
      // allocate outgoing since either this is the first time or if a subsequent time we would
      // have sent the previous outgoing batch to downstream operator
      allocateOutgoing();

      if (firstBatchForDataSet) {
        this.currentIndex = incoming.getRecordCount() == 0 ? Integer.MAX_VALUE : this.getVectorIndex(underlyingIndex);

        if (outerOutcome == OK_NEW_SCHEMA) {
          firstBatchForSchema = true;
        }
        // consume empty batches until we get one with data (unless we got an EMIT). If we got an emit
        // then this is the first batch, it was empty and we also got an emit.
        if (incoming.getRecordCount() == 0 ) {
          if (outerOutcome != EMIT) {
            outer:
            while (true) {
              IterOutcome out = outgoing.next(0, incoming);
              switch (out) {
                case OK_NEW_SCHEMA:
                  //lastOutcome = out;
                  firstBatchForSchema = true;
                case OK:
                  if (incoming.getRecordCount() == 0) {
                    continue;
                  } else {
                    currentIndex = this.getVectorIndex(underlyingIndex);
                    break outer;
                  }
                case EMIT:
                  outerOutcome = EMIT;
                  if (incoming.getRecordCount() == 0) {
                    // When we see an EMIT we let the  agg record batch know that it should either
                    // send out an EMIT or an OK_NEW_SCHEMA, followed by an EMIT. To do that we simply return
                    // RETURN_AND_RESET with the outcome so the record batch can take care of it.
                    return setOkAndReturnEmit();
                  } else {
                    currentIndex = this.getVectorIndex(underlyingIndex);
                    break outer;
                  }

                case NONE:
                  out = IterOutcome.OK_NEW_SCHEMA;
                default:
                  lastOutcome = out;
                  outcome = out;
                  done = true;
                  return AggOutcome.CLEANUP_AND_RETURN;
              } // switch (outcome)
            } // while empty batches are seen
          } else {
            return setOkAndReturnEmit();
          }
        }
      }

      if (newSchema) {
        return AggOutcome.UPDATE_AGGREGATOR;
      }

      // if the previous iteration has an outcome that was terminal, don't do anything.
      if (lastOutcome != null /*&& lastOutcome != IterOutcome.OK_NEW_SCHEMA*/) {
        outcome = lastOutcome;
        return AggOutcome.CLEANUP_AND_RETURN;
      }

      outside: while (true) {
        // loop through existing records, adding as necessary.
        if (!processRemainingRecordsInBatch()) {
          // output batch is full. Return.
          return setOkAndReturn(outerOutcome);
        }
        // if the current batch came with an EMIT, we're done since if we are here it means output batch consumed all
        // the rows in incoming batch
        if (outerOutcome == EMIT) {
          // output the last record
          outputToBatch(previousIndex);
          resetIndex();
          return setOkAndReturnEmit();
        }

        /**
         * Hold onto the previous incoming batch. When the incoming uses an
         * SV4, the InternalBatch DOES NOT clone or transfer the data. Instead,
         * it clones only the SV4, and assumes that the same hyper-list of
         * batches will be offered again after the next call to the incoming
         * next(). This is, in fact, how the SV4 works, so all is fine. The
         * trick to be aware of, however, is that this MUST BE TRUE even if
         * the incoming next() returns NONE: the incoming is obligated to continue
         * to offer the set of batches. That is, the incoming cannot try to be
         * tidy and release the batches one end-of-data or the following code
         * will fail, likely with an IndexOutOfBounds exception.
         */

        InternalBatch previous = new InternalBatch(incoming, context);

        try {
          while (true) {

            IterOutcome out = outgoing.next(0, incoming);
            if (EXTRA_DEBUG) {
              logger.debug("Received IterOutcome of {}", out);
            }
            switch (out) {
              case NONE:
                done = true;
                lastOutcome = out;
                if (firstBatchForDataSet && addedRecordCount == 0) {
                  return setOkAndReturn(NONE);
                } else if (addedRecordCount > 0) {
                  outputToBatchPrev(previous, previousIndex, outputCount); // No need to check the return value
                  // (output container full or not) as we are not going to insert any more records.
                  if (EXTRA_DEBUG) {
                    logger.debug("Received no more batches, returning.");
                  }
                  return setOkAndReturn(NONE);
                } else {
                  // not first batch and record Count == 0
                  outcome = out;
                  return AggOutcome.CLEANUP_AND_RETURN;
                }
                // EMIT is handled like OK, except that we do not loop back to process the
                // next incoming batch; we return instead
              case EMIT:
                if (incoming.getRecordCount() == 0) {
                  if (addedRecordCount > 0) {
                    outputToBatchPrev(previous, previousIndex, outputCount);
                  }
                } else {
                  resetIndex();
                  currentIndex = this.getVectorIndex(underlyingIndex);
                  if (previousIndex != -1 && isSamePrev(previousIndex, previous, currentIndex)) {
                    if (EXTRA_DEBUG) {
                      logger.debug("New value was same as last value of previous batch, adding.");
                    }
                    addRecordInc(currentIndex);
                    previousIndex = currentIndex;
                    incIndex();
                    if (EXTRA_DEBUG) {
                      logger.debug("Continuing outside");
                    }
                  } else { // not the same
                    if (EXTRA_DEBUG) {
                      logger.debug("This is not the same as the previous, add record and continue outside.");
                    }
                    if (addedRecordCount > 0) {
                      if (outputToBatchPrev(previous, previousIndex, outputCount)) {
                        if (EXTRA_DEBUG) {
                          logger.debug("Output container is full. flushing it.");
                        }
                        return setOkAndReturn(EMIT);
                      }
                    }
                    // important to set the previous index to -1 since we start a new group
                    previousIndex = -1;
                  }
                  if (!processRemainingRecordsInBatch()) {
                    // output batch is full. Return.
                    return setOkAndReturn(EMIT);
                  }
                  outputToBatch(previousIndex); // currentIndex has been reset to int_max so use previous index.
                }
                resetIndex();
                return setOkAndReturnEmit();

              case NOT_YET:
                this.outcome = out;
                return AggOutcome.RETURN_OUTCOME;

              case OK_NEW_SCHEMA:
                firstBatchForSchema = true;
                //lastOutcome = out;
                if (EXTRA_DEBUG) {
                  logger.debug("Received new schema.  Batch has {} records.", incoming.getRecordCount());
                }
                if (addedRecordCount > 0) {
                  outputToBatchPrev(previous, previousIndex, outputCount); // No need to check the return value
                  // (output container full or not) as we are not going to insert anymore records.
                  if (EXTRA_DEBUG) {
                    logger.debug("Wrote out end of previous batch, returning.");
                  }
                  newSchema = true;
                  return setOkAndReturn(OK_NEW_SCHEMA);
                }
                cleanup();
                return AggOutcome.UPDATE_AGGREGATOR;
              case OK:
                resetIndex();
                if (incoming.getRecordCount() == 0) {
                  continue;
                } else {
                  currentIndex = this.getVectorIndex(underlyingIndex);
                  if (previousIndex != -1 && isSamePrev(previousIndex, previous, currentIndex)) {
                    if (EXTRA_DEBUG) {
                      logger.debug("New value was same as last value of previous batch, adding.");
                    }
                    addRecordInc(currentIndex);
                    previousIndex = currentIndex;
                    incIndex();
                    if (EXTRA_DEBUG) {
                      logger.debug("Continuing outside");
                    }
                    continue outside;
                  } else { // not the same
                    if (EXTRA_DEBUG) {
                      logger.debug("This is not the same as the previous, add record and continue outside.");
                    }
                    if (addedRecordCount > 0) {
                      if (outputToBatchPrev(previous, previousIndex, outputCount)) {
                        if (EXTRA_DEBUG) {
                          logger.debug("Output container is full. flushing it.");
                        }
                        previousIndex = -1;
                        return setOkAndReturn(OK);
                      }
                    }
                    previousIndex = -1;
                    continue outside;
                  }
                }
              default:
                lastOutcome = out;
                outcome = out;
                return AggOutcome.CLEANUP_AND_RETURN;
            }
          }
        } finally {
          // make sure to clear previous
          if (previous != null) {
            previous.clear();
          }
        }
      }
    } finally {
      if (first) {
        first = false;
      }
    }
  }