public IterOutcome innerNext()

in exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java [187:362]


  public IterOutcome innerNext() {

    // if a special batch has been sent, we have no data in the incoming so exit early
    if (done || specialBatchSent) {
      assert (!sendEmit); // if special batch sent with emit then flag will not be set
      return NONE;
    }

    // We sent an OK_NEW_SCHEMA and also encountered the end of a data set. So we need to send
    // an EMIT with an empty batch now
    if (sendEmit) {
      first = false; // first is set only in the case when we see a NONE after an empty first (and only) batch
      sendEmit = false;
      firstBatchForDataSet = true;
      firstBatchForSchema = false;
      recordCount = 0;
      container.setEmpty();
      specialBatchSent = false;
      return EMIT;
    }

    // this is only called on the first batch. Beyond this, the aggregator manages batches.
    if (aggregator == null || first) {
      if (first && incoming.getRecordCount() > 0) {
        first = false;
        lastKnownOutcome = OK_NEW_SCHEMA;
      } else {
        lastKnownOutcome = next(incoming);
      }
      logger.debug("Next outcome of {}", lastKnownOutcome);
      switch (lastKnownOutcome) {
        case NONE:

          if (first && getKeyExpressions().size() == 0) {
            // if we have a straight aggregate and empty input batch, we need to handle it in a different way
            // We want to produce the special batch only if we got a NONE as the first outcome after
            // OK_NEW_SCHEMA. If we get a NONE immediately after we see an EMIT, then we have already handled
            // the case of the empty batch
            constructSpecialBatch();
            // set state to indicate the fact that we have sent a special batch and input is empty
            specialBatchSent = true;
            // If outcome is NONE then we send the special batch in the first iteration and the NONE
            // outcome in the next iteration. If outcome is EMIT, we can send the special
            // batch and the EMIT outcome at the same time.
            return IterOutcome.OK;
          }
          // else fall thru
        case NOT_YET:
        case OK_NEW_SCHEMA:
          createAggregator();
          firstBatchForSchema = true;
          break;
        case EMIT:
          // if we get an EMIT with an empty batch as the first (and therefore only) batch
          // we have to do the special handling
          if (firstBatchForDataSet && getKeyExpressions().size() == 0 && incoming.getRecordCount() == 0) {
            constructSpecialBatch();
            // If outcome is NONE then we send the special batch in the first iteration and the NONE
            // outcome in the next iteration. If outcome is EMIT, we can send the special
            // batch and the EMIT outcome at the same time. (unless the finalOutcome is OK_NEW_SCHEMA)
            return  getFinalOutcome();
          }
          // else fall thru
        case OK:
          break;
        default:
          throw new IllegalStateException(String.format("unknown outcome %s", lastKnownOutcome));
      }
    } else {
      // If this is not the first batch and previous batch is fully processed with no error condition or NONE is not
      // seen then it will call next() on upstream to get new batch. Otherwise just process the previous incoming batch
      if ( lastKnownOutcome != NONE && firstBatchForDataSet && !aggregator.isDone()
        && aggregator.previousBatchProcessed()) {
        lastKnownOutcome = incoming.next();
        if (!first ) {
          // Setup needs to be called again. During setup, generated code saves a reference to the vectors
          // pointed to by the incoming batch so that the de-referencing of the vector wrappers to get to
          // the vectors  does not have to be done at each call to eval. However, after an EMIT is seen,
          // the vectors are replaced and the reference to the old vectors is no longer valid
          try {
            aggregator.setup(oContext, incoming, this, maxOutputRowCount);
          } catch (SchemaChangeException e) {
            UserException.Builder exceptionBuilder = UserException.functionError(e)
                .message("A Schema change exception occured in calling setup() in generated code.");
            throw exceptionBuilder.build(logger);
          }
        }
      }
    }
    AggOutcome aggOutcome = aggregator.doWork(lastKnownOutcome);
    recordCount = aggregator.getOutputCount();
    container.setRecordCount(recordCount);
    logger.debug("Aggregator response {}, records {}", aggOutcome, aggregator.getOutputCount());
    // get the returned IterOutcome from aggregator and based on AggOutcome and returned IterOutcome update the
    // lastKnownOutcome below. For example: if AggOutcome is RETURN_AND_RESET then lastKnownOutcome is always set to
    // EMIT
    IterOutcome returnOutcome = aggregator.getOutcome();
    switch (aggOutcome) {
      case CLEANUP_AND_RETURN:
        if (!first) {
          container.zeroVectors();
        }
        done = true;
        ExternalSortBatch.releaseBatches(incoming);
        return returnOutcome;
      case RETURN_AND_RESET:
        // We could have got a string of batches, all empty, until we hit an emit
        if (firstBatchForDataSet && getKeyExpressions().size() == 0 && recordCount == 0) {
          // if we have a straight aggregate and empty input batch, we need to handle it in a different way
          constructSpecialBatch();
          // If outcome is NONE then we send the special batch in the first iteration and the NONE
          // outcome in the next iteration. If outcome is EMIT, we can send the special
          // batch and the EMIT outcome at the same time.
          return getFinalOutcome();
        }
        firstBatchForDataSet = true;
        firstBatchForSchema = false;
        if(first) {
          first = false;
        }
        // Since AggOutcome is RETURN_AND_RESET and returned IterOutcome is OK_NEW_SCHEMA from Aggregator that means it
        // has seen first batch with OK_NEW_SCHEMA and then last batch with EMIT outcome. In that case if all the input
        // batch is processed to produce output batch it need to send and empty batch with EMIT outcome in subsequent
        // next call.
        if(returnOutcome == OK_NEW_SCHEMA) {
          sendEmit = (aggregator == null) || aggregator.previousBatchProcessed();
        }
        // Release external sort batches after EMIT is seen
        ExternalSortBatch.releaseBatches(incoming);
        lastKnownOutcome = EMIT;
        return returnOutcome;
      case RETURN_OUTCOME:
        // In case of complex writer expression, vectors would be added to batch run-time.
        // We have to re-build the schema.
        if (complexWriters != null) {
          container.buildSchema(SelectionVectorMode.NONE);
        }
        if (returnOutcome == IterOutcome.NONE ) {
          lastKnownOutcome = NONE;
          // we will set the 'done' flag in the next call to innerNext and use the lastKnownOutcome
          // to determine whether we should set the flag or not.
          // This is so that if someone calls getRecordCount in between calls to innerNext, we will
          // return the correct record count (if the done flag is set, we will return 0).
          if (first) {
            first = false;
            return OK_NEW_SCHEMA;
          } else {
            return OK;
          }
        } else if (returnOutcome == OK && first) {
          lastKnownOutcome = OK_NEW_SCHEMA;
          returnOutcome = OK_NEW_SCHEMA;
        }
        first = false;
        return returnOutcome;
      case UPDATE_AGGREGATOR:
        // We could get this either between data sets or within a data set.
        // If the former, we can handle the change and so need to update the aggregator and
        // continue. If the latter, we cannot (currently) handle the schema change, so throw
        // and exception
        // This case is not tested since there are no unit tests for this and there is no support
        // from the sort operator for this case
        if (returnOutcome == EMIT) {
          createAggregator();
          lastKnownOutcome = EMIT;
          return OK_NEW_SCHEMA;
        } else {
          throw UserException.schemaChangeError(SchemaChangeException.schemaChanged(
                  "Streaming aggregate does not support schema changes", incomingSchema,
                  incoming.getSchema()))
              .build(logger);
        }
      default:
        throw new IllegalStateException(String.format("Unknown state %s.", aggOutcome));
    }
  }