x-pack/auditbeat/module/system/socket/state.go (900 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.
//go:build (linux && 386) || (linux && amd64)
package socket
import (
"encoding/binary"
"errors"
"fmt"
"net"
"os"
"strconv"
"sync"
"sync/atomic"
"time"
"golang.org/x/sys/unix"
"github.com/elastic/beats/v7/auditbeat/tracing"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/flowhash"
"github.com/elastic/beats/v7/metricbeat/mb"
"github.com/elastic/beats/v7/x-pack/auditbeat/module/system/socket/dns"
"github.com/elastic/beats/v7/x-pack/auditbeat/module/system/socket/helper"
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/go-libaudit/v2/aucoalesce"
)
const (
// how often to check for expired flows.
expireInterval = time.Second
// how often the state log generated (only in debug mode).
logInterval = time.Second * 30
)
var (
userCache = aucoalesce.NewUserCache(5 * time.Minute)
groupCache = aucoalesce.NewGroupCache(5 * time.Minute)
)
type kernelTime uint64
type flowProto uint8
const (
protoUnknown flowProto = 0
protoTCP flowProto = unix.IPPROTO_TCP
protoUDP flowProto = unix.IPPROTO_UDP
)
func (p flowProto) String() string {
switch p {
case protoTCP:
return "tcp"
case protoUDP:
return "udp"
}
return "unknown"
}
type inetType uint8
const (
inetTypeUnknown inetType = 0
inetTypeIPv4 inetType = unix.AF_INET
inetTypeIPv6 inetType = unix.AF_INET6
)
func (t inetType) String() string {
switch t {
case inetTypeIPv4:
return "ipv4"
case inetTypeIPv6:
return "ipv6"
}
return "unknown"
}
type flowDirection uint8
const (
directionUnknown flowDirection = iota
directionIngress
directionEgress
)
// String returns the textual representation of the flowDirection.
func (d flowDirection) String() string {
switch d {
case directionIngress:
return "ingress"
case directionEgress:
return "egress"
default:
return "unknown"
}
}
type endpoint struct {
addr net.TCPAddr
packets, bytes uint64
}
func (e *endpoint) updateWith(other endpoint) {
if e.addr.IP == nil {
e.addr.IP = other.addr.IP
e.addr.Port = other.addr.Port
}
e.packets += other.packets
e.bytes += other.bytes
}
// String returns the textual representation of the endpoint address:port.
func (e *endpoint) String() string {
if e.addr.IP != nil {
return e.addr.String()
}
return "(not bound)"
}
func newEndpointIPv4(beIP uint32, bePort uint16, pkts uint64, bytes uint64) (e endpoint) {
var buf [4]byte
e.packets = pkts
e.bytes = bytes
if bePort != 0 && beIP != 0 {
tracing.MachineEndian.PutUint16(buf[:], bePort)
port := binary.BigEndian.Uint16(buf[:])
tracing.MachineEndian.PutUint32(buf[:], beIP)
e.addr = net.TCPAddr{
IP: net.IPv4(buf[0], buf[1], buf[2], buf[3]),
Port: int(port),
}
}
return e
}
func newEndpointIPv6(beIPa uint64, beIPb uint64, bePort uint16, pkts uint64, bytes uint64) (e endpoint) {
e.packets = pkts
e.bytes = bytes
if bePort != 0 && (beIPa != 0 || beIPb != 0) {
addr := make([]byte, 16)
tracing.MachineEndian.PutUint16(addr[:], bePort)
port := binary.BigEndian.Uint16(addr[:])
tracing.MachineEndian.PutUint64(addr, beIPa)
tracing.MachineEndian.PutUint64(addr[8:], beIPb)
e.addr = net.TCPAddr{
IP: addr,
Port: int(port),
}
}
return e
}
type flow struct {
prev, next helper.LinkedElement
sock uintptr
inetType inetType
proto flowProto
dir flowDirection
created, lastSeen kernelTime
pid uint32
process *process
local, remote endpoint
complete bool
done bool
// these are automatically calculated by state from kernelTimes above
createdTime, lastSeenTime time.Time
}
// If this flow should be reported or only captured partial data
func (f *flow) isValid() bool {
return f.inetType != inetTypeUnknown && f.proto != protoUnknown && f.local.addr.IP != nil && f.remote.addr.IP != nil
}
// Prev returns the previous flow in a linked list of flows.
func (f *flow) Prev() helper.LinkedElement {
return f.prev
}
// Next returns the next flow in a linked list of flows.
func (f *flow) Next() helper.LinkedElement {
return f.next
}
// SetPrev sets previous flow in a linked list of flows.
func (f *flow) SetPrev(e helper.LinkedElement) {
f.prev = e
}
// SetNext sets the next flow in a linked list of flows.
func (f *flow) SetNext(e helper.LinkedElement) {
f.next = e
}
// Timestamp returns the time value used to expire this flow.
func (f *flow) Timestamp() time.Time {
return f.lastSeenTime
}
type process struct {
// RWMutex is used to arbitrate reads and writes to resolvedDomains.
sync.RWMutex
pid uint32
name, path string
args []string
created kernelTime
uid, gid, euid, egid uint32
hasCreds bool
// populated by state from created
createdTime time.Time
// populated after createdTime is adjusted.
entityID string
// populated by DNS enrichment.
resolvedDomains map[string]string
}
func (p *process) addTransaction(tr dns.Transaction) {
p.Lock()
defer p.Unlock()
if p.resolvedDomains == nil {
p.resolvedDomains = make(map[string]string)
}
for _, addr := range tr.Addresses {
p.resolvedDomains[addr.String()] = tr.Domain
}
}
// ResolveIP returns the domain associated with the given IP.
func (p *process) ResolveIP(ip net.IP) (domain string, found bool) {
p.RLock()
defer p.RUnlock()
domain, found = p.resolvedDomains[ip.String()]
return domain, found
}
type socket struct {
sock uintptr
flows map[string]*flow
// Sockets have direction if they have been connect()ed or accept()ed.
dir flowDirection
bound bool
pid uint32
process *process
// This signals that the socket is in the closeTimeout list.
closing bool
prev, next helper.LinkedElement
createdTime, lastSeenTime time.Time
}
// Prev returns the previous socket in the linked list.
func (s *socket) Prev() helper.LinkedElement {
return s.prev
}
// Next returns the next socket in the linked list.
func (s *socket) Next() helper.LinkedElement {
return s.next
}
// SetPrev sets the previous socket in the linked list.
func (s *socket) SetPrev(e helper.LinkedElement) {
s.prev = e
}
// SetNext sets the next socket in the linked list.
func (s *socket) SetNext(e helper.LinkedElement) {
s.next = e
}
// Timestamp returns the time reference used to expire sockets.
func (s *socket) Timestamp() time.Time {
return s.lastSeenTime
}
type dnsTracker struct {
// map[net.UDPAddr(string)][]dns.Transaction
transactionByClient *common.Cache
// map[net.UDPAddr(string)]*process
processByClient *common.Cache
}
func newDNSTracker(timeout time.Duration) dnsTracker {
return dnsTracker{
transactionByClient: common.NewCache(timeout, 8),
processByClient: common.NewCache(timeout, 8),
}
}
// AddTransaction registers a new DNS transaction.
func (dt *dnsTracker) AddTransaction(tr dns.Transaction) {
clientAddr := tr.Client.String()
if procIf := dt.processByClient.Get(clientAddr); procIf != nil {
if proc, ok := procIf.(*process); ok {
proc.addTransaction(tr)
return
}
}
var list []dns.Transaction
var ok bool
if prev := dt.transactionByClient.Get(clientAddr); prev != nil {
list, ok = prev.([]dns.Transaction)
if !ok {
return
}
}
list = append(list, tr)
dt.transactionByClient.Put(clientAddr, list)
}
// AddTransactionWithProcess registers a new DNS transaction for the given process.
func (dt *dnsTracker) AddTransactionWithProcess(tr dns.Transaction, proc *process) {
proc.addTransaction(tr)
}
// CleanUp removes expired entries from the maps.
func (dt *dnsTracker) CleanUp() {
dt.transactionByClient.CleanUp()
dt.processByClient.CleanUp()
}
// RegisterEndpoint registers a new local endpoint used for DNS queries
// to correlate captured DNS packets with their originator process.
func (dt *dnsTracker) RegisterEndpoint(addr net.UDPAddr, proc *process) {
key := addr.String()
dt.processByClient.Put(key, proc)
if listIf := dt.transactionByClient.Get(key); listIf != nil {
list, ok := listIf.([]dns.Transaction)
if !ok {
return
}
for _, tr := range list {
proc.addTransaction(tr)
}
}
}
type state struct {
sync.Mutex
// Used to convert kernel time to user time
kernelEpoch time.Time
reporter mb.PushReporterV2
log helper.Logger
processes map[uint32]*process
socks map[uintptr]*socket
threads map[uint32]event
numFlows uint64
// configuration
inactiveTimeout, closeTimeout, socketTimeout time.Duration
clockMaxDrift time.Duration
// lru used for flow expiration.
flowLRU helper.LinkedList
// lru used for socket expiration.
socketLRU helper.LinkedList
// holds sockets in closing state. This is to keep them around until their
// close timeout expires.
closing helper.LinkedList
dns dnsTracker
// Decouple time.Now()
clock func() time.Time
// currentPID is the PID of the beat.
currentPID int
}
func (s *state) getSocket(sock uintptr) *socket {
if socket, found := s.socks[sock]; found {
return socket
}
now := s.clock()
socket := &socket{
sock: sock,
createdTime: now,
lastSeenTime: now,
}
s.socks[sock] = socket
s.socketLRU.Add(socket)
return socket
}
var kernelProcess = process{
pid: 0,
name: "[kernel_task]",
}
func NewState(r mb.PushReporterV2, log helper.Logger, inactiveTimeout, socketTimeout, closeTimeout, clockMaxDrift time.Duration) *state {
s := makeState(r, log, inactiveTimeout, socketTimeout, closeTimeout, clockMaxDrift)
go s.expireLoop()
go s.logStateLoop()
return s
}
func makeState(r mb.PushReporterV2, log helper.Logger, inactiveTimeout, socketTimeout, closeTimeout, clockMaxDrift time.Duration) *state {
return &state{
reporter: r,
log: log,
processes: make(map[uint32]*process),
socks: make(map[uintptr]*socket),
threads: make(map[uint32]event),
inactiveTimeout: inactiveTimeout,
socketTimeout: socketTimeout,
closeTimeout: closeTimeout,
clockMaxDrift: clockMaxDrift,
dns: newDNSTracker(inactiveTimeout * 2),
clock: time.Now,
currentPID: os.Getpid(),
}
}
var (
lastEvents uint64
lastTime time.Time
)
func (s *state) logState() {
s.Lock()
numFlows := s.numFlows
numSocks := len(s.socks)
numProcs := len(s.processes)
numThreads := len(s.threads)
flowLRUSize := s.flowLRU.Size()
closingSize := s.closing.Size()
events := atomic.LoadUint64(&eventCount)
s.Unlock()
now := s.clock()
took := now.Sub(lastTime)
newEvs := events - lastEvents
lastEvents = events
lastTime = now
var errs []string
if uint64(flowLRUSize) != numFlows {
errs = append(errs, "flow count mismatch")
}
msg := fmt.Sprintf("state flows=%d sockets=%d procs=%d threads=%d lru=%d closing=%d events=%d eps=%.1f",
numFlows, numSocks, numProcs, numThreads, flowLRUSize, closingSize, events,
float64(newEvs)*float64(time.Second)/float64(took))
if errs == nil {
s.log.Debugf("%s", msg)
} else {
s.log.Warnf("%s. Warnings: %v", msg, errs)
}
}
func (s *state) expireLoop() {
reportTicker := time.NewTicker(expireInterval)
defer reportTicker.Stop()
for {
select {
case <-s.reporter.Done():
return
case <-reportTicker.C:
s.ExpireFlows()
}
}
}
func (s *state) logStateLoop() {
logTicker := time.NewTicker(logInterval)
defer logTicker.Stop()
for {
select {
case <-s.reporter.Done():
return
case <-logTicker.C:
s.logState()
}
}
}
func (s *state) ExpireFlows() {
start := s.clock()
toReport := s.expireFlows()
if sent := s.reportFlows(&toReport); sent != 0 {
s.log.Debugf("ExpireOlder took %v reported=%d", s.clock().Sub(start), sent)
}
}
func (s *state) expireFlows() (toReport helper.LinkedList) {
s.Lock()
defer s.Unlock()
now := s.clock()
s.flowLRU.RemoveOlder(now.Add(-s.inactiveTimeout), func(e helper.LinkedElement) bool {
flow, ok := e.(*flow)
if ok {
flows := s.onFlowTerminated(flow)
toReport.Append(&flows)
}
return ok
})
s.socketLRU.RemoveOlder(now.Add(-s.socketTimeout), func(e helper.LinkedElement) bool {
sock, ok := e.(*socket)
if ok {
s.onSockDestroyed(sock.sock, sock, 0)
}
return ok
})
s.closing.RemoveOlder(now.Add(-s.closeTimeout), func(e helper.LinkedElement) bool {
sock, ok := e.(*socket)
if ok {
flows := s.onSockTerminated(sock)
toReport.Append(&flows)
}
return ok
})
// Expire cached DNS
s.dns.CleanUp()
return toReport
}
func (s *state) CreateProcess(p *process) error {
if p.pid == 0 {
return errors.New("can't create process with PID 0")
}
s.Lock()
defer s.Unlock()
s.processes[p.pid] = p
if p.createdTime == (time.Time{}) {
p.createdTime = s.kernTimestampToTime(p.created)
}
return nil
}
func (s *state) ForkProcess(parentPID, childPID uint32, ts kernelTime) error {
if parentPID == childPID {
return nil
}
s.Lock()
defer s.Unlock()
if _, found := s.processes[childPID]; found {
return errors.New("fork: child pid already registered to another process")
}
if parent, found := s.processes[parentPID]; found {
child := &process{
pid: childPID,
name: parent.name,
path: parent.path,
args: parent.args,
created: ts,
uid: parent.uid,
gid: parent.gid,
euid: parent.euid,
egid: parent.egid,
hasCreds: parent.hasCreds,
createdTime: s.kernTimestampToTime(ts),
}
child.resolvedDomains = make(map[string]string, len(parent.resolvedDomains))
for k, v := range parent.resolvedDomains {
child.resolvedDomains[k] = v
}
s.log.Debugf("forking process %d with %d associated domains", childPID, len(child.resolvedDomains))
s.processes[childPID] = child
}
return nil
}
func (s *state) TerminateProcess(pid uint32) error {
if pid == 0 {
return errors.New("can't terminate process with PID 0")
}
s.log.Debugf("terminating process %d", pid)
s.Lock()
defer s.Unlock()
delete(s.processes, pid)
return nil
}
func (s *state) processExists(pid uint32) bool {
s.Lock()
defer s.Unlock()
_, ok := s.processes[pid]
return ok
}
func (s *state) getProcess(pid uint32) *process {
if pid == 0 {
return &kernelProcess
}
return s.processes[pid]
}
type threadEnterError struct {
tid uint32
existing event
}
// Error is the error message string.
func (t threadEnterError) Error() string {
return fmt.Sprintf("thread already had an event. tid=%d existing=%v", t.tid, t.existing)
}
func (s *state) ThreadEnter(tid uint32, ev event) error {
s.Lock()
prev, hasPrev := s.threads[tid]
s.threads[tid] = ev
s.Unlock()
if hasPrev {
return threadEnterError{
tid: tid,
existing: prev,
}
}
return nil
}
func (s *state) ThreadLeave(tid uint32) (ev event, found bool) {
s.Lock()
defer s.Unlock()
if ev, found = s.threads[tid]; found {
delete(s.threads, tid)
}
return ev, found
}
func (s *state) onSockTerminated(sock *socket) (toReport helper.LinkedList) {
for _, f := range sock.flows {
flows := s.onFlowTerminated(f)
toReport.Append(&flows)
}
sock.flows = nil
delete(s.socks, sock.sock)
if sock.closing {
s.closing.Remove(sock)
} else {
s.moveToClosing(sock)
}
return toReport
}
// CreateSocket allocates a new sock in the system
func (s *state) CreateSocket(ref flow) error {
var toReport helper.LinkedList
// Send flows to the output as a deferred function to avoid
// holding on s mutex when there's backpressure from the output.
defer s.reportFlows(&toReport)
s.Lock()
defer s.Unlock()
ref.createdTime = s.kernTimestampToTime(ref.created)
ref.lastSeenTime = s.kernTimestampToTime(ref.lastSeen)
if prev, found := s.socks[ref.sock]; found {
// Fetch existing flow in case of TCP negotiation
if initial, found := prev.flows[ref.remote.String()]; found && ref.local.String() == initial.local.String() {
initial.dir = ref.dir
initial.pid = ref.pid
initial.process = ref.process
ref.updateWith(*initial, s)
delete(prev.flows, ref.remote.String())
}
// terminate existing if sock ptr is reused
toReport = s.onSockTerminated(prev)
}
return s.createFlow(ref)
}
func (s *state) OnDNSTransaction(tr dns.Transaction) error {
s.Lock()
defer s.Unlock()
s.log.Debugf("adding DNS transaction for domain %s for client %s", tr.Domain, tr.Client.String())
s.dns.AddTransaction(tr)
return nil
}
func (s *state) mutualEnrich(sock *socket, f *flow) {
// if the sock is not bound to a local address yet, update if possible
if !sock.bound && f.local.addr.IP != nil {
sock.bound = true
for _, flow := range sock.flows {
if flow.local.addr.IP == nil {
flow.local.addr = f.local.addr
}
}
}
if sockNoDir := sock.dir == directionUnknown; sockNoDir != (f.dir == directionUnknown) {
if sockNoDir {
sock.dir = f.dir
} else {
f.dir = sock.dir
}
}
if sock.pid == 0 {
sock.pid = f.pid
sock.process = f.process
}
if sock.pid == f.pid && sock.pid != 0 {
if sockNoProcess := sock.process == nil; sockNoProcess != (f.process == nil) {
if sockNoProcess {
sock.process = f.process
} else {
f.process = sock.process
}
} else if sock.process == nil && sock.pid != 0 {
sock.process = s.getProcess(sock.pid)
f.process = sock.process
}
}
if !sock.closing {
sock.lastSeenTime = s.clock()
s.socketLRU.Remove(sock)
s.socketLRU.Add(sock)
}
}
func (s *state) createFlow(ref flow) error {
if ref.process != nil {
s.log.Debugf("creating flow for pid %s", ref.process.pid)
}
// Get or create a socket for this flow
sock := s.getSocket(ref.sock)
ref.createdTime = ref.lastSeenTime
s.mutualEnrich(sock, &ref)
// don't create the flow yet if it doesn't have a populated remote address
if ref.remote.addr.IP == nil {
return nil
}
ptr := new(flow)
*ptr = ref
if sock.flows == nil {
sock.flows = make(map[string]*flow, 1)
}
sock.flows[ref.remote.addr.String()] = ptr
s.flowLRU.Add(ptr)
s.numFlows++
return nil
}
// OnSockDestroyed is called to signal that the given sock has been destroyed.
func (s *state) OnSockDestroyed(ptr uintptr, pid uint32) error {
s.Lock()
defer s.Unlock()
s.onSockDestroyed(ptr, nil, pid)
return nil
}
func (s *state) onSockDestroyed(ptr uintptr, sock *socket, pid uint32) {
var found bool
if sock == nil {
if sock, found = s.socks[ptr]; !found {
return
}
}
// Enrich with pid
if sock.pid == 0 && pid != 0 {
sock.pid = pid
}
if sock.process == nil && sock.pid != 0 {
sock.process = s.getProcess(pid)
}
// Keep the sock around in case it's a connected TCP socket, as still some
// packets can be received shortly after/during inet_release.
if !sock.closing {
s.moveToClosing(sock)
}
}
func (s *state) moveToClosing(sock *socket) {
sock.lastSeenTime = s.clock()
sock.closing = true
s.socketLRU.Remove(sock)
s.closing.Add(sock)
}
// UpdateFlow receives a partial flow and creates or updates an existing flow.
func (s *state) UpdateFlow(ref flow) error {
return s.UpdateFlowWithCondition(ref, nil)
}
// UpdateFlowWithCondition receives a partial flow and creates or updates an
// existing flow. The optional condition must be met before an existing flow is
// updated. Otherwise the update is ignored.
func (s *state) UpdateFlowWithCondition(ref flow, cond func(*flow) bool) error {
s.Lock()
defer s.Unlock()
ref.createdTime = s.kernTimestampToTime(ref.created)
ref.lastSeenTime = s.kernTimestampToTime(ref.lastSeen)
sock, found := s.socks[ref.sock]
if !found {
return s.createFlow(ref)
}
prev, found := sock.flows[ref.remote.addr.String()]
if !found {
// Sock has been already closed and it may be receiving a SYN for a different
// flow.
if sock.closing {
return nil
}
return s.createFlow(ref)
}
if cond != nil && !cond(prev) {
return nil
}
s.mutualEnrich(sock, &ref)
prev.updateWith(ref, s)
s.enrichDNS(prev)
s.flowLRU.Remove(prev)
s.flowLRU.Add(prev)
return nil
}
func (s *state) enrichDNS(f *flow) {
if f.remote.addr.Port == 53 && f.proto == protoUDP && f.pid != 0 && f.process != nil {
localUDP := net.UDPAddr{
IP: f.local.addr.IP,
Port: f.local.addr.Port,
}
if f.process != nil {
s.log.Debugf("registering endpoint %s for process %d", localUDP.String(), f.process.pid)
}
s.dns.RegisterEndpoint(localUDP, f.process)
}
}
func (f *flow) updateWith(ref flow, s *state) {
f.lastSeenTime = ref.lastSeenTime
if ref.inetType != f.inetType {
if f.inetType == inetTypeUnknown {
f.inetType = ref.inetType
}
}
if ref.proto != f.proto {
if f.proto == protoUnknown {
f.proto = ref.proto
}
}
if f.pid == 0 && ref.pid != 0 {
f.pid = ref.pid
f.process = ref.process
}
if f.process == nil {
if ref.process != nil && f.pid == ref.pid {
f.process = ref.process
} else {
f.process = s.getProcess(f.pid)
}
}
if f.dir == directionUnknown {
f.dir = ref.dir
}
if ref.complete {
f.complete = true
}
f.local.updateWith(ref.local)
f.remote.updateWith(ref.remote)
}
func (s *state) reportFlow(f *flow) (reported bool) {
if f != nil && f.isValid() && int(f.pid) != s.currentPID {
if ev, err := f.toEvent(true); err == nil {
reported = s.reporter.Event(ev)
} else {
s.log.Errorf("Failed to convert flow=%v err=%v", f, err)
}
}
return reported
}
func (s *state) reportFlows(l *helper.LinkedList) (count int) {
for item := l.Get(); item != nil; item = l.Get() {
if f, ok := item.(*flow); ok {
if s.reportFlow(f) {
count++
}
}
}
return count
}
func (s *state) onFlowTerminated(f *flow) (toReport helper.LinkedList) {
if f.done {
return toReport
}
s.flowLRU.Remove(f)
f.done = true
// Unbind this flow from its parent
if parent, found := s.socks[f.sock]; found {
delete(parent.flows, f.remote.addr.String())
}
s.numFlows--
toReport.Add(f)
return toReport
}
func (f *flow) toEvent(final bool) (ev mb.Event, err error) {
localAddr := f.local.addr
remoteAddr := f.remote.addr
local := mapstr.M{
"ip": localAddr.IP.String(),
"port": localAddr.Port,
"packets": f.local.packets,
"bytes": f.local.bytes,
}
remote := mapstr.M{
"ip": remoteAddr.IP.String(),
"port": remoteAddr.Port,
"packets": f.remote.packets,
"bytes": f.remote.bytes,
}
src, dst := local, remote
switch f.dir {
case directionIngress:
src, dst = dst, src
case directionUnknown:
// For some flows we can miss information to determine the source (dir=unknown).
// As a last resort, assume that the client side uses a higher port number
// than the server.
if localAddr.Port < remoteAddr.Port {
src, dst = dst, src
}
}
inetType := f.inetType
// Under Linux, a socket created as AF_INET6 can receive IPv4 connections
// and it will use the IPv4 stack.
// This results in src and dst address using IPv4 mapped addresses (which
// Golang converts to IPv4 automatically). It will be misleading to report
// network.type: ipv6 and have v4 addresses, so it's better to report
// a network.type of ipv4 (which also matches the actual stack used).
if inetType == inetTypeIPv6 && f.local.addr.IP.To4() != nil && f.remote.addr.IP.To4() != nil {
inetType = inetTypeIPv4
}
eventType := []string{"info"}
if inetType == inetTypeIPv6 || inetType == inetTypeIPv4 {
eventType = append(eventType, "connection")
}
root := mapstr.M{
"source": src,
"client": src,
"destination": dst,
"server": dst,
"network": mapstr.M{
"direction": f.dir.String(),
"type": inetType.String(),
"transport": f.proto.String(),
"packets": f.local.packets + f.remote.packets,
"bytes": f.local.bytes + f.remote.bytes,
},
"event": mapstr.M{
"kind": "event",
"action": "network_flow",
"category": []string{"network"},
"type": eventType,
"start": f.createdTime,
"end": f.lastSeenTime,
"duration": f.lastSeenTime.Sub(f.createdTime).Nanoseconds(),
},
"flow": mapstr.M{
"final": final,
"complete": f.complete,
},
}
if communityid := flowhash.CommunityID.Hash(flowhash.Flow{
SourceIP: localAddr.IP,
SourcePort: uint16(localAddr.Port),
DestinationIP: remoteAddr.IP,
DestinationPort: uint16(remoteAddr.Port),
Protocol: uint8(f.proto),
}); communityid != "" {
(root["network"].(mapstr.M))["community_id"] = communityid
}
var errs []error
rootPut := func(key string, value interface{}) {
if _, err := root.Put(key, value); err != nil {
errs = append(errs, err)
}
}
relatedIPs := []string{}
if len(localAddr.IP) != 0 {
relatedIPs = append(relatedIPs, localAddr.IP.String())
}
if len(localAddr.IP) > 0 {
relatedIPs = append(relatedIPs, remoteAddr.IP.String())
}
if len(relatedIPs) > 0 {
rootPut("related.ip", relatedIPs)
}
metricset := mapstr.M{
"kernel_sock_address": fmt.Sprintf("0x%x", f.sock),
}
if f.pid != 0 {
process := mapstr.M{
"pid": int(f.pid),
}
if f.process != nil {
process["name"] = f.process.name
process["args"] = f.process.args
process["executable"] = f.process.path
if f.process.createdTime != (time.Time{}) {
process["created"] = f.process.createdTime
}
if f.process.entityID != "" {
process["entity_id"] = f.process.entityID
}
if f.process.hasCreds {
uid := strconv.Itoa(int(f.process.uid))
gid := strconv.Itoa(int(f.process.gid))
rootPut("user.id", uid)
rootPut("group.id", gid)
if name := userCache.LookupID(uid); name != "" {
rootPut("user.name", name)
rootPut("related.user", []string{name})
}
if name := groupCache.LookupID(gid); name != "" {
rootPut("group.name", name)
}
metricset["uid"] = f.process.uid
metricset["gid"] = f.process.gid
metricset["euid"] = f.process.euid
metricset["egid"] = f.process.egid
}
if domain, found := f.process.ResolveIP(f.local.addr.IP); found {
local["domain"] = domain
}
if domain, found := f.process.ResolveIP(f.remote.addr.IP); found {
remote["domain"] = domain
}
}
root["process"] = process
}
return mb.Event{
RootFields: root,
MetricSetFields: metricset,
}, errors.Join(errs...)
}
func (s *state) SyncClocks(kernelNanos, userNanos uint64) error {
userTime := time.Unix(int64(time.Duration(userNanos)/time.Second), int64(time.Duration(userNanos)%time.Second))
bootTime := userTime.Add(-time.Duration(kernelNanos))
s.Lock()
if s.kernelEpoch == (time.Time{}) {
s.kernelEpoch = bootTime
s.Unlock()
return nil
}
drift := s.kernelEpoch.Sub(bootTime)
adjusted := drift < -s.clockMaxDrift || drift > s.clockMaxDrift
if adjusted {
s.kernelEpoch = bootTime
}
s.Unlock()
if adjusted {
s.log.Debugf("adjusted internal clock drift=%s", drift)
}
return nil
}
func (s *state) kernTimestampToTime(ts kernelTime) time.Time {
if ts == 0 {
return time.Time{}
}
if s.kernelEpoch == (time.Time{}) {
// This is the first event and time sync hasn't happened yet.
// Take a temporary epoch relative to current time.
now := s.clock()
s.kernelEpoch = now.Add(-time.Duration(ts))
return now
}
return s.kernelEpoch.Add(time.Duration(ts))
}