in src/nccl_ofi_net.c [919:976]
static ncclResult_t ofi_process_cq(nccl_ofi_t *nccl_ofi_comp)
{
ssize_t rc = 0;
ncclResult_t ret = ncclSuccess;
struct fi_cq_err_entry err_buffer = { 0 };
uint64_t cqe_burst = nccl_ofi_comp->num_cqes;
struct fi_cq_tagged_entry cqe_tagged_buffers[cqe_burst];
nccl_ofi_req_t *req = NULL;
struct fid_cq *cq = nccl_ofi_comp->cq;
uint64_t control_bit_mask = ~(nccl_ofi_comp->max_tag);
while (true) {
/* Zero-out buffers */
memset(&cqe_tagged_buffers, 0, sizeof(cqe_tagged_buffers));
/* Receive completions for the given endpoint */
rc = fi_cq_read(cq, &cqe_tagged_buffers[0], cqe_burst);
if (rc > 0) {
ret = process_completions(
&cqe_tagged_buffers[0], rc,
control_bit_mask);
if (OFI_UNLIKELY(ret != 0))
goto exit;
}
else if (OFI_UNLIKELY(rc == -FI_EAVAIL)) {
rc = fi_cq_readerr(cq, &err_buffer, 0);
if (OFI_UNLIKELY(rc < 0)) {
NCCL_OFI_WARN("Unable to read from fi_cq_readerr. RC: %zd. Error: %s",
rc,
fi_cq_strerror(cq,
err_buffer.prov_errno,
err_buffer.err_data, NULL, 0));
ret = ncclSystemError;
goto exit;
}
/* TODO: Add debug log to dump failed request details */
req = container_of(err_buffer.op_context,
nccl_ofi_req_t, ctx);
req->state = NCCL_OFI_REQ_ERROR;
req->size = err_buffer.len;
}
else if (rc == -FI_EAGAIN) {
/* No completions to process */
break;
}
else {
NCCL_OFI_WARN("Unable to retrieve completion queue entries. RC: %zd, ERROR: %s",
rc, fi_strerror(-ret));
ret = ncclSystemError;
goto exit;
}
}
exit:
return ret;
}