packetbeat/flows/worker.go (514 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package flows
import (
"encoding/binary"
"errors"
"net"
"sync"
"time"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/flowhash"
"github.com/elastic/beats/v7/packetbeat/procs"
"github.com/elastic/beats/v7/packetbeat/protos/applayer"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
)
var (
ErrInvalidTimeout = errors.New("timeout must be >= 1s")
ErrInvalidPeriod = errors.New("report period must be -1 or >= 1s")
)
// worker is a generic asynchronous function processor.
type worker struct {
wg sync.WaitGroup
done chan struct{}
run func(*worker)
}
// newWorker returns a handle to a worker to run fn.
func newWorker(fn func(w *worker)) *worker {
return &worker{
done: make(chan struct{}),
run: fn,
}
}
// start starts execution of the worker function.
func (w *worker) start() {
debugf("start flows worker")
w.wg.Add(1)
go func() {
defer w.finished()
w.run(w)
}()
}
// finished decrements the workers working function count. finished
// must be called the same number of times as start over the lifetime
// of the worker.
func (w *worker) finished() {
w.wg.Done()
logp.Info("flows worker loop stopped")
}
// stop terminates the function and waits until processing is complete.
// stop may only be called once.
func (w *worker) stop() {
debugf("stop flows worker")
close(w.done)
w.wg.Wait()
debugf("stopped flows worker")
}
// sleep will sleep for the provided duration unless the worker has been
// stopped. sleep returns whether the worker can continue processing.
func (w *worker) sleep(d time.Duration) bool {
select {
case <-w.done:
return false
case <-time.After(d):
return true
}
}
// tick will sleep until the provided ticker fires unless the worker has been
// stopped. tick returns whether the worker can continue processing.
func (w *worker) tick(t *time.Ticker) bool {
select {
case <-w.done:
return false
case <-t.C:
return true
}
}
// periodically will execute fn each tick duration until the worker has been
// stopped or fn returns a non-nil error.
func (w *worker) periodically(tick time.Duration, fn func() error) {
defer debugf("stop periodic loop")
ticker := time.NewTicker(tick)
defer ticker.Stop()
for {
cont := w.tick(ticker)
if !cont {
return
}
err := fn()
if err != nil {
return
}
}
}
// newFlowsWorker returns a worker with a flow lifetime specified by timeout and a
// reporting intervals specified by period. If period is less than or equal to zero
// reporting will be done at flow lifetime end.
// Flows are published via the pub Reporter after being enriched with process information
// by watcher.
func newFlowsWorker(pub Reporter, watcher *procs.ProcessesWatcher, table *flowMetaTable, counters *counterReg, timeout, period time.Duration, enableDeltaFlowReports bool) (*worker, error) {
if timeout < time.Second {
return nil, ErrInvalidTimeout
}
if 0 < period && period < time.Second {
return nil, ErrInvalidPeriod
}
tick := timeout
ticksTimeout := 1
ticksPeriod := -1
if period > 0 {
tick = gcd(timeout, period)
if tick < time.Second {
tick = time.Second
}
ticksTimeout = int(timeout / tick)
if ticksTimeout == 0 {
ticksTimeout = 1
}
ticksPeriod = int(period / tick)
if ticksPeriod == 0 {
ticksPeriod = 1
}
}
debugf("new flows worker. timeout=%v, period=%v, tick=%v, ticksTO=%v, ticksP=%v",
timeout, period, tick, ticksTimeout, ticksPeriod)
defaultBatchSize := 1024
processor := &flowsProcessor{
table: table,
watcher: watcher,
counters: counters,
timeout: timeout,
enableDeltaFlowReporting: enableDeltaFlowReports,
}
processor.spool.init(pub, defaultBatchSize)
return makeWorker(processor, tick, ticksTimeout, ticksPeriod, 10)
}
// gcd returns the greatest common divisor of a and b.
func gcd(a, b time.Duration) time.Duration {
for b != 0 {
a, b = b, a%b
}
return a
}
// makeWorker returns a worker that runs processor.execute each tick. Each timeout'th tick,
// the worker will check flow timeouts and each period'th tick, the worker will report flow
// events to be published.
func makeWorker(processor *flowsProcessor, tick time.Duration, timeout, period int, align int64) (*worker, error) {
return newWorker(func(w *worker) {
defer processor.execute(w, false, true, true)
if align > 0 {
// Wait until the current time rounded up to nearest align seconds.
aligned := time.Unix(((time.Now().Unix()+(align-1))/align)*align, 0)
waitStart := time.Until(aligned)
debugf("worker wait start(%v): %v", aligned, waitStart)
if cont := w.sleep(waitStart); !cont {
return
}
}
nTimeout := timeout
nPeriod := period
reportPeriodically := period > 0
debugf("start flows worker loop")
w.periodically(tick, func() error {
nTimeout--
nPeriod--
debugf("worker tick, nTimeout=%v, nPeriod=%v", nTimeout, nPeriod)
handleTimeout := nTimeout == 0
if handleTimeout {
nTimeout = timeout
}
handleReports := reportPeriodically && nPeriod == 0
if nPeriod <= 0 {
nPeriod = period
}
processor.execute(w, handleTimeout, handleReports, false)
return nil
})
}), nil
}
type flowsProcessor struct {
spool spool
watcher *procs.ProcessesWatcher
table *flowMetaTable
counters *counterReg
timeout time.Duration
enableDeltaFlowReporting bool
}
func (fw *flowsProcessor) execute(w *worker, checkTimeout, handleReports, lastReport bool) {
if !checkTimeout && !handleReports {
return
}
debugf("exec tick, timeout=%v, report=%v", checkTimeout, handleReports)
// get counter names snapshot if reports must be generated
fw.counters.mutex.Lock()
intNames := fw.counters.ints.getNames()
uintNames := fw.counters.uints.getNames()
floatNames := fw.counters.floats.getNames()
fw.counters.mutex.Unlock()
fw.table.Lock()
defer fw.table.Unlock()
ts := time.Now()
// TODO: create snapshot inside flows/tables, so deletion of timed-out flows
// and reporting flows stats can be done more concurrent to packet
// processing.
for table := fw.table.tables.head; table != nil; table = table.next {
var next *biFlow
for flow := table.flows.head; flow != nil; flow = next {
next = flow.next
debugf("handle flow: %v, %v", flow.id.flowIDMeta, flow.id.flowID)
reportFlow := handleReports
isOver := lastReport
if checkTimeout {
if ts.Sub(flow.ts) > fw.timeout {
debugf("kill flow")
reportFlow = true
flow.kill() // mark flow as killed
isOver = true
table.remove(flow)
}
}
if reportFlow {
debugf("report flow")
fw.report(w, ts, flow, isOver, intNames, uintNames, floatNames)
}
}
}
fw.spool.flush()
}
func (fw *flowsProcessor) report(w *worker, ts time.Time, flow *biFlow, isOver bool, intNames, uintNames, floatNames []string) {
event := createEvent(fw.watcher, ts, flow, isOver, intNames, uintNames, floatNames, fw.enableDeltaFlowReporting)
debugf("add event: %v", event)
fw.spool.publish(event)
}
func createEvent(watcher *procs.ProcessesWatcher, ts time.Time, f *biFlow, isOver bool, intNames, uintNames, floatNames []string, enableDeltaFlowReporting bool) beat.Event {
timestamp := ts
event := mapstr.M{
"start": common.Time(f.createTS),
"end": common.Time(f.ts),
"duration": f.ts.Sub(f.createTS),
"dataset": "flow",
"kind": "event",
"category": []string{"network"},
"action": "network_flow",
}
eventType := []string{"connection"}
if isOver {
eventType = append(eventType, "end")
}
event["type"] = eventType
flow := mapstr.M{
"id": common.NetString(f.id.Serialize()),
"final": isOver,
}
fields := mapstr.M{
"event": event,
"flow": flow,
"type": "flow",
}
network := mapstr.M{}
source := mapstr.M{}
dest := mapstr.M{}
tuple := common.IPPortTuple{}
var communityID flowhash.Flow
var proto applayer.Transport
// add ethernet layer meta data
if src, dst, ok := f.id.EthAddr(); ok {
source["mac"] = formatHardwareAddr(net.HardwareAddr(src))
dest["mac"] = formatHardwareAddr(net.HardwareAddr(dst))
}
// add vlan
if vlan := f.id.OutterVLan(); vlan != nil {
vlanID := uint64(binary.LittleEndian.Uint16(vlan))
putOrAppendUint64(flow, "vlan", vlanID)
}
if vlan := f.id.VLan(); vlan != nil {
vlanID := uint64(binary.LittleEndian.Uint16(vlan))
putOrAppendUint64(flow, "vlan", vlanID)
}
// ipv4 layer meta data
if src, dst, ok := f.id.OutterIPv4Addr(); ok {
srcIP, dstIP := net.IP(src), net.IP(dst)
source["ip"] = srcIP.String()
dest["ip"] = dstIP.String()
tuple.SrcIP = srcIP
tuple.DstIP = dstIP
tuple.IPLength = 4
network["type"] = "ipv4"
communityID.SourceIP = srcIP
communityID.DestinationIP = dstIP
}
if src, dst, ok := f.id.IPv4Addr(); ok {
srcIP, dstIP := net.IP(src), net.IP(dst)
putOrAppendString(source, "ip", srcIP.String())
putOrAppendString(dest, "ip", dstIP.String())
// Save IPs for process matching if an outer layer was not present
if tuple.IPLength == 0 {
tuple.SrcIP = srcIP
tuple.DstIP = dstIP
tuple.IPLength = 4
communityID.SourceIP = srcIP
communityID.DestinationIP = dstIP
network["type"] = "ipv4"
}
}
// ipv6 layer meta data
if src, dst, ok := f.id.OutterIPv6Addr(); ok {
srcIP, dstIP := net.IP(src), net.IP(dst)
putOrAppendString(source, "ip", srcIP.String())
putOrAppendString(dest, "ip", dstIP.String())
tuple.SrcIP = srcIP
tuple.DstIP = dstIP
tuple.IPLength = 6
network["type"] = "ipv6"
communityID.SourceIP = srcIP
communityID.DestinationIP = dstIP
}
if src, dst, ok := f.id.IPv6Addr(); ok {
srcIP, dstIP := net.IP(src), net.IP(dst)
putOrAppendString(source, "ip", srcIP.String())
putOrAppendString(dest, "ip", dstIP.String())
// Save IPs for process matching if an outer layer was not present
if tuple.IPLength == 0 {
tuple.SrcIP = srcIP
tuple.DstIP = dstIP
tuple.IPLength = 6
communityID.SourceIP = srcIP
communityID.DestinationIP = dstIP
network["type"] = "ipv6"
}
}
// udp layer meta data
if src, dst, ok := f.id.UDPAddr(); ok {
tuple.SrcPort = binary.LittleEndian.Uint16(src)
tuple.DstPort = binary.LittleEndian.Uint16(dst)
source["port"], dest["port"] = tuple.SrcPort, tuple.DstPort
network["transport"] = "udp"
proto = applayer.TransportUDP
communityID.SourcePort = tuple.SrcPort
communityID.DestinationPort = tuple.DstPort
communityID.Protocol = 17
}
// tcp layer meta data
if src, dst, ok := f.id.TCPAddr(); ok {
tuple.SrcPort = binary.LittleEndian.Uint16(src)
tuple.DstPort = binary.LittleEndian.Uint16(dst)
source["port"], dest["port"] = tuple.SrcPort, tuple.DstPort
network["transport"] = "tcp"
proto = applayer.TransportTCP
communityID.SourcePort = tuple.SrcPort
communityID.DestinationPort = tuple.DstPort
communityID.Protocol = 6
}
var totalBytes, totalPackets uint64
if f.stats[0] != nil {
// Source stats.
stats := encodeStats(f.stats[0], intNames, uintNames, floatNames, enableDeltaFlowReporting)
for k, v := range stats {
switch k {
case "icmpV4TypeCode":
if typeCode, ok := v.(uint64); ok && typeCode > 0 {
network["transport"] = "icmp"
communityID.Protocol = 1
communityID.ICMP.Type = uint8(typeCode >> 8)
communityID.ICMP.Code = uint8(typeCode)
}
case "icmpV6TypeCode":
if typeCode, ok := v.(uint64); ok && typeCode > 0 {
network["transport"] = "ipv6-icmp"
communityID.Protocol = 58
communityID.ICMP.Type = uint8(typeCode >> 8)
communityID.ICMP.Code = uint8(typeCode)
}
default:
source[k] = v
}
}
if v, found := stats["bytes"]; found {
//nolint:errcheck // ignore
totalBytes += v.(uint64)
}
if v, found := stats["packets"]; found {
//nolint:errcheck // ignore
totalPackets += v.(uint64)
}
}
if f.stats[1] != nil {
// Destination stats.
stats := encodeStats(f.stats[1], intNames, uintNames, floatNames, enableDeltaFlowReporting)
for k, v := range stats {
switch k {
case "icmpV4TypeCode", "icmpV6TypeCode":
default:
dest[k] = v
}
}
if v, found := stats["bytes"]; found {
//nolint:errcheck // ignore
totalBytes += v.(uint64)
}
if v, found := stats["packets"]; found {
//nolint:errcheck // ignore
totalPackets += v.(uint64)
}
}
if communityID.Protocol > 0 && len(communityID.SourceIP) > 0 && len(communityID.DestinationIP) > 0 {
if hash := flowhash.CommunityID.Hash(communityID); hash != "" {
network["community_id"] = hash
}
}
network["bytes"] = totalBytes
network["packets"] = totalPackets
fields["network"] = network
// Set process information if it's available
if tuple.IPLength != 0 && tuple.SrcPort != 0 {
if proc := watcher.FindProcessesTuple(&tuple, proto); proc != nil {
if proc.Src.PID > 0 {
p := mapstr.M{
"pid": proc.Src.PID,
"name": proc.Src.Name,
"args": proc.Src.Args,
"ppid": proc.Src.PPID,
"executable": proc.Src.Exe,
"start": proc.Src.StartTime,
"working_directory": proc.Src.CWD,
}
if proc.Src.CWD != "" {
p["working_directory"] = proc.Src.CWD
}
source["process"] = p
fields["process"] = p
}
if proc.Dst.PID > 0 {
p := mapstr.M{
"pid": proc.Dst.PID,
"name": proc.Dst.Name,
"args": proc.Dst.Args,
"ppid": proc.Dst.PPID,
"executable": proc.Dst.Exe,
"start": proc.Dst.StartTime,
"working_directory": proc.Src.CWD,
}
if proc.Dst.CWD != "" {
p["working_directory"] = proc.Dst.CWD
}
dest["process"] = p
fields["process"] = p
}
}
}
fields["source"] = source
fields["destination"] = dest
return beat.Event{
Timestamp: timestamp,
Fields: fields,
}
}
// formatHardwareAddr formats hardware addresses according to the ECS spec.
func formatHardwareAddr(addr net.HardwareAddr) string {
buf := make([]byte, 0, len(addr)*3-1)
for _, b := range addr {
if len(buf) != 0 {
buf = append(buf, '-')
}
const hexDigit = "0123456789ABCDEF"
buf = append(buf, hexDigit[b>>4], hexDigit[b&0xf])
}
return string(buf)
}
func encodeStats(stats *flowStats, ints, uints, floats []string, enableDeltaFlowReporting bool) map[string]interface{} {
report := make(map[string]interface{})
i := 0
for _, mask := range stats.intFlags {
for m := mask; m != 0; m >>= 1 {
if (m & 1) == 1 {
report[ints[i]] = stats.ints[i]
}
i++
}
}
i = 0
for _, mask := range stats.uintFlags {
for m := mask; m != 0; m >>= 1 {
if (m & 1) == 1 {
report[uints[i]] = stats.uints[i]
if enableDeltaFlowReporting && (uints[i] == "bytes" || uints[i] == "packets") {
// If Delta Flow Reporting is enabled, reset bytes and packets at each period.
// Only the bytes and packets received during the flow period will be reported.
// This should be thread safe as it is called under the flowmetadatatable lock.
stats.uints[i] = 0
}
}
i++
}
}
i = 0
for _, mask := range stats.floatFlags {
for m := mask; m != 0; m >>= 1 {
if (m & 1) == 1 {
report[floats[i]] = stats.floats[i]
}
i++
}
}
return report
}
func putOrAppendString(m mapstr.M, key, value string) {
old, found := m[key]
if !found {
m[key] = value
return
}
if old != nil {
switch v := old.(type) {
case string:
m[key] = []string{v, value}
case []string:
m[key] = append(v, value)
}
}
}
func putOrAppendUint64(m mapstr.M, key string, value uint64) {
old, found := m[key]
if !found {
m[key] = value
return
}
if old != nil {
switch v := old.(type) {
case uint8:
m[key] = []uint64{uint64(v), value}
case uint16:
m[key] = []uint64{uint64(v), value}
case uint32:
m[key] = []uint64{uint64(v), value}
case uint64:
m[key] = []uint64{v, value}
case []uint64:
m[key] = append(v, value)
}
}
}
// spool is an event publisher spool.
type spool struct {
pub Reporter
events []beat.Event
}
// init sets the destination and spool size.
func (s *spool) init(pub Reporter, sz int) {
s.pub = pub
s.events = make([]beat.Event, 0, sz)
}
// publish queues the event for publication, flushing to the destination
// if the spool is full.
func (s *spool) publish(event beat.Event) {
s.events = append(s.events, event)
if len(s.events) == cap(s.events) {
s.flush()
}
}
// flush sends the spooled events to the destination and clears them
// from the spool.
func (s *spool) flush() {
if len(s.events) == 0 {
return
}
s.pub(s.events)
// A newly allocated spool is created since the
// elements of s.events are no longer owned by s
// during testing and mutating them causes a panic.
//
// The beat.Client interface which Reporter is
// derived from is silent on whether the caller
// is allowed to modify elements of the slice
// after the call to the PublishAll method returns.
s.events = make([]beat.Event, 0, cap(s.events))
}