static int kmsg_ring_process_resps()

in ncrx/nctx.c [282:342]


static int kmsg_ring_process_resps(struct kmsg_ring *ring, int sock)
{
	char rx_buf[NCRX_PKT_MAX + 1];
	union sockaddr_in46 raddr;
	struct iovec iov = { .iov_base = rx_buf, .iov_len = NCRX_PKT_MAX };
	struct msghdr msgh = { .msg_name = &raddr.addr, .msg_iov = &iov,
			       .msg_iovlen = 1 };
	ssize_t len;
	char *pos, *tok;
	uint64_t seq;

next_packet:
	msgh.msg_namelen = sizeof(raddr);
	len = recvmsg(sock, &msgh, MSG_DONTWAIT);
	if (len < 0) {
		if (errno == EAGAIN)
			return 0;
		return -1;
	}

	rx_buf[len] = '\0';
	pos = rx_buf;
	tok = strsep(&pos, " ");

	/* "ncrx" header */
	if (strncmp(tok, "ncrx", 4)) {
		char addr_str[INET6_ADDRSTRLEN];

		if (raddr.addr.sa_family == AF_INET6)
			inet_ntop(AF_INET6, &raddr.in6.sin6_addr,
				  addr_str, sizeof(addr_str));
		else
			inet_ntop(AF_INET, &raddr.in4.sin_addr,
				  addr_str, sizeof(addr_str));

		fprintf(stderr, "Warning: malformed packet from [%s]:%u\n",
			addr_str, ntohs(raddr.in4.sin_port));
		goto next_packet;
	}
	tok += 4;

	/* <ack-seq> */
	if (sscanf(tok, "%"SCNu64, &seq))
		kmsg_ring_consume(ring, seq);

	/* <missing-seq>... */
	while ((tok = strsep(&pos, " "))) {
		if (sscanf(tok, "%"SCNu64, &seq)) {
			char *msg = kmsg_ring_peek(ring, seq);
			if (msg)
				send_kmsg(sock, msg, 0,
					  &raddr.addr, msgh.msg_namelen);
		}
	}

	/* stash remote address for emergency tx */
	ring->raddr = raddr;
	ring->raddr_len = msgh.msg_namelen;

	goto next_packet;
}