int ncrx_process()

in ncrx/libncrx.c [618:696]


int ncrx_process(const char *payload, uint64_t now_mono, uint64_t now_real,
		struct ncrx *ncrx)
{
	struct ncrx_slot *slot, *tmp_slot;
	struct ncrx_msg *msg;
	uint64_t old_head_seq = ncrx->head_seq;
	int dist_retx, ret = 0;

	if (now_mono < ncrx->now_mono)
		fprintf(stderr, "ncrx: time regressed %"PRIu64"->%"PRIu64"\n",
			ncrx->now_mono, now_mono);

	ncrx->now_mono = now_mono;
	ncrx->resp_len = 0;

	/*
	 * If fully acked, keep last ack timestamp current so that new
	 * messages arriving doesn't trigger ack timeout immediately.
	 */
	if (ncrx->acked_seq == tail_seq(ncrx) - 1)
		ncrx->acked_at = now_mono;

	/* parse and queue @payload */
	if (payload)
		ret = ncrx_queue_payload(payload, ncrx, now_real);

	/* retire complete & timed-out msgs from tail */
	while (ncrx->tail != ncrx->head) {
		slot = &ncrx->slots[ncrx->tail];

		if ((!slot->msg || !list_empty(&slot->hole_node)) &&
		    slot->timestamp + ncrx->p.msg_timeout > now_mono)
			break;
		retire_tail(ncrx);
	}

	/* retire timed-out oos msgs */
	while ((msg = msg_list_peek(&ncrx->oos_list))) {
		if (msg->rx_at_mono + ncrx->p.oos_timeout > now_mono)
			break;
		msg->oos = 1;
		msg_list_del(msg, &ncrx->oos_list);
		msg_list_append(msg, &ncrx->retired_list);
	}

	/* if enabled, ack pending and timeout expired? */
	if (ncrx->p.ack_intv && ncrx->acked_seq != tail_seq(ncrx) - 1 &&
	    ncrx->acked_at + ncrx->p.ack_intv < now_mono)
		ncrx_build_resp(NULL, ncrx);

	/* head passed one or more re-transmission boundaries? */
	dist_retx = old_head_seq / ncrx->p.retx_stride !=
		ncrx->head_seq / ncrx->p.retx_stride;

	hole_list_for_each(slot, tmp_slot, &ncrx->hole_list) {
		int retx = 0;

		/*
		 * If so, request re-tx of holes further away than stride.
		 * This ensures that a missing seq is requested at least
		 * certain number of times regardless of incoming rate.
		 */
		if (dist_retx &&
		    slot_dist(slot - ncrx->slots, ncrx) > ncrx->p.retx_stride)
			retx = 1;

		/* request re-tx every retx_intv */
		if (now_mono - max(slot->timestamp, slot->retx_timestamp) >=
		    (unsigned)ncrx->p.retx_intv) {
			slot->retx_timestamp = now_mono;
			retx = 1;
		}

		if (retx)
			ncrx_build_resp(slot, ncrx);
	}

	return ret;
}