void PipeImpl::advanceReadOperation()

in tensorpipe/core/pipe_impl.cc [669:759]


void PipeImpl::advanceReadOperation(
    ReadOpIter opIter,
    ReadOperation::State prevOpState) {
  TP_DCHECK(context_->inLoop());

  ReadOperation& op = *opIter;

  // Needs to go after previous op to ensure ordering of callback invocations.
  readOps_.attemptTransition(
      opIter,
      /*from=*/ReadOperation::UNINITIALIZED,
      /*to=*/ReadOperation::ASKING_FOR_ALLOCATION,
      /*cond=*/error_ && prevOpState >= ReadOperation::ASKING_FOR_ALLOCATION,
      /*actions=*/{&PipeImpl::callReadDescriptorCallback});

  // The ordering on the "wire" (the primary connection) is descriptor of op N,
  // then payloads of op N, then descriptor of op N+1. Hence this transition
  // must happen after the previous op scheduled its payload read, not just its
  // descriptor read.
  readOps_.attemptTransition(
      opIter,
      /*from=*/ReadOperation::UNINITIALIZED,
      /*to=*/ReadOperation::READING_DESCRIPTOR,
      /*cond=*/!error_ && state_ == ESTABLISHED &&
          prevOpState >= ReadOperation::READING_PAYLOADS_AND_RECEIVING_TENSORS,
      /*actions=*/{&PipeImpl::readDescriptorOfMessage});

  // Needs to go after previous op to ensure ordering of callback invocations.
  readOps_.attemptTransition(
      opIter,
      /*from=*/ReadOperation::READING_DESCRIPTOR,
      /*to=*/ReadOperation::ASKING_FOR_ALLOCATION,
      /*cond=*/op.doneReadingDescriptor &&
          prevOpState >= ReadOperation::ASKING_FOR_ALLOCATION,
      /*actions=*/{&PipeImpl::callReadDescriptorCallback});

  // Needs to wait for previous op to have _received_ the read call, as we can
  // only have exactly one operation at a time for which we expect a read call.
  readOps_.attemptTransition(
      opIter,
      /*from=*/ReadOperation::ASKING_FOR_ALLOCATION,
      /*to=*/ReadOperation::ASKING_FOR_ALLOCATION_FIRST_IN_LINE,
      /*cond=*/op.doneReadingDescriptor &&
          prevOpState >= ReadOperation::READING_PAYLOADS_AND_RECEIVING_TENSORS,
      /*actions=*/{&PipeImpl::expectReadCall});

  // Needs to go after previous op to ensure ordering of callback invocations.
  readOps_.attemptTransition(
      opIter,
      /*from=*/ReadOperation::ASKING_FOR_ALLOCATION_FIRST_IN_LINE,
      /*to=*/ReadOperation::FINISHED,
      /*cond=*/error_ && op.doneGettingAllocation &&
          prevOpState >= ReadOperation::FINISHED,
      /*actions=*/{&PipeImpl::callReadCallback});

  // No need to order this with the previous operation, since all it needs is
  // to come after this own op's descriptor read.
  // This transition shortcuts writing the descriptor reply when all target
  // devices were provided by the sender.
  readOps_.attemptTransition(
      opIter,
      /*from=*/ReadOperation::ASKING_FOR_ALLOCATION_FIRST_IN_LINE,
      /*to=*/ReadOperation::READING_PAYLOADS_AND_RECEIVING_TENSORS,
      /*cond=*/!error_ && op.doneGettingAllocation &&
          !op.hasMissingTargetDevices,
      /*actions=*/
      {&PipeImpl::readPayloadsOfMessage, &PipeImpl::receiveTensorsOfMessage});

  // No need to order this with the previous operation, since all it needs is
  // to come after this own op's descriptor read.
  readOps_.attemptTransition(
      opIter,
      /*from=*/ReadOperation::ASKING_FOR_ALLOCATION_FIRST_IN_LINE,
      /*to=*/ReadOperation::READING_PAYLOADS_AND_RECEIVING_TENSORS,
      /*cond=*/!error_ && op.doneGettingAllocation &&
          op.hasMissingTargetDevices,
      /*actions=*/
      {&PipeImpl::readPayloadsOfMessage,
       &PipeImpl::writeDescriptorReplyOfMessage,
       &PipeImpl::receiveTensorsOfMessage});

  // Needs to go after previous op to ensure ordering of callback invocations.
  readOps_.attemptTransition(
      opIter,
      /*from=*/ReadOperation::READING_PAYLOADS_AND_RECEIVING_TENSORS,
      /*to=*/ReadOperation::FINISHED,
      /*cond=*/op.numPayloadsBeingRead == 0 &&
          op.numTensorsBeingReceived == 0 &&
          prevOpState >= ReadOperation::FINISHED,
      /*actions=*/{&PipeImpl::callReadCallback});
}