static int sendrecv_recv_comm_recv()

in src/nccl_ofi_sendrecv.cpp [1070:1170]


static int sendrecv_recv_comm_recv(nccl_net_ofi_recv_comm_t *recv_comm, int n, void **buffers,
				   size_t *sizes, int *tags, nccl_net_ofi_mr_handle_t **mhandles,
				   nccl_net_ofi_req_t **base_req)
{
	int ret = 0;
	ssize_t rc = 0;
	nccl_net_ofi_sendrecv_req_t *req = NULL;
	nccl_net_ofi_sendrecv_ep_t *ep = NULL;
	nccl_net_ofi_sendrecv_recv_comm_t *r_comm =
		(nccl_net_ofi_sendrecv_recv_comm_t *)recv_comm;
	int dev_id = r_comm->base.base.dev_id;
	auto **mr_handles = reinterpret_cast<nccl_net_ofi_sendrecv_mr_handle_t **>(mhandles);

	/* Retrieve and validate endpoint */
	ep = (nccl_net_ofi_sendrecv_ep_t *)r_comm->base.base.ep;
	if (OFI_UNLIKELY(ep == NULL)) {
		ret = -EINVAL;
		NCCL_OFI_WARN("Invalid endpoint provided");
		goto error;
	}

	/* Support only NCCL_OFI_MAX_REQUESTS inflight reqs. */
	if (OFI_UNLIKELY(r_comm->num_inflight_reqs == NCCL_OFI_MAX_REQUESTS)) {
		ret = -EINVAL;
		NCCL_OFI_WARN("Can not support more than %d inflight requests",
			      NCCL_OFI_MAX_REQUESTS);
		goto error;
	}

	/* Allocate NCCL OFI request */
	req = sendrecv_allocate_req(r_comm->nccl_ofi_reqs_fl);
	if (OFI_UNLIKELY(req == NULL)) {
		ret = -EINVAL;
		NCCL_OFI_WARN("Unable to get NCCL OFI request for device %d",
			      dev_id);
		goto error;
	}

	/* Progress NCCL OFI */
	ret = sendrecv_cq_process(ep->cq);
	if (OFI_UNLIKELY(ret != 0))
		goto error;

	req->comm = &r_comm->base.base;
	req->dev_id = dev_id;
	req->direction = NCCL_OFI_SENDRECV_RECV;

	req->num_recvs = n;

	if (OFI_UNLIKELY(mr_handles == NULL)) {
		ret = -EINVAL;
		NCCL_OFI_WARN("Memory handles array is NULL");
		goto error;
	}

	/* Currently, plugin doesn't support grouped receives */
	assert(n <= NCCL_OFI_MAX_RECVS);
	for (int recv_n = 0; recv_n < n; recv_n++) {
		void *desc = NULL;

		if (mr_handles[recv_n]->mr != nullptr) {
			desc = fi_mr_desc(mr_handles[recv_n]->mr);
		}

		NCCL_OFI_TRACE_RECV_SENDRECV(dev_id, r_comm, sizes[recv_n], req, base_req);

		/*
		 * TODO: Use NCCL provided tags when plugin supports grouped
		 * receives aka props->maxRecvs > 1.
		 */

		/* Try posting buffer to local EP */
		rc = fi_trecv(r_comm->local_ep, buffers[recv_n], sizes[recv_n],
			      desc, FI_ADDR_UNSPEC, r_comm->tag, 0, sendrecv_req_get_ofi_context(req));
		if (rc == -FI_EAGAIN) {
			/* Return NULL request */
			*base_req = NULL;
			goto error;
		}
		else if (rc != 0) {
			NCCL_OFI_WARN("Unable to post receive buffer for dev %d. RC: %zd, ERROR: %s",
				      dev_id, rc, fi_strerror(-rc));
			ret = rc;
			goto error;
		}

	}

	(r_comm->num_inflight_reqs)++;

	/* Return request to NCCL */
	*base_req = (nccl_net_ofi_req_t *)req;

	goto exit;

 error:
	if (req)
		sendrecv_recv_comm_free_req(r_comm, dev_id, req, false);
 exit:
	return ret;
}