void ChannelImpl::advanceChunkSendOperation()

in tensorpipe/channel/cuda_basic/channel_impl.cc [117:229]


void ChannelImpl::advanceChunkSendOperation(
    ChunkSendOpIter opIter,
    ChunkSendOperation::State prevOpState) {
  TP_DCHECK(context_->inLoop());

  ChunkSendOperation& op = *opIter;

  // Needs to go after previous op invoked its callback because the last chunk
  // in a series (that corresponds to one operation) must invoke its callback
  // only when all chunks in the series are done.
  chunkSendOps_.attemptTransition(
      opIter,
      /*from=*/ChunkSendOperation::UNINITIALIZED,
      /*to=*/ChunkSendOperation::FINISHED,
      /*cond=*/error_ && prevOpState >= ChunkSendOperation::INVOKED_CALLBACK,
      /*actions=*/{&ChannelImpl::callSendCallback});

  // Needs to go after previous op to ensure predictable and consistent ordering
  // of send calls on CPU channel.
  // This transition shortcuts the allocation of/copy to staging memory when the
  // buffer is already on CPU.
  chunkSendOps_.attemptTransition(
      opIter,
      /*from=*/ChunkSendOperation::UNINITIALIZED,
      /*to=*/ChunkSendOperation::SENDING_CPU_BUFFER,
      /*cond=*/!error_ && op.isCpuBuffer &&
          prevOpState >= ChunkSendOperation::SENDING_CPU_BUFFER,
      /*actions=*/
      {&ChannelImpl::writeReadyToSend, &ChannelImpl::sendCpuBuffer});

  // Needs to go after previous op to ensure later operations are not holding
  // staging buffers while earlier ones are still blocked waiting for them,
  // because the staging buffer will only be returned to the allocator once the
  // operation is destroyed, but this won't happen until earlier operations have
  // completed, and if they are blocked waiting for buffers we may deadlock.
  chunkSendOps_.attemptTransition(
      opIter,
      /*from=*/ChunkSendOperation::UNINITIALIZED,
      /*to=*/ChunkSendOperation::ALLOCATING_CPU_BUFFER,
      /*cond=*/!error_ && !op.isCpuBuffer &&
          prevOpState >= ChunkSendOperation::ALLOCATING_CPU_BUFFER,
      /*actions=*/{&ChannelImpl::allocateSendCpuBuffer});

  // See above for why this needs to go after previous op.
  chunkSendOps_.attemptTransition(
      opIter,
      /*from=*/ChunkSendOperation::ALLOCATING_CPU_BUFFER,
      /*to=*/ChunkSendOperation::FINISHED,
      /*cond=*/error_ && op.doneAllocatingCpuStagingBuffer &&
          prevOpState >= ChunkSendOperation::INVOKED_CALLBACK,
      /*actions=*/
      {&ChannelImpl::callSendCallback, &ChannelImpl::returnSendCpuBuffer});

  // Needs to go after previous op to ensure predictable and consistent ordering
  // of write calls on the control connection.
  chunkSendOps_.attemptTransition(
      opIter,
      /*from=*/ChunkSendOperation::ALLOCATING_CPU_BUFFER,
      /*to=*/ChunkSendOperation::COPYING_FROM_GPU_TO_CPU,
      /*cond=*/!error_ && op.doneAllocatingCpuStagingBuffer &&
          prevOpState >= ChunkSendOperation::COPYING_FROM_GPU_TO_CPU,
      /*actions=*/
      {&ChannelImpl::writeReadyToSend, &ChannelImpl::copyFromGpuToCpu});

  // See above for why this needs to go after previous op.
  chunkSendOps_.attemptTransition(
      opIter,
      /*from=*/ChunkSendOperation::COPYING_FROM_GPU_TO_CPU,
      /*to=*/ChunkSendOperation::FINISHED,
      /*cond=*/error_ && op.doneCopyingFromGpuToCpu &&
          prevOpState >= ChunkSendOperation::INVOKED_CALLBACK,
      /*actions=*/
      {&ChannelImpl::callSendCallback, &ChannelImpl::returnSendCpuBuffer});

  // See above for why this needs to go after previous op.
  chunkSendOps_.attemptTransition(
      opIter,
      /*from=*/ChunkSendOperation::COPYING_FROM_GPU_TO_CPU,
      /*to=*/ChunkSendOperation::INVOKED_CALLBACK,
      /*cond=*/!error_ && op.doneCopyingFromGpuToCpu &&
          prevOpState >= ChunkSendOperation::INVOKED_CALLBACK,
      /*actions=*/{&ChannelImpl::callSendCallback});

  chunkSendOps_.attemptTransition(
      opIter,
      /*from=*/ChunkSendOperation::INVOKED_CALLBACK,
      /*to=*/ChunkSendOperation::FINISHED,
      /*cond=*/error_,
      /*actions=*/{&ChannelImpl::returnSendCpuBuffer});

  // Needs to go after previous op to ensure predictable and consistent ordering
  // of send calls on CPU channel.
  chunkSendOps_.attemptTransition(
      opIter,
      /*from=*/ChunkSendOperation::INVOKED_CALLBACK,
      /*to=*/ChunkSendOperation::SENDING_CPU_BUFFER,
      /*cond=*/!error_ && prevOpState >= ChunkSendOperation::SENDING_CPU_BUFFER,
      /*actions=*/{&ChannelImpl::sendCpuBuffer});

  chunkSendOps_.attemptTransition(
      opIter,
      /*from=*/ChunkSendOperation::SENDING_CPU_BUFFER,
      /*to=*/ChunkSendOperation::FINISHED,
      /*cond=*/op.doneSendingCpuBuffer && op.isCpuBuffer,
      /*actions=*/{&ChannelImpl::callSendCallback});

  chunkSendOps_.attemptTransition(
      opIter,
      /*from=*/ChunkSendOperation::SENDING_CPU_BUFFER,
      /*to=*/ChunkSendOperation::FINISHED,
      /*cond=*/op.doneSendingCpuBuffer && !op.isCpuBuffer,
      /*actions=*/{&ChannelImpl::returnSendCpuBuffer});
}