in exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java [177:448]
public IterOutcome innerNext() {
if (fragProviders.length == 0) {
return IterOutcome.NONE;
}
boolean schemaChanged = false;
if (!outgoingBatchHasSpace) {
logger.debug("Outgoing vectors were full on last iteration");
allocateOutgoing();
outgoingPosition = 0;
outgoingBatchHasSpace = true;
}
if (!hasMoreIncoming) {
logger.debug("next() was called after all values have been processed");
outgoingPosition = 0;
return IterOutcome.NONE;
}
List<UserBitShared.SerializedField> fieldList = null;
boolean createDummyBatch = false;
// lazy initialization
if (!hasRun) {
schemaChanged = true; // first iteration is always a schema change
// set up each (non-empty) incoming record batch
final List<RawFragmentBatch> rawBatches = Lists.newArrayList();
try {
int p = 0;
for (@SuppressWarnings("unused") final RawFragmentBatchProvider provider : fragProviders) {
RawFragmentBatch rawBatch;
// check if there is a batch in temp holder before calling getNext(), as it may have been used when building schema
if (tempBatchHolder[p] != null) {
rawBatch = tempBatchHolder[p];
tempBatchHolder[p] = null;
} else {
rawBatch = getNext(p);
}
checkContinue();
// If rawBatch is null, go ahead and add it to the list. We will create dummy batches
// for all null batches later.
if (rawBatch == null) {
checkContinue();
createDummyBatch = true;
rawBatches.add(rawBatch);
p++; // move to next sender
continue;
}
if (fieldList == null && rawBatch.getHeader().getDef().getFieldCount() != 0) {
// save the schema to fix up empty batches with no schema if needed.
fieldList = rawBatch.getHeader().getDef().getFieldList();
}
if (rawBatch.getHeader().getDef().getRecordCount() != 0) {
rawBatches.add(rawBatch);
} else {
// keep reading till we get a batch with record count > 0 or we have no more batches to read i.e. we get null
while ((rawBatch = getNext(p)) != null && rawBatch.getHeader().getDef().getRecordCount() == 0) {
// Do nothing
}
if (rawBatch == null) {
checkContinue();
createDummyBatch = true;
}
if (rawBatch == null || rawBatch.getHeader().getDef().getFieldCount() == 0) {
createDummyBatch = true;
}
// Even if rawBatch is null, go ahead and add it to the list.
// We will create dummy batches for all null batches later.
rawBatches.add(rawBatch);
}
p++;
}
// If no batch arrived with schema from any of the providers, just return NONE.
if (fieldList == null) {
return IterOutcome.NONE;
}
// Go through and fix schema for empty batches.
if (createDummyBatch) {
// Create dummy record batch definition with 0 record count
UserBitShared.RecordBatchDef dummyDef = UserBitShared.RecordBatchDef.newBuilder()
// we cannot use/modify the original field list as that is used by
// valid record batch.
// create a copy of field list with valuecount = 0 for all fields.
// This is for dummy schema generation.
.addAllField(createDummyFieldList(fieldList))
.setRecordCount(0)
.build();
// Create dummy header
BitData.FragmentRecordBatch dummyHeader = BitData.FragmentRecordBatch.newBuilder()
.setIsLastBatch(true)
.setDef(dummyDef)
.build();
for (int i = 0; i < p; i++) {
RawFragmentBatch rawBatch = rawBatches.get(i);
if (rawBatch == null || rawBatch.getHeader().getDef().getFieldCount() == 0) {
rawBatch = new RawFragmentBatch(dummyHeader, null, null);
rawBatches.set(i, rawBatch);
}
}
}
} catch (Throwable t) {
clearBatches(rawBatches);
throw t;
}
// allocate the incoming record batch loaders
senderCount = rawBatches.size();
incomingBatches = new RawFragmentBatch[senderCount];
batchOffsets = new int[senderCount];
batchLoaders = new RecordBatchLoader[senderCount];
for (int i = 0; i < senderCount; ++i) {
incomingBatches[i] = rawBatches.get(i);
batchLoaders[i] = new RecordBatchLoader(oContext.getAllocator());
}
// after this point all batches have moved to incomingBatches
rawBatches.clear();
int i = 0;
for (final RawFragmentBatch batch : incomingBatches) {
// initialize the incoming batchLoaders
final UserBitShared.RecordBatchDef rbd = batch.getHeader().getDef();
batchLoaders[i].load(rbd, batch.getBody());
batch.release();
++batchOffsets[i];
++i;
}
// after this point all batches have been released and their bytebuf are in batchLoaders
// Ensure all the incoming batches have the identical schema.
// Note: RecordBatchLoader permutes the columns to obtain the same columns order for all batches.
checkSameSchemaAmongBatches(batchLoaders);
// create the outgoing schema and vector container, and allocate the initial batch
final SchemaBuilder bldr = BatchSchema.newBuilder().setSelectionVectorMode(BatchSchema.SelectionVectorMode.NONE);
for (final VectorWrapper<?> v : batchLoaders[0]) {
// add field to the output schema
bldr.addField(v.getField());
// allocate a new value vector
container.addOrGet(v.getField());
}
allocateOutgoing();
container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
// generate code for merge operations (copy and compare)
merger = createMerger();
// allocate the priority queue with the generated comparator
this.pqueue = new PriorityQueue<>(fragProviders.length, new Comparator<Node>() {
@Override
public int compare(final Node node1, final Node node2) {
final int leftIndex = (node1.batchId << 16) + node1.valueIndex;
final int rightIndex = (node2.batchId << 16) + node2.valueIndex;
try {
return merger.doEval(leftIndex, rightIndex);
} catch (SchemaChangeException e) {
throw new UnsupportedOperationException(e);
}
}
});
// populate the priority queue with initial values
for (int b = 0; b < senderCount; ++b) {
while (batchLoaders[b] != null && batchLoaders[b].getRecordCount() == 0) {
final RawFragmentBatch batch = getNext(b);
incomingBatches[b] = batch;
if (batch != null) {
batchLoaders[b].load(batch.getHeader().getDef(), batch.getBody());
} else {
batchLoaders[b].clear();
batchLoaders[b] = null;
checkContinue();
}
}
if (batchLoaders[b] != null) {
pqueue.add(new Node(b, 0));
}
}
hasRun = true;
// finished lazy initialization
}
while (outgoingBatchHasSpace) {
// poll next value from pq and copy to outgoing batch
final Node node = pqueue.poll();
if (node == null) {
break;
}
outgoingBatchHasSpace = copyRecordToOutgoingBatch(node);
if (node.valueIndex == batchLoaders[node.batchId].getRecordCount() - 1) {
// reached the end of an incoming record batch
RawFragmentBatch nextBatch;
nextBatch = getNext(node.batchId);
while (nextBatch != null && nextBatch.getHeader().getDef().getRecordCount() == 0) {
nextBatch = getNext(node.batchId);
}
assert nextBatch != null || inputCounts[node.batchId] == outputCounts[node.batchId]
: String.format("Stream %d input count: %d output count %d", node.batchId, inputCounts[node.batchId], outputCounts[node.batchId]);
if (nextBatch == null) {
checkContinue();
}
incomingBatches[node.batchId] = nextBatch;
if (nextBatch == null) {
// batch is empty
boolean allBatchesEmpty = true;
for (final RawFragmentBatch batch : incomingBatches) {
// see if all batches are empty so we can return OK_* or NONE
if (batch != null) {
allBatchesEmpty = false;
break;
}
}
if (allBatchesEmpty) {
hasMoreIncoming = false;
break;
}
// this batch is empty; since the pqueue no longer references this batch, it will be
// ignored in subsequent iterations.
continue;
}
final UserBitShared.RecordBatchDef rbd = incomingBatches[node.batchId].getHeader().getDef();
batchLoaders[node.batchId].load(rbd, incomingBatches[node.batchId].getBody());
incomingBatches[node.batchId].release();
batchOffsets[node.batchId] = 0;
// add front value from batch[x] to priority queue
if (batchLoaders[node.batchId].getRecordCount() != 0) {
node.valueIndex = 0;
pqueue.add(node);
}
} else {
node.valueIndex++;
pqueue.add(node);
}
}
// set the value counts in the outgoing vectors
container.setValueCount(outgoingPosition);
if (pqueue.isEmpty()) {
state = BatchState.DONE;
}
if (schemaChanged) {
return IterOutcome.OK_NEW_SCHEMA;
}
else {
return IterOutcome.OK;
}
}