packetbeat/procs/procs.go (242 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package procs import ( "net" "strings" "sync" "time" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/packetbeat/protos/applayer" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/go-sysinfo" ) // This controls how often process info for a running process is reloaded // A big value means less unnecessary refreshes at a higher risk of missing // a PID being recycled by the OS const processCacheExpiration = 30 * time.Second var ( anyIPv4 = net.IPv4zero.String() anyIPv6 = net.IPv6unspecified.String() ) // ProcessWatcher implements process enrichment for network traffic. type ProcessesWatcher struct { mu sync.Mutex portProcMap map[applayer.Transport]map[endpoint]portProcMapping localAddrs []net.IP // localAddrs lists IP addresses that are to be treated as local. processCache map[int]*process // processCache is a time-expiration cache of process details keyed on PID. enabled bool // enabled specifier whether the ProcessWatcher will be active. monitored []ProcConfig // monitored is the set of processes that are monitored by the ProcessWatcher. // watcher is the OS-dependent engine for the ProcessWatcher. watcher processWatcher } // endpoint is a network address/port number complex. type endpoint struct { address string port uint16 } // portProcMapping is an association between an endpoint and a process. type portProcMapping struct { endpoint endpoint // FIXME: This is never used. pid int proc *process expires time.Time } // process describes an OS process. type process struct { pid, ppid int name, exe, cwd string args []string startTime time.Time // expires is the time at which the process will be dropped // from the cache during enrichment queries. expires time.Time } // Init initializes the ProcessWatcher with the provided configuration. func (proc *ProcessesWatcher) Init(config ProcsConfig) error { return proc.init(config, proc) } // processWatcher allows the OS-dependent implementation to be replaced by a mock for testing type processWatcher interface { // GetLocalPortToPIDMapping returns the list of local port numbers and the PID // that owns them. GetLocalPortToPIDMapping(transport applayer.Transport) (ports map[endpoint]int, err error) // GetProcess returns the process metadata. GetProcess(pid int) *process // GetLocalIPs returns the list of local addresses. If the returned error // is non-nil, the IP slice is nil. GetLocalIPs() ([]net.IP, error) } // init sets up the necessary data structures for the ProcessWatcher. func (proc *ProcessesWatcher) init(config ProcsConfig, watcher processWatcher) error { proc.watcher = watcher proc.portProcMap = map[applayer.Transport]map[endpoint]portProcMapping{ applayer.TransportUDP: make(map[endpoint]portProcMapping), applayer.TransportTCP: make(map[endpoint]portProcMapping), } proc.processCache = make(map[int]*process) proc.enabled = config.Enabled if proc.enabled { logp.Info("Process watcher enabled") } else { logp.Info("Process watcher disabled") } // Read the local IP addresses. var err error proc.localAddrs, err = watcher.GetLocalIPs() if err != nil { logp.Err("Error getting local IP addresses: %s", err) } proc.monitored = config.Monitored return nil } // FindProcessesTupleTCP looks up local process information for the source and // destination addresses of TCP tuple func (proc *ProcessesWatcher) FindProcessesTupleTCP(tuple *common.IPPortTuple) (procTuple *common.ProcessTuple) { return proc.FindProcessesTuple(tuple, applayer.TransportTCP) } // FindProcessesTupleUDP looks up local process information for the source and // destination addresses of UDP tuple func (proc *ProcessesWatcher) FindProcessesTupleUDP(tuple *common.IPPortTuple) (procTuple *common.ProcessTuple) { return proc.FindProcessesTuple(tuple, applayer.TransportUDP) } // FindProcessesTuple looks up local process information for the source and // destination addresses of a tuple for the given transport protocol func (proc *ProcessesWatcher) FindProcessesTuple(tuple *common.IPPortTuple, transport applayer.Transport) *common.ProcessTuple { var procTuple common.ProcessTuple if !proc.enabled { return &procTuple } proc.enrich(&procTuple.Src, tuple.SrcIP, tuple.SrcPort, transport) proc.enrich(&procTuple.Dst, tuple.DstIP, tuple.DstPort, transport) return &procTuple } // enrich adds process information to dst for the process associated with the given IP, port and // transport if the IP is not local and the information is available to the ProcessWatcher. func (proc *ProcessesWatcher) enrich(dst *common.Process, ip net.IP, port uint16, transport applayer.Transport) { if !proc.isLocalIP(ip) { return } p := proc.findProc(ip, port, transport) if p == nil { return } dst.PID = p.pid dst.PPID = p.ppid dst.Name = p.name dst.Args = p.args dst.Exe = p.exe dst.StartTime = p.startTime if logp.IsDebug("procs") { logp.Debug("procs", "Found process '%s' (pid=%d) for %s:%d/%s", p.name, p.pid, ip, port, transport) } } func (proc *ProcessesWatcher) isLocalIP(ip net.IP) bool { if ip.IsLoopback() { return true } for _, addr := range proc.localAddrs { if ip.Equal(addr) { return true } } return false } func (proc *ProcessesWatcher) findProc(address net.IP, port uint16, transport applayer.Transport) *process { proc.mu.Lock() defer proc.mu.Unlock() procMap, ok := proc.portProcMap[transport] if !ok { return nil } p, exists := lookupMapping(address, port, procMap) if exists { return p.proc } proc.updateMap(transport) p, exists = lookupMapping(address, port, procMap) if exists { return p.proc } return nil } // proc.mu must be locked func lookupMapping(address net.IP, port uint16, procMap map[endpoint]portProcMapping) (portProcMapping, bool) { now := time.Now() key := endpoint{address.String(), port} p, found := procMap[key] // Precedence when one socket is bound to a specific IP:port and another one // to INADDR_ANY and same port is not clear. Seems that the last one to bind // takes precedence, and we don't have a way to tell. // This function takes the naive approach of giving precedence to the more // specific address and then to INADDR_ANY. if !found { if address.To4() != nil { key.address = anyIPv4 } else { key.address = anyIPv6 } p, found = procMap[key] } // We can't guarantee `p` doesn't point to an old entry, since // we never remove entries from `procMap`, we only overwrite // them, but we only overwrite them once an unrelated packet // that doesn't have an entry on `procMap` ends up rebuilding // the whole map. // // We take a conservative approach by discarding the entry if // it's old enough. When we fail the first time here, our caller // updates all maps and calls us again. if found && now.After(p.expires) { logp.Debug("procs", "PID %d (%s) port %d is too old, discarding", p.pid, p.proc.name, port) delete(procMap, key) p = portProcMapping{} found = false } return p, found } // proc.mu must be locked func (proc *ProcessesWatcher) updateMap(transport applayer.Transport) { if logp.HasSelector("procsdetailed") { start := time.Now() defer func() { logp.Debug("procsdetailed", "updateMap() took %v", time.Since(start)) }() } endpoints, err := proc.watcher.GetLocalPortToPIDMapping(transport) if err != nil { logp.Err("unable to list local ports: %v", err) } proc.expireProcessCache() for e, pid := range endpoints { proc.updateMappingEntry(transport, e, pid) } } // proc.mu must be locked func (proc *ProcessesWatcher) expireProcessCache() { now := time.Now() for pid, info := range proc.processCache { if now.After(info.expires) { delete(proc.processCache, pid) } } } // proc.mu must be locked func (proc *ProcessesWatcher) updateMappingEntry(transport applayer.Transport, e endpoint, pid int) { prev, ok := proc.portProcMap[transport][e] if ok && prev.pid == pid { // This port->pid mapping already exists return } p := proc.getProcessInfo(pid) if p == nil { return } // We overwrite previous entries here, and they expire in // lookupMapping() if they are deemed old enough. // // Map size is bound by the number of ports: ~65k, so it's // fine to have old entries lingering, as long as we don't // trust them on subsequent connections. // // If the source port is re-used within the hardcoded 10 // seconds window, we might end up hitting an old mapping. proc.portProcMap[transport][e] = portProcMapping{ endpoint: e, pid: pid, proc: p, expires: time.Now().Add(10 * time.Second), } if logp.IsDebug("procsdetailed") { logp.Debug("procsdetailed", "updateMappingEntry(): local=%s:%d/%s pid=%d process='%s'", e.address, e.port, transport, pid, p.name) } } // getProcessInfo returns a potentially cached process corresponding to the // provided process ID. // // If any part of the process's argv contains a substring in proc.monitored.CmdlineGrep, // the name of the process is replaced with the corresponding proc.monitored.Process. // This behaviour is not recommended to be used and is not available to integrations // packages by design. func (proc *ProcessesWatcher) getProcessInfo(pid int) *process { if p, ok := proc.processCache[pid]; ok { return p } // Not in cache, resolve process info p := proc.watcher.GetProcess(pid) if p == nil { return nil } // The packetbeat.procs.monitored*.cmdline_grep allows you to overwrite // the process name with an alias. for _, match := range proc.monitored { if strings.Contains(strings.Join(p.args, " "), match.CmdlineGrep) { p.name = match.Process break } } proc.processCache[pid] = p return p } // GetProcess returns the process metadata. func (proc *ProcessesWatcher) GetProcess(pid int) *process { if pid <= 0 { return nil } p, err := sysinfo.Process(pid) if err != nil { logp.Err("Unable to get command-line for PID %d: %v", pid, err) return nil } info, err := p.Info() if err != nil { logp.Err("Unable to get command-line for PID %d: %v", pid, err) return nil } return &process{ pid: info.PID, ppid: info.PPID, name: procName(info), exe: info.Exe, cwd: info.CWD, args: info.Args, startTime: info.StartTime, expires: time.Now().Add(processCacheExpiration), } } // GetLocalIPs returns the list of local addresses. func (proc *ProcessesWatcher) GetLocalIPs() ([]net.IP, error) { return common.LocalIPAddrs() }