bpf_queue.c (572 lines of code) (raw):

// SPDX-License-Identifier: Apache-2.0 /* Copyright (c) 2024 Elastic NV */ #include <sys/epoll.h> #include <sys/param.h> #include <sys/sysinfo.h> #include <ctype.h> #include <err.h> #include <errno.h> #include <fcntl.h> #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <bpf/btf.h> #include "quark.h" #include "bpf_probes_skel.h" #include "elastic-ebpf/GPL/Events/EbpfEventProto.h" struct bpf_queue { struct bpf_probes *probes; struct ring_buffer *ringbuf; }; static int bpf_queue_populate(struct quark_queue *); static int bpf_queue_update_stats(struct quark_queue *); static void bpf_queue_close(struct quark_queue *); struct quark_queue_ops queue_ops_bpf = { .open = bpf_queue_open, .populate = bpf_queue_populate, .update_stats = bpf_queue_update_stats, .close = bpf_queue_close, }; /* * Map libbpf logs into quark_verbose. * fmt has a newline, we have to prepend program_invocatin_short_name, so we * can't use vwarn, as it prepends itself and adds a newline. */ static int libbpf_print_fn(enum libbpf_print_level level, const char *fmt, va_list ap) { int pri; char *nfmt; if (level == LIBBPF_WARN || level == LIBBPF_INFO) pri = QUARK_VL_WARN; else if (level == LIBBPF_DEBUG) pri = QUARK_VL_DEBUG; else pri = QUARK_VL_WARN; /* fallback in case they add something new */ if (pri > quark_verbose) return (0); /* best effort in out of mem situations */ if (asprintf(&nfmt, "%s: %s", program_invocation_short_name, fmt) == -1) vfprintf(stderr, fmt, ap); else { vfprintf(stderr, nfmt, ap); free(nfmt); } return (0); } struct ebpf_ctx { struct ebpf_pid_info *pids; struct ebpf_cred_info *creds; struct ebpf_tty_dev *ctty; char *comm; struct ebpf_namespace_info *ns; char *cwd; }; static void ebpf_ctx_to_task(struct ebpf_ctx *ebpf_ctx, struct raw_task *task) { task->cap_inheritable = 0; /* unavailable */ task->cap_permitted = ebpf_ctx->creds->cap_permitted; task->cap_effective = ebpf_ctx->creds->cap_effective; task->cap_bset = 0; /* unavailable */ task->cap_ambient = 0; /* unavailable */ task->start_boottime = ebpf_ctx->pids->start_time_ns; /* XXX check format */ task->uid = ebpf_ctx->creds->ruid; task->gid = ebpf_ctx->creds->rgid; task->suid = ebpf_ctx->creds->suid; task->sgid = ebpf_ctx->creds->sgid; task->euid = ebpf_ctx->creds->euid; task->egid = ebpf_ctx->creds->egid; task->pgid = ebpf_ctx->pids->pgid; task->sid = ebpf_ctx->pids->sid; task->ppid = ebpf_ctx->pids->ppid; /* skip exit_* */ task->tty_major = ebpf_ctx->ctty->major; task->tty_minor = ebpf_ctx->ctty->minor; task->uts_inonum = ebpf_ctx->ns->uts_inonum; task->ipc_inonum = ebpf_ctx->ns->ipc_inonum; task->mnt_inonum = ebpf_ctx->ns->mnt_inonum; task->net_inonum = ebpf_ctx->ns->net_inonum; if (ebpf_ctx->cwd != NULL) task->cwd = strdup(ebpf_ctx->cwd); strlcpy(task->comm, ebpf_ctx->comm, sizeof(task->comm)); } static struct raw_event * ebpf_events_to_raw(struct ebpf_event_header *ev) { struct raw_event *raw; struct ebpf_varlen_field *field; struct ebpf_ctx ebpf_ctx; bzero(&ebpf_ctx, sizeof(ebpf_ctx)); raw = NULL; switch (ev->type) { case EBPF_EVENT_PROCESS_FORK: { struct ebpf_process_fork_event *fork; fork = (struct ebpf_process_fork_event *)ev; if ((raw = raw_event_alloc(RAW_WAKE_UP_NEW_TASK)) == NULL) goto bad; raw->pid = fork->child_pids.tgid; raw->time = ev->ts; ebpf_ctx.pids = &fork->child_pids; ebpf_ctx.creds = &fork->creds; ebpf_ctx.ctty = &fork->ctty; ebpf_ctx.comm = fork->comm; ebpf_ctx.ns = &fork->ns; ebpf_ctx.cwd = NULL; /* the macro doesn't take a pointer so we can't pass down :) */ FOR_EACH_VARLEN_FIELD(fork->vl_fields, field) { switch (field->type) { case EBPF_VL_FIELD_CWD: ebpf_ctx.cwd = field->data; break; default: break; } } ebpf_ctx_to_task(&ebpf_ctx, &raw->task); break; } case EBPF_EVENT_PROCESS_EXIT: { struct ebpf_process_exit_event *exit; exit = (struct ebpf_process_exit_event *)ev; if ((raw = raw_event_alloc(RAW_EXIT_THREAD)) == NULL) goto bad; raw->pid = exit->pids.tgid; raw->time = ev->ts; ebpf_ctx.pids = &exit->pids; ebpf_ctx.creds = &exit->creds; ebpf_ctx.ctty = &exit->ctty; ebpf_ctx.comm = exit->comm; ebpf_ctx.ns = &exit->ns; ebpf_ctx.cwd = NULL; raw->task.exit_code = exit->exit_code; raw->task.exit_time_event = raw->time; ebpf_ctx_to_task(&ebpf_ctx, &raw->task); break; } case EBPF_EVENT_PROCESS_EXEC: { struct ebpf_process_exec_event *exec; exec = (struct ebpf_process_exec_event *)ev; if ((raw = raw_event_alloc(RAW_EXEC)) == NULL) goto bad; raw->pid = exec->pids.tgid; raw->time = ev->ts; raw->exec.flags |= RAW_EXEC_F_EXT; ebpf_ctx.pids = &exec->pids; ebpf_ctx.creds = &exec->creds; ebpf_ctx.ctty = &exec->ctty; ebpf_ctx.comm = exec->comm; ebpf_ctx.ns = &exec->ns; ebpf_ctx.cwd = NULL; FOR_EACH_VARLEN_FIELD(exec->vl_fields, field) { switch (field->type) { case EBPF_VL_FIELD_CWD: ebpf_ctx.cwd = field->data; break; case EBPF_VL_FIELD_FILENAME: raw->exec.filename = strdup(field->data); /* filename might still be NULL */ break; case EBPF_VL_FIELD_ARGV: raw->exec.ext.args_len = field->size; if (raw->exec.ext.args_len == 0) { raw->exec.ext.args = NULL; break; } raw->exec.ext.args = malloc(field->size); if (raw->exec.ext.args == NULL) break; memcpy(raw->exec.ext.args, field->data, field->size); raw->exec.ext.args[field->size - 1] = 0; break; default: break; } } ebpf_ctx_to_task(&ebpf_ctx, &raw->exec.ext.task); break; } case EBPF_EVENT_NETWORK_CONNECTION_ACCEPTED: case EBPF_EVENT_NETWORK_CONNECTION_ATTEMPTED: case EBPF_EVENT_NETWORK_CONNECTION_CLOSED: { struct ebpf_net_event *net; struct raw_sock_conn *conn; net = (struct ebpf_net_event *)ev; if ((raw = raw_event_alloc(RAW_SOCK_CONN)) == NULL) goto bad; raw->pid = net->pids.tgid; raw->time = ev->ts; conn = &raw->sock_conn; if (net->net.family == EBPF_NETWORK_EVENT_AF_INET) { conn->local.af = AF_INET; memcpy(&conn->local.addr4, net->net.saddr, 4); conn->remote.af = AF_INET; memcpy(&conn->remote.addr4, net->net.daddr, 4); } else if (net->net.family == EBPF_NETWORK_EVENT_AF_INET6) { conn->local.af = AF_INET6; memcpy(conn->local.addr6, net->net.saddr6, 16); conn->remote.af = AF_INET6; memcpy(conn->remote.addr6, net->net.daddr6, 16); } else goto bad; conn->local.port = htons(net->net.sport); conn->remote.port = htons(net->net.dport); switch (ev->type) { case EBPF_EVENT_NETWORK_CONNECTION_ACCEPTED: raw->sock_conn.conn = SOCK_CONN_ACCEPT; break; case EBPF_EVENT_NETWORK_CONNECTION_ATTEMPTED: raw->sock_conn.conn = SOCK_CONN_CONNECT; break; case EBPF_EVENT_NETWORK_CONNECTION_CLOSED: raw->sock_conn.conn = SOCK_CONN_CLOSE; break; default: goto bad; } break; } case EBPF_EVENT_NETWORK_DNS_PKT: { struct ebpf_dns_event *dns; size_t cap_len; struct quark_packet *packet; dns = (struct ebpf_dns_event *)ev; if ((raw = raw_event_alloc(RAW_PACKET)) == NULL) goto bad; raw->pid = dns->tgid; raw->time = ev->ts; cap_len = MIN(dns->cap_len, QUARK_MAX_PACKET); raw->packet.quark_packet = calloc(1, sizeof(*raw->packet.quark_packet) + cap_len); if (raw->packet.quark_packet == NULL) goto bad; packet = raw->packet.quark_packet; switch (dns->direction) { case EBPF_NETWORK_DIR_EGRESS: packet->direction = QUARK_PACKET_DIR_EGRESS; break; case EBPF_NETWORK_DIR_INGRESS: packet->direction = QUARK_PACKET_DIR_INGRESS; break; default: goto bad; } packet->origin = QUARK_PACKET_ORIGIN_DNS; packet->orig_len = dns->orig_len; packet->cap_len = cap_len; FOR_EACH_VARLEN_FIELD(dns->vl_fields, field) { switch (field->type) { case EBPF_VL_FIELD_DNS_BODY: memcpy(packet->data, field->data, cap_len); break; default: qwarnx("unhandled field type %d", field->type); goto bad; } } break; } default: qwarnx("unhandled type %lu", ev->type); goto bad; } return (raw); bad: if (raw != NULL) raw_event_free(raw); return (NULL); } static int bpf_ringbuf_cb(void *vqq, void *vdata, size_t len) { struct quark_queue *qq = vqq; struct ebpf_event_header *ev = vdata; struct quark_event *qev; struct raw_event *raw; if (qq->flags & QQ_BYPASS) { qev = &qq->event_storage; qev->bypass = ev; qev->events = QUARK_EV_BYPASS; } else { raw = ebpf_events_to_raw(ev); if (raw != NULL && raw_event_insert(qq, raw) == -1) raw_event_free(raw); } return (0); } static int relo_ret(struct btf *btf, int *loc, const char *func) { *loc = btf_number_of_params(btf, func); if (*loc == -1) qwarnx("can't relocate return for %s", func); return (*loc); } static int relo_param(struct btf *btf, int *loc, const char *func, const char *param) { *loc = btf_index_of_param(btf, func, param); if (*loc == -1) warnx("can't relocate parameter %s on function %s", param, func); return (*loc); } static int relo_member(struct btf *btf, int *loc, const char *struct_name, const char *member) { char dotname[512]; *loc = -1; if (snprintf(dotname, sizeof(dotname), "%s.%s", struct_name, member) >= (int)sizeof(dotname)) { qwarnx("buffer too small"); return (-1); } *loc = btf_root_offset(btf, dotname, 0); return (*loc); } static int bpf_queue_open1(struct quark_queue *qq, int use_fentry) { struct bpf_queue *bqq; struct bpf_probes *p; struct ring_buffer_opts ringbuf_opts; int cgroup_fd, i, off; struct bpf_prog_skeleton *ps; struct btf *btf; libbpf_set_print(libbpf_print_fn); if ((bqq = calloc(1, sizeof(*bqq))) == NULL) return (-1); qq->queue_be = bqq; cgroup_fd = -1; btf = NULL; bqq->probes = bpf_probes__open(); if (bqq->probes == NULL) { qwarn("bpf_probes__open"); goto fail; } p = bqq->probes; /* * BTF used for relocations */ btf = btf__load_vmlinux_btf(); if (btf == NULL) { qwarn("btf__load_vmlinux_btf"); goto fail; } /* * Unload everything since it has way more than we want */ for (i = 0; i < p->skeleton->prog_cnt; i++) { ps = &p->skeleton->progs[i]; bpf_program__set_autoload(*ps->prog, 0); } /* * Load just the bits we want */ bpf_program__set_autoload(p->progs.sched_process_fork, 1); bpf_program__set_autoload(p->progs.sched_process_exec, 1); bpf_program__set_autoload(p->progs.sched_process_exit, 1); if (use_fentry) bpf_program__set_autoload(p->progs.fentry__taskstats_exit, 1); else bpf_program__set_autoload(p->progs.kprobe__taskstats_exit, 1); /* Used in process probes, so always on */ if (relo_member(btf, &off, "iov_iter", "__iov") != -1) p->rodata->off__iov_iter____iov__ = off; if (qq->flags & QQ_SOCK_CONN) { if (use_fentry) { bpf_program__set_autoload(p->progs.fexit__inet_csk_accept, 1); bpf_program__set_autoload(p->progs.fexit__tcp_v4_connect, 1); bpf_program__set_autoload(p->progs.fexit__tcp_v6_connect, 1); bpf_program__set_autoload(p->progs.fentry__tcp_close, 1); } else { bpf_program__set_autoload(p->progs.kretprobe__inet_csk_accept, 1); bpf_program__set_autoload(p->progs.kprobe__tcp_v4_connect, 1); bpf_program__set_autoload(p->progs.kretprobe__tcp_v4_connect, 1); bpf_program__set_autoload(p->progs.kprobe__tcp_v6_connect, 1); bpf_program__set_autoload(p->progs.kretprobe__tcp_v6_connect, 1); bpf_program__set_autoload(p->progs.kprobe__tcp_close, 1); } if (relo_ret(btf, &p->rodata->ret__inet_csk_accept__, "inet_csk_accept") == -1) goto fail; } if (qq->flags & QQ_FILE) { if (use_fentry) { bpf_program__set_autoload(p->progs.fentry__do_renameat2, 1); bpf_program__set_autoload(p->progs.fentry__do_unlinkat, 1); bpf_program__set_autoload(p->progs.fentry__mnt_want_write, 1); bpf_program__set_autoload(p->progs.fentry__vfs_rename, 1); bpf_program__set_autoload(p->progs.fentry__vfs_unlink, 1); bpf_program__set_autoload(p->progs.fexit__chmod_common, 1); bpf_program__set_autoload(p->progs.fexit__chown_common, 1); bpf_program__set_autoload(p->progs.fexit__do_filp_open, 1); bpf_program__set_autoload(p->progs.fexit__do_truncate, 1); bpf_program__set_autoload(p->progs.fexit__vfs_rename, 1); bpf_program__set_autoload(p->progs.fexit__vfs_unlink, 1); bpf_program__set_autoload(p->progs.fexit__vfs_write, 1); bpf_program__set_autoload(p->progs.fexit__vfs_writev, 1); } else { bpf_program__set_autoload(p->progs.kprobe__chmod_common, 1); bpf_program__set_autoload(p->progs.kretprobe__chmod_common, 1); bpf_program__set_autoload(p->progs.kprobe__chown_common, 1); bpf_program__set_autoload(p->progs.kretprobe__chown_common, 1); bpf_program__set_autoload(p->progs.kprobe__do_truncate, 1); bpf_program__set_autoload(p->progs.kretprobe__do_truncate, 1); bpf_program__set_autoload(p->progs.kprobe__vfs_writev, 1); bpf_program__set_autoload(p->progs.kretprobe__vfs_writev, 1); bpf_program__set_autoload(p->progs.kprobe__vfs_rename, 1); bpf_program__set_autoload(p->progs.kretprobe__vfs_rename, 1); bpf_program__set_autoload(p->progs.kprobe__vfs_unlink, 1); bpf_program__set_autoload(p->progs.kretprobe__vfs_unlink, 1); bpf_program__set_autoload(p->progs.kprobe__vfs_write, 1); bpf_program__set_autoload(p->progs.kretprobe__vfs_write, 1); bpf_program__set_autoload(p->progs.kprobe__do_renameat2, 1); bpf_program__set_autoload(p->progs.kprobe__do_unlinkat, 1); bpf_program__set_autoload(p->progs.kprobe__mnt_want_write, 1); bpf_program__set_autoload(p->progs.kretprobe__do_filp_open, 1); } /* vfs_unlink() */ if (relo_ret(btf, &p->rodata->ret__vfs_unlink__, "vfs_unlink") == -1) goto fail; if (relo_param(btf, &p->rodata->arg__vfs_unlink__dentry__, "vfs_unlink", "dentry") == -1) goto fail; /* vfs_rename() */ if (relo_ret(btf, &p->rodata->ret__vfs_rename__, "vfs_rename") == -1) goto fail; if (btf_index_of_param(btf, "vfs_rename", "rd") != -1) { p->rodata->exists__vfs_rename__rd__ = 1; } else { if (relo_param(btf, &p->rodata->arg__vfs_rename__old_dentry__, "vfs_rename", "old_dentry") == -1) goto fail; if (relo_param(btf, &p->rodata->arg__vfs_rename__new_dentry__, "vfs_rename", "new_dentry") == -1) goto fail; } /* do_truncate() */ if (relo_ret(btf, &p->rodata->ret__do_truncate__, "do_truncate") == -1) goto fail; if (relo_param(btf, &p->rodata->arg__do_truncate__filp__, "do_truncate", "filp") == -1) goto fail; /* * XXX We can likely get rid of all of those by using * bpf_core_field_exists inside the probe */ if (relo_member(btf, &off, "inode", "__i_atime") != -1) p->rodata->off__inode____i_atime__ = off; if (relo_member(btf, &off, "inode", "__i_mtime") != -1) p->rodata->off__inode____i_mtime__ = off; if (relo_member(btf, &off, "inode", "__i_ctime") != -1) p->rodata->off__inode____i_ctime__ = off; } if (qq->flags & QQ_DNS) { cgroup_fd = open("/sys/fs/cgroup", O_RDONLY); if (cgroup_fd == -1) { qwarn("open cgroup"); goto fail; } bpf_program__set_autoload(p->progs.skb_egress, 1); bpf_program__set_autoload(p->progs.skb_ingress, 1); bpf_program__set_autoload(p->progs.sock_create, 1); bpf_program__set_autoload(p->progs.sock_release, 1); bpf_program__set_autoload(p->progs.sendmsg4, 1); bpf_program__set_autoload(p->progs.connect4, 1); bpf_program__set_autoload(p->progs.recvmsg4, 1); } if (bpf_map__set_max_entries(p->maps.event_buffer_map, get_nprocs_conf()) != 0) { qwarn("bpf_map__set_max_entries"); goto fail; } if (bpf_probes__load(p) != 0) { qwarn("bpf_probes__load"); goto fail; } if (cgroup_fd != -1) { for (i = 0; i < p->skeleton->prog_cnt; i++) { ps = &p->skeleton->progs[i]; switch (bpf_program__get_type(*ps->prog)) { case BPF_PROG_TYPE_CGROUP_DEVICE: /* FALLTHROUGH */ case BPF_PROG_TYPE_CGROUP_SKB: /* FALLTHROUGH */ case BPF_PROG_TYPE_CGROUP_SOCK: /* FALLTHROUGH */ case BPF_PROG_TYPE_CGROUP_SOCKOPT: /* FALLTHROUGH */ case BPF_PROG_TYPE_CGROUP_SOCK_ADDR: /* FALLTHROUGH */ case BPF_PROG_TYPE_CGROUP_SYSCTL: break; default: continue; } *ps->link = bpf_program__attach_cgroup(*ps->prog, cgroup_fd); if (*ps->link == NULL) { qwarn("bpf_program__attach_cgroup %s", ps->name); goto fail; } } close(cgroup_fd); cgroup_fd = -1; } if (bpf_probes__attach(p) != 0) { qwarn("bpf_probes__attach"); goto fail; } ringbuf_opts.sz = sizeof(ringbuf_opts); bqq->ringbuf = ring_buffer__new(bpf_map__fd(p->maps.ringbuf), bpf_ringbuf_cb, qq, &ringbuf_opts); if (bqq->ringbuf == NULL) { qwarn("ring_buffer__new"); goto fail; } qq->epollfd = ring_buffer__epoll_fd(bqq->ringbuf); if (qq->epollfd < 0) goto fail; qq->queue_ops = &queue_ops_bpf; qq->stats.backend = QQ_EBPF; btf__free(btf); return (0); fail: if (cgroup_fd != -1) { close(cgroup_fd); cgroup_fd = -1; } if (btf != NULL) btf__free(btf); bpf_queue_close(qq); return (-1); } int bpf_queue_open(struct quark_queue *qq) { if ((qq->flags & QQ_EBPF) == 0) return (errno = ENOTSUP, -1); if (bpf_queue_open1(qq, 1) == -1) { qwarn("bpf_queue_open failed with fentry, trying kprobe"); return bpf_queue_open1(qq, 0); } return (0); } static int bpf_queue_populate(struct quark_queue *qq) { struct bpf_queue *bqq = qq->queue_be; int npop, space_left; space_left = qq->flags & QQ_BYPASS ? 1 : qq->length >= qq->max_length ? 0 : qq->max_length - qq->length; if (space_left == 0) return (0); npop = ring_buffer__consume_n(bqq->ringbuf, space_left); return (npop < 0 ? -1 : npop); } static int bpf_queue_update_stats(struct quark_queue *qq) { struct bpf_queue *bqq = qq->queue_be; struct ebpf_event_stats pcpu_ees[libbpf_num_possible_cpus()]; u32 zero = 0; int i; /* valgrind doesn't track that this will be updated below */ bzero(pcpu_ees, sizeof(pcpu_ees)); if (bpf_map__lookup_elem(bqq->probes->maps.ringbuf_stats, &zero, sizeof(zero), pcpu_ees, sizeof(pcpu_ees), 0) != 0) return (-1); for (i = 0; i < libbpf_num_possible_cpus(); i++) qq->stats.lost = pcpu_ees[i].lost; return (0); } static void bpf_queue_close(struct quark_queue *qq) { struct bpf_queue *bqq = qq->queue_be; if (bqq != NULL) { if (bqq->probes != NULL) { bpf_probes__destroy(bqq->probes); bqq->probes = NULL; } if (bqq->ringbuf != NULL) { ring_buffer__free(bqq->ringbuf); bqq->ringbuf = NULL; } free(bqq); bqq = NULL; qq->queue_be = NULL; } /* Closed in ring_buffer__free() */ qq->epollfd = -1; }