in ncrx/nctx.c [349:399]
static int kmsg_ring_emg_tx(struct kmsg_ring *ring, int sock)
{
struct kmsg_slot *slot = &ring->slots[ring->tail];
uint64_t target, now;
uint64_t tail_seq;
char *msg;
/* if @ring is empty or remote site is not established, nothing to do */
if (ring->head == ring->tail || !ring->raddr_len) {
ring->emg_tx_intv = 0;
return -1;
}
/* calculate the next deadline, if in the future, return the diff */
if (!ring->emg_tx_intv)
target = slot->ts + ACK_TIMEOUT;
else
target = ring->emg_tx_ts + ring->emg_tx_intv;
now = current_msec();
if (target > now)
return target - now;
tail_seq = kmsg_ring_tail_seq(ring);
if (!ring->emg_tx_intv) {
/* new emg tx session */
ring->emg_tx_intv = EMG_TX_MIN_INTV;
ring->emg_tx_seq = tail_seq;
} else if (ring->emg_tx_seq < ring->head_seq) {
/* in the middle of emg tx session */
ring->emg_tx_seq++;
if (ring->emg_tx_seq < tail_seq)
ring->emg_tx_seq = tail_seq;
} else {
/* finished one session, increase intv and repeat */
ring->emg_tx_intv *= 2;
if (ring->emg_tx_intv < EMG_TX_MAX_INTV)
ring->emg_tx_intv = EMG_TX_MAX_INTV;
ring->emg_tx_seq = tail_seq;
}
msg = kmsg_ring_peek(ring, ring->emg_tx_seq);
if (msg)
send_kmsg(sock, msg, 1, &ring->raddr.addr, ring->raddr_len);
ring->emg_tx_ts = now;
return ring->emg_tx_intv;
}