in ncrx/libncrx.c [618:696]
int ncrx_process(const char *payload, uint64_t now_mono, uint64_t now_real,
struct ncrx *ncrx)
{
struct ncrx_slot *slot, *tmp_slot;
struct ncrx_msg *msg;
uint64_t old_head_seq = ncrx->head_seq;
int dist_retx, ret = 0;
if (now_mono < ncrx->now_mono)
fprintf(stderr, "ncrx: time regressed %"PRIu64"->%"PRIu64"\n",
ncrx->now_mono, now_mono);
ncrx->now_mono = now_mono;
ncrx->resp_len = 0;
/*
* If fully acked, keep last ack timestamp current so that new
* messages arriving doesn't trigger ack timeout immediately.
*/
if (ncrx->acked_seq == tail_seq(ncrx) - 1)
ncrx->acked_at = now_mono;
/* parse and queue @payload */
if (payload)
ret = ncrx_queue_payload(payload, ncrx, now_real);
/* retire complete & timed-out msgs from tail */
while (ncrx->tail != ncrx->head) {
slot = &ncrx->slots[ncrx->tail];
if ((!slot->msg || !list_empty(&slot->hole_node)) &&
slot->timestamp + ncrx->p.msg_timeout > now_mono)
break;
retire_tail(ncrx);
}
/* retire timed-out oos msgs */
while ((msg = msg_list_peek(&ncrx->oos_list))) {
if (msg->rx_at_mono + ncrx->p.oos_timeout > now_mono)
break;
msg->oos = 1;
msg_list_del(msg, &ncrx->oos_list);
msg_list_append(msg, &ncrx->retired_list);
}
/* if enabled, ack pending and timeout expired? */
if (ncrx->p.ack_intv && ncrx->acked_seq != tail_seq(ncrx) - 1 &&
ncrx->acked_at + ncrx->p.ack_intv < now_mono)
ncrx_build_resp(NULL, ncrx);
/* head passed one or more re-transmission boundaries? */
dist_retx = old_head_seq / ncrx->p.retx_stride !=
ncrx->head_seq / ncrx->p.retx_stride;
hole_list_for_each(slot, tmp_slot, &ncrx->hole_list) {
int retx = 0;
/*
* If so, request re-tx of holes further away than stride.
* This ensures that a missing seq is requested at least
* certain number of times regardless of incoming rate.
*/
if (dist_retx &&
slot_dist(slot - ncrx->slots, ncrx) > ncrx->p.retx_stride)
retx = 1;
/* request re-tx every retx_intv */
if (now_mono - max(slot->timestamp, slot->retx_timestamp) >=
(unsigned)ncrx->p.retx_intv) {
slot->retx_timestamp = now_mono;
retx = 1;
}
if (retx)
ncrx_build_resp(slot, ncrx);
}
return ret;
}