ncrx/libncrx.c (570 lines of code) (raw):

/* * ncrx - extended netconsole receiver library * * Copyright (C) 2016, Facebook, Inc. * All rights reserved. * * This source code is licensed under the license found in the LICENSE file in * the root directory of this source tree. */ #include <stdio.h> #include <stdlib.h> #include <stdint.h> #include <string.h> #include <time.h> #include <errno.h> #include <assert.h> #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <netinet/udp.h> #include "ncrx.h" /* oos history is tracked with a uint32_t */ #define NCRX_OOS_MAX 32 struct ncrx_msg_list { struct ncrx_list head; int nr; /* number of msgs on the list */ }; struct ncrx_slot { struct ncrx_msg *msg; uint64_t timestamp; /* last rx on this slot */ uint64_t retx_timestamp; /* last retransmission */ struct ncrx_list hole_node; /* anchored @ ncrx->hole_list */ }; struct ncrx { struct ncrx_param p; uint64_t now_mono; /* latest time in msecs */ int head; /* next slot to use */ int tail; /* last slot in use */ uint64_t head_seq; /* next expected seq, unset=0 */ struct ncrx_slot *slots; /* msg slots */ struct ncrx_list hole_list; /* missing or !complete slots */ uint32_t oos_history; /* bit history of oos msgs */ struct ncrx_msg_list oos_list; /* buffered oos msgs */ struct ncrx_msg_list retired_list; /* msgs to be fetched by user */ uint64_t acked_seq; /* last seq acked, unset=max */ uint64_t acked_at; /* and when */ /* response buffer for ncrx_response() */ char resp_buf[NCRX_PKT_MAX + 1]; int resp_len; }; static const struct ncrx_param ncrx_dfl_param = { .nr_slots = NCRX_DFL_NR_SLOTS, .ack_intv = NCRX_DFL_ACK_INTV, .retx_intv = NCRX_DFL_RETX_INTV, .retx_stride = NCRX_DFL_RETX_STRIDE, .msg_timeout = NCRX_DFL_MSG_TIMEOUT, .oos_thr = NCRX_DFL_OOS_THR, .oos_intv = NCRX_DFL_OOS_INTV, .oos_timeout = NCRX_DFL_OOS_TIMEOUT, }; /* utilities mostly stolen from kernel */ #define min(x, y) ({ \ typeof(x) _min1 = (x); \ typeof(y) _min2 = (y); \ (void) (&_min1 == &_min2); \ _min1 < _min2 ? _min1 : _min2; }) #define max(x, y) ({ \ typeof(x) _max1 = (x); \ typeof(y) _max2 = (y); \ (void) (&_max1 == &_max2); \ _max1 > _max2 ? _max1 : _max2; }) #define offsetof(TYPE, MEMBER) ((size_t) &((TYPE *)0)->MEMBER) #define container_of(ptr, type, member) ({ \ const typeof( ((type *)0)->member ) *__mptr = (ptr); \ (type *)( (char *)__mptr - offsetof(type,member) );}) /* ncrx_msg from its ->node */ #define node_to_msg(ptr) container_of(ptr, struct ncrx_msg, node) /* iterate msg_list */ #define msg_list_for_each(pos, n, list) \ for (pos = node_to_msg((list)->head.next), \ n = node_to_msg(pos->node.next); \ &pos->node != &(list)->head; \ pos = n, n = node_to_msg(pos->node.next)) /* ncrx_slot from its ->hole_node */ #define hole_to_slot(ptr) \ container_of(ptr, struct ncrx_slot, hole_node) /* iterate hole_list */ #define hole_list_for_each(pos, n, list) \ for (pos = hole_to_slot((list)->next), \ n = hole_to_slot(pos->hole_node.next); \ &pos->hole_node != (list); \ pos = n, n = hole_to_slot(pos->hole_node.next)) static unsigned int hweight32(uint32_t w) { w -= (w >> 1) & 0x55555555; w = (w & 0x33333333) + ((w >> 2) & 0x33333333); w = (w + (w >> 4)) & 0x0f0f0f0f; return (w * 0x01010101) >> 24; } static void init_list(struct ncrx_list *head) { head->next = head; head->prev = head; } static int list_empty(struct ncrx_list *head) { return head->next == head; } static void list_del(struct ncrx_list *head) { struct ncrx_list *prev = head->prev; struct ncrx_list *next = head->next; prev->next = next; next->prev = prev; init_list(head); } static void list_append(struct ncrx_list *node, struct ncrx_list *list) { struct ncrx_list *prev = list->prev; assert(node->next == node && node->prev == node); node->next = list; node->prev = prev; prev->next = node; list->prev = node; } static void msg_list_del(struct ncrx_msg *msg, struct ncrx_msg_list *list) { list_del(&msg->node); list->nr--; if (!list->nr) assert(list->head.next == &list->head && list->head.prev == &list->head); } static void msg_list_append(struct ncrx_msg *msg, struct ncrx_msg_list *list) { list_append(&msg->node, &list->head); list->nr++; } static struct ncrx_msg *msg_list_peek(struct ncrx_msg_list *list) { if (list_empty(&list->head)) return NULL; return node_to_msg(list->head.next); } static struct ncrx_msg *msg_list_pop(struct ncrx_msg_list *list) { struct ncrx_msg *msg; msg = msg_list_peek(list); if (msg) msg_list_del(msg, list); return msg; } /* * Parse @payload into @msg. The data is not copied into @msg's buffer. * @msg->text and ->dict are updated to point into @payload instead. */ static int parse_packet(const char *payload, struct ncrx_msg *msg) { char buf[1024]; char *p, *tok; int idx; memset(msg, 0, sizeof(*msg)); p = strchr(payload, ';'); if (!p || p - payload >= (signed)sizeof(buf)) goto einval; memcpy(buf, payload, p - payload); buf[p - payload] = '\0'; msg->text = p + 1; msg->text_len = strlen(msg->text); if (msg->text_len > NCRX_LINE_MAX) msg->text_len = NCRX_LINE_MAX; /* <level>,<sequnum>,<timestamp>,<contflag>[,KEY=VAL]* */ idx = 0; p = buf; while ((tok = strsep(&p, ","))) { char *endp, *key, *val; unsigned long long v; switch (idx++) { case 0: v = strtoul(tok, &endp, 0); if (*endp != '\0' || v > UINT8_MAX) goto einval; msg->facility = v >> 3; msg->level = v & ((1 << 3) - 1); continue; case 1: v = strtoull(tok, &endp, 0); if (*endp != '\0') goto einval; msg->seq = v; continue; case 2: v = strtoull(tok, &endp, 0); if (*endp != '\0') goto einval; msg->ts_usec = v; continue; case 3: if (tok[0] == 'c') msg->cont_start = 1; else if (tok[0] == '+') msg->cont = 1; continue; } val = tok; key = strsep(&val, "="); if (!val) continue; if (!strcmp(key, "ncfrag")) { unsigned nf_off, nf_len; if (sscanf(val, "%u/%u", &nf_off, &nf_len) != 2) goto einval; if (!msg->text_len || nf_len >= NCRX_LINE_MAX || nf_off + msg->text_len > nf_len) goto einval; msg->ncfrag_off = nf_off; msg->ncfrag_len = msg->text_len; msg->ncfrag_left = nf_len - msg->ncfrag_len; msg->text_len = nf_len; } else if (!strcmp(key, "ncemg")) { v = strtoul(val, &endp, 0); if (*endp != '\0') goto einval; msg->emg = v; } } return 0; einval: errno = EINVAL; return -1; } /* how far @idx is behind @ncrx->head */ static int slot_dist(int idx, struct ncrx *ncrx) { int dist = ncrx->head - idx; return dist >= 0 ? dist : dist + ncrx->p.nr_slots; } /* number of occupied slots */ static int nr_queued(struct ncrx *ncrx) { return slot_dist(ncrx->tail, ncrx); } /* seq of the last queued message */ static uint64_t tail_seq(struct ncrx *ncrx) { return ncrx->head_seq - nr_queued(ncrx); } /* slot index of a message with sequence number @ncrx->head_seq + @delta */ static int seq_delta_idx(struct ncrx *ncrx, int delta) { int idx = ncrx->head + delta; if (idx < 0) return idx + ncrx->p.nr_slots; else if (idx >= ncrx->p.nr_slots) return idx - ncrx->p.nr_slots; else return idx; } /* is @slot completely empty? */ static int slot_is_free(struct ncrx_slot *slot) { return !slot->msg && list_empty(&slot->hole_node); } /* @slot may have just been completed, if so, remove it from hole_list */ static void slot_maybe_complete(struct ncrx_slot *slot) { struct ncrx_msg *msg = slot->msg; if (!msg || msg->ncfrag_left || list_empty(&slot->hole_node)) return; list_del(&slot->hole_node); } /* retire the last queued slot whether complete or not */ static void retire_tail(struct ncrx *ncrx) { int ntail = (ncrx->tail + 1) % ncrx->p.nr_slots; struct ncrx_slot *slot = &ncrx->slots[ncrx->tail]; struct ncrx_slot *nslot = &ncrx->slots[ntail]; if (slot->msg) { msg_list_append(slot->msg, &ncrx->retired_list); slot->msg = NULL; } list_del(&slot->hole_node); /* free slot is never a hole */ ncrx->tail = ntail; /* * Activities of past msgs are considered activities for newer ones * too. This prevents oos interval verdicts from flipping as * sequence progresses. */ nslot->timestamp = max(slot->timestamp, nslot->timestamp); } /* make room for message with seq ncrx->head_seq + @delta */ static void make_room(struct ncrx *ncrx, int delta) { int i; /* head_seq is for the next msg, need to advance for 0 @delta too */ for (i = 0; i <= delta; i++) { struct ncrx_slot *slot; int max_busy = ncrx->p.nr_slots - ncrx->p.retx_stride; /* a new slot is considered hole until it gets completed */ slot = &ncrx->slots[ncrx->head]; assert(slot_is_free(slot)); list_append(&slot->hole_node, &ncrx->hole_list); slot->timestamp = ncrx->now_mono; slot->retx_timestamp = 0; /* * Wind the ring buffer and push out if overflowed. Always * keep at least one stride empty so that retransmissions * of expired slots don't count as oos. */ ncrx->head_seq++; ncrx->head = (ncrx->head + 1) % ncrx->p.nr_slots; if (slot_dist(ncrx->tail, ncrx) > max_busy) retire_tail(ncrx); } } /* * Get slot for @tmsg. On success, returns pointer to the slot which may * be free or occupied with partial or complete message. Returns NULL with * errno set to ERANGE if oos, NULL / ENOENT if already retired. */ static struct ncrx_slot *get_seq_slot(struct ncrx_msg *tmsg, struct ncrx *ncrx) { struct ncrx_slot *slot; int64_t delta; int idx; /* new seq stream */ if (!ncrx->head_seq) { ncrx->head_seq = tmsg->seq; ncrx->acked_seq = UINT64_MAX; tmsg->seq_reset = 1; } delta = tmsg->seq - ncrx->head_seq; /* * Consider oos if outside reorder window or if the slot is * complete and the last activity on it was more than oos_intv ago. * Emergency messages are never considered oos as they don't follow * the usual transmission pattern and may repeat indefinitely. */ if (-delta > ncrx->p.nr_slots || delta > ncrx->p.nr_slots) { errno = ERANGE; return NULL; } idx = seq_delta_idx(ncrx, delta); slot = &ncrx->slots[idx]; if (-delta > nr_queued(ncrx)) { int is_free = slot_is_free(slot); if (!tmsg->emg && (!is_free || slot->timestamp + ncrx->p.oos_intv < ncrx->now_mono)) { errno = ERANGE; return NULL; } if (is_free) slot->timestamp = ncrx->now_mono; errno = ENOENT; return NULL; } make_room(ncrx, delta); slot->timestamp = ncrx->now_mono; return slot; } /* make @src's copy, if @src is a fragment, allocate full size as it may grow */ static struct ncrx_msg *copy_msg(struct ncrx_msg *src) { struct ncrx_msg *dst; assert(!src->dict && !src->dict_len); dst = malloc(sizeof(*dst) + src->text_len + 1); if (!dst) return NULL; *dst = *src; init_list(&dst->node); dst->text = dst->buf; if (src->ncfrag_len) { memset(dst->text, 0, src->text_len + 1); memcpy(dst->text + src->ncfrag_off, src->text, src->ncfrag_len); dst->ncfrag_off = 0; dst->ncfrag_len = 0; } else { memcpy(dst->text, src->text, src->text_len); dst->text[dst->text_len] = '\0'; } return dst; } /* * @tmsg is a newly parsed msg which is out-of-sequence. Queue it on * @ncrx->oos_list until the message times out, gets pushed out by other * oos messages or the sequence stream gets reset. */ static int queue_oos_msg(struct ncrx_msg *tmsg, struct ncrx *ncrx) { struct ncrx_slot *slot; struct ncrx_msg *msg, *nmsg, *first; msg = copy_msg(tmsg); if (!msg) return -1; msg_list_append(msg, &ncrx->oos_list); /* * Shifted left automatically on each new msg. Set oos and see if * there have been too many oos among the last 32 messages. */ ncrx->oos_history |= 1; if ((signed)hweight32(ncrx->oos_history) < ncrx->p.oos_thr) { /* nope, handle oos overflow and handle */ if (ncrx->oos_list.nr > NCRX_OOS_MAX) { msg = msg_list_pop(&ncrx->oos_list); if (msg) { msg->oos = 1; msg_list_append(msg, &ncrx->retired_list); } } return 0; } /* * The current sequence stream seems no good. Let's reset by * retiring all pending, picking the oos msg with the lowest seq, * queueing it to reset the seq and then queueing all other oos * msgs. If a msg is still oos after reset, just retire it. */ while (ncrx->tail != ncrx->head) retire_tail(ncrx); ncrx->head_seq = 0; ncrx->acked_seq = UINT64_MAX; first = node_to_msg(ncrx->oos_list.head.next); msg_list_for_each(msg, nmsg, &ncrx->oos_list) first = msg->seq < first->seq ? msg : first; msg_list_del(first, &ncrx->oos_list); slot = get_seq_slot(first, ncrx); slot->msg = first; slot_maybe_complete(slot); while ((msg = msg_list_pop(&ncrx->oos_list))) { slot = get_seq_slot(msg, ncrx); if (slot) { slot->msg = msg; slot_maybe_complete(slot); } else { msg->oos = 1; msg_list_append(msg, &ncrx->retired_list); } } return 0; } /* @payload has just been received, parse and queue it */ static int ncrx_queue_payload(const char *payload, struct ncrx *ncrx, uint64_t now_real) { struct ncrx_msg tmsg; struct ncrx_slot *slot; int new_msg = 0; if (parse_packet(payload, &tmsg)) return -1; tmsg.rx_at_mono = ncrx->now_mono; tmsg.rx_at_real = now_real; ncrx->oos_history <<= 1; /* ack immediately if logging source is doing emergency transmissions */ if (tmsg.emg) { ncrx->acked_seq = UINT64_MAX; ncrx->acked_at = 0; } /* get the matching slot and allocate a new message if empty */ slot = get_seq_slot(&tmsg, ncrx); if (slot && !slot->msg) { slot->msg = copy_msg(&tmsg); new_msg = 1; } if (!slot || !slot->msg) { if (errno == ENOENT) return 0; if (errno == ERANGE) return queue_oos_msg(&tmsg, ncrx); return -1; } if (!new_msg && slot->msg->ncfrag_left) { struct ncrx_msg *msg = slot->msg; int off = tmsg.ncfrag_off; int i; for (i = 0; i < tmsg.ncfrag_len; i++) { if (msg->text[off + i]) continue; msg->text[off + i] = tmsg.text[i]; msg->ncfrag_left--; } } slot_maybe_complete(slot); return 0; } /* * Build ncrx_response() output. Ack for the last retired msg is always * added. If @slot is non-NULL, re-transmission for it is also added. */ static void ncrx_build_resp(struct ncrx_slot *slot, struct ncrx *ncrx) { /* no msg received? */ if (!ncrx->head_seq) return; /* "ncrx<ack-seq>" */ if (!ncrx->resp_len) { ncrx->acked_seq = tail_seq(ncrx) - 1; ncrx->acked_at = ncrx->now_mono; ncrx->resp_len = snprintf(ncrx->resp_buf, NCRX_PKT_MAX, "ncrx%"PRIu64, ncrx->acked_seq); } /* " <missing-seq>..." truncated to NCRX_PKT_MAX */ if (slot) { int idx = slot - ncrx->slots; int len; len = snprintf(ncrx->resp_buf + ncrx->resp_len, NCRX_PKT_MAX - ncrx->resp_len, " %"PRIu64, ncrx->head_seq - slot_dist(idx, ncrx)); if (ncrx->resp_len + len <= NCRX_PKT_MAX) { ncrx->resp_len += len; ncrx->resp_buf[ncrx->resp_len] = '\0'; } } } int ncrx_process(const char *payload, uint64_t now_mono, uint64_t now_real, struct ncrx *ncrx) { struct ncrx_slot *slot, *tmp_slot; struct ncrx_msg *msg; uint64_t old_head_seq = ncrx->head_seq; int dist_retx, ret = 0; if (now_mono < ncrx->now_mono) fprintf(stderr, "ncrx: time regressed %"PRIu64"->%"PRIu64"\n", ncrx->now_mono, now_mono); ncrx->now_mono = now_mono; ncrx->resp_len = 0; /* * If fully acked, keep last ack timestamp current so that new * messages arriving doesn't trigger ack timeout immediately. */ if (ncrx->acked_seq == tail_seq(ncrx) - 1) ncrx->acked_at = now_mono; /* parse and queue @payload */ if (payload) ret = ncrx_queue_payload(payload, ncrx, now_real); /* retire complete & timed-out msgs from tail */ while (ncrx->tail != ncrx->head) { slot = &ncrx->slots[ncrx->tail]; if ((!slot->msg || !list_empty(&slot->hole_node)) && slot->timestamp + ncrx->p.msg_timeout > now_mono) break; retire_tail(ncrx); } /* retire timed-out oos msgs */ while ((msg = msg_list_peek(&ncrx->oos_list))) { if (msg->rx_at_mono + ncrx->p.oos_timeout > now_mono) break; msg->oos = 1; msg_list_del(msg, &ncrx->oos_list); msg_list_append(msg, &ncrx->retired_list); } /* if enabled, ack pending and timeout expired? */ if (ncrx->p.ack_intv && ncrx->acked_seq != tail_seq(ncrx) - 1 && ncrx->acked_at + ncrx->p.ack_intv < now_mono) ncrx_build_resp(NULL, ncrx); /* head passed one or more re-transmission boundaries? */ dist_retx = old_head_seq / ncrx->p.retx_stride != ncrx->head_seq / ncrx->p.retx_stride; hole_list_for_each(slot, tmp_slot, &ncrx->hole_list) { int retx = 0; /* * If so, request re-tx of holes further away than stride. * This ensures that a missing seq is requested at least * certain number of times regardless of incoming rate. */ if (dist_retx && slot_dist(slot - ncrx->slots, ncrx) > ncrx->p.retx_stride) retx = 1; /* request re-tx every retx_intv */ if (now_mono - max(slot->timestamp, slot->retx_timestamp) >= (unsigned)ncrx->p.retx_intv) { slot->retx_timestamp = now_mono; retx = 1; } if (retx) ncrx_build_resp(slot, ncrx); } return ret; } const char *ncrx_response(struct ncrx *ncrx, int *lenp) { if (lenp) *lenp = ncrx->resp_len; if (ncrx->resp_len) return ncrx->resp_buf; return NULL; } /* parse out the dictionary in a complete message, if it exists */ static void terminate_msg_and_dict(struct ncrx_msg *msg) { msg->dict = strchr(msg->text, '\n'); if (msg->dict) { int len = msg->text_len; msg->text_len = msg->dict - msg->text; msg->text[msg->text_len] = '\0'; msg->dict_len = len - msg->text_len - 1; msg->dict++; } } struct ncrx_msg *ncrx_next_msg(struct ncrx *ncrx) { struct ncrx_msg *msg = msg_list_pop(&ncrx->retired_list); if (msg) terminate_msg_and_dict(msg); return msg; } uint64_t ncrx_invoke_process_at(struct ncrx *ncrx) { uint64_t when = UINT64_MAX; struct ncrx_msg *msg; /* ack enabled and pending? */ if (ncrx->p.ack_intv && ncrx->head_seq && ncrx->acked_seq != tail_seq(ncrx) - 1) when = min(when, ncrx->acked_at + ncrx->p.ack_intv); /* * Holes to request for retransmission? msg_timeout is the same * condition but way longer. Checking on retx_intv is enough. */ if (!list_empty(&ncrx->hole_list)) when = min(when, ncrx->now_mono + ncrx->p.retx_intv); /* oos timeout */ if ((msg = msg_list_peek(&ncrx->oos_list))) when = min(when, msg->rx_at_mono + ncrx->p.oos_timeout); /* min 10ms intv to avoid busy loop in case something goes bonkers */ return max(when, ncrx->now_mono + 10); } struct ncrx *ncrx_create(const struct ncrx_param *param) { const struct ncrx_param *dfl = &ncrx_dfl_param; struct ncrx_param *p; struct ncrx *ncrx; int i; ncrx = calloc(1, sizeof(*ncrx)); if (!ncrx) return NULL; p = &ncrx->p; if (param) { p->nr_slots = param->nr_slots ?: dfl->nr_slots; p->ack_intv = param->ack_intv ?: dfl->ack_intv; p->retx_intv = param->retx_intv ?: dfl->retx_intv; p->retx_stride = param->retx_stride ?: dfl->retx_stride; p->msg_timeout = param->msg_timeout ?: dfl->msg_timeout; p->oos_thr = param->oos_thr ?: dfl->oos_thr; p->oos_intv = param->oos_intv ?: dfl->oos_intv; p->oos_timeout = param->oos_timeout ?: dfl->oos_timeout; } else { *p = *dfl; } ncrx->acked_seq = UINT64_MAX; init_list(&ncrx->hole_list); init_list(&ncrx->oos_list.head); init_list(&ncrx->retired_list.head); ncrx->slots = calloc(ncrx->p.nr_slots, sizeof(ncrx->slots[0])); if (!ncrx->slots) return NULL; for (i = 0; i < ncrx->p.nr_slots; i++) init_list(&ncrx->slots[i].hole_node); return ncrx; } void ncrx_destroy(struct ncrx *ncrx) { struct ncrx_msg *msg; int i; for (i = 0; i < ncrx->p.nr_slots; i++) free(ncrx->slots[i].msg); while ((msg = msg_list_pop(&ncrx->oos_list))) free(msg); while ((msg = msg_list_pop(&ncrx->retired_list))) free(msg); free(ncrx->slots); free(ncrx); }