quark.go (273 lines of code) (raw):

// SPDX-License-Identifier: Apache-2.0 // Copyright (c) 2024 Elastic NV //go:build linux && (amd64 || arm64) package quark /* #cgo CFLAGS: -I${SRCDIR}/include #cgo amd64 LDFLAGS: -Wl,--wrap=fmemopen ${SRCDIR}/libquark_big_amd64.a #cgo arm64 LDFLAGS: -Wl,--wrap=fmemopen ${SRCDIR}/libquark_big_arm64.a #include <stdlib.h> #include "quark.h" __asm__(".symver fmemopen, fmemopen@GLIBC_2.2.5"); FILE * __wrap_fmemopen(void *buf, size_t size, const char *mode) { return fmemopen(buf, size, mode); } */ import "C" import ( "bytes" "errors" "strings" "syscall" "unsafe" ) // Proc carries data on the state of the process. Only vaid if `Valid` is set. type Proc struct { CapInheritable uint64 CapPermitted uint64 CapEffective uint64 CapBset uint64 CapAmbient uint64 TimeBoot uint64 Ppid uint32 Uid uint32 Gid uint32 Suid uint32 Sgid uint32 Euid uint32 Egid uint32 Pgid uint32 Sid uint32 EntryLeader uint32 EntryLeaderType uint32 TtyMajor uint32 TtyMinor uint32 UtsInonum uint32 IpcInonum uint32 MntInonum uint32 NetInonum uint32 Valid bool } // Exit carries data on the exit behavior of the process. Only valid if `Valid` is set. type Exit struct { ExitCode int32 ExitTimeProcess uint64 Valid bool } // Process represents a single process. type Process struct { Pid uint32 // Always present Proc Proc // Only meaningful if Proc.Valid (QUARK_F_PROC) Exit Exit // Only meaningful if Exit.Valid (QUARK_F_EXIT) Comm string // QUARK_F_COMM Filename string // QUARK_F_FILENAME Cmdline []string // QUARK_F_CMDLINE Cwd string // QUARK_F_CWD } // Events is a bitmask of QUARK_EV_* and expresses what triggered this // event, Process is the context of the Event. type Event struct { Events uint64 Process Process } // Queue holds the state of a quark instance. type Queue struct { quarkQueue *C.struct_quark_queue // pointer to the queue structure cEvents *C.struct_quark_event numCevents int epollFd int } const ( // quark_queue_attr{} flags QQ_THREAD_EVENTS = int(C.QQ_THREAD_EVENTS) QQ_KPROBE = int(C.QQ_KPROBE) QQ_EBPF = int(C.QQ_EBPF) QQ_NO_SNAPSHOT = int(C.QQ_NO_SNAPSHOT) QQ_MIN_AGG = int(C.QQ_MIN_AGG) QQ_ENTRY_LEADER = int(C.QQ_ENTRY_LEADER) QQ_ALL_BACKENDS = int(C.QQ_ALL_BACKENDS) // Event.events QUARK_EV_FORK = uint64(C.QUARK_EV_FORK) QUARK_EV_EXEC = uint64(C.QUARK_EV_EXEC) QUARK_EV_EXIT = uint64(C.QUARK_EV_EXIT) QUARK_EV_SETPROCTITLE = uint64(C.QUARK_EV_SETPROCTITLE) QUARK_EV_SNAPSHOT = uint64(C.QUARK_EV_SNAPSHOT) // EntryLeaderType QUARK_ELT_UNKNOWN = int(C.QUARK_ELT_UNKNOWN) QUARK_ELT_INIT = int(C.QUARK_ELT_INIT) QUARK_ELT_KTHREAD = int(C.QUARK_ELT_KTHREAD) QUARK_ELT_SSHD = int(C.QUARK_ELT_SSHD) QUARK_ELT_SSM = int(C.QUARK_ELT_SSM) QUARK_ELT_CONTAINER = int(C.QUARK_ELT_CONTAINER) QUARK_ELT_TERM = int(C.QUARK_ELT_TERM) QUARK_ELT_CONSOLE = int(C.QUARK_ELT_CONSOLE) ) // QueueAttr defines the attributes for the Quark queue. type QueueAttr struct { Flags int MaxLength int CacheGraceTime int HoldTime int } // Documented in https://elastic.github.io/quark/quark_queue_get_stats.3.html. type Stats struct { Insertions uint64 Removals uint64 Aggregations uint64 NonAggregations uint64 Lost uint64 Backend int } const ( QUARK_VL_SILENT = int(C.QUARK_VL_SILENT) QUARK_VL_WARN = int(C.QUARK_VL_WARN) QUARK_VL_DEBUG = int(C.QUARK_VL_DEBUG) ) var ErrUndefined = errors.New("undefined") func wrapErrno(err error) error { if err == nil { err = ErrUndefined } return err } // DefaultQueueAttr returns the default attributes for the queue. func DefaultQueueAttr() QueueAttr { var attr C.struct_quark_queue_attr C.quark_queue_default_attr(&attr) return QueueAttr{ Flags: int(attr.flags), MaxLength: int(attr.max_length), CacheGraceTime: int(attr.cache_grace_time), HoldTime: int(attr.hold_time), } } // OpenQueue opens a Quark Queue with the given attributes. func OpenQueue(attr QueueAttr, slots int) (*Queue, error) { var queue Queue p, err := C.calloc(C.size_t(1), C.sizeof_struct_quark_queue) if p == nil { return nil, wrapErrno(err) } queue.quarkQueue = (*C.struct_quark_queue)(p) p = nil cattr := C.struct_quark_queue_attr{ flags: C.int(attr.Flags), max_length: C.int(attr.MaxLength), cache_grace_time: C.int(attr.CacheGraceTime), hold_time: C.int(attr.HoldTime), } ok, err := C.quark_queue_open(queue.quarkQueue, &cattr) if ok == -1 { C.free(unsafe.Pointer(queue.quarkQueue)) return nil, wrapErrno(err) } p, err = C.calloc(C.size_t(slots), C.sizeof_struct_quark_event) if p == nil { C.quark_queue_close(queue.quarkQueue) C.free(unsafe.Pointer(queue.quarkQueue)) return nil, wrapErrno(err) } queue.cEvents = (*C.struct_quark_event)(p) queue.numCevents = slots p = nil queue.epollFd = int(C.quark_queue_get_epollfd(queue.quarkQueue)) return &queue, nil } // Close closes the queue. func (queue *Queue) Close() { C.quark_queue_close(queue.quarkQueue) C.free(unsafe.Pointer(queue.quarkQueue)) C.free(unsafe.Pointer(queue.cEvents)) queue.quarkQueue = nil queue.cEvents = nil } func eventOfIndex(cEvents *C.struct_quark_event, idx int) *C.struct_quark_event { return (*C.struct_quark_event)(unsafe.Add(unsafe.Pointer(cEvents), idx*C.sizeof_struct_quark_event)) } // GetEvents returns a number of events, up to a maximum of `slots` passed to OpenQueue. func (queue *Queue) GetEvents() ([]Event, error) { n, err := C.quark_queue_get_events(queue.quarkQueue, queue.cEvents, C.int(queue.numCevents)) if n == -1 { return nil, wrapErrno(err) } events := make([]Event, n) for i := range events { events[i] = eventToGo(eventOfIndex(queue.cEvents, i)) } return events, nil } // Lookup looks up for the Process associated with PID in quark's internal cache. func (queue *Queue) Lookup(pid int) (Process, bool) { process, _ := C.quark_process_lookup(queue.quarkQueue, C.int(pid)) if process == nil { return Process{}, false } return processToGo(process), true } // Block blocks until there are events or an undefined timeout // expires. GetEvents should be called once Block returns. func (queue *Queue) Block() error { event := make([]syscall.EpollEvent, 1) _, err := syscall.EpollWait(queue.epollFd, event, 100) if err != nil && errors.Is(err, syscall.EINTR) { err = nil } return err } // Snapshot returns a snapshot of all processes in the cache. func (queue *Queue) Snapshot() []Process { var processes []Process var iter C.struct_quark_process_iter var qp *C.struct_quark_process C.quark_process_iter_init(&iter, queue.quarkQueue) for qp = C.quark_process_iter_next(&iter); qp != nil; qp = C.quark_process_iter_next(&iter) { processes = append(processes, processToGo(qp)) } return processes } // Stats returns statistics of an active queue. func (queue *Queue) Stats() Stats { var stats Stats var cStats C.struct_quark_queue_stats C.quark_queue_get_stats(queue.quarkQueue, &cStats) stats.Insertions = uint64(cStats.insertions) stats.Removals = uint64(cStats.removals) stats.Aggregations = uint64(cStats.aggregations) stats.NonAggregations = uint64(cStats.non_aggregations) stats.Lost = uint64(cStats.lost) stats.Backend = int(cStats.backend) return stats } // Sets quark verbosity globally, not per queue. func SetVerbose(level int) { C.quark_verbose = C.int(level) } // processToGo converts the C process structure to a go process. func processToGo(cProcess *C.struct_quark_process) Process { var process Process if cProcess == nil { return Process{} } process.Pid = uint32(cProcess.pid) if cProcess.flags&C.QUARK_F_PROC != 0 { process.Proc = Proc{ CapInheritable: uint64(cProcess.proc_cap_inheritable), CapPermitted: uint64(cProcess.proc_cap_permitted), CapEffective: uint64(cProcess.proc_cap_effective), CapBset: uint64(cProcess.proc_cap_bset), CapAmbient: uint64(cProcess.proc_cap_ambient), TimeBoot: uint64(cProcess.proc_time_boot), Ppid: uint32(cProcess.proc_ppid), Uid: uint32(cProcess.proc_uid), Gid: uint32(cProcess.proc_gid), Suid: uint32(cProcess.proc_suid), Sgid: uint32(cProcess.proc_sgid), Euid: uint32(cProcess.proc_euid), Egid: uint32(cProcess.proc_egid), Pgid: uint32(cProcess.proc_pgid), Sid: uint32(cProcess.proc_sid), EntryLeader: uint32(cProcess.proc_entry_leader), EntryLeaderType: uint32(cProcess.proc_entry_leader_type), TtyMajor: uint32(cProcess.proc_tty_major), TtyMinor: uint32(cProcess.proc_tty_minor), UtsInonum: uint32(cProcess.proc_uts_inonum), IpcInonum: uint32(cProcess.proc_ipc_inonum), MntInonum: uint32(cProcess.proc_mnt_inonum), NetInonum: uint32(cProcess.proc_net_inonum), Valid: true, } } if cProcess.flags&C.QUARK_F_EXIT != 0 { process.Exit = Exit{ ExitCode: int32(cProcess.exit_code), ExitTimeProcess: uint64(cProcess.exit_time_event), Valid: true, } } if cProcess.flags&C.QUARK_F_COMM != 0 { process.Comm = C.GoString(&cProcess.comm[0]) } if cProcess.flags&C.QUARK_F_FILENAME != 0 { process.Filename = C.GoString(&cProcess.filename[0]) } if cProcess.flags&C.QUARK_F_CMDLINE != 0 { b := C.GoBytes(unsafe.Pointer(&cProcess.cmdline[0]), C.int(cProcess.cmdline_len)) nul := string(byte(0)) b = bytes.TrimRight(b, nul) process.Cmdline = strings.Split(string(b), nul) } if cProcess.flags&C.QUARK_F_CWD != 0 { process.Cwd = C.GoString(&cProcess.cwd[0]) } return process } func eventToGo(cEvent *C.struct_quark_event) Event { return Event{ Events: uint64(cEvent.events), Process: processToGo(cEvent.process), } }