in src/nccl_ofi_rdma.cpp [1003:1090]
static inline int handle_ctrl_recv(nccl_net_ofi_rdma_send_comm_t *s_comm,
uint16_t msg_seq_num,
nccl_net_ofi_rdma_req_t *rx_buff_req)
{
int ret;
nccl_ofi_msgbuff_status_t stat;
nccl_net_ofi_rdma_ep_t *ep = (nccl_net_ofi_rdma_ep_t *)s_comm->base.base.ep;
nccl_ofi_msgbuff_result_t mb_res = nccl_ofi_msgbuff_insert(s_comm->msgbuff, msg_seq_num,
rx_buff_req, NCCL_OFI_MSGBUFF_BUFF, &stat);
if (mb_res == NCCL_OFI_MSGBUFF_SUCCESS) {
/* Inserted! In this case sender has not yet called send() for this message, so
return success and initiate RDMA write when sender calls send(). */
return decrease_rx_buff_cnt(ep, get_rx_buff_data(rx_buff_req)->rail);
}
if (OFI_UNLIKELY(mb_res != NCCL_OFI_MSGBUFF_INVALID_IDX || stat != NCCL_OFI_MSGBUFF_INPROGRESS)) {
NCCL_OFI_WARN("Unexpected message insert result (%d) (ctrl recv)", (int)mb_res);
return -EINVAL;
}
// Already a req entry here
void *elem;
nccl_ofi_msgbuff_elemtype_t type;
mb_res = nccl_ofi_msgbuff_retrieve(s_comm->msgbuff, msg_seq_num, &elem, &type, &stat);
if (OFI_UNLIKELY(mb_res != NCCL_OFI_MSGBUFF_SUCCESS || type != NCCL_OFI_MSGBUFF_REQ)) {
NCCL_OFI_WARN("Invalid message retrieval result for msg %hu", msg_seq_num);
return -EINVAL;
}
nccl_net_ofi_rdma_req_t *req = (nccl_net_ofi_rdma_req_t *)elem;
assert(req->msg_seq_num == msg_seq_num);
rdma_req_send_data_t *send_data = get_send_data(req);
rdma_req_rx_buff_data_t *rx_buff_data = get_rx_buff_data(rx_buff_req);
nccl_net_ofi_rdma_ctrl_msg_t *ctrl_msg = get_rx_ctrl_msg(rx_buff_data);
if (!send_data->eager) {
ret = update_send_data_from_remote(s_comm, rx_buff_req, req);
if (OFI_UNLIKELY(ret != 0)) {
NCCL_OFI_WARN("Failed to copy ctrl data");
return ret;
}
/* Initiate rdma write */
ret = send_progress(req);
if (ret == -FI_EAGAIN) {
/* Add to pending reqs queue */
nccl_net_ofi_mutex_lock(&ep->pending_reqs_lock);
ep->pending_reqs_queue->push_back(req);
nccl_net_ofi_mutex_unlock(&ep->pending_reqs_lock);
ret = 0;
NCCL_OFI_TRACE_PENDING_INSERT(req);
}
else if (OFI_UNLIKELY(ret != 0)) {
return ret;
}
} else {
/* If recv buffer is smaller than send buffer, we reduce the size of the send req, even if we have
have already eagerly sent the whole send buffer. The receive side will discard the extra data. */
send_data->remote_len = ctrl_msg->buff_len;
nccl_net_ofi_mutex_lock(&req->req_lock);
if (send_data->remote_len < send_data->buff_len) {
NCCL_OFI_TRACE(NCCL_NET,
"Remote recv buffer (%zu) smaller than send buffer (%zu) in eager send",
send_data->remote_len, send_data->buff_len);
req->size = send_data->remote_len;
send_data->buff_len = send_data->remote_len;
}
nccl_net_ofi_mutex_unlock(&req->req_lock);
/* In the eager case, increment completion count for send req */
ret = inc_req_completion(req, 0, send_data->total_num_compls);
if (ret != 0) {
NCCL_OFI_WARN("Failed to increase completion count");
return ret;
}
}
/* Attempt to re-post rx buffer */
ret = repost_rx_buff(ep, rx_buff_req);
if (ret != 0) {
NCCL_OFI_WARN("Failed to repost rx buff");
return ret;
}
return 0;
}