in src/nccl_ofi_rdma.cpp [1131:1200]
static inline int handle_eager_recv(nccl_net_ofi_rdma_recv_comm_t *r_comm,
uint16_t msg_seq_num,
nccl_net_ofi_rdma_req_t *rx_buff_req)
{
int ret;
nccl_net_ofi_rdma_ep_t *ep = (nccl_net_ofi_rdma_ep_t *)r_comm->base.base.ep;
/* Decrease rx buffer count. It will be incremented again when reposting */
ret = decrease_rx_buff_cnt(ep, get_rx_buff_data(rx_buff_req)->rail);
if (ret != 0) {
return ret;
}
nccl_ofi_msgbuff_status_t stat;
nccl_ofi_msgbuff_result_t mb_res = nccl_ofi_msgbuff_insert(r_comm->msgbuff, msg_seq_num,
rx_buff_req, NCCL_OFI_MSGBUFF_BUFF, &stat);
if (mb_res == NCCL_OFI_MSGBUFF_SUCCESS) {
/* Inserted! In this case receiver has not yet called recv() for this message, so
return success and initiate eager read when receiver calls recv(). */
return 0;
}
if (OFI_UNLIKELY(mb_res != NCCL_OFI_MSGBUFF_INVALID_IDX)) {
NCCL_OFI_WARN("Unexpected message insert result (%d) (eager recv)", (int)mb_res);
return -EINVAL;
}
if (OFI_UNLIKELY(stat != NCCL_OFI_MSGBUFF_INPROGRESS)) {
NCCL_OFI_WARN("Unexpected message status (%d) (ctrl recv)", (int)stat);
return -EINVAL;
}
// In this case, there is already a req entry here. Initiate eager copy.
void *elem;
nccl_ofi_msgbuff_elemtype_t type;
mb_res = nccl_ofi_msgbuff_retrieve(r_comm->msgbuff, msg_seq_num, &elem, &type, &stat);
if (OFI_UNLIKELY(mb_res != NCCL_OFI_MSGBUFF_SUCCESS || type != NCCL_OFI_MSGBUFF_REQ)) {
NCCL_OFI_WARN("Invalid message retrieval result for msg %hu", msg_seq_num);
return -EINVAL;
}
nccl_net_ofi_rdma_req_t *recv_req = (nccl_net_ofi_rdma_req_t *)elem;
rdma_req_recv_data_t *recv_data = get_recv_data(recv_req);
rdma_req_rx_buff_data_t *rx_buff_data = get_rx_buff_data(rx_buff_req);
if (rx_buff_data->recv_len == 0) {
/* Special case: for zero-sized messages, we can skip the local read */
/* Re-post rx buffer */
ret = check_post_rx_buff_req(rx_buff_req);
if (ret != 0) {
NCCL_OFI_WARN("Failed call to check_post_rx_buff_req");
return ret;
}
ret = inc_req_completion(recv_req, 0, recv_data->total_num_compls);
return ret;
}
ret = alloc_eager_copy_req(recv_req, r_comm, rx_buff_req);
if (ret != 0) {
NCCL_OFI_WARN("Failed call to alloc_eager_copy_req");
return ret;
}
ret = receive_progress(recv_data->eager_copy_req, true);
if (ret != 0) {
NCCL_OFI_WARN("Failed to post eager read: %d", ret);
return ret;
}
return 0;
}