in tensorpipe/channel/cuda_gdr/channel_impl.cc [410:476]
void ChannelImpl::advanceRecvOperation(
RecvOpIter opIter,
RecvOperation::State prevOpState) {
TP_DCHECK(context_->inLoop());
RecvOperation& op = *opIter;
recvOps_.attemptTransition(
opIter,
/*from=*/RecvOperation::UNINITIALIZED,
/*to=*/RecvOperation::FINISHED,
/*cond=*/error_ || op.length == 0,
/*actions=*/{&ChannelImpl::callRecvCallback});
// Needs to go after previous op to ensure predictable and consistent ordering
// of write calls on the descriptor control connection.
recvOps_.attemptTransition(
opIter,
/*from=*/RecvOperation::UNINITIALIZED,
/*to=*/RecvOperation::READING_DESCRIPTOR,
/*cond=*/!error_ && state_ == ESTABLISHED &&
prevOpState >= RecvOperation::READING_DESCRIPTOR,
/*actions=*/{&ChannelImpl::readDescriptor});
recvOps_.attemptTransition(
opIter,
/*from=*/RecvOperation::READING_DESCRIPTOR,
/*to=*/RecvOperation::FINISHED,
/*cond=*/error_ && op.doneReadingDescriptor,
/*actions=*/{&ChannelImpl::callRecvCallback});
// This doesn't strictly need to go after the previous op, but it doesn't make
// sense to busy poll multiple events if only one of them is actually able to
// then make progress.
recvOps_.attemptTransition(
opIter,
/*from=*/RecvOperation::READING_DESCRIPTOR,
/*to=*/RecvOperation::WAITING_FOR_CUDA_EVENT,
/*cond=*/!error_ && op.doneReadingDescriptor &&
prevOpState >= RecvOperation::RECEIVING_OVER_IB,
/*actions=*/{&ChannelImpl::waitForRecvCudaEvent});
recvOps_.attemptTransition(
opIter,
/*from=*/RecvOperation::WAITING_FOR_CUDA_EVENT,
/*to=*/RecvOperation::FINISHED,
/*cond=*/error_ && op.doneWaitingForCudaEvent,
/*actions=*/{&ChannelImpl::callRecvCallback});
// Needs to go after previous op to ensure predictable and consistent ordering
// of recv calls on InfiniBand queue pair and write calls on the completion
// control connection.
recvOps_.attemptTransition(
opIter,
/*from=*/RecvOperation::WAITING_FOR_CUDA_EVENT,
/*to=*/RecvOperation::RECEIVING_OVER_IB,
/*cond=*/!error_ && op.doneWaitingForCudaEvent &&
prevOpState >= RecvOperation::RECEIVING_OVER_IB,
/*actions=*/{&ChannelImpl::recvOverIbAndWriteReadyToRecive});
recvOps_.attemptTransition(
opIter,
/*from=*/RecvOperation::RECEIVING_OVER_IB,
/*to=*/RecvOperation::FINISHED,
/*cond=*/op.numChunksBeingReceived == 0,
/*actions=*/{&ChannelImpl::callRecvCallback});
}