in ncrx/nctx.c [282:342]
static int kmsg_ring_process_resps(struct kmsg_ring *ring, int sock)
{
char rx_buf[NCRX_PKT_MAX + 1];
union sockaddr_in46 raddr;
struct iovec iov = { .iov_base = rx_buf, .iov_len = NCRX_PKT_MAX };
struct msghdr msgh = { .msg_name = &raddr.addr, .msg_iov = &iov,
.msg_iovlen = 1 };
ssize_t len;
char *pos, *tok;
uint64_t seq;
next_packet:
msgh.msg_namelen = sizeof(raddr);
len = recvmsg(sock, &msgh, MSG_DONTWAIT);
if (len < 0) {
if (errno == EAGAIN)
return 0;
return -1;
}
rx_buf[len] = '\0';
pos = rx_buf;
tok = strsep(&pos, " ");
/* "ncrx" header */
if (strncmp(tok, "ncrx", 4)) {
char addr_str[INET6_ADDRSTRLEN];
if (raddr.addr.sa_family == AF_INET6)
inet_ntop(AF_INET6, &raddr.in6.sin6_addr,
addr_str, sizeof(addr_str));
else
inet_ntop(AF_INET, &raddr.in4.sin_addr,
addr_str, sizeof(addr_str));
fprintf(stderr, "Warning: malformed packet from [%s]:%u\n",
addr_str, ntohs(raddr.in4.sin_port));
goto next_packet;
}
tok += 4;
/* <ack-seq> */
if (sscanf(tok, "%"SCNu64, &seq))
kmsg_ring_consume(ring, seq);
/* <missing-seq>... */
while ((tok = strsep(&pos, " "))) {
if (sscanf(tok, "%"SCNu64, &seq)) {
char *msg = kmsg_ring_peek(ring, seq);
if (msg)
send_kmsg(sock, msg, 0,
&raddr.addr, msgh.msg_namelen);
}
}
/* stash remote address for emergency tx */
ring->raddr = raddr;
ring->raddr_len = msgh.msg_namelen;
goto next_packet;
}