in src/nccl_ofi_net.c [1876:1978]
static ncclResult_t ofi_iflush(void* recvComm, void* data, int size,
void *mhandle, void **request)
{
ncclResult_t ret = ncclSuccess;
recvComm_t *rComm = (recvComm_t *)recvComm;
nccl_ofi_req_t *req = NULL;
ssize_t rc = 0;
struct fid_mr *mr_handle = (struct fid_mr *)mhandle;
uint64_t cuda_key = 0ULL;
void* desc = NULL;
if (ofi_nccl_gdr_flush_disable() || !support_gdr)
goto exit;
/* Validate recvComm */
if (OFI_UNLIKELY(rComm == NULL)) {
ret = ncclSystemError;
NCCL_OFI_WARN("Invalid recvComm provided");
goto exit;
}
if (size == 0) {
/*
* Flush is an expensive operation. So, don't send fi_read for
* 0-sized messages. Since, NCCL issues flush for every irecv(),
* we guarantee to sync data to GPU even without it.
*/
goto exit;
}
/* Support only NCCL_OFI_MAX_REQUESTS inflight requests. */
if (OFI_UNLIKELY(rComm->num_inflight_reqs == NCCL_OFI_MAX_REQUESTS)) {
ret = ncclSystemError;
NCCL_OFI_WARN("Can not support more than %d inflight requests",
NCCL_OFI_MAX_REQUESTS);
goto exit;
}
/* Allocate NCCL OFI request */
req = allocate_nccl_ofi_request(rComm->nccl_ofi_reqs_fl);
if (OFI_UNLIKELY(req == NULL)) {
ret = ncclSystemError;
NCCL_OFI_WARN("Unable to get NCCL OFI request for device %d",
rComm->dev);
goto exit;
}
req->rComm = rComm;
req->dev = rComm->dev;
req->direction = NCCL_OFI_RECV;
if (mr_handle != NULL) {
/* Extract remote key */
desc = fi_mr_desc(mr_handle);
cuda_key = fi_mr_key(mr_handle);
if (OFI_UNLIKELY(cuda_key == FI_KEY_NOTAVAIL)) {
ret = ncclSystemError;
NCCL_OFI_WARN("Memory registration may not have completed.");
goto error;
}
}
/* Issue RDMA read */
do {
rc = fi_read(rComm->local_ep, &rComm->flush_buff.host_buffer,
rComm->flush_buff.size,
desc,
rComm->local_ep_addr, (uint64_t)data,
cuda_key, &req->ctx);
if (rc == 0) {
break;
}
else if (rc == -FI_EAGAIN) {
/*
* Process completions so that you have enough
* resources for issuing fi_read
*/
ret = nccl_ofi_progress(nccl_ofi_component[rComm->dev]);
if (OFI_UNLIKELY(ret != ncclSuccess))
goto error;
}
else {
NCCL_OFI_WARN("Unable to issue read operation for dev %d. RC: %zd, ERROR: %s",
rComm->dev, rc, fi_strerror(-rc));
ret = ncclSystemError;
goto error;
}
} while (true);
rComm->num_inflight_reqs++;
*request = req;
return ret;
error:
if (req)
free_nccl_ofi_req(req, false);
exit:
*request = NULL;
return ret;
}