in x-pack/auditbeat/module/system/socket/state.go [907:1058]
func (f *flow) toEvent(final bool) (ev mb.Event, err error) {
localAddr := f.local.addr
remoteAddr := f.remote.addr
local := mapstr.M{
"ip": localAddr.IP.String(),
"port": localAddr.Port,
"packets": f.local.packets,
"bytes": f.local.bytes,
}
remote := mapstr.M{
"ip": remoteAddr.IP.String(),
"port": remoteAddr.Port,
"packets": f.remote.packets,
"bytes": f.remote.bytes,
}
src, dst := local, remote
switch f.dir {
case directionIngress:
src, dst = dst, src
case directionUnknown:
// For some flows we can miss information to determine the source (dir=unknown).
// As a last resort, assume that the client side uses a higher port number
// than the server.
if localAddr.Port < remoteAddr.Port {
src, dst = dst, src
}
}
inetType := f.inetType
// Under Linux, a socket created as AF_INET6 can receive IPv4 connections
// and it will use the IPv4 stack.
// This results in src and dst address using IPv4 mapped addresses (which
// Golang converts to IPv4 automatically). It will be misleading to report
// network.type: ipv6 and have v4 addresses, so it's better to report
// a network.type of ipv4 (which also matches the actual stack used).
if inetType == inetTypeIPv6 && f.local.addr.IP.To4() != nil && f.remote.addr.IP.To4() != nil {
inetType = inetTypeIPv4
}
eventType := []string{"info"}
if inetType == inetTypeIPv6 || inetType == inetTypeIPv4 {
eventType = append(eventType, "connection")
}
root := mapstr.M{
"source": src,
"client": src,
"destination": dst,
"server": dst,
"network": mapstr.M{
"direction": f.dir.String(),
"type": inetType.String(),
"transport": f.proto.String(),
"packets": f.local.packets + f.remote.packets,
"bytes": f.local.bytes + f.remote.bytes,
},
"event": mapstr.M{
"kind": "event",
"action": "network_flow",
"category": []string{"network"},
"type": eventType,
"start": f.createdTime,
"end": f.lastSeenTime,
"duration": f.lastSeenTime.Sub(f.createdTime).Nanoseconds(),
},
"flow": mapstr.M{
"final": final,
"complete": f.complete,
},
}
if communityid := flowhash.CommunityID.Hash(flowhash.Flow{
SourceIP: localAddr.IP,
SourcePort: uint16(localAddr.Port),
DestinationIP: remoteAddr.IP,
DestinationPort: uint16(remoteAddr.Port),
Protocol: uint8(f.proto),
}); communityid != "" {
(root["network"].(mapstr.M))["community_id"] = communityid
}
var errs []error
rootPut := func(key string, value interface{}) {
if _, err := root.Put(key, value); err != nil {
errs = append(errs, err)
}
}
relatedIPs := []string{}
if len(localAddr.IP) != 0 {
relatedIPs = append(relatedIPs, localAddr.IP.String())
}
if len(localAddr.IP) > 0 {
relatedIPs = append(relatedIPs, remoteAddr.IP.String())
}
if len(relatedIPs) > 0 {
rootPut("related.ip", relatedIPs)
}
metricset := mapstr.M{
"kernel_sock_address": fmt.Sprintf("0x%x", f.sock),
}
if f.pid != 0 {
process := mapstr.M{
"pid": int(f.pid),
}
if f.process != nil {
process["name"] = f.process.name
process["args"] = f.process.args
process["executable"] = f.process.path
if f.process.createdTime != (time.Time{}) {
process["created"] = f.process.createdTime
}
if f.process.entityID != "" {
process["entity_id"] = f.process.entityID
}
if f.process.hasCreds {
uid := strconv.Itoa(int(f.process.uid))
gid := strconv.Itoa(int(f.process.gid))
rootPut("user.id", uid)
rootPut("group.id", gid)
if name := userCache.LookupID(uid); name != "" {
rootPut("user.name", name)
rootPut("related.user", []string{name})
}
if name := groupCache.LookupID(gid); name != "" {
rootPut("group.name", name)
}
metricset["uid"] = f.process.uid
metricset["gid"] = f.process.gid
metricset["euid"] = f.process.euid
metricset["egid"] = f.process.egid
}
if domain, found := f.process.ResolveIP(f.local.addr.IP); found {
local["domain"] = domain
}
if domain, found := f.process.ResolveIP(f.remote.addr.IP); found {
remote["domain"] = domain
}
}
root["process"] = process
}
return mb.Event{
RootFields: root,
MetricSetFields: metricset,
}, errors.Join(errs...)
}