static int kmsg_ring_emg_tx()

in ncrx/nctx.c [349:399]


static int kmsg_ring_emg_tx(struct kmsg_ring *ring, int sock)
{
	struct kmsg_slot *slot = &ring->slots[ring->tail];
	uint64_t target, now;
	uint64_t tail_seq;
	char *msg;

	/* if @ring is empty or remote site is not established, nothing to do */
	if (ring->head == ring->tail || !ring->raddr_len) {
		ring->emg_tx_intv = 0;
		return -1;
	}

	/* calculate the next deadline, if in the future, return the diff */
	if (!ring->emg_tx_intv)
		target = slot->ts + ACK_TIMEOUT;
	else
		target = ring->emg_tx_ts + ring->emg_tx_intv;

	now = current_msec();

	if (target > now)
		return target - now;

	tail_seq = kmsg_ring_tail_seq(ring);

	if (!ring->emg_tx_intv) {
		/* new emg tx session */
		ring->emg_tx_intv = EMG_TX_MIN_INTV;
		ring->emg_tx_seq = tail_seq;
	} else if (ring->emg_tx_seq < ring->head_seq) {
		/* in the middle of emg tx session */
		ring->emg_tx_seq++;
		if (ring->emg_tx_seq < tail_seq)
			ring->emg_tx_seq = tail_seq;
	} else {
		/* finished one session, increase intv and repeat */
		ring->emg_tx_intv *= 2;
		if (ring->emg_tx_intv < EMG_TX_MAX_INTV)
			ring->emg_tx_intv = EMG_TX_MAX_INTV;
		ring->emg_tx_seq = tail_seq;
	}

	msg = kmsg_ring_peek(ring, ring->emg_tx_seq);
	if (msg)
		send_kmsg(sock, msg, 1, &ring->raddr.addr, ring->raddr_len);

	ring->emg_tx_ts = now;

	return ring->emg_tx_intv;
}