static ncclResult_t ofi_process_cq()

in src/nccl_ofi_net.c [919:976]


static ncclResult_t ofi_process_cq(nccl_ofi_t *nccl_ofi_comp)
{
	ssize_t rc = 0;
	ncclResult_t ret = ncclSuccess;
	struct fi_cq_err_entry err_buffer = { 0 };
	uint64_t cqe_burst = nccl_ofi_comp->num_cqes;
	struct fi_cq_tagged_entry cqe_tagged_buffers[cqe_burst];
	nccl_ofi_req_t *req = NULL;
	struct fid_cq *cq = nccl_ofi_comp->cq;
	uint64_t control_bit_mask = ~(nccl_ofi_comp->max_tag);

	while (true) {

		/* Zero-out buffers */
		memset(&cqe_tagged_buffers, 0, sizeof(cqe_tagged_buffers));

		/* Receive completions for the given endpoint */
		rc = fi_cq_read(cq, &cqe_tagged_buffers[0], cqe_burst);
		if (rc > 0) {
			ret = process_completions(
					&cqe_tagged_buffers[0], rc,
					control_bit_mask);
			if (OFI_UNLIKELY(ret != 0))
				goto exit;
		}
		else if (OFI_UNLIKELY(rc == -FI_EAVAIL)) {
			rc = fi_cq_readerr(cq, &err_buffer, 0);
			if (OFI_UNLIKELY(rc < 0)) {
				NCCL_OFI_WARN("Unable to read from fi_cq_readerr. RC: %zd. Error: %s",
					     rc,
					     fi_cq_strerror(cq,
						err_buffer.prov_errno,
						err_buffer.err_data, NULL, 0));
				ret = ncclSystemError;
				goto exit;
			}

			/* TODO: Add debug log to dump failed request details */
			req = container_of(err_buffer.op_context,
					   nccl_ofi_req_t, ctx);
			req->state = NCCL_OFI_REQ_ERROR;
			req->size = err_buffer.len;
		}
		else if (rc == -FI_EAGAIN) {
			/* No completions to process */
			break;
		}
		else {
			NCCL_OFI_WARN("Unable to retrieve completion queue entries. RC: %zd, ERROR: %s",
				     rc, fi_strerror(-ret));
			ret = ncclSystemError;
			goto exit;
		}
	}

exit:
	return ret;
}