in tensorpipe/channel/cuda_basic/channel_impl.cc [117:229]
void ChannelImpl::advanceChunkSendOperation(
ChunkSendOpIter opIter,
ChunkSendOperation::State prevOpState) {
TP_DCHECK(context_->inLoop());
ChunkSendOperation& op = *opIter;
// Needs to go after previous op invoked its callback because the last chunk
// in a series (that corresponds to one operation) must invoke its callback
// only when all chunks in the series are done.
chunkSendOps_.attemptTransition(
opIter,
/*from=*/ChunkSendOperation::UNINITIALIZED,
/*to=*/ChunkSendOperation::FINISHED,
/*cond=*/error_ && prevOpState >= ChunkSendOperation::INVOKED_CALLBACK,
/*actions=*/{&ChannelImpl::callSendCallback});
// Needs to go after previous op to ensure predictable and consistent ordering
// of send calls on CPU channel.
// This transition shortcuts the allocation of/copy to staging memory when the
// buffer is already on CPU.
chunkSendOps_.attemptTransition(
opIter,
/*from=*/ChunkSendOperation::UNINITIALIZED,
/*to=*/ChunkSendOperation::SENDING_CPU_BUFFER,
/*cond=*/!error_ && op.isCpuBuffer &&
prevOpState >= ChunkSendOperation::SENDING_CPU_BUFFER,
/*actions=*/
{&ChannelImpl::writeReadyToSend, &ChannelImpl::sendCpuBuffer});
// Needs to go after previous op to ensure later operations are not holding
// staging buffers while earlier ones are still blocked waiting for them,
// because the staging buffer will only be returned to the allocator once the
// operation is destroyed, but this won't happen until earlier operations have
// completed, and if they are blocked waiting for buffers we may deadlock.
chunkSendOps_.attemptTransition(
opIter,
/*from=*/ChunkSendOperation::UNINITIALIZED,
/*to=*/ChunkSendOperation::ALLOCATING_CPU_BUFFER,
/*cond=*/!error_ && !op.isCpuBuffer &&
prevOpState >= ChunkSendOperation::ALLOCATING_CPU_BUFFER,
/*actions=*/{&ChannelImpl::allocateSendCpuBuffer});
// See above for why this needs to go after previous op.
chunkSendOps_.attemptTransition(
opIter,
/*from=*/ChunkSendOperation::ALLOCATING_CPU_BUFFER,
/*to=*/ChunkSendOperation::FINISHED,
/*cond=*/error_ && op.doneAllocatingCpuStagingBuffer &&
prevOpState >= ChunkSendOperation::INVOKED_CALLBACK,
/*actions=*/
{&ChannelImpl::callSendCallback, &ChannelImpl::returnSendCpuBuffer});
// Needs to go after previous op to ensure predictable and consistent ordering
// of write calls on the control connection.
chunkSendOps_.attemptTransition(
opIter,
/*from=*/ChunkSendOperation::ALLOCATING_CPU_BUFFER,
/*to=*/ChunkSendOperation::COPYING_FROM_GPU_TO_CPU,
/*cond=*/!error_ && op.doneAllocatingCpuStagingBuffer &&
prevOpState >= ChunkSendOperation::COPYING_FROM_GPU_TO_CPU,
/*actions=*/
{&ChannelImpl::writeReadyToSend, &ChannelImpl::copyFromGpuToCpu});
// See above for why this needs to go after previous op.
chunkSendOps_.attemptTransition(
opIter,
/*from=*/ChunkSendOperation::COPYING_FROM_GPU_TO_CPU,
/*to=*/ChunkSendOperation::FINISHED,
/*cond=*/error_ && op.doneCopyingFromGpuToCpu &&
prevOpState >= ChunkSendOperation::INVOKED_CALLBACK,
/*actions=*/
{&ChannelImpl::callSendCallback, &ChannelImpl::returnSendCpuBuffer});
// See above for why this needs to go after previous op.
chunkSendOps_.attemptTransition(
opIter,
/*from=*/ChunkSendOperation::COPYING_FROM_GPU_TO_CPU,
/*to=*/ChunkSendOperation::INVOKED_CALLBACK,
/*cond=*/!error_ && op.doneCopyingFromGpuToCpu &&
prevOpState >= ChunkSendOperation::INVOKED_CALLBACK,
/*actions=*/{&ChannelImpl::callSendCallback});
chunkSendOps_.attemptTransition(
opIter,
/*from=*/ChunkSendOperation::INVOKED_CALLBACK,
/*to=*/ChunkSendOperation::FINISHED,
/*cond=*/error_,
/*actions=*/{&ChannelImpl::returnSendCpuBuffer});
// Needs to go after previous op to ensure predictable and consistent ordering
// of send calls on CPU channel.
chunkSendOps_.attemptTransition(
opIter,
/*from=*/ChunkSendOperation::INVOKED_CALLBACK,
/*to=*/ChunkSendOperation::SENDING_CPU_BUFFER,
/*cond=*/!error_ && prevOpState >= ChunkSendOperation::SENDING_CPU_BUFFER,
/*actions=*/{&ChannelImpl::sendCpuBuffer});
chunkSendOps_.attemptTransition(
opIter,
/*from=*/ChunkSendOperation::SENDING_CPU_BUFFER,
/*to=*/ChunkSendOperation::FINISHED,
/*cond=*/op.doneSendingCpuBuffer && op.isCpuBuffer,
/*actions=*/{&ChannelImpl::callSendCallback});
chunkSendOps_.attemptTransition(
opIter,
/*from=*/ChunkSendOperation::SENDING_CPU_BUFFER,
/*to=*/ChunkSendOperation::FINISHED,
/*cond=*/op.doneSendingCpuBuffer && !op.isCpuBuffer,
/*actions=*/{&ChannelImpl::returnSendCpuBuffer});
}