in tensorpipe/channel/cuda_basic/channel_impl.cc [380:506]
void ChannelImpl::advanceChunkRecvOperation(
ChunkRecvOpIter opIter,
ChunkRecvOperation::State prevOpState) {
TP_DCHECK(context_->inLoop());
ChunkRecvOperation& op = *opIter;
// Needs to go after previous op invoked its callback because the last chunk
// in a series (that corresponds to one operation) must invoke its callback
// only when all chunks in the series are done.
chunkRecvOps_.attemptTransition(
opIter,
/*from=*/ChunkRecvOperation::UNINITIALIZED,
/*to=*/ChunkRecvOperation::FINISHED,
/*cond=*/error_ &&
prevOpState >=
ChunkRecvOperation::COPYING_FROM_CPU_TO_GPU_AND_INVOKED_CALLBACK,
/*actions=*/{&ChannelImpl::callRecvCallback});
// Needs to go after previous op to ensure predictable and consistent ordering
// of read calls on control connection.
chunkRecvOps_.attemptTransition(
opIter,
/*from=*/ChunkRecvOperation::UNINITIALIZED,
/*to=*/ChunkRecvOperation::READING_READY_TO_SEND,
/*cond=*/!error_ &&
prevOpState >= ChunkRecvOperation::READING_READY_TO_SEND,
/*actions=*/{&ChannelImpl::readReadyToSend});
// See above for why this needs to go after previous op.
chunkRecvOps_.attemptTransition(
opIter,
/*from=*/ChunkRecvOperation::READING_READY_TO_SEND,
/*to=*/ChunkRecvOperation::FINISHED,
/*cond=*/error_ && op.doneReadingReadyToSend &&
prevOpState >=
ChunkRecvOperation::COPYING_FROM_CPU_TO_GPU_AND_INVOKED_CALLBACK,
/*actions=*/{&ChannelImpl::callRecvCallback});
// Needs to go after previous op to ensure predictable and consistent ordering
// of recv calls on CPU channel.
// This operation shortcuts allocating staging memory when receiving directly
// on CPU.
chunkRecvOps_.attemptTransition(
opIter,
/*from=*/ChunkRecvOperation::READING_READY_TO_SEND,
/*to=*/ChunkRecvOperation::RECEIVING_CPU_BUFFER,
/*cond=*/!error_ && op.doneReadingReadyToSend && op.isCpuBuffer &&
prevOpState >= ChunkRecvOperation::RECEIVING_CPU_BUFFER,
/*actions=*/{&ChannelImpl::receiveCpuBuffer});
// Needs to go after previous op to ensure later operations are not holding
// staging buffers while earlier ones are still blocked waiting for them,
// because the staging buffer will only be returned to the allocator once the
// operation is destroyed, but this won't happen until earlier operations have
// completed, and if they are blocked waiting for buffers we may deadlock.
chunkRecvOps_.attemptTransition(
opIter,
/*from=*/ChunkRecvOperation::READING_READY_TO_SEND,
/*to=*/ChunkRecvOperation::ALLOCATING_CPU_BUFFER,
/*cond=*/!error_ && op.doneReadingReadyToSend && !op.isCpuBuffer &&
prevOpState >= ChunkRecvOperation::ALLOCATING_CPU_BUFFER,
/*actions=*/{&ChannelImpl::allocateRecvCpuBuffer});
// See above for why this needs to go after previous op.
chunkRecvOps_.attemptTransition(
opIter,
/*from=*/ChunkRecvOperation::ALLOCATING_CPU_BUFFER,
/*to=*/ChunkRecvOperation::FINISHED,
/*cond=*/error_ && op.doneAllocatingCpuStagingBuffer &&
prevOpState >=
ChunkRecvOperation::COPYING_FROM_CPU_TO_GPU_AND_INVOKED_CALLBACK,
/*actions=*/
{&ChannelImpl::callRecvCallback, &ChannelImpl::returnRecvCpuBuffer});
// Needs to go after previous op to ensure predictable and consistent ordering
// of recv calls on CPU channel.
chunkRecvOps_.attemptTransition(
opIter,
/*from=*/ChunkRecvOperation::ALLOCATING_CPU_BUFFER,
/*to=*/ChunkRecvOperation::RECEIVING_CPU_BUFFER,
/*cond=*/!error_ && op.doneAllocatingCpuStagingBuffer &&
prevOpState >= ChunkRecvOperation::RECEIVING_CPU_BUFFER,
/*actions=*/{&ChannelImpl::receiveCpuBuffer});
// See above for why this needs to go after previous op.
chunkRecvOps_.attemptTransition(
opIter,
/*from=*/ChunkRecvOperation::RECEIVING_CPU_BUFFER,
/*to=*/ChunkRecvOperation::FINISHED,
/*cond=*/error_ && op.doneReceivingCpuBuffer && !op.isCpuBuffer &&
prevOpState >=
ChunkRecvOperation::COPYING_FROM_CPU_TO_GPU_AND_INVOKED_CALLBACK,
/*actions=*/
{&ChannelImpl::callRecvCallback, &ChannelImpl::returnRecvCpuBuffer});
// This transition shortcuts the copy to GPU when receiving on CPU memory.
chunkRecvOps_.attemptTransition(
opIter,
/*from=*/ChunkRecvOperation::RECEIVING_CPU_BUFFER,
/*to=*/ChunkRecvOperation::FINISHED,
/*cond=*/op.doneReceivingCpuBuffer && op.isCpuBuffer,
/*actions=*/{&ChannelImpl::callRecvCallback});
chunkRecvOps_.attemptTransition(
opIter,
/*from=*/ChunkRecvOperation::RECEIVING_CPU_BUFFER,
/*to=*/ChunkRecvOperation::COPYING_FROM_CPU_TO_GPU,
/*cond=*/!error_ && op.doneReceivingCpuBuffer && !op.isCpuBuffer,
/*actions=*/{&ChannelImpl::copyFromCpuToGpu});
// See above for why this needs to go after previous op.
chunkRecvOps_.attemptTransition(
opIter,
/*from=*/ChunkRecvOperation::COPYING_FROM_CPU_TO_GPU,
/*to=*/ChunkRecvOperation::COPYING_FROM_CPU_TO_GPU_AND_INVOKED_CALLBACK,
/*cond=*/prevOpState >=
ChunkRecvOperation::COPYING_FROM_CPU_TO_GPU_AND_INVOKED_CALLBACK,
/*actions=*/{&ChannelImpl::callRecvCallback});
chunkRecvOps_.attemptTransition(
opIter,
/*from=*/ChunkRecvOperation::COPYING_FROM_CPU_TO_GPU_AND_INVOKED_CALLBACK,
/*to=*/ChunkRecvOperation::FINISHED,
/*cond=*/op.doneCopyingFromCpuToGpu,
/*actions=*/{&ChannelImpl::returnRecvCpuBuffer});
}