public abstract void doSetup()

in exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java [285:491]


  public abstract void doSetup(@Named("context") FragmentContext context,
                               @Named("incoming") RecordBatch incoming,
                               @Named("outgoing") OutgoingRecordBatch[] outgoing)
                       throws SchemaChangeException;
  public abstract int doEval(@Named("inIndex") int inIndex) throws SchemaChangeException;

  public class OutgoingRecordBatch implements PartitionOutgoingBatch, VectorAccessible {

    private final AccountingDataTunnel tunnel;
    private final HashPartitionSender operator;
    private final FragmentContext context;
    private final VectorContainer vectorContainer;
    private final int oppositeMinorFragmentId;
    private final OperatorStats stats;

    private boolean isLast;
    private boolean dropAll;
    private int recordCount;
    private int totalRecords;

    public OutgoingRecordBatch(OperatorStats stats, HashPartitionSender operator, AccountingDataTunnel tunnel,
                               FragmentContext context, BufferAllocator allocator, int oppositeMinorFragmentId) {
      this.context = context;
      this.operator = operator;
      this.tunnel = tunnel;
      this.stats = stats;
      this.oppositeMinorFragmentId = oppositeMinorFragmentId;
      this.vectorContainer = new VectorContainer(allocator);
    }

    protected void copy(int inIndex) throws IOException {
      try {
        doEval(inIndex, recordCount);
      } catch (SchemaChangeException e) {
        throw new UnsupportedOperationException(e);
      }
      recordCount++;
      totalRecords++;
      if (recordCount == outgoingRecordBatchSize) {
        flush(false);
      }
    }

    @Override
    public void terminate() {
      // receiver already terminated, don't send anything to it from now on
      dropAll = true;
    }

    @RuntimeOverridden
    protected void doSetup(@Named("incoming") RecordBatch incoming,
                           @Named("outgoing") VectorAccessible outgoing) throws SchemaChangeException { };

    @RuntimeOverridden
    protected void doEval(@Named("inIndex") int inIndex,
                          @Named("outIndex") int outIndex) throws SchemaChangeException { };

    public void flush(boolean schemaChanged) throws IOException {
      if (dropAll) {
        // If we are in dropAll mode, we still want to copy the data, because we
        // can't stop copying a single outgoing
        // batch with out stopping all outgoing batches. Other option is check
        // for status of dropAll before copying
        // every single record in copy method which has the overhead for every
        // record all the time. Resetting the output
        // count, reusing the same buffers and copying has overhead only for
        // outgoing batches whose receiver has
        // terminated.

        // Reset the count to 0 and use existing buffers for exhausting input where receiver of this batch is terminated
        recordCount = 0;
        return;
      }
      final FragmentHandle handle = context.getHandle();

      // We need to send the last batch when
      //   1. we are actually done processing the incoming RecordBatches and no more input available
      //   2. receiver wants to terminate (possible in case of queries involving limit clause). Even when receiver wants
      //      to terminate we need to send at least one batch with "isLastBatch" set to true, so that receiver knows
      //      sender has acknowledged the terminate request. After sending the last batch, all further batches are
      //      dropped.
      //   3. Partitioner thread is interrupted due to cancellation of fragment.
      final boolean isLastBatch = isLast || Thread.currentThread().isInterrupted();

      // if the batch is not the last batch and the current recordCount is zero, then no need to send any RecordBatches
      if (!isLastBatch && recordCount == 0) {
        return;
      }

      vectorContainer.setValueCount(recordCount);

      FragmentWritableBatch writableBatch = new FragmentWritableBatch(isLastBatch,
          handle.getQueryId(),
          handle.getMajorFragmentId(),
          handle.getMinorFragmentId(),
          operator.getOppositeMajorFragmentId(),
          oppositeMinorFragmentId,
          getWritableBatch());

      updateStats(writableBatch);
      stats.startWait();
      try {
        tunnel.sendRecordBatch(writableBatch);
      } finally {
        stats.stopWait();
      }

      // If the current batch is the last batch, then set a flag to ignore any
      // requests to flush the data
      // This is possible when the receiver is terminated, but we still get data
      // from input operator
      if (isLastBatch) {
        dropAll = true;
      }

      // If this flush is not due to schema change, allocate space for existing vectors.
      if (!schemaChanged) {
        // reset values and reallocate the buffer for each value vector based on the incoming batch.
        // NOTE: the value vector is directly referenced by generated code; therefore references
        // must remain valid.
        recordCount = 0;
        vectorContainer.zeroVectors();
        allocateOutgoingRecordBatch();
      }
    }

    private void allocateOutgoingRecordBatch() {
      vectorContainer.allocate(outgoingRecordBatchSize);
    }

    public void updateStats(FragmentWritableBatch writableBatch) {
      stats.addLongStat(Metric.BYTES_SENT, writableBatch.getByteCount());
      stats.addLongStat(Metric.BATCHES_SENT, 1);
      stats.addLongStat(Metric.RECORDS_SENT, writableBatch.getHeader().getDef().getRecordCount());
    }

    /**
     * Initialize the OutgoingBatch based on the current schema in incoming RecordBatch
     */
    public void initializeBatch() {
      vectorContainer.buildFrom(incoming.getSchema());
      allocateOutgoingRecordBatch();
      try {
        doSetup(incoming, vectorContainer);
      } catch (SchemaChangeException e) {
        throw new UnsupportedOperationException(e);
      }
    }

    public void resetBatch() {
      isLast = false;
      recordCount = 0;
      vectorContainer.clear();
    }

    public void setIsLast() {
      isLast = true;
    }

    @Override
    public BatchSchema getSchema() {
      return incoming.getSchema();
    }

    @Override
    public int getRecordCount() {
      return recordCount;
    }

    @Override
    public long getTotalRecords() {
      return totalRecords;
    }

    @Override
    public TypedFieldId getValueVectorId(SchemaPath path) {
      return vectorContainer.getValueVectorId(path);
    }

    @Override
    public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... fieldIds) {
      return vectorContainer.getValueAccessorById(clazz, fieldIds);
    }

    @Override
    public Iterator<VectorWrapper<?>> iterator() {
      return vectorContainer.iterator();
    }

    @Override
    public SelectionVector2 getSelectionVector2() {
      throw new UnsupportedOperationException();
    }

    @Override
    public SelectionVector4 getSelectionVector4() {
      throw new UnsupportedOperationException();
    }

    public WritableBatch getWritableBatch() {
      return WritableBatch.getBatchNoHVWrap(recordCount, this, false);
    }

    public void clear(){
      vectorContainer.clear();
    }
  }