void rxr_ep_progress_internal()

in prov/efa/src/rxr/rxr_ep.c [1428:1572]


void rxr_ep_progress_internal(struct rxr_ep *ep)
{
	struct rxr_rx_entry *rx_entry;
	struct rxr_tx_entry *tx_entry;
	struct rxr_read_entry *read_entry;
	struct dlist_entry *tmp;
	ssize_t ret;

	rxr_ep_check_available_data_bufs_timer(ep);

	// Poll the EFA completion queue
	rxr_ep_poll_cq(ep, ep->rdm_cq, rxr_env.efa_cq_read_size, 0);

	// Poll the SHM completion queue if enabled
	if (ep->use_shm)
		rxr_ep_poll_cq(ep, ep->shm_cq, rxr_env.shm_cq_read_size, 1);

	ret = rxr_ep_bulk_post_recv(ep);

	if (OFI_UNLIKELY(ret)) {
		if (rxr_cq_handle_cq_error(ep, ret))
			assert(0 &&
			       "error writing error cq entry after failed post recv");
		return;
	}

	rxr_ep_check_peer_backoff_timer(ep);

	/*
	 * Send any queued ctrl packets.
	 */
	dlist_foreach_container_safe(&ep->rx_entry_queued_list,
				     struct rxr_rx_entry,
				     rx_entry, queued_entry, tmp) {
		if (rx_entry->state == RXR_RX_QUEUED_CTRL)
			ret = rxr_pkt_post_ctrl(ep, RXR_RX_ENTRY, rx_entry,
						rx_entry->queued_ctrl.type,
						rx_entry->queued_ctrl.inject);
		else
			ret = rxr_ep_send_queued_pkts(ep,
						      &rx_entry->queued_pkts);
		if (ret == -FI_EAGAIN)
			break;
		if (OFI_UNLIKELY(ret))
			goto rx_err;

		dlist_remove(&rx_entry->queued_entry);
		rx_entry->state = RXR_RX_RECV;
	}

	dlist_foreach_container_safe(&ep->tx_entry_queued_list,
				     struct rxr_tx_entry,
				     tx_entry, queued_entry, tmp) {
		if (tx_entry->state == RXR_TX_QUEUED_CTRL)
			ret = rxr_pkt_post_ctrl(ep, RXR_TX_ENTRY, tx_entry,
						tx_entry->queued_ctrl.type,
						tx_entry->queued_ctrl.inject);
		else
			ret = rxr_ep_send_queued_pkts(ep, &tx_entry->queued_pkts);

		if (ret == -FI_EAGAIN)
			break;
		if (OFI_UNLIKELY(ret))
			goto tx_err;

		dlist_remove(&tx_entry->queued_entry);

		if (tx_entry->state == RXR_TX_QUEUED_REQ_RNR)
			tx_entry->state = RXR_TX_REQ;
		else if (tx_entry->state == RXR_TX_QUEUED_DATA_RNR) {
			tx_entry->state = RXR_TX_SEND;
			dlist_insert_tail(&tx_entry->entry,
					  &ep->tx_pending_list);
		}
	}

	/*
	 * Send data packets until window or tx queue is exhausted.
	 */
	dlist_foreach_container(&ep->tx_pending_list, struct rxr_tx_entry,
				tx_entry, entry) {
		if (tx_entry->window > 0)
			tx_entry->send_flags |= FI_MORE;
		else
			continue;

		while (tx_entry->window > 0) {
			if (ep->max_outstanding_tx - ep->tx_pending <= 1 ||
			    tx_entry->window <= ep->max_data_payload_size)
				tx_entry->send_flags &= ~FI_MORE;
			/*
			 * The core's TX queue is full so we can't do any
			 * additional work.
			 */
			if (ep->tx_pending == ep->max_outstanding_tx)
				goto out;
			ret = rxr_pkt_post_data(ep, tx_entry);
			if (OFI_UNLIKELY(ret)) {
				tx_entry->send_flags &= ~FI_MORE;
				goto tx_err;
			}
		}
	}

	/*
	 * Send read requests until finish or error encoutered
	 */
	dlist_foreach_container_safe(&ep->read_pending_list, struct rxr_read_entry,
				     read_entry, pending_entry, tmp) {
		/*
		 * The core's TX queue is full so we can't do any
		 * additional work.
		 */
		if (ep->tx_pending == ep->max_outstanding_tx)
			goto out;

		ret = rxr_read_post(ep, read_entry);
		if (ret == -FI_EAGAIN)
			break;

		if (OFI_UNLIKELY(ret))
			goto read_err;

		dlist_remove(&read_entry->pending_entry);
	}

out:
	return;
rx_err:
	if (rxr_cq_handle_rx_error(ep, rx_entry, ret))
		assert(0 &&
		       "error writing error cq entry when handling RX error");
	return;
tx_err:
	if (rxr_cq_handle_tx_error(ep, tx_entry, ret))
		assert(0 &&
		       "error writing error cq entry when handling TX error");
	return;

read_err:
	if (rxr_read_handle_error(ep, read_entry, ret))
		assert(0 &&
		       "error writing err cq entry while handling RDMA error");
	return;
}