in src/nccl_ofi_rdma.cpp [1904:1958]
static int process_pending_reqs(nccl_net_ofi_rdma_ep_t *ep)
{
int rc = 0;
while (true) {
nccl_net_ofi_rdma_req_t *req = NULL;
nccl_net_ofi_mutex_lock(&ep->pending_reqs_lock);
if (!ep->pending_reqs_queue->empty()) {
req = ep->pending_reqs_queue->front();
ep->pending_reqs_queue->pop_front();
}
nccl_net_ofi_mutex_unlock(&ep->pending_reqs_lock);
if (req == NULL) { break; }
switch (req->type) {
case NCCL_OFI_RDMA_WRITE:
case NCCL_OFI_RDMA_SEND:
case NCCL_OFI_RDMA_CTRL_RX_BUFF:
case NCCL_OFI_RDMA_EAGER_RX_BUFF:
rc = send_progress(req);
break;
case NCCL_OFI_RDMA_READ:
case NCCL_OFI_RDMA_EAGER_COPY:
case NCCL_OFI_RDMA_SEND_CTRL:
case NCCL_OFI_RDMA_FLUSH:
rc = receive_progress(req, false);
break;
case NCCL_OFI_RDMA_RECV:
case NCCL_OFI_RDMA_RECV_SEGMS:
case NCCL_OFI_RDMA_SEND_CONN:
case NCCL_OFI_RDMA_SEND_CLOSE:
case NCCL_OFI_RDMA_RECV_CONN:
case NCCL_OFI_RDMA_RECV_CONN_RESP:
case NCCL_OFI_RDMA_SEND_CONN_RESP:
case NCCL_OFI_RDMA_INVALID_TYPE:
default:
NCCL_OFI_WARN("Unexpected type: %d", req->type);
return -EINVAL;
}
if ((rc != 0) && (rc != -FI_EAGAIN)) {
NCCL_OFI_WARN("Unable to post request; RC: %d", rc);
break;
} else if (rc == -FI_EAGAIN) {
/* Put the request in the front of the queue and try again later */
nccl_net_ofi_mutex_lock(&ep->pending_reqs_lock);
ep->pending_reqs_queue->push_front(req);
nccl_net_ofi_mutex_unlock(&ep->pending_reqs_lock);
rc = 0;
break;
}
NCCL_OFI_TRACE_PENDING_REMOVE(req);
}
return rc;
}