in src/nccl_ofi_rdma.cpp [4222:4327]
static int flush(nccl_net_ofi_recv_comm_t *recv_comm, int n, void **buffers,
int *sizes, nccl_net_ofi_mr_handle_t **mhandles,
nccl_net_ofi_req_t **base_req)
{
int ret = 0;
int flush_n = 0;
bool network_busy = false;
nccl_net_ofi_rdma_ep_t *ep = NULL;
nccl_net_ofi_rdma_recv_comm_t *r_comm =
(nccl_net_ofi_rdma_recv_comm_t *)recv_comm;
nccl_net_ofi_rdma_req_t *req = NULL;
ssize_t rc = 0;
nccl_net_ofi_rdma_mr_handle_t **mr_handles = (nccl_net_ofi_rdma_mr_handle_t **)mhandles;
if (OFI_UNLIKELY(r_comm->num_inflight_reqs == NCCL_OFI_MAX_REQUESTS)) {
ret = -ENOSPC;
NCCL_OFI_WARN("Can not support more than %d inflight requests",
NCCL_OFI_MAX_REQUESTS);
goto error;
}
ep = (nccl_net_ofi_rdma_ep_t *)r_comm->base.base.ep;
assert(ep != NULL);
/* Process any pending requests */
network_busy = false;
rc = process_cq_if_pending(ep);
if (rc == -EAGAIN) {
/* Network is still busy. */
network_busy = true;
} else if (rc != 0) {
ret = rc;
goto error;
}
if (ofi_nccl_gdr_flush_disable() || support_gdr == GDR_UNSUPPORTED)
goto exit;
#if HAVE_CUDA
if (cuda_flush) {
ret = nccl_net_ofi_cuda_flush_gpudirect_rdma_writes();
if (ret != 0) {
NCCL_OFI_WARN("Error performing CUDA GDR flush");
}
goto exit;
}
#endif
/*
* Find the non-zero request for which we will issue flush.
* A single operation can flush all request at once.
*/
flush_n = -1;
for (int recv_n = 0; recv_n < n; recv_n++) {
if (sizes[recv_n] != 0) {
flush_n = recv_n;
break;
}
}
if (flush_n == -1) {
/*
* Flush is an expensive operation. So, don't send fi_read for
* 0-sized messages. Since, NCCL issues flush for every irecv(),
* we guarantee to sync data to GPU even without it.
*/
goto exit;
}
ret = rdma_comm_alloc_flush_req(r_comm, buffers[flush_n], mr_handles[flush_n], &req);
if (OFI_UNLIKELY(ret != 0)) {
goto error;
}
NCCL_OFI_TRACE_FLUSH(req, base_req);
if (!network_busy) {
rc = receive_progress(req, true);
if (OFI_UNLIKELY(rc != 0)) {
NCCL_OFI_WARN("Call to receive_progress failed: %zu", rc);
ret = rc;
goto error;
}
} else {
/* Add to pending reqs queue */
nccl_net_ofi_mutex_lock(&ep->pending_reqs_lock);
ep->pending_reqs_queue->push_back(req);
nccl_net_ofi_mutex_unlock(&ep->pending_reqs_lock);
ret = 0;
NCCL_OFI_TRACE_PENDING_INSERT(req);
}
(r_comm->num_inflight_reqs)++;
*base_req = &req->base;
return ret;
error:
if (req)
req->free(req, false);
exit:
*base_req = NULL;
return ret;
}