public IterOutcome innerNext()

in exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java [171:318]


  public IterOutcome innerNext() {
    recordCount = 0;
    if (state == BatchState.DONE) {
      return NONE;
    }

    // Check if anything is remaining from previous record boundary
    if (hasOutputRecords) {
      return handleRemainingOutput();
    }

    // Reset the TopN state for next iteration
    resetTopNState();

    boolean incomingHasSv2 = false;
    switch (incoming.getSchema().getSelectionVectorMode()) {
      case NONE: {
        break;
      }
      case TWO_BYTE: {
        incomingHasSv2 = true;
        break;
      }
      case FOUR_BYTE: {
        throw UserException.internalError(null)
          .message("TopN doesn't support incoming with SV4 mode")
          .build(logger);
      }
      default:
        throw new UnsupportedOperationException("Unsupported SV mode detected in TopN incoming batch");
    }

    outer: while (true) {
      Stopwatch watch = Stopwatch.createStarted();
      if (first) {
        lastKnownOutcome = IterOutcome.OK_NEW_SCHEMA;
        // Create the SV4 object upfront to be used for both empty and non-empty incoming batches at EMIT boundary
        sv4 = new SelectionVector4(context.getAllocator(), 0);
        first = false;
      } else {
        lastKnownOutcome = next(incoming);
      }
      if (lastKnownOutcome == OK && schema == null) {
        lastKnownOutcome = IterOutcome.OK_NEW_SCHEMA;
        container.clear();
      }
      logger.debug("Took {} us to get next", watch.elapsed(TimeUnit.MICROSECONDS));
      switch (lastKnownOutcome) {
      case NONE:
        break outer;
      case NOT_YET:
        throw new UnsupportedOperationException();
      case OK_NEW_SCHEMA:
        // only change in the case that the schema truly changes.  Artificial schema changes are ignored.
        // schema change handling in case when EMIT is also seen is same as without EMIT. i.e. only if union type
        // is enabled it will be handled.
        container.clear();
        firstBatchForSchema = true;
        if (!incoming.getSchema().equals(schema)) {
          if (schema != null) {
            if (!unionTypeEnabled) {
              throw new UnsupportedOperationException(String.format("TopN currently doesn't support changing " +
                "schemas with union type disabled. Please try enabling union type: %s and re-execute the query",
                ExecConstants.ENABLE_UNION_TYPE_KEY));
            } else {
              schema = SchemaUtil.mergeSchemas(this.schema, incoming.getSchema());
              purgeAndResetPriorityQueue();
              schemaChanged = true;
            }
          } else {
            schema = incoming.getSchema();
          }
        }
        // fall through.
      case OK:
      case EMIT:
        if (incoming.getRecordCount() == 0) {
          for (VectorWrapper<?> w : incoming) {
            w.clear();
          }
          // Release memory for incoming SV2 vector
          if (incomingHasSv2) {
            incoming.getSelectionVector2().clear();
          }
          break;
        }
        countSincePurge += incoming.getRecordCount();
        batchCount++;
        RecordBatchData batch;
        if (schemaChanged) {
          batch = new RecordBatchData(SchemaUtil.coerceContainer(incoming, this.schema, oContext), oContext.getAllocator());
        } else {
          batch = new RecordBatchData(incoming, oContext.getAllocator());
        }
        boolean success = false;
        try {
          if (priorityQueue == null) {
            priorityQueue = createNewPriorityQueue(new ExpandableHyperContainer(batch.getContainer()), config.getLimit());
          } else if (!priorityQueue.isInitialized()) {
            // means priority queue is cleaned up after producing output for first record boundary. We should
            // initialize it for next record boundary
            priorityQueue.init(config.getLimit(), oContext.getAllocator(),
              schema.getSelectionVectorMode() == SelectionVectorMode.TWO_BYTE);
          }
          priorityQueue.add(batch);
          // Based on static threshold of number of batches, perform purge operation to release the memory for
          // RecordBatches which are of no use or doesn't fall under TopN category
          if (countSincePurge > config.getLimit() && batchCount > batchPurgeThreshold) {
            purge();
            countSincePurge = 0;
            batchCount = 0;
          }
          success = true;
        } catch (SchemaChangeException e) {
          throw schemaChangeException(e, logger);
        } finally {
          if (!success) {
            batch.clear();
          }
        }
        break;
      default:
        throw new UnsupportedOperationException();
      }

      // If the last seen outcome is EMIT then break the loop. We do it here since we want to process the batch
      // with records and EMIT outcome in above case statements
      if (lastKnownOutcome == EMIT) {
        break;
      }
    }

    // PriorityQueue can be null here if first batch is received with OK_NEW_SCHEMA and is empty and second next()
    // call returned NONE or EMIT.
    // PriorityQueue can be uninitialized here if only empty batch is received between 2 EMIT outcome.
    if (schema == null || (priorityQueue == null || !priorityQueue.isInitialized())) {
      // builder may be null at this point if the first incoming batch is empty
      return handleEmptyBatches(lastKnownOutcome);
    }

    priorityQueue.generate();
    prepareOutputContainer(priorityQueue.getHyperBatch(), priorityQueue.getFinalSv4());

    // With EMIT outcome control will come here multiple times whereas without EMIT outcome control will only come
    // here once. In EMIT outcome case if there is schema change in any iteration then that will be handled by
    // lastKnownOutcome.
    return getFinalOutcome();
  }