in exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java [171:318]
public IterOutcome innerNext() {
recordCount = 0;
if (state == BatchState.DONE) {
return NONE;
}
// Check if anything is remaining from previous record boundary
if (hasOutputRecords) {
return handleRemainingOutput();
}
// Reset the TopN state for next iteration
resetTopNState();
boolean incomingHasSv2 = false;
switch (incoming.getSchema().getSelectionVectorMode()) {
case NONE: {
break;
}
case TWO_BYTE: {
incomingHasSv2 = true;
break;
}
case FOUR_BYTE: {
throw UserException.internalError(null)
.message("TopN doesn't support incoming with SV4 mode")
.build(logger);
}
default:
throw new UnsupportedOperationException("Unsupported SV mode detected in TopN incoming batch");
}
outer: while (true) {
Stopwatch watch = Stopwatch.createStarted();
if (first) {
lastKnownOutcome = IterOutcome.OK_NEW_SCHEMA;
// Create the SV4 object upfront to be used for both empty and non-empty incoming batches at EMIT boundary
sv4 = new SelectionVector4(context.getAllocator(), 0);
first = false;
} else {
lastKnownOutcome = next(incoming);
}
if (lastKnownOutcome == OK && schema == null) {
lastKnownOutcome = IterOutcome.OK_NEW_SCHEMA;
container.clear();
}
logger.debug("Took {} us to get next", watch.elapsed(TimeUnit.MICROSECONDS));
switch (lastKnownOutcome) {
case NONE:
break outer;
case NOT_YET:
throw new UnsupportedOperationException();
case OK_NEW_SCHEMA:
// only change in the case that the schema truly changes. Artificial schema changes are ignored.
// schema change handling in case when EMIT is also seen is same as without EMIT. i.e. only if union type
// is enabled it will be handled.
container.clear();
firstBatchForSchema = true;
if (!incoming.getSchema().equals(schema)) {
if (schema != null) {
if (!unionTypeEnabled) {
throw new UnsupportedOperationException(String.format("TopN currently doesn't support changing " +
"schemas with union type disabled. Please try enabling union type: %s and re-execute the query",
ExecConstants.ENABLE_UNION_TYPE_KEY));
} else {
schema = SchemaUtil.mergeSchemas(this.schema, incoming.getSchema());
purgeAndResetPriorityQueue();
schemaChanged = true;
}
} else {
schema = incoming.getSchema();
}
}
// fall through.
case OK:
case EMIT:
if (incoming.getRecordCount() == 0) {
for (VectorWrapper<?> w : incoming) {
w.clear();
}
// Release memory for incoming SV2 vector
if (incomingHasSv2) {
incoming.getSelectionVector2().clear();
}
break;
}
countSincePurge += incoming.getRecordCount();
batchCount++;
RecordBatchData batch;
if (schemaChanged) {
batch = new RecordBatchData(SchemaUtil.coerceContainer(incoming, this.schema, oContext), oContext.getAllocator());
} else {
batch = new RecordBatchData(incoming, oContext.getAllocator());
}
boolean success = false;
try {
if (priorityQueue == null) {
priorityQueue = createNewPriorityQueue(new ExpandableHyperContainer(batch.getContainer()), config.getLimit());
} else if (!priorityQueue.isInitialized()) {
// means priority queue is cleaned up after producing output for first record boundary. We should
// initialize it for next record boundary
priorityQueue.init(config.getLimit(), oContext.getAllocator(),
schema.getSelectionVectorMode() == SelectionVectorMode.TWO_BYTE);
}
priorityQueue.add(batch);
// Based on static threshold of number of batches, perform purge operation to release the memory for
// RecordBatches which are of no use or doesn't fall under TopN category
if (countSincePurge > config.getLimit() && batchCount > batchPurgeThreshold) {
purge();
countSincePurge = 0;
batchCount = 0;
}
success = true;
} catch (SchemaChangeException e) {
throw schemaChangeException(e, logger);
} finally {
if (!success) {
batch.clear();
}
}
break;
default:
throw new UnsupportedOperationException();
}
// If the last seen outcome is EMIT then break the loop. We do it here since we want to process the batch
// with records and EMIT outcome in above case statements
if (lastKnownOutcome == EMIT) {
break;
}
}
// PriorityQueue can be null here if first batch is received with OK_NEW_SCHEMA and is empty and second next()
// call returned NONE or EMIT.
// PriorityQueue can be uninitialized here if only empty batch is received between 2 EMIT outcome.
if (schema == null || (priorityQueue == null || !priorityQueue.isInitialized())) {
// builder may be null at this point if the first incoming batch is empty
return handleEmptyBatches(lastKnownOutcome);
}
priorityQueue.generate();
prepareOutputContainer(priorityQueue.getHyperBatch(), priorityQueue.getFinalSv4());
// With EMIT outcome control will come here multiple times whereas without EMIT outcome control will only come
// here once. In EMIT outcome case if there is schema change in any iteration then that will be handled by
// lastKnownOutcome.
return getFinalOutcome();
}