in src/nccl_ofi_sendrecv.cpp [294:362]
static int sendrecv_cq_process(struct fid_cq *cq)
{
ssize_t rc = 0;
int ret = 0;
/*
* On call to fi_cq_readerr, Libfabric requires some members of
* err_entry to be zero-initialized or point to valid data. For
* simplicity, just zero out the whole struct.
*/
struct fi_cq_err_entry err_buffer = {};
struct fi_cq_tagged_entry cqe_tagged_buffers[cq_read_count];
while (true) {
/* Receive completions for the given endpoint */
rc = fi_cq_read(cq, cqe_tagged_buffers, cq_read_count);
if (rc > 0) {
ret = sendrecv_process_completions(
cqe_tagged_buffers, rc);
if (OFI_UNLIKELY(ret != 0))
goto exit;
}
else if (OFI_UNLIKELY(rc == -FI_EAVAIL)) {
nccl_net_ofi_context_t *ctx;
rc = fi_cq_readerr(cq, &err_buffer, 0);
if (OFI_UNLIKELY(rc == -FI_EAGAIN)) {
/*
* Error not available yet.
* fi_cq_read will keep returning -FI_EAVAIL so just bail out and try again later.
*/
break;
} else if (OFI_UNLIKELY(rc < 0)) {
NCCL_OFI_WARN("Unable to read from fi_cq_readerr. RC: %zd. Error: %s",
rc,
fi_strerror(-rc));
ret = rc;
goto exit;
}
if (err_buffer.op_context == NULL) {
NCCL_OFI_WARN("Received error entry without a context.");
ret = -EINVAL;
goto exit;
}
ctx = container_of(err_buffer.op_context,
nccl_net_ofi_context_t, ofi_ctx);
ret = ctx->handle_error_entry(ctx, cq, &err_buffer, 0);
if (ret != 0) {
goto exit;
}
}
else if (rc == -FI_EAGAIN) {
/* No completions to process */
break;
}
else {
NCCL_OFI_WARN("Unable to retrieve completion queue entries. RC: %zd, ERROR: %s",
rc, fi_strerror(-rc));
ret = rc;
goto exit;
}
}
exit:
return ret;
}