in packetbeat/flows/worker.go [292:528]
func createEvent(watcher *procs.ProcessesWatcher, ts time.Time, f *biFlow, isOver bool, intNames, uintNames, floatNames []string, enableDeltaFlowReporting bool) beat.Event {
timestamp := ts
event := mapstr.M{
"start": common.Time(f.createTS),
"end": common.Time(f.ts),
"duration": f.ts.Sub(f.createTS),
"dataset": "flow",
"kind": "event",
"category": []string{"network"},
"action": "network_flow",
}
eventType := []string{"connection"}
if isOver {
eventType = append(eventType, "end")
}
event["type"] = eventType
flow := mapstr.M{
"id": common.NetString(f.id.Serialize()),
"final": isOver,
}
fields := mapstr.M{
"event": event,
"flow": flow,
"type": "flow",
}
network := mapstr.M{}
source := mapstr.M{}
dest := mapstr.M{}
tuple := common.IPPortTuple{}
var communityID flowhash.Flow
var proto applayer.Transport
// add ethernet layer meta data
if src, dst, ok := f.id.EthAddr(); ok {
source["mac"] = formatHardwareAddr(net.HardwareAddr(src))
dest["mac"] = formatHardwareAddr(net.HardwareAddr(dst))
}
// add vlan
if vlan := f.id.OutterVLan(); vlan != nil {
vlanID := uint64(binary.LittleEndian.Uint16(vlan))
putOrAppendUint64(flow, "vlan", vlanID)
}
if vlan := f.id.VLan(); vlan != nil {
vlanID := uint64(binary.LittleEndian.Uint16(vlan))
putOrAppendUint64(flow, "vlan", vlanID)
}
// ipv4 layer meta data
if src, dst, ok := f.id.OutterIPv4Addr(); ok {
srcIP, dstIP := net.IP(src), net.IP(dst)
source["ip"] = srcIP.String()
dest["ip"] = dstIP.String()
tuple.SrcIP = srcIP
tuple.DstIP = dstIP
tuple.IPLength = 4
network["type"] = "ipv4"
communityID.SourceIP = srcIP
communityID.DestinationIP = dstIP
}
if src, dst, ok := f.id.IPv4Addr(); ok {
srcIP, dstIP := net.IP(src), net.IP(dst)
putOrAppendString(source, "ip", srcIP.String())
putOrAppendString(dest, "ip", dstIP.String())
// Save IPs for process matching if an outer layer was not present
if tuple.IPLength == 0 {
tuple.SrcIP = srcIP
tuple.DstIP = dstIP
tuple.IPLength = 4
communityID.SourceIP = srcIP
communityID.DestinationIP = dstIP
network["type"] = "ipv4"
}
}
// ipv6 layer meta data
if src, dst, ok := f.id.OutterIPv6Addr(); ok {
srcIP, dstIP := net.IP(src), net.IP(dst)
putOrAppendString(source, "ip", srcIP.String())
putOrAppendString(dest, "ip", dstIP.String())
tuple.SrcIP = srcIP
tuple.DstIP = dstIP
tuple.IPLength = 6
network["type"] = "ipv6"
communityID.SourceIP = srcIP
communityID.DestinationIP = dstIP
}
if src, dst, ok := f.id.IPv6Addr(); ok {
srcIP, dstIP := net.IP(src), net.IP(dst)
putOrAppendString(source, "ip", srcIP.String())
putOrAppendString(dest, "ip", dstIP.String())
// Save IPs for process matching if an outer layer was not present
if tuple.IPLength == 0 {
tuple.SrcIP = srcIP
tuple.DstIP = dstIP
tuple.IPLength = 6
communityID.SourceIP = srcIP
communityID.DestinationIP = dstIP
network["type"] = "ipv6"
}
}
// udp layer meta data
if src, dst, ok := f.id.UDPAddr(); ok {
tuple.SrcPort = binary.LittleEndian.Uint16(src)
tuple.DstPort = binary.LittleEndian.Uint16(dst)
source["port"], dest["port"] = tuple.SrcPort, tuple.DstPort
network["transport"] = "udp"
proto = applayer.TransportUDP
communityID.SourcePort = tuple.SrcPort
communityID.DestinationPort = tuple.DstPort
communityID.Protocol = 17
}
// tcp layer meta data
if src, dst, ok := f.id.TCPAddr(); ok {
tuple.SrcPort = binary.LittleEndian.Uint16(src)
tuple.DstPort = binary.LittleEndian.Uint16(dst)
source["port"], dest["port"] = tuple.SrcPort, tuple.DstPort
network["transport"] = "tcp"
proto = applayer.TransportTCP
communityID.SourcePort = tuple.SrcPort
communityID.DestinationPort = tuple.DstPort
communityID.Protocol = 6
}
var totalBytes, totalPackets uint64
if f.stats[0] != nil {
// Source stats.
stats := encodeStats(f.stats[0], intNames, uintNames, floatNames, enableDeltaFlowReporting)
for k, v := range stats {
switch k {
case "icmpV4TypeCode":
if typeCode, ok := v.(uint64); ok && typeCode > 0 {
network["transport"] = "icmp"
communityID.Protocol = 1
communityID.ICMP.Type = uint8(typeCode >> 8)
communityID.ICMP.Code = uint8(typeCode)
}
case "icmpV6TypeCode":
if typeCode, ok := v.(uint64); ok && typeCode > 0 {
network["transport"] = "ipv6-icmp"
communityID.Protocol = 58
communityID.ICMP.Type = uint8(typeCode >> 8)
communityID.ICMP.Code = uint8(typeCode)
}
default:
source[k] = v
}
}
if v, found := stats["bytes"]; found {
//nolint:errcheck // ignore
totalBytes += v.(uint64)
}
if v, found := stats["packets"]; found {
//nolint:errcheck // ignore
totalPackets += v.(uint64)
}
}
if f.stats[1] != nil {
// Destination stats.
stats := encodeStats(f.stats[1], intNames, uintNames, floatNames, enableDeltaFlowReporting)
for k, v := range stats {
switch k {
case "icmpV4TypeCode", "icmpV6TypeCode":
default:
dest[k] = v
}
}
if v, found := stats["bytes"]; found {
//nolint:errcheck // ignore
totalBytes += v.(uint64)
}
if v, found := stats["packets"]; found {
//nolint:errcheck // ignore
totalPackets += v.(uint64)
}
}
if communityID.Protocol > 0 && len(communityID.SourceIP) > 0 && len(communityID.DestinationIP) > 0 {
if hash := flowhash.CommunityID.Hash(communityID); hash != "" {
network["community_id"] = hash
}
}
network["bytes"] = totalBytes
network["packets"] = totalPackets
fields["network"] = network
// Set process information if it's available
if tuple.IPLength != 0 && tuple.SrcPort != 0 {
if proc := watcher.FindProcessesTuple(&tuple, proto); proc != nil {
if proc.Src.PID > 0 {
p := mapstr.M{
"pid": proc.Src.PID,
"name": proc.Src.Name,
"args": proc.Src.Args,
"ppid": proc.Src.PPID,
"executable": proc.Src.Exe,
"start": proc.Src.StartTime,
"working_directory": proc.Src.CWD,
}
if proc.Src.CWD != "" {
p["working_directory"] = proc.Src.CWD
}
source["process"] = p
fields["process"] = p
}
if proc.Dst.PID > 0 {
p := mapstr.M{
"pid": proc.Dst.PID,
"name": proc.Dst.Name,
"args": proc.Dst.Args,
"ppid": proc.Dst.PPID,
"executable": proc.Dst.Exe,
"start": proc.Dst.StartTime,
"working_directory": proc.Src.CWD,
}
if proc.Dst.CWD != "" {
p["working_directory"] = proc.Dst.CWD
}
dest["process"] = p
fields["process"] = p
}
}
}
fields["source"] = source
fields["destination"] = dest
return beat.Event{
Timestamp: timestamp,
Fields: fields,
}
}