in x-pack/filebeat/input/netflow/convert.go [136:356]
func flowToBeatEvent(flow record.Record, internalNetworks []string) beat.Event {
event := toBeatEventCommon(flow)
ecsEvent, ok := event.Fields["event"].(mapstr.M)
if !ok {
ecsEvent = mapstr.M{}
event.Fields["event"] = ecsEvent
}
sysUptime, hasSysUptime := getKeyUint64(flow.Exporter, "uptimeMillis")
if !hasSysUptime || sysUptime == 0 {
// Alternative update
sysUptime, hasSysUptime = getKeyUint64(flow.Fields, "systemInitTimeMilliseconds")
}
startUptime, hasStartUptime := getKeyUint64(flow.Fields, "flowStartSysUpTime")
endUptime, hasEndUptime := getKeyUint64(flow.Fields, "flowEndSysUpTime")
if hasSysUptime {
// Can't convert uptime values to absolute time if sysUptime is bogus
// It will result on a flow that starts and ends in the future.
hasStartUptime = hasStartUptime && startUptime <= sysUptime
hasEndUptime = hasEndUptime && endUptime <= sysUptime
if hasStartUptime {
ecsEvent["start"] = flow.Timestamp.Add(time.Duration(startUptime-sysUptime) * time.Millisecond)
}
if hasEndUptime {
ecsEvent["end"] = flow.Timestamp.Add(time.Duration(endUptime-sysUptime) * time.Millisecond)
}
if hasStartUptime && hasEndUptime {
ecsEvent["duration"] = ecsEvent["end"].(time.Time).Sub(ecsEvent["start"].(time.Time)).Nanoseconds()
}
}
if ecsEvent["duration"] == nil {
if durationMillis, found := getKeyUint64(flow.Fields, "flowDurationMilliseconds"); found {
duration := time.Duration(durationMillis) * time.Millisecond
ecsEvent["duration"] = duration
// Here we're missing at least one of (start, end)
if start := ecsEvent["start"]; start != nil {
ecsEvent["end"] = start.(time.Time).Add(duration)
} else if end := ecsEvent["end"]; end != nil {
ecsEvent["start"] = end.(time.Time).Add(-duration)
}
}
}
flowDirection, hasFlowDirection := getKeyUint64(flow.Fields, "flowDirection")
// ECS Fields -- source, destination & related.ip
ecsSource := mapstr.M{}
ecsDest := mapstr.M{}
var relatedIP []net.IP
// Populate first with WLAN fields
if hasFlowDirection {
staIP, _ := getKeyIP(flow.Fields, "staIPv4Address")
staMac, hasStaMac := getKeyString(flow.Fields, "staMacAddress")
wtpMac, hasWtpMac := getKeyString(flow.Fields, "wtpMacAddress")
if hasStaMac && hasWtpMac {
srcMac := staMac
srcIP := staIP
dstMac := wtpMac
var dstIP net.IP = nil
if Direction(flowDirection) == DirectionOutbound {
srcMac, dstMac = dstMac, srcMac
srcIP, dstIP = dstIP, srcIP
}
if srcIP != nil {
ecsSource["ip"] = srcIP
ecsSource["locality"] = getIPLocality(internalNetworks, srcIP).String()
}
ecsSource["mac"] = srcMac
if dstIP != nil {
ecsDest["ip"] = dstIP
ecsDest["locality"] = getIPLocality(internalNetworks, dstIP).String()
}
ecsDest["mac"] = dstMac
}
}
// Regular IPv4 fields
if ip, found := getKeyIP(flow.Fields, "sourceIPv4Address"); found {
ecsSource["ip"] = ip
relatedIP = append(relatedIP, ip)
ecsSource["locality"] = getIPLocality(internalNetworks, ip).String()
} else if ip, found := getKeyIP(flow.Fields, "sourceIPv6Address"); found {
ecsSource["ip"] = ip
relatedIP = append(relatedIP, ip)
ecsSource["locality"] = getIPLocality(internalNetworks, ip).String()
}
if sourcePort, found := getKeyUint64(flow.Fields, "sourceTransportPort"); found {
ecsSource["port"] = sourcePort
}
if mac, found := getKeyString(flow.Fields, "sourceMacAddress"); found {
ecsSource["mac"] = mac
}
// ECS Fields -- destination
if ip, found := getKeyIP(flow.Fields, "destinationIPv4Address"); found {
ecsDest["ip"] = ip
relatedIP = append(relatedIP, ip)
ecsDest["locality"] = getIPLocality(internalNetworks, ip).String()
} else if ip, found := getKeyIP(flow.Fields, "destinationIPv6Address"); found {
ecsDest["ip"] = ip
relatedIP = append(relatedIP, ip)
ecsDest["locality"] = getIPLocality(internalNetworks, ip).String()
}
if destPort, found := getKeyUint64(flow.Fields, "destinationTransportPort"); found {
ecsDest["port"] = destPort
}
if mac, found := getKeyString(flow.Fields, "destinationMacAddress"); found {
ecsDest["mac"] = mac
}
// ECS Fields -- Flow
ecsFlow := mapstr.M{}
var srcIP, dstIP net.IP
var srcPort, dstPort uint16
var protocol IPProtocol
if ip, found := getKeyIP(record.Map(ecsSource), "ip"); found {
srcIP = ip
}
if ip, found := getKeyIP(record.Map(ecsDest), "ip"); found {
dstIP = ip
}
if port, found := getKeyUint64(flow.Fields, "sourceTransportPort"); found {
srcPort = uint16(port)
}
if port, found := getKeyUint64(flow.Fields, "destinationTransportPort"); found {
dstPort = uint16(port)
}
if proto, found := getKeyUint64(flow.Fields, "protocolIdentifier"); found {
protocol = IPProtocol(proto)
}
if srcIP == nil {
srcIP = net.IPv4(0, 0, 0, 0).To4()
}
if dstIP == nil {
dstIP = net.IPv4(0, 0, 0, 0).To4()
}
ecsFlow["id"] = flowID(srcIP, dstIP, srcPort, dstPort, uint8(protocol))
ecsFlow["locality"] = getIPLocality(internalNetworks, srcIP, dstIP).String()
// ECS Fields -- network
ecsNetwork := mapstr.M{}
if proto, found := getKeyUint64(flow.Fields, "protocolIdentifier"); found {
ecsNetwork["transport"] = IPProtocol(proto).String()
ecsNetwork["iana_number"] = proto
}
countBytes, hasBytes := getKeyUint64Alternatives(flow.Fields, "octetDeltaCount", "octetTotalCount", "initiatorOctets")
countPkts, hasPkts := getKeyUint64Alternatives(flow.Fields, "packetDeltaCount", "packetTotalCount", "initiatorPackets")
revBytes, hasRevBytes := getKeyUint64Alternatives(flow.Fields, "reverseOctetDeltaCount", "reverseOctetTotalCount", "responderOctets")
revPkts, hasRevPkts := getKeyUint64Alternatives(flow.Fields, "reversePacketDeltaCount", "reversePacketTotalCount", "responderPackets")
if hasRevBytes {
ecsDest["bytes"] = revBytes
}
if hasRevPkts {
ecsDest["packets"] = revPkts
}
if hasBytes {
ecsSource["bytes"] = countBytes
if hasRevBytes {
countBytes += revBytes
}
ecsNetwork["bytes"] = countBytes
}
if hasPkts {
ecsSource["packets"] = countPkts
if hasRevPkts {
countPkts += revPkts
}
ecsNetwork["packets"] = countPkts
}
if biflowDir, isBiflow := getKeyUint64(flow.Fields, "biflowDirection"); isBiflow && len(ecsSource) > 0 && len(ecsDest) > 0 {
// swap source and destination if biflowDirection is reverseInitiator
if biflowDir == 2 {
ecsDest, ecsSource = ecsSource, ecsDest
}
ecsEvent["category"] = []string{"network", "session"}
// Assume source is the client in biflows.
event.Fields["client"] = ecsSource
event.Fields["server"] = ecsDest
}
ecsNetwork["direction"] = "unknown"
if hasFlowDirection {
ecsNetwork["direction"] = Direction(flowDirection).String()
}
if ssid, found := getKeyString(flow.Fields, "wlanSSID"); found {
ecsNetwork["name"] = ssid
}
if communityid := flowhash.CommunityID.Hash(flowhash.Flow{
SourceIP: srcIP,
SourcePort: srcPort,
DestinationIP: dstIP,
DestinationPort: dstPort,
Protocol: uint8(protocol),
}); communityid != "" {
ecsNetwork["community_id"] = communityid
}
if len(ecsFlow) > 0 {
event.Fields["flow"] = ecsFlow
}
if len(ecsSource) > 0 {
event.Fields["source"] = ecsSource
}
if len(ecsDest) > 0 {
event.Fields["destination"] = ecsDest
}
if len(ecsNetwork) > 0 {
event.Fields["network"] = ecsNetwork
}
if len(relatedIP) > 0 {
event.Fields["related"] = mapstr.M{"ip": uniqueIPs(relatedIP)}
}
return event
}