public IterOutcome innerNext()

in exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java [177:448]


  public IterOutcome innerNext() {
    if (fragProviders.length == 0) {
      return IterOutcome.NONE;
    }
    boolean schemaChanged = false;

    if (!outgoingBatchHasSpace) {
      logger.debug("Outgoing vectors were full on last iteration");
      allocateOutgoing();
      outgoingPosition = 0;
      outgoingBatchHasSpace = true;
    }

    if (!hasMoreIncoming) {
      logger.debug("next() was called after all values have been processed");
      outgoingPosition = 0;
      return IterOutcome.NONE;
    }

    List<UserBitShared.SerializedField> fieldList = null;
    boolean createDummyBatch = false;

    // lazy initialization
    if (!hasRun) {
      schemaChanged = true; // first iteration is always a schema change

      // set up each (non-empty) incoming record batch
      final List<RawFragmentBatch> rawBatches = Lists.newArrayList();
      try {
        int p = 0;
        for (@SuppressWarnings("unused") final RawFragmentBatchProvider provider : fragProviders) {
          RawFragmentBatch rawBatch;
          // check if there is a batch in temp holder before calling getNext(), as it may have been used when building schema
          if (tempBatchHolder[p] != null) {
            rawBatch = tempBatchHolder[p];
            tempBatchHolder[p] = null;
          } else {
            rawBatch = getNext(p);
          }
          checkContinue();

          // If rawBatch is null, go ahead and add it to the list. We will create dummy batches
          // for all null batches later.
          if (rawBatch == null) {
            checkContinue();
            createDummyBatch = true;
            rawBatches.add(rawBatch);
            p++; // move to next sender
            continue;
          }

          if (fieldList == null && rawBatch.getHeader().getDef().getFieldCount() != 0) {
            // save the schema to fix up empty batches with no schema if needed.
              fieldList = rawBatch.getHeader().getDef().getFieldList();
          }

          if (rawBatch.getHeader().getDef().getRecordCount() != 0) {
            rawBatches.add(rawBatch);
          } else {
            // keep reading till we get a batch with record count > 0 or we have no more batches to read i.e. we get null
            while ((rawBatch = getNext(p)) != null && rawBatch.getHeader().getDef().getRecordCount() == 0) {
              // Do nothing
            }
            if (rawBatch == null) {
              checkContinue();
              createDummyBatch = true;
            }
            if (rawBatch == null || rawBatch.getHeader().getDef().getFieldCount() == 0) {
              createDummyBatch = true;
            }
            // Even if rawBatch is null, go ahead and add it to the list.
            // We will create dummy batches for all null batches later.
            rawBatches.add(rawBatch);
          }
          p++;
        }

        // If no batch arrived with schema from any of the providers, just return NONE.
        if (fieldList == null) {
          return IterOutcome.NONE;
        }

        // Go through and fix schema for empty batches.
        if (createDummyBatch) {
          // Create dummy record batch definition with 0 record count
          UserBitShared.RecordBatchDef dummyDef = UserBitShared.RecordBatchDef.newBuilder()
              // we cannot use/modify the original field list as that is used by
              // valid record batch.
              // create a copy of field list with valuecount = 0 for all fields.
              // This is for dummy schema generation.
              .addAllField(createDummyFieldList(fieldList))
              .setRecordCount(0)
              .build();

          // Create dummy header
          BitData.FragmentRecordBatch dummyHeader = BitData.FragmentRecordBatch.newBuilder()
              .setIsLastBatch(true)
              .setDef(dummyDef)
              .build();

          for (int i = 0; i < p; i++) {
            RawFragmentBatch rawBatch = rawBatches.get(i);
            if (rawBatch == null || rawBatch.getHeader().getDef().getFieldCount() == 0) {
              rawBatch = new RawFragmentBatch(dummyHeader, null, null);
              rawBatches.set(i, rawBatch);
            }
          }
        }
      } catch (Throwable t) {
        clearBatches(rawBatches);
        throw t;
      }

      // allocate the incoming record batch loaders
      senderCount = rawBatches.size();
      incomingBatches = new RawFragmentBatch[senderCount];
      batchOffsets = new int[senderCount];
      batchLoaders = new RecordBatchLoader[senderCount];
      for (int i = 0; i < senderCount; ++i) {
        incomingBatches[i] = rawBatches.get(i);
        batchLoaders[i] = new RecordBatchLoader(oContext.getAllocator());
      }

      // after this point all batches have moved to incomingBatches
      rawBatches.clear();

      int i = 0;
      for (final RawFragmentBatch batch : incomingBatches) {
        // initialize the incoming batchLoaders
        final UserBitShared.RecordBatchDef rbd = batch.getHeader().getDef();
        batchLoaders[i].load(rbd, batch.getBody());
        batch.release();
        ++batchOffsets[i];
        ++i;
      }

      // after this point all batches have been released and their bytebuf are in batchLoaders

      // Ensure all the incoming batches have the identical schema.
      // Note: RecordBatchLoader permutes the columns to obtain the same columns order for all batches.
      checkSameSchemaAmongBatches(batchLoaders);

      // create the outgoing schema and vector container, and allocate the initial batch
      final SchemaBuilder bldr = BatchSchema.newBuilder().setSelectionVectorMode(BatchSchema.SelectionVectorMode.NONE);
      for (final VectorWrapper<?> v : batchLoaders[0]) {

        // add field to the output schema
        bldr.addField(v.getField());

        // allocate a new value vector
        container.addOrGet(v.getField());
      }
      allocateOutgoing();

      container.buildSchema(BatchSchema.SelectionVectorMode.NONE);

      // generate code for merge operations (copy and compare)
      merger = createMerger();

      // allocate the priority queue with the generated comparator
      this.pqueue = new PriorityQueue<>(fragProviders.length, new Comparator<Node>() {
        @Override
        public int compare(final Node node1, final Node node2) {
          final int leftIndex = (node1.batchId << 16) + node1.valueIndex;
          final int rightIndex = (node2.batchId << 16) + node2.valueIndex;
          try {
            return merger.doEval(leftIndex, rightIndex);
          } catch (SchemaChangeException e) {
            throw new UnsupportedOperationException(e);
          }
        }
      });

      // populate the priority queue with initial values
      for (int b = 0; b < senderCount; ++b) {
        while (batchLoaders[b] != null && batchLoaders[b].getRecordCount() == 0) {
          final RawFragmentBatch batch = getNext(b);
          incomingBatches[b] = batch;
          if (batch != null) {
            batchLoaders[b].load(batch.getHeader().getDef(), batch.getBody());
          } else {
            batchLoaders[b].clear();
            batchLoaders[b] = null;
            checkContinue();
          }
        }
        if (batchLoaders[b] != null) {
          pqueue.add(new Node(b, 0));
        }
      }

      hasRun = true;
      // finished lazy initialization
    }

    while (outgoingBatchHasSpace) {
      // poll next value from pq and copy to outgoing batch
      final Node node = pqueue.poll();
      if (node == null) {
        break;
      }
      outgoingBatchHasSpace = copyRecordToOutgoingBatch(node);

      if (node.valueIndex == batchLoaders[node.batchId].getRecordCount() - 1) {
        // reached the end of an incoming record batch
        RawFragmentBatch nextBatch;
        nextBatch = getNext(node.batchId);

        while (nextBatch != null && nextBatch.getHeader().getDef().getRecordCount() == 0) {
          nextBatch = getNext(node.batchId);
        }

        assert nextBatch != null || inputCounts[node.batchId] == outputCounts[node.batchId]
            : String.format("Stream %d input count: %d output count %d", node.batchId, inputCounts[node.batchId], outputCounts[node.batchId]);
        if (nextBatch == null) {
          checkContinue();
        }

        incomingBatches[node.batchId] = nextBatch;

        if (nextBatch == null) {
          // batch is empty
          boolean allBatchesEmpty = true;

          for (final RawFragmentBatch batch : incomingBatches) {
            // see if all batches are empty so we can return OK_* or NONE
            if (batch != null) {
              allBatchesEmpty = false;
              break;
            }
          }

          if (allBatchesEmpty) {
            hasMoreIncoming = false;
            break;
          }

          // this batch is empty; since the pqueue no longer references this batch, it will be
          // ignored in subsequent iterations.
          continue;
        }

        final UserBitShared.RecordBatchDef rbd = incomingBatches[node.batchId].getHeader().getDef();
        batchLoaders[node.batchId].load(rbd, incomingBatches[node.batchId].getBody());
        incomingBatches[node.batchId].release();
        batchOffsets[node.batchId] = 0;

        // add front value from batch[x] to priority queue
        if (batchLoaders[node.batchId].getRecordCount() != 0) {
          node.valueIndex = 0;
          pqueue.add(node);
        }
      } else {
        node.valueIndex++;
        pqueue.add(node);
      }
    }

    // set the value counts in the outgoing vectors
    container.setValueCount(outgoingPosition);

    if (pqueue.isEmpty()) {
      state = BatchState.DONE;
    }

    if (schemaChanged) {
      return IterOutcome.OK_NEW_SCHEMA;
    }
    else {
      return IterOutcome.OK;
    }
  }