void PipeImpl::advanceWriteOperation()

in tensorpipe/core/pipe_impl.cc [761:835]


void PipeImpl::advanceWriteOperation(
    WriteOpIter opIter,
    WriteOperation::State prevOpState) {
  TP_DCHECK(context_->inLoop());

  WriteOperation& op = *opIter;

  // Needs to go after previous op to ensure ordering of callback invocations.
  writeOps_.attemptTransition(
      opIter,
      /*from=*/WriteOperation::UNINITIALIZED,
      /*to=*/WriteOperation::FINISHED,
      /*cond=*/error_ && prevOpState >= WriteOperation::FINISHED,
      /*actions=*/{&PipeImpl::callWriteCallback});

  // Needs to go after previous op to ensure predictable and consistent ordering
  // of write calls on the connection and send calls on the channels.
  // This transition shortcuts reading the target devices when they were all
  // provided by the user.
  writeOps_.attemptTransition(
      opIter,
      /*from=*/WriteOperation::UNINITIALIZED,
      /*to=*/WriteOperation::WRITING_PAYLOADS_AND_SENDING_TENSORS,
      /*cond=*/!error_ && state_ == ESTABLISHED &&
          !op.hasMissingTargetDevices &&
          prevOpState >= WriteOperation::WRITING_PAYLOADS_AND_SENDING_TENSORS,
      /*actions=*/
      {&PipeImpl::writeDescriptorOfMessage,
       &PipeImpl::writePayloadsOfMessage,
       &PipeImpl::sendTensorsOfMessage});

  // Needs to go after previous op to ensure predictable and consistent ordering
  // of write calls on the descriptor connection and read calls on the
  // descriptor reply connection.
  writeOps_.attemptTransition(
      opIter,
      /*from=*/WriteOperation::UNINITIALIZED,
      /*to=*/WriteOperation::WRITING_PAYLOADS_AND_READING_TARGET_DEVICES,
      /*cond=*/!error_ && state_ == ESTABLISHED && op.hasMissingTargetDevices &&
          prevOpState >=
              WriteOperation::WRITING_PAYLOADS_AND_READING_TARGET_DEVICES,
      /*actions=*/
      {&PipeImpl::writeDescriptorOfMessage,
       &PipeImpl::writePayloadsOfMessage,
       &PipeImpl::readDescriptorReplyOfMessage});

  // Needs to go after previous op to ensure ordering of callback invocations.
  writeOps_.attemptTransition(
      opIter,
      /*from=*/WriteOperation::WRITING_PAYLOADS_AND_READING_TARGET_DEVICES,
      /*to=*/WriteOperation::FINISHED,
      /*cond=*/error_ && op.numPayloadsBeingWritten == 0 &&
          op.doneReadingDescriptorReply &&
          prevOpState >= WriteOperation::FINISHED,
      /*actions=*/{&PipeImpl::callWriteCallback});

  // Needs to go after previous op to ensure predictable and consistent ordering
  // of send calls on channels.
  writeOps_.attemptTransition(
      opIter,
      /*from=*/WriteOperation::WRITING_PAYLOADS_AND_READING_TARGET_DEVICES,
      /*to=*/WriteOperation::WRITING_PAYLOADS_AND_SENDING_TENSORS,
      /*cond=*/!error_ && op.doneReadingDescriptorReply &&
          prevOpState >= WriteOperation::WRITING_PAYLOADS_AND_SENDING_TENSORS,
      /*actions=*/{&PipeImpl::sendTensorsOfMessage});

  // Needs to go after previous op to ensure ordering of callback invocations.
  writeOps_.attemptTransition(
      opIter,
      /*from=*/WriteOperation::WRITING_PAYLOADS_AND_SENDING_TENSORS,
      /*to=*/WriteOperation::FINISHED,
      /*cond=*/op.numPayloadsBeingWritten == 0 && op.numTensorsBeingSent == 0 &&
          prevOpState >= WriteOperation::FINISHED,
      /*actions=*/{&PipeImpl::callWriteCallback});
}