in tensorpipe/core/pipe_impl.cc [669:759]
void PipeImpl::advanceReadOperation(
ReadOpIter opIter,
ReadOperation::State prevOpState) {
TP_DCHECK(context_->inLoop());
ReadOperation& op = *opIter;
// Needs to go after previous op to ensure ordering of callback invocations.
readOps_.attemptTransition(
opIter,
/*from=*/ReadOperation::UNINITIALIZED,
/*to=*/ReadOperation::ASKING_FOR_ALLOCATION,
/*cond=*/error_ && prevOpState >= ReadOperation::ASKING_FOR_ALLOCATION,
/*actions=*/{&PipeImpl::callReadDescriptorCallback});
// The ordering on the "wire" (the primary connection) is descriptor of op N,
// then payloads of op N, then descriptor of op N+1. Hence this transition
// must happen after the previous op scheduled its payload read, not just its
// descriptor read.
readOps_.attemptTransition(
opIter,
/*from=*/ReadOperation::UNINITIALIZED,
/*to=*/ReadOperation::READING_DESCRIPTOR,
/*cond=*/!error_ && state_ == ESTABLISHED &&
prevOpState >= ReadOperation::READING_PAYLOADS_AND_RECEIVING_TENSORS,
/*actions=*/{&PipeImpl::readDescriptorOfMessage});
// Needs to go after previous op to ensure ordering of callback invocations.
readOps_.attemptTransition(
opIter,
/*from=*/ReadOperation::READING_DESCRIPTOR,
/*to=*/ReadOperation::ASKING_FOR_ALLOCATION,
/*cond=*/op.doneReadingDescriptor &&
prevOpState >= ReadOperation::ASKING_FOR_ALLOCATION,
/*actions=*/{&PipeImpl::callReadDescriptorCallback});
// Needs to wait for previous op to have _received_ the read call, as we can
// only have exactly one operation at a time for which we expect a read call.
readOps_.attemptTransition(
opIter,
/*from=*/ReadOperation::ASKING_FOR_ALLOCATION,
/*to=*/ReadOperation::ASKING_FOR_ALLOCATION_FIRST_IN_LINE,
/*cond=*/op.doneReadingDescriptor &&
prevOpState >= ReadOperation::READING_PAYLOADS_AND_RECEIVING_TENSORS,
/*actions=*/{&PipeImpl::expectReadCall});
// Needs to go after previous op to ensure ordering of callback invocations.
readOps_.attemptTransition(
opIter,
/*from=*/ReadOperation::ASKING_FOR_ALLOCATION_FIRST_IN_LINE,
/*to=*/ReadOperation::FINISHED,
/*cond=*/error_ && op.doneGettingAllocation &&
prevOpState >= ReadOperation::FINISHED,
/*actions=*/{&PipeImpl::callReadCallback});
// No need to order this with the previous operation, since all it needs is
// to come after this own op's descriptor read.
// This transition shortcuts writing the descriptor reply when all target
// devices were provided by the sender.
readOps_.attemptTransition(
opIter,
/*from=*/ReadOperation::ASKING_FOR_ALLOCATION_FIRST_IN_LINE,
/*to=*/ReadOperation::READING_PAYLOADS_AND_RECEIVING_TENSORS,
/*cond=*/!error_ && op.doneGettingAllocation &&
!op.hasMissingTargetDevices,
/*actions=*/
{&PipeImpl::readPayloadsOfMessage, &PipeImpl::receiveTensorsOfMessage});
// No need to order this with the previous operation, since all it needs is
// to come after this own op's descriptor read.
readOps_.attemptTransition(
opIter,
/*from=*/ReadOperation::ASKING_FOR_ALLOCATION_FIRST_IN_LINE,
/*to=*/ReadOperation::READING_PAYLOADS_AND_RECEIVING_TENSORS,
/*cond=*/!error_ && op.doneGettingAllocation &&
op.hasMissingTargetDevices,
/*actions=*/
{&PipeImpl::readPayloadsOfMessage,
&PipeImpl::writeDescriptorReplyOfMessage,
&PipeImpl::receiveTensorsOfMessage});
// Needs to go after previous op to ensure ordering of callback invocations.
readOps_.attemptTransition(
opIter,
/*from=*/ReadOperation::READING_PAYLOADS_AND_RECEIVING_TENSORS,
/*to=*/ReadOperation::FINISHED,
/*cond=*/op.numPayloadsBeingRead == 0 &&
op.numTensorsBeingReceived == 0 &&
prevOpState >= ReadOperation::FINISHED,
/*actions=*/{&PipeImpl::callReadCallback});
}