in src/nccl_ofi_sendrecv.cpp [1849:1966]
static int sendrecv_send_comm_send(nccl_net_ofi_send_comm_t *send_comm, void *data, size_t size, int tag,
nccl_net_ofi_mr_handle_t *mhandle, nccl_net_ofi_req_t **base_req)
{
int ret = 0;
nccl_net_ofi_sendrecv_send_comm_t *s_comm =
(nccl_net_ofi_sendrecv_send_comm_t *)send_comm;
auto *mr_handle = reinterpret_cast<nccl_net_ofi_sendrecv_mr_handle_t *>(mhandle);
ssize_t rc = 0;
nccl_net_ofi_sendrecv_req_t *req = NULL;
void *desc = NULL;
int dev_id = s_comm->base.base.dev_id;
/* Validate endpoint */
nccl_net_ofi_sendrecv_ep_t *ep =
(nccl_net_ofi_sendrecv_ep_t *)s_comm->base.base.ep;
if (OFI_UNLIKELY(ep == NULL)) {
ret = -EINVAL;
NCCL_OFI_WARN("Invalid endpoint provided");
goto error;
}
/* Support only NCCL_OFI_MAX_REQUESTS inflight requests. */
if (OFI_UNLIKELY(s_comm->num_inflight_reqs == NCCL_OFI_MAX_SEND_REQUESTS)) {
ret = -EINVAL;
NCCL_OFI_WARN("Can not support more than %d inflight requests",
NCCL_OFI_MAX_SEND_REQUESTS);
goto error;
}
/*
* In case, we are connecting to self, ensure that the request has
* completed. If its completed, free the request. If not, progress the
* function to process completions and return NULL request for send to
* retry.
*/
if (OFI_UNLIKELY(s_comm->conn_info != NULL)) {
nccl_ofi_connection_info_t *conn_info = (nccl_ofi_connection_info_t *)s_comm->conn_info->ptr;
if (conn_info->connect_to_self == 1) {
assert(conn_info->req != NULL);
nccl_net_ofi_sendrecv_req_t *self_req = (nccl_net_ofi_sendrecv_req_t *)conn_info->req;
if (self_req->state == NCCL_OFI_SENDRECV_REQ_COMPLETED) {
sendrecv_send_comm_free_req(s_comm, dev_id, self_req, false);
nccl_ofi_freelist_entry_free(ep->conn_msg_fl, s_comm->conn_info);
s_comm->conn_info = NULL;
} else {
NCCL_OFI_TRACE(NCCL_INIT | NCCL_NET,
"Self-connect request: %p hasn't completed. Current State: %s",
self_req,
sendrecv_req_state_get_string(self_req->state));
ret = sendrecv_cq_process(ep->cq);
*base_req = NULL;
goto exit;
}
}
}
/*
* TODO: Use NCCL provided tags when using grouped receives aka
* props->maxRecvs > 1.
*/
/* Allocate NCCL OFI request */
req = sendrecv_allocate_req(s_comm->nccl_ofi_reqs_fl);
if (OFI_UNLIKELY(req == NULL)) {
ret = -ENOMEM;
NCCL_OFI_WARN("Unable to get NCCL OFI request for device %d",
dev_id);
goto error;
}
req->comm = &s_comm->base.base;
req->dev_id = dev_id;
req->direction = NCCL_OFI_SENDRECV_SEND;
if (mr_handle->mr != nullptr)
desc = fi_mr_desc(mr_handle->mr);
NCCL_OFI_TRACE_SEND_SENDRECV(req->dev_id, size, s_comm, 0, req, base_req);
/*
* Try sending data to remote EP; Return NULL request
* if not able to send.
*/
rc = fi_tsend(s_comm->local_ep, data, size, desc,
s_comm->remote_ep, s_comm->tag, sendrecv_req_get_ofi_context(req));
if (OFI_UNLIKELY(rc == -FI_EAGAIN)) {
/* Make progress for next try */
ret = sendrecv_cq_process(ep->cq);
/* Return NULL request */
*base_req = NULL;
goto error;
}
else if (OFI_UNLIKELY(rc != 0)) {
NCCL_OFI_WARN("Could not send request for device %d. RC: %zd",
dev_id, rc);
ret = rc;
goto error;
}
(s_comm->num_inflight_reqs)++;
/* Set request size */
req->size = size;
/* Return request to NCCL */
*base_req = &req->base;
goto exit;
error:
if (req)
sendrecv_send_comm_free_req(s_comm, dev_id, req, false);
exit:
return ret;
}