static int sendrecv_send_comm_send()

in src/nccl_ofi_sendrecv.cpp [1849:1966]


static int sendrecv_send_comm_send(nccl_net_ofi_send_comm_t *send_comm, void *data, size_t size, int tag,
				   nccl_net_ofi_mr_handle_t *mhandle, nccl_net_ofi_req_t **base_req)
{
	int ret = 0;
	nccl_net_ofi_sendrecv_send_comm_t *s_comm =
		(nccl_net_ofi_sendrecv_send_comm_t *)send_comm;
	auto *mr_handle = reinterpret_cast<nccl_net_ofi_sendrecv_mr_handle_t *>(mhandle);
	ssize_t rc = 0;
	nccl_net_ofi_sendrecv_req_t *req = NULL;
	void *desc = NULL;
	int dev_id = s_comm->base.base.dev_id;

	/* Validate endpoint */
	nccl_net_ofi_sendrecv_ep_t *ep =
		(nccl_net_ofi_sendrecv_ep_t *)s_comm->base.base.ep;
	if (OFI_UNLIKELY(ep == NULL)) {
		ret = -EINVAL;
		NCCL_OFI_WARN("Invalid endpoint provided");
		goto error;
	}

	/* Support only NCCL_OFI_MAX_REQUESTS inflight requests. */
	if (OFI_UNLIKELY(s_comm->num_inflight_reqs == NCCL_OFI_MAX_SEND_REQUESTS)) {
		ret = -EINVAL;
		NCCL_OFI_WARN("Can not support more than %d inflight requests",
			      NCCL_OFI_MAX_SEND_REQUESTS);
		goto error;
	}

	/*
	 * In case, we are connecting to self, ensure that the request has
	 * completed. If its completed, free the request. If not, progress the
	 * function to process completions and return NULL request for send to
	 * retry.
	 */
	if (OFI_UNLIKELY(s_comm->conn_info != NULL)) {
		nccl_ofi_connection_info_t *conn_info = (nccl_ofi_connection_info_t *)s_comm->conn_info->ptr;
		if (conn_info->connect_to_self == 1) {
			assert(conn_info->req != NULL);
			nccl_net_ofi_sendrecv_req_t *self_req = (nccl_net_ofi_sendrecv_req_t *)conn_info->req;

			if (self_req->state == NCCL_OFI_SENDRECV_REQ_COMPLETED) {
				sendrecv_send_comm_free_req(s_comm, dev_id, self_req, false);
				nccl_ofi_freelist_entry_free(ep->conn_msg_fl, s_comm->conn_info);
				s_comm->conn_info = NULL;
			} else {
				NCCL_OFI_TRACE(NCCL_INIT | NCCL_NET,
					       "Self-connect request: %p hasn't completed. Current State: %s",
					       self_req,
					       sendrecv_req_state_get_string(self_req->state));

				ret = sendrecv_cq_process(ep->cq);

				*base_req = NULL;
				goto exit;
			}
		}
	}

	/*
	 * TODO: Use NCCL provided tags when using grouped receives aka
	 * props->maxRecvs > 1.
	 */

	/* Allocate NCCL OFI request */
	req = sendrecv_allocate_req(s_comm->nccl_ofi_reqs_fl);
	if (OFI_UNLIKELY(req == NULL)) {
		ret = -ENOMEM;
		NCCL_OFI_WARN("Unable to get NCCL OFI request for device %d",
			      dev_id);
		goto error;
	}

	req->comm = &s_comm->base.base;
	req->dev_id = dev_id;
	req->direction = NCCL_OFI_SENDRECV_SEND;

	if (mr_handle->mr != nullptr)
		desc = fi_mr_desc(mr_handle->mr);

	NCCL_OFI_TRACE_SEND_SENDRECV(req->dev_id, size, s_comm, 0, req, base_req);

	/*
	 * Try sending data to remote EP; Return NULL request
	 * if not able to send.
	 */
	rc = fi_tsend(s_comm->local_ep, data, size, desc,
		      s_comm->remote_ep, s_comm->tag, sendrecv_req_get_ofi_context(req));
	if (OFI_UNLIKELY(rc == -FI_EAGAIN)) {
		/* Make progress for next try */
		ret = sendrecv_cq_process(ep->cq);
		/* Return NULL request */
		*base_req = NULL;
		goto error;
	}
	else if (OFI_UNLIKELY(rc != 0)) {
		NCCL_OFI_WARN("Could not send request for device %d. RC: %zd",
			      dev_id, rc);
		ret = rc;
		goto error;
	}

	(s_comm->num_inflight_reqs)++;

	/* Set request size */
	req->size = size;

	/* Return request to NCCL */
	*base_req = &req->base;

	goto exit;

 error:
	if (req)
		sendrecv_send_comm_free_req(s_comm, dev_id, req, false);
 exit:
	return ret;
}