in exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java [103:348]
public AggOutcome doWork(IterOutcome outerOutcome) {
if (done || outerOutcome == NONE) {
outcome = IterOutcome.NONE;
return AggOutcome.CLEANUP_AND_RETURN;
}
try { // outside block to ensure that first is set to false after the first run.
outputCount = 0;
// allocate outgoing since either this is the first time or if a subsequent time we would
// have sent the previous outgoing batch to downstream operator
allocateOutgoing();
if (firstBatchForDataSet) {
this.currentIndex = incoming.getRecordCount() == 0 ? Integer.MAX_VALUE : this.getVectorIndex(underlyingIndex);
if (outerOutcome == OK_NEW_SCHEMA) {
firstBatchForSchema = true;
}
// consume empty batches until we get one with data (unless we got an EMIT). If we got an emit
// then this is the first batch, it was empty and we also got an emit.
if (incoming.getRecordCount() == 0 ) {
if (outerOutcome != EMIT) {
outer:
while (true) {
IterOutcome out = outgoing.next(0, incoming);
switch (out) {
case OK_NEW_SCHEMA:
//lastOutcome = out;
firstBatchForSchema = true;
case OK:
if (incoming.getRecordCount() == 0) {
continue;
} else {
currentIndex = this.getVectorIndex(underlyingIndex);
break outer;
}
case EMIT:
outerOutcome = EMIT;
if (incoming.getRecordCount() == 0) {
// When we see an EMIT we let the agg record batch know that it should either
// send out an EMIT or an OK_NEW_SCHEMA, followed by an EMIT. To do that we simply return
// RETURN_AND_RESET with the outcome so the record batch can take care of it.
return setOkAndReturnEmit();
} else {
currentIndex = this.getVectorIndex(underlyingIndex);
break outer;
}
case NONE:
out = IterOutcome.OK_NEW_SCHEMA;
default:
lastOutcome = out;
outcome = out;
done = true;
return AggOutcome.CLEANUP_AND_RETURN;
} // switch (outcome)
} // while empty batches are seen
} else {
return setOkAndReturnEmit();
}
}
}
if (newSchema) {
return AggOutcome.UPDATE_AGGREGATOR;
}
// if the previous iteration has an outcome that was terminal, don't do anything.
if (lastOutcome != null /*&& lastOutcome != IterOutcome.OK_NEW_SCHEMA*/) {
outcome = lastOutcome;
return AggOutcome.CLEANUP_AND_RETURN;
}
outside: while (true) {
// loop through existing records, adding as necessary.
if (!processRemainingRecordsInBatch()) {
// output batch is full. Return.
return setOkAndReturn(outerOutcome);
}
// if the current batch came with an EMIT, we're done since if we are here it means output batch consumed all
// the rows in incoming batch
if (outerOutcome == EMIT) {
// output the last record
outputToBatch(previousIndex);
resetIndex();
return setOkAndReturnEmit();
}
/**
* Hold onto the previous incoming batch. When the incoming uses an
* SV4, the InternalBatch DOES NOT clone or transfer the data. Instead,
* it clones only the SV4, and assumes that the same hyper-list of
* batches will be offered again after the next call to the incoming
* next(). This is, in fact, how the SV4 works, so all is fine. The
* trick to be aware of, however, is that this MUST BE TRUE even if
* the incoming next() returns NONE: the incoming is obligated to continue
* to offer the set of batches. That is, the incoming cannot try to be
* tidy and release the batches one end-of-data or the following code
* will fail, likely with an IndexOutOfBounds exception.
*/
InternalBatch previous = new InternalBatch(incoming, context);
try {
while (true) {
IterOutcome out = outgoing.next(0, incoming);
if (EXTRA_DEBUG) {
logger.debug("Received IterOutcome of {}", out);
}
switch (out) {
case NONE:
done = true;
lastOutcome = out;
if (firstBatchForDataSet && addedRecordCount == 0) {
return setOkAndReturn(NONE);
} else if (addedRecordCount > 0) {
outputToBatchPrev(previous, previousIndex, outputCount); // No need to check the return value
// (output container full or not) as we are not going to insert any more records.
if (EXTRA_DEBUG) {
logger.debug("Received no more batches, returning.");
}
return setOkAndReturn(NONE);
} else {
// not first batch and record Count == 0
outcome = out;
return AggOutcome.CLEANUP_AND_RETURN;
}
// EMIT is handled like OK, except that we do not loop back to process the
// next incoming batch; we return instead
case EMIT:
if (incoming.getRecordCount() == 0) {
if (addedRecordCount > 0) {
outputToBatchPrev(previous, previousIndex, outputCount);
}
} else {
resetIndex();
currentIndex = this.getVectorIndex(underlyingIndex);
if (previousIndex != -1 && isSamePrev(previousIndex, previous, currentIndex)) {
if (EXTRA_DEBUG) {
logger.debug("New value was same as last value of previous batch, adding.");
}
addRecordInc(currentIndex);
previousIndex = currentIndex;
incIndex();
if (EXTRA_DEBUG) {
logger.debug("Continuing outside");
}
} else { // not the same
if (EXTRA_DEBUG) {
logger.debug("This is not the same as the previous, add record and continue outside.");
}
if (addedRecordCount > 0) {
if (outputToBatchPrev(previous, previousIndex, outputCount)) {
if (EXTRA_DEBUG) {
logger.debug("Output container is full. flushing it.");
}
return setOkAndReturn(EMIT);
}
}
// important to set the previous index to -1 since we start a new group
previousIndex = -1;
}
if (!processRemainingRecordsInBatch()) {
// output batch is full. Return.
return setOkAndReturn(EMIT);
}
outputToBatch(previousIndex); // currentIndex has been reset to int_max so use previous index.
}
resetIndex();
return setOkAndReturnEmit();
case NOT_YET:
this.outcome = out;
return AggOutcome.RETURN_OUTCOME;
case OK_NEW_SCHEMA:
firstBatchForSchema = true;
//lastOutcome = out;
if (EXTRA_DEBUG) {
logger.debug("Received new schema. Batch has {} records.", incoming.getRecordCount());
}
if (addedRecordCount > 0) {
outputToBatchPrev(previous, previousIndex, outputCount); // No need to check the return value
// (output container full or not) as we are not going to insert anymore records.
if (EXTRA_DEBUG) {
logger.debug("Wrote out end of previous batch, returning.");
}
newSchema = true;
return setOkAndReturn(OK_NEW_SCHEMA);
}
cleanup();
return AggOutcome.UPDATE_AGGREGATOR;
case OK:
resetIndex();
if (incoming.getRecordCount() == 0) {
continue;
} else {
currentIndex = this.getVectorIndex(underlyingIndex);
if (previousIndex != -1 && isSamePrev(previousIndex, previous, currentIndex)) {
if (EXTRA_DEBUG) {
logger.debug("New value was same as last value of previous batch, adding.");
}
addRecordInc(currentIndex);
previousIndex = currentIndex;
incIndex();
if (EXTRA_DEBUG) {
logger.debug("Continuing outside");
}
continue outside;
} else { // not the same
if (EXTRA_DEBUG) {
logger.debug("This is not the same as the previous, add record and continue outside.");
}
if (addedRecordCount > 0) {
if (outputToBatchPrev(previous, previousIndex, outputCount)) {
if (EXTRA_DEBUG) {
logger.debug("Output container is full. flushing it.");
}
previousIndex = -1;
return setOkAndReturn(OK);
}
}
previousIndex = -1;
continue outside;
}
}
default:
lastOutcome = out;
outcome = out;
return AggOutcome.CLEANUP_AND_RETURN;
}
}
} finally {
// make sure to clear previous
if (previous != null) {
previous.clear();
}
}
}
} finally {
if (first) {
first = false;
}
}
}