static int process_pending_reqs()

in src/nccl_ofi_rdma.cpp [1904:1958]


static int process_pending_reqs(nccl_net_ofi_rdma_ep_t *ep)
{
	int rc = 0;

	while (true) {
		nccl_net_ofi_rdma_req_t *req = NULL;
		nccl_net_ofi_mutex_lock(&ep->pending_reqs_lock);
		if (!ep->pending_reqs_queue->empty()) {
			req = ep->pending_reqs_queue->front();
			ep->pending_reqs_queue->pop_front();
		}
		nccl_net_ofi_mutex_unlock(&ep->pending_reqs_lock);
		if (req == NULL) { break; }

		switch (req->type) {
			case NCCL_OFI_RDMA_WRITE:
			case NCCL_OFI_RDMA_SEND:
			case NCCL_OFI_RDMA_CTRL_RX_BUFF:
			case NCCL_OFI_RDMA_EAGER_RX_BUFF:
				rc = send_progress(req);
				break;
			case NCCL_OFI_RDMA_READ:
			case NCCL_OFI_RDMA_EAGER_COPY:
			case NCCL_OFI_RDMA_SEND_CTRL:
			case NCCL_OFI_RDMA_FLUSH:
				rc = receive_progress(req, false);
				break;
			case NCCL_OFI_RDMA_RECV:
			case NCCL_OFI_RDMA_RECV_SEGMS:
			case NCCL_OFI_RDMA_SEND_CONN:
			case NCCL_OFI_RDMA_SEND_CLOSE:
			case NCCL_OFI_RDMA_RECV_CONN:
			case NCCL_OFI_RDMA_RECV_CONN_RESP:
			case NCCL_OFI_RDMA_SEND_CONN_RESP:
			case NCCL_OFI_RDMA_INVALID_TYPE:
			default:
				NCCL_OFI_WARN("Unexpected type: %d", req->type);
				return -EINVAL;
		}

		if ((rc != 0) && (rc != -FI_EAGAIN)) {
			NCCL_OFI_WARN("Unable to post request; RC: %d", rc);
			break;
		} else if (rc == -FI_EAGAIN) {
			/* Put the request in the front of the queue and try again later */
			nccl_net_ofi_mutex_lock(&ep->pending_reqs_lock);
			ep->pending_reqs_queue->push_front(req);
			nccl_net_ofi_mutex_unlock(&ep->pending_reqs_lock);
			rc = 0;
			break;
		}
		NCCL_OFI_TRACE_PENDING_REMOVE(req);
	}
	return rc;
}