in src/nccl_ofi_sendrecv.cpp [1070:1170]
static int sendrecv_recv_comm_recv(nccl_net_ofi_recv_comm_t *recv_comm, int n, void **buffers,
size_t *sizes, int *tags, nccl_net_ofi_mr_handle_t **mhandles,
nccl_net_ofi_req_t **base_req)
{
int ret = 0;
ssize_t rc = 0;
nccl_net_ofi_sendrecv_req_t *req = NULL;
nccl_net_ofi_sendrecv_ep_t *ep = NULL;
nccl_net_ofi_sendrecv_recv_comm_t *r_comm =
(nccl_net_ofi_sendrecv_recv_comm_t *)recv_comm;
int dev_id = r_comm->base.base.dev_id;
auto **mr_handles = reinterpret_cast<nccl_net_ofi_sendrecv_mr_handle_t **>(mhandles);
/* Retrieve and validate endpoint */
ep = (nccl_net_ofi_sendrecv_ep_t *)r_comm->base.base.ep;
if (OFI_UNLIKELY(ep == NULL)) {
ret = -EINVAL;
NCCL_OFI_WARN("Invalid endpoint provided");
goto error;
}
/* Support only NCCL_OFI_MAX_REQUESTS inflight reqs. */
if (OFI_UNLIKELY(r_comm->num_inflight_reqs == NCCL_OFI_MAX_REQUESTS)) {
ret = -EINVAL;
NCCL_OFI_WARN("Can not support more than %d inflight requests",
NCCL_OFI_MAX_REQUESTS);
goto error;
}
/* Allocate NCCL OFI request */
req = sendrecv_allocate_req(r_comm->nccl_ofi_reqs_fl);
if (OFI_UNLIKELY(req == NULL)) {
ret = -EINVAL;
NCCL_OFI_WARN("Unable to get NCCL OFI request for device %d",
dev_id);
goto error;
}
/* Progress NCCL OFI */
ret = sendrecv_cq_process(ep->cq);
if (OFI_UNLIKELY(ret != 0))
goto error;
req->comm = &r_comm->base.base;
req->dev_id = dev_id;
req->direction = NCCL_OFI_SENDRECV_RECV;
req->num_recvs = n;
if (OFI_UNLIKELY(mr_handles == NULL)) {
ret = -EINVAL;
NCCL_OFI_WARN("Memory handles array is NULL");
goto error;
}
/* Currently, plugin doesn't support grouped receives */
assert(n <= NCCL_OFI_MAX_RECVS);
for (int recv_n = 0; recv_n < n; recv_n++) {
void *desc = NULL;
if (mr_handles[recv_n]->mr != nullptr) {
desc = fi_mr_desc(mr_handles[recv_n]->mr);
}
NCCL_OFI_TRACE_RECV_SENDRECV(dev_id, r_comm, sizes[recv_n], req, base_req);
/*
* TODO: Use NCCL provided tags when plugin supports grouped
* receives aka props->maxRecvs > 1.
*/
/* Try posting buffer to local EP */
rc = fi_trecv(r_comm->local_ep, buffers[recv_n], sizes[recv_n],
desc, FI_ADDR_UNSPEC, r_comm->tag, 0, sendrecv_req_get_ofi_context(req));
if (rc == -FI_EAGAIN) {
/* Return NULL request */
*base_req = NULL;
goto error;
}
else if (rc != 0) {
NCCL_OFI_WARN("Unable to post receive buffer for dev %d. RC: %zd, ERROR: %s",
dev_id, rc, fi_strerror(-rc));
ret = rc;
goto error;
}
}
(r_comm->num_inflight_reqs)++;
/* Return request to NCCL */
*base_req = (nccl_net_ofi_req_t *)req;
goto exit;
error:
if (req)
sendrecv_recv_comm_free_req(r_comm, dev_id, req, false);
exit:
return ret;
}