ncrx/nctx.c (362 lines of code) (raw):

/* * nctx - extended netconsole sender * * Copyright (C) 2016, Facebook, Inc. * All rights reserved. * * This source code is licensed under the license found in the LICENSE file in * the root directory of this source tree. */ #include <stdio.h> #include <string.h> #include <stdlib.h> #include <unistd.h> #include <sys/types.h> #include <sys/stat.h> #include <fcntl.h> #include <time.h> #include <poll.h> #include <ctype.h> #include <errno.h> #include <arpa/inet.h> #include <sys/socket.h> #include <netinet/in.h> #include <netinet/udp.h> #include "ncrx.h" /* in msecs */ #define ACK_TIMEOUT 10000 #define EMG_TX_MAX_INTV 1000 #define EMG_TX_MIN_INTV 100 union sockaddr_in46 { struct sockaddr addr; struct sockaddr_in6 in6; struct sockaddr_in in4; }; struct kmsg_slot { char *msg; uint64_t ts; }; struct kmsg_ring { int head; int tail; int nr_slots; uint64_t head_seq; union sockaddr_in46 raddr; int raddr_len; int emg_tx_intv; uint64_t emg_tx_seq; uint64_t emg_tx_ts; struct kmsg_slot *slots; }; /* relative time in msecs */ static uint64_t current_msec(void) { struct timespec ts; if (clock_gettime(CLOCK_MONOTONIC, &ts)) { perror("clock_gettime"); exit(1); } return ts.tv_sec * 1000 + ts.tv_nsec / 1000000; } static int kmsg_ring_init(struct kmsg_ring *ring, int nr_slots) { memset(ring, 0, sizeof(*ring)); ring->slots = malloc(sizeof(ring->slots[0]) * nr_slots); if (!ring->slots) return -1; ring->nr_slots = nr_slots; return 0; } /* advance @ring's head by one, if head catches up with tail, clip it */ static void kmsg_ring_advance(struct kmsg_ring *ring) { struct kmsg_slot *slot; ring->head_seq++; ring->head = (ring->head + 1) % ring->nr_slots; slot = &ring->slots[ring->head]; if (ring->tail == ring->head) { free(slot->msg); memset(slot, 0, sizeof(*slot)); ring->tail = (ring->tail + 1) % ring->nr_slots; } } /* fill @ring with kmsgs from @devkmsg, returns 0 on success, -1 on failure */ static int kmsg_ring_fill(struct kmsg_ring *ring, int devkmsg) { char buf[NCRX_LINE_MAX]; struct kmsg_slot *slot; int level; uint64_t seq; ssize_t len; next_line: do { len = read(devkmsg, buf, sizeof(buf) - 1); /* * EPIPE indicates skipped messages. kmsgs are always * stored according to their sequence numbers, so we don't * need to do anything special on EPIPE. Keep reading. */ } while (len < 0 && errno == EPIPE); if (len < 0) { if (errno == EAGAIN) return 0; return -1; } /* read seq and see if it makes sense */ buf[len] = '\0'; if (sscanf(buf, "%d,%"SCNu64",", &level, &seq) != 2 || seq < ring->head_seq) { fprintf(stderr, "Warning: malformed kmsg \"%s\"\n", buf); goto next_line; } /* wind ring till head is at the right slot and store */ while (ring->head_seq < seq) kmsg_ring_advance(ring); slot = &ring->slots[ring->head]; slot->msg = strdup(buf); if (!slot->msg) return -1; slot->ts = current_msec(); kmsg_ring_advance(ring); goto next_line; } /* sequence number of the oldest occupied slot in @ring */ static uint64_t kmsg_ring_tail_seq(struct kmsg_ring *ring) { int nr; nr = ring->head - ring->tail; if (nr < 0) nr += ring->nr_slots; return ring->head_seq - nr; } /* peek kmsg matching @seq, NULL if not found */ static char *kmsg_ring_peek(struct kmsg_ring *ring, uint64_t seq) { int idx; if (seq < kmsg_ring_tail_seq(ring) || seq >= ring->head_seq) return NULL; idx = ring->head - (int)(ring->head_seq - seq); if (idx < 0) idx += ring->nr_slots; return ring->slots[idx].msg; } /* free slots upto @upto_seq, tail_seq is @upto_seq + 1 afterwards */ static void kmsg_ring_consume(struct kmsg_ring *ring, uint64_t upto_seq) { uint64_t tail_seq = kmsg_ring_tail_seq(ring); int tail = ring->tail; if (!ring->head_seq || upto_seq < tail_seq) return; if (upto_seq >= ring->head_seq) upto_seq = ring->head_seq - 1; while (tail_seq <= upto_seq) { struct kmsg_slot *slot = &ring->slots[ring->head]; free(slot->msg); memset(slot, 0, sizeof(*slot)); tail_seq++; tail = (tail + 1) % ring->nr_slots; /* made progress, reset emergency tx */ ring->emg_tx_intv = 0; } ring->tail = tail; } /* * Send @msg to @addr via @sock. If @msg is too long, split into * NCRX_PKT_MAX byte chunks with ncfrag header added. If @is_emg_tx is * set, add ncemg header. */ static void send_kmsg(int sock, char *msg, int is_emg_tx, struct sockaddr *addr, int addr_len) { char buf[NCRX_PKT_MAX + 1]; const int max_extra_len = sizeof(",ncemg=1,ncfrag=0000/0000"); const char *header, *body; int msg_len = strlen(msg); int header_len = msg_len, body_len = 0; int chunk_len, nr_chunks, i; if (!is_emg_tx && msg_len <= NCRX_PKT_MAX) { sendto(sock, msg, msg_len, 0, addr, addr_len); return; } /* need to insert extra header fields, detect header and body */ header = msg; body = memchr(msg, ';', msg_len); if (body) { header_len = body - header; body_len = msg_len - header_len - 1; body++; } chunk_len = NCRX_PKT_MAX - header_len - max_extra_len; if (chunk_len <= 0) { fprintf(stderr, "Error: invalid chunk_len %d in send_kmsg()\n", chunk_len); return; } /* * Transfer possibly multiple chunks with extra header fields. * * For emergency transfers due to missing acks, add "emg=1". * * If @msg needs to be split to fit NCRX_PKT_MAX, add * "ncfrag=<byte-offset>/<total-bytes>" to identify each chunk. */ memcpy(buf, header, header_len); nr_chunks = (body_len + chunk_len - 1) / chunk_len; for (i = 0; i < nr_chunks; i++) { int offset = i * chunk_len; int this_header = header_len; int this_chunk; this_chunk = body_len - offset; if (this_chunk > chunk_len) this_chunk = chunk_len; if (is_emg_tx && this_header < sizeof(buf)) this_header += snprintf(buf + this_header, sizeof(buf) - this_header, ",ncemg=1"); if (nr_chunks > 1 && this_header < sizeof(buf)) this_header += snprintf(buf + this_header, sizeof(buf) - this_header, ",ncfrag=%d/%d", offset, body_len); if (this_header < sizeof(buf)) this_header += snprintf(buf + this_header, sizeof(buf) - this_header, ";"); if (this_header + chunk_len > NCRX_PKT_MAX) { fprintf(stderr, "Error: this_header %d is too large for chunk_len %d in send_kmsg()\n", this_header, chunk_len); return; } memcpy(buf + this_header, body, this_chunk); sendto(sock, buf, this_header + this_chunk, 0, addr, addr_len); body += this_chunk; } } /* rx and handle response packets from @sock, returns 0 on success, -1 on err */ static int kmsg_ring_process_resps(struct kmsg_ring *ring, int sock) { char rx_buf[NCRX_PKT_MAX + 1]; union sockaddr_in46 raddr; struct iovec iov = { .iov_base = rx_buf, .iov_len = NCRX_PKT_MAX }; struct msghdr msgh = { .msg_name = &raddr.addr, .msg_iov = &iov, .msg_iovlen = 1 }; ssize_t len; char *pos, *tok; uint64_t seq; next_packet: msgh.msg_namelen = sizeof(raddr); len = recvmsg(sock, &msgh, MSG_DONTWAIT); if (len < 0) { if (errno == EAGAIN) return 0; return -1; } rx_buf[len] = '\0'; pos = rx_buf; tok = strsep(&pos, " "); /* "ncrx" header */ if (strncmp(tok, "ncrx", 4)) { char addr_str[INET6_ADDRSTRLEN]; if (raddr.addr.sa_family == AF_INET6) inet_ntop(AF_INET6, &raddr.in6.sin6_addr, addr_str, sizeof(addr_str)); else inet_ntop(AF_INET, &raddr.in4.sin_addr, addr_str, sizeof(addr_str)); fprintf(stderr, "Warning: malformed packet from [%s]:%u\n", addr_str, ntohs(raddr.in4.sin_port)); goto next_packet; } tok += 4; /* <ack-seq> */ if (sscanf(tok, "%"SCNu64, &seq)) kmsg_ring_consume(ring, seq); /* <missing-seq>... */ while ((tok = strsep(&pos, " "))) { if (sscanf(tok, "%"SCNu64, &seq)) { char *msg = kmsg_ring_peek(ring, seq); if (msg) send_kmsg(sock, msg, 0, &raddr.addr, msgh.msg_namelen); } } /* stash remote address for emergency tx */ ring->raddr = raddr; ring->raddr_len = msgh.msg_namelen; goto next_packet; } /* * Perform emergency tx if necessary. Must be called after @ring is filled * and responses are processed. Returns the duration in msecs after which * this function should be invoked again. If -1, timeout isn't necessary. */ static int kmsg_ring_emg_tx(struct kmsg_ring *ring, int sock) { struct kmsg_slot *slot = &ring->slots[ring->tail]; uint64_t target, now; uint64_t tail_seq; char *msg; /* if @ring is empty or remote site is not established, nothing to do */ if (ring->head == ring->tail || !ring->raddr_len) { ring->emg_tx_intv = 0; return -1; } /* calculate the next deadline, if in the future, return the diff */ if (!ring->emg_tx_intv) target = slot->ts + ACK_TIMEOUT; else target = ring->emg_tx_ts + ring->emg_tx_intv; now = current_msec(); if (target > now) return target - now; tail_seq = kmsg_ring_tail_seq(ring); if (!ring->emg_tx_intv) { /* new emg tx session */ ring->emg_tx_intv = EMG_TX_MIN_INTV; ring->emg_tx_seq = tail_seq; } else if (ring->emg_tx_seq < ring->head_seq) { /* in the middle of emg tx session */ ring->emg_tx_seq++; if (ring->emg_tx_seq < tail_seq) ring->emg_tx_seq = tail_seq; } else { /* finished one session, increase intv and repeat */ ring->emg_tx_intv *= 2; if (ring->emg_tx_intv < EMG_TX_MAX_INTV) ring->emg_tx_intv = EMG_TX_MAX_INTV; ring->emg_tx_seq = tail_seq; } msg = kmsg_ring_peek(ring, ring->emg_tx_seq); if (msg) send_kmsg(sock, msg, 1, &ring->raddr.addr, ring->raddr_len); ring->emg_tx_ts = now; return ring->emg_tx_intv; } static void usage_err(const char *err) { if (err) fprintf(stderr, "Error: %s\n", err); fprintf(stderr, "Usage: nctx [-n nr_slots] [-k devkmsg_path] ip port\n"); exit(1); } int main(int argc, char **argv) { union sockaddr_in46 laddr = { }; struct pollfd pfds[2] = { }; struct kmsg_ring kmsg_ring; const char *devkmsg_path = "/dev/kmsg"; int nr_slots = NCRX_DFL_NR_SLOTS; int sleep_dur = -1; int opt, port, sock, devkmsg; socklen_t addrlen; while ((opt = getopt(argc, argv, "n:k:h?")) != -1) { switch (opt) { case 'n': nr_slots = atoi(optarg); if (nr_slots <= 0) usage_err("nr_slots must be a positive number"); break; case 'k': devkmsg_path = optarg; break; default: usage_err(NULL); } } if (optind + 2 != argc) usage_err(NULL); if (inet_pton(AF_INET6, argv[optind], &laddr.in6.sin6_addr)) { laddr.addr.sa_family = AF_INET6; addrlen = sizeof(laddr.in6); } else if (inet_pton(AF_INET, argv[optind], &laddr.in4.sin_addr)) { laddr.addr.sa_family = AF_INET; addrlen = sizeof(laddr.in4); } else { usage_err("invalid IP address"); } port = atoi(argv[optind + 1]); if (port <= 0 || port > 65535) usage_err("invalid port number"); laddr.in4.sin_port = htons(port); sock = socket(laddr.addr.sa_family, SOCK_DGRAM, 0); if (sock < 0) { perror("socket"); return 1; } if (bind(sock, &laddr.addr, addrlen)) { perror("bind"); return 1; } devkmsg = open(devkmsg_path, O_RDONLY | O_NONBLOCK); if (devkmsg < 0) { perror("open"); return 1; } if (kmsg_ring_init(&kmsg_ring, nr_slots)) { perror("kmsg_ring_init"); return 1; } pfds[0].events = POLLIN; pfds[1].events = POLLIN; pfds[0].fd = devkmsg; pfds[1].fd = sock; while (poll(pfds, 2, sleep_dur) >= 0) { if (kmsg_ring_fill(&kmsg_ring, devkmsg)) { perror("kmsg_ring_fill"); return 1; } if (kmsg_ring_process_resps(&kmsg_ring, sock)) { perror("kmsg_ring_process_resps"); return 1; } sleep_dur = kmsg_ring_emg_tx(&kmsg_ring, sock); } perror("poll"); return 1; }