static int sendrecv_cq_process()

in src/nccl_ofi_sendrecv.cpp [294:362]


static int sendrecv_cq_process(struct fid_cq *cq)
{
	ssize_t rc = 0;
	int ret = 0;
	/*
	 * On call to fi_cq_readerr, Libfabric requires some members of
	 * err_entry to be zero-initialized or point to valid data.  For
	 * simplicity, just zero out the whole struct.
	 */
	struct fi_cq_err_entry err_buffer = {};
	struct fi_cq_tagged_entry cqe_tagged_buffers[cq_read_count];

	while (true) {
		/* Receive completions for the given endpoint */
		rc = fi_cq_read(cq, cqe_tagged_buffers, cq_read_count);
		if (rc > 0) {
			ret = sendrecv_process_completions(
				cqe_tagged_buffers, rc);
			if (OFI_UNLIKELY(ret != 0))
				goto exit;
		}
		else if (OFI_UNLIKELY(rc == -FI_EAVAIL)) {
			nccl_net_ofi_context_t *ctx;

			rc = fi_cq_readerr(cq, &err_buffer, 0);

			if (OFI_UNLIKELY(rc == -FI_EAGAIN)) {
				/*
				 * Error not available yet.
				 * fi_cq_read will keep returning -FI_EAVAIL so just bail out and try again later.
				 */
				break;
			} else if (OFI_UNLIKELY(rc < 0)) {
				NCCL_OFI_WARN("Unable to read from fi_cq_readerr. RC: %zd. Error: %s",
					      rc,
					      fi_strerror(-rc));
				ret = rc;
				goto exit;
			}

			if (err_buffer.op_context == NULL) {
				NCCL_OFI_WARN("Received error entry without a context.");
				ret = -EINVAL;
				goto exit;
			}

			ctx = container_of(err_buffer.op_context,
					   nccl_net_ofi_context_t, ofi_ctx);
			ret = ctx->handle_error_entry(ctx, cq, &err_buffer, 0);
			if (ret != 0) {
				goto exit;
			}

		}
		else if (rc == -FI_EAGAIN) {
			/* No completions to process */
			break;
		}
		else {
			NCCL_OFI_WARN("Unable to retrieve completion queue entries. RC: %zd, ERROR: %s",
				      rc, fi_strerror(-rc));
			ret = rc;
			goto exit;
		}
	}

 exit:
	return ret;
}