in prov/efa/src/rxr/rxr_ep.c [1428:1572]
void rxr_ep_progress_internal(struct rxr_ep *ep)
{
struct rxr_rx_entry *rx_entry;
struct rxr_tx_entry *tx_entry;
struct rxr_read_entry *read_entry;
struct dlist_entry *tmp;
ssize_t ret;
rxr_ep_check_available_data_bufs_timer(ep);
// Poll the EFA completion queue
rxr_ep_poll_cq(ep, ep->rdm_cq, rxr_env.efa_cq_read_size, 0);
// Poll the SHM completion queue if enabled
if (ep->use_shm)
rxr_ep_poll_cq(ep, ep->shm_cq, rxr_env.shm_cq_read_size, 1);
ret = rxr_ep_bulk_post_recv(ep);
if (OFI_UNLIKELY(ret)) {
if (rxr_cq_handle_cq_error(ep, ret))
assert(0 &&
"error writing error cq entry after failed post recv");
return;
}
rxr_ep_check_peer_backoff_timer(ep);
/*
* Send any queued ctrl packets.
*/
dlist_foreach_container_safe(&ep->rx_entry_queued_list,
struct rxr_rx_entry,
rx_entry, queued_entry, tmp) {
if (rx_entry->state == RXR_RX_QUEUED_CTRL)
ret = rxr_pkt_post_ctrl(ep, RXR_RX_ENTRY, rx_entry,
rx_entry->queued_ctrl.type,
rx_entry->queued_ctrl.inject);
else
ret = rxr_ep_send_queued_pkts(ep,
&rx_entry->queued_pkts);
if (ret == -FI_EAGAIN)
break;
if (OFI_UNLIKELY(ret))
goto rx_err;
dlist_remove(&rx_entry->queued_entry);
rx_entry->state = RXR_RX_RECV;
}
dlist_foreach_container_safe(&ep->tx_entry_queued_list,
struct rxr_tx_entry,
tx_entry, queued_entry, tmp) {
if (tx_entry->state == RXR_TX_QUEUED_CTRL)
ret = rxr_pkt_post_ctrl(ep, RXR_TX_ENTRY, tx_entry,
tx_entry->queued_ctrl.type,
tx_entry->queued_ctrl.inject);
else
ret = rxr_ep_send_queued_pkts(ep, &tx_entry->queued_pkts);
if (ret == -FI_EAGAIN)
break;
if (OFI_UNLIKELY(ret))
goto tx_err;
dlist_remove(&tx_entry->queued_entry);
if (tx_entry->state == RXR_TX_QUEUED_REQ_RNR)
tx_entry->state = RXR_TX_REQ;
else if (tx_entry->state == RXR_TX_QUEUED_DATA_RNR) {
tx_entry->state = RXR_TX_SEND;
dlist_insert_tail(&tx_entry->entry,
&ep->tx_pending_list);
}
}
/*
* Send data packets until window or tx queue is exhausted.
*/
dlist_foreach_container(&ep->tx_pending_list, struct rxr_tx_entry,
tx_entry, entry) {
if (tx_entry->window > 0)
tx_entry->send_flags |= FI_MORE;
else
continue;
while (tx_entry->window > 0) {
if (ep->max_outstanding_tx - ep->tx_pending <= 1 ||
tx_entry->window <= ep->max_data_payload_size)
tx_entry->send_flags &= ~FI_MORE;
/*
* The core's TX queue is full so we can't do any
* additional work.
*/
if (ep->tx_pending == ep->max_outstanding_tx)
goto out;
ret = rxr_pkt_post_data(ep, tx_entry);
if (OFI_UNLIKELY(ret)) {
tx_entry->send_flags &= ~FI_MORE;
goto tx_err;
}
}
}
/*
* Send read requests until finish or error encoutered
*/
dlist_foreach_container_safe(&ep->read_pending_list, struct rxr_read_entry,
read_entry, pending_entry, tmp) {
/*
* The core's TX queue is full so we can't do any
* additional work.
*/
if (ep->tx_pending == ep->max_outstanding_tx)
goto out;
ret = rxr_read_post(ep, read_entry);
if (ret == -FI_EAGAIN)
break;
if (OFI_UNLIKELY(ret))
goto read_err;
dlist_remove(&read_entry->pending_entry);
}
out:
return;
rx_err:
if (rxr_cq_handle_rx_error(ep, rx_entry, ret))
assert(0 &&
"error writing error cq entry when handling RX error");
return;
tx_err:
if (rxr_cq_handle_tx_error(ep, tx_entry, ret))
assert(0 &&
"error writing error cq entry when handling TX error");
return;
read_err:
if (rxr_read_handle_error(ep, read_entry, ret))
assert(0 &&
"error writing err cq entry while handling RDMA error");
return;
}