in tensorpipe/core/pipe_impl.cc [761:835]
void PipeImpl::advanceWriteOperation(
WriteOpIter opIter,
WriteOperation::State prevOpState) {
TP_DCHECK(context_->inLoop());
WriteOperation& op = *opIter;
// Needs to go after previous op to ensure ordering of callback invocations.
writeOps_.attemptTransition(
opIter,
/*from=*/WriteOperation::UNINITIALIZED,
/*to=*/WriteOperation::FINISHED,
/*cond=*/error_ && prevOpState >= WriteOperation::FINISHED,
/*actions=*/{&PipeImpl::callWriteCallback});
// Needs to go after previous op to ensure predictable and consistent ordering
// of write calls on the connection and send calls on the channels.
// This transition shortcuts reading the target devices when they were all
// provided by the user.
writeOps_.attemptTransition(
opIter,
/*from=*/WriteOperation::UNINITIALIZED,
/*to=*/WriteOperation::WRITING_PAYLOADS_AND_SENDING_TENSORS,
/*cond=*/!error_ && state_ == ESTABLISHED &&
!op.hasMissingTargetDevices &&
prevOpState >= WriteOperation::WRITING_PAYLOADS_AND_SENDING_TENSORS,
/*actions=*/
{&PipeImpl::writeDescriptorOfMessage,
&PipeImpl::writePayloadsOfMessage,
&PipeImpl::sendTensorsOfMessage});
// Needs to go after previous op to ensure predictable and consistent ordering
// of write calls on the descriptor connection and read calls on the
// descriptor reply connection.
writeOps_.attemptTransition(
opIter,
/*from=*/WriteOperation::UNINITIALIZED,
/*to=*/WriteOperation::WRITING_PAYLOADS_AND_READING_TARGET_DEVICES,
/*cond=*/!error_ && state_ == ESTABLISHED && op.hasMissingTargetDevices &&
prevOpState >=
WriteOperation::WRITING_PAYLOADS_AND_READING_TARGET_DEVICES,
/*actions=*/
{&PipeImpl::writeDescriptorOfMessage,
&PipeImpl::writePayloadsOfMessage,
&PipeImpl::readDescriptorReplyOfMessage});
// Needs to go after previous op to ensure ordering of callback invocations.
writeOps_.attemptTransition(
opIter,
/*from=*/WriteOperation::WRITING_PAYLOADS_AND_READING_TARGET_DEVICES,
/*to=*/WriteOperation::FINISHED,
/*cond=*/error_ && op.numPayloadsBeingWritten == 0 &&
op.doneReadingDescriptorReply &&
prevOpState >= WriteOperation::FINISHED,
/*actions=*/{&PipeImpl::callWriteCallback});
// Needs to go after previous op to ensure predictable and consistent ordering
// of send calls on channels.
writeOps_.attemptTransition(
opIter,
/*from=*/WriteOperation::WRITING_PAYLOADS_AND_READING_TARGET_DEVICES,
/*to=*/WriteOperation::WRITING_PAYLOADS_AND_SENDING_TENSORS,
/*cond=*/!error_ && op.doneReadingDescriptorReply &&
prevOpState >= WriteOperation::WRITING_PAYLOADS_AND_SENDING_TENSORS,
/*actions=*/{&PipeImpl::sendTensorsOfMessage});
// Needs to go after previous op to ensure ordering of callback invocations.
writeOps_.attemptTransition(
opIter,
/*from=*/WriteOperation::WRITING_PAYLOADS_AND_SENDING_TENSORS,
/*to=*/WriteOperation::FINISHED,
/*cond=*/op.numPayloadsBeingWritten == 0 && op.numTensorsBeingSent == 0 &&
prevOpState >= WriteOperation::FINISHED,
/*actions=*/{&PipeImpl::callWriteCallback});
}