func flowToBeatEvent()

in x-pack/filebeat/input/netflow/convert.go [136:356]


func flowToBeatEvent(flow record.Record, internalNetworks []string) beat.Event {
	event := toBeatEventCommon(flow)

	ecsEvent, ok := event.Fields["event"].(mapstr.M)
	if !ok {
		ecsEvent = mapstr.M{}
		event.Fields["event"] = ecsEvent
	}
	sysUptime, hasSysUptime := getKeyUint64(flow.Exporter, "uptimeMillis")
	if !hasSysUptime || sysUptime == 0 {
		// Alternative update
		sysUptime, hasSysUptime = getKeyUint64(flow.Fields, "systemInitTimeMilliseconds")
	}
	startUptime, hasStartUptime := getKeyUint64(flow.Fields, "flowStartSysUpTime")
	endUptime, hasEndUptime := getKeyUint64(flow.Fields, "flowEndSysUpTime")
	if hasSysUptime {
		// Can't convert uptime values to absolute time if sysUptime is bogus
		// It will result on a flow that starts and ends in the future.
		hasStartUptime = hasStartUptime && startUptime <= sysUptime
		hasEndUptime = hasEndUptime && endUptime <= sysUptime
		if hasStartUptime {
			ecsEvent["start"] = flow.Timestamp.Add(time.Duration(startUptime-sysUptime) * time.Millisecond)
		}
		if hasEndUptime {
			ecsEvent["end"] = flow.Timestamp.Add(time.Duration(endUptime-sysUptime) * time.Millisecond)
		}
		if hasStartUptime && hasEndUptime {
			ecsEvent["duration"] = ecsEvent["end"].(time.Time).Sub(ecsEvent["start"].(time.Time)).Nanoseconds()
		}
	}
	if ecsEvent["duration"] == nil {
		if durationMillis, found := getKeyUint64(flow.Fields, "flowDurationMilliseconds"); found {
			duration := time.Duration(durationMillis) * time.Millisecond
			ecsEvent["duration"] = duration

			// Here we're missing at least one of (start, end)
			if start := ecsEvent["start"]; start != nil {
				ecsEvent["end"] = start.(time.Time).Add(duration)
			} else if end := ecsEvent["end"]; end != nil {
				ecsEvent["start"] = end.(time.Time).Add(-duration)
			}
		}
	}

	flowDirection, hasFlowDirection := getKeyUint64(flow.Fields, "flowDirection")
	// ECS Fields -- source, destination & related.ip
	ecsSource := mapstr.M{}
	ecsDest := mapstr.M{}
	var relatedIP []net.IP

	// Populate first with WLAN fields
	if hasFlowDirection {
		staIP, _ := getKeyIP(flow.Fields, "staIPv4Address")
		staMac, hasStaMac := getKeyString(flow.Fields, "staMacAddress")
		wtpMac, hasWtpMac := getKeyString(flow.Fields, "wtpMacAddress")
		if hasStaMac && hasWtpMac {
			srcMac := staMac
			srcIP := staIP
			dstMac := wtpMac
			var dstIP net.IP = nil
			if Direction(flowDirection) == DirectionOutbound {
				srcMac, dstMac = dstMac, srcMac
				srcIP, dstIP = dstIP, srcIP
			}
			if srcIP != nil {
				ecsSource["ip"] = srcIP
				ecsSource["locality"] = getIPLocality(internalNetworks, srcIP).String()
			}
			ecsSource["mac"] = srcMac
			if dstIP != nil {
				ecsDest["ip"] = dstIP
				ecsDest["locality"] = getIPLocality(internalNetworks, dstIP).String()
			}
			ecsDest["mac"] = dstMac
		}
	}

	// Regular IPv4 fields
	if ip, found := getKeyIP(flow.Fields, "sourceIPv4Address"); found {
		ecsSource["ip"] = ip
		relatedIP = append(relatedIP, ip)
		ecsSource["locality"] = getIPLocality(internalNetworks, ip).String()
	} else if ip, found := getKeyIP(flow.Fields, "sourceIPv6Address"); found {
		ecsSource["ip"] = ip
		relatedIP = append(relatedIP, ip)
		ecsSource["locality"] = getIPLocality(internalNetworks, ip).String()
	}
	if sourcePort, found := getKeyUint64(flow.Fields, "sourceTransportPort"); found {
		ecsSource["port"] = sourcePort
	}
	if mac, found := getKeyString(flow.Fields, "sourceMacAddress"); found {
		ecsSource["mac"] = mac
	}

	// ECS Fields -- destination
	if ip, found := getKeyIP(flow.Fields, "destinationIPv4Address"); found {
		ecsDest["ip"] = ip
		relatedIP = append(relatedIP, ip)
		ecsDest["locality"] = getIPLocality(internalNetworks, ip).String()
	} else if ip, found := getKeyIP(flow.Fields, "destinationIPv6Address"); found {
		ecsDest["ip"] = ip
		relatedIP = append(relatedIP, ip)
		ecsDest["locality"] = getIPLocality(internalNetworks, ip).String()
	}
	if destPort, found := getKeyUint64(flow.Fields, "destinationTransportPort"); found {
		ecsDest["port"] = destPort
	}
	if mac, found := getKeyString(flow.Fields, "destinationMacAddress"); found {
		ecsDest["mac"] = mac
	}

	// ECS Fields -- Flow
	ecsFlow := mapstr.M{}
	var srcIP, dstIP net.IP
	var srcPort, dstPort uint16
	var protocol IPProtocol
	if ip, found := getKeyIP(record.Map(ecsSource), "ip"); found {
		srcIP = ip
	}
	if ip, found := getKeyIP(record.Map(ecsDest), "ip"); found {
		dstIP = ip
	}
	if port, found := getKeyUint64(flow.Fields, "sourceTransportPort"); found {
		srcPort = uint16(port)
	}
	if port, found := getKeyUint64(flow.Fields, "destinationTransportPort"); found {
		dstPort = uint16(port)
	}
	if proto, found := getKeyUint64(flow.Fields, "protocolIdentifier"); found {
		protocol = IPProtocol(proto)
	}
	if srcIP == nil {
		srcIP = net.IPv4(0, 0, 0, 0).To4()
	}
	if dstIP == nil {
		dstIP = net.IPv4(0, 0, 0, 0).To4()
	}
	ecsFlow["id"] = flowID(srcIP, dstIP, srcPort, dstPort, uint8(protocol))
	ecsFlow["locality"] = getIPLocality(internalNetworks, srcIP, dstIP).String()

	// ECS Fields -- network
	ecsNetwork := mapstr.M{}
	if proto, found := getKeyUint64(flow.Fields, "protocolIdentifier"); found {
		ecsNetwork["transport"] = IPProtocol(proto).String()
		ecsNetwork["iana_number"] = proto
	}
	countBytes, hasBytes := getKeyUint64Alternatives(flow.Fields, "octetDeltaCount", "octetTotalCount", "initiatorOctets")
	countPkts, hasPkts := getKeyUint64Alternatives(flow.Fields, "packetDeltaCount", "packetTotalCount", "initiatorPackets")
	revBytes, hasRevBytes := getKeyUint64Alternatives(flow.Fields, "reverseOctetDeltaCount", "reverseOctetTotalCount", "responderOctets")
	revPkts, hasRevPkts := getKeyUint64Alternatives(flow.Fields, "reversePacketDeltaCount", "reversePacketTotalCount", "responderPackets")

	if hasRevBytes {
		ecsDest["bytes"] = revBytes
	}

	if hasRevPkts {
		ecsDest["packets"] = revPkts
	}

	if hasBytes {
		ecsSource["bytes"] = countBytes
		if hasRevBytes {
			countBytes += revBytes
		}
		ecsNetwork["bytes"] = countBytes
	}
	if hasPkts {
		ecsSource["packets"] = countPkts
		if hasRevPkts {
			countPkts += revPkts
		}
		ecsNetwork["packets"] = countPkts
	}

	if biflowDir, isBiflow := getKeyUint64(flow.Fields, "biflowDirection"); isBiflow && len(ecsSource) > 0 && len(ecsDest) > 0 {
		// swap source and destination if biflowDirection is reverseInitiator
		if biflowDir == 2 {
			ecsDest, ecsSource = ecsSource, ecsDest
		}
		ecsEvent["category"] = []string{"network", "session"}

		// Assume source is the client in biflows.
		event.Fields["client"] = ecsSource
		event.Fields["server"] = ecsDest
	}

	ecsNetwork["direction"] = "unknown"
	if hasFlowDirection {
		ecsNetwork["direction"] = Direction(flowDirection).String()
	}
	if ssid, found := getKeyString(flow.Fields, "wlanSSID"); found {
		ecsNetwork["name"] = ssid
	}

	if communityid := flowhash.CommunityID.Hash(flowhash.Flow{
		SourceIP:        srcIP,
		SourcePort:      srcPort,
		DestinationIP:   dstIP,
		DestinationPort: dstPort,
		Protocol:        uint8(protocol),
	}); communityid != "" {
		ecsNetwork["community_id"] = communityid
	}

	if len(ecsFlow) > 0 {
		event.Fields["flow"] = ecsFlow
	}
	if len(ecsSource) > 0 {
		event.Fields["source"] = ecsSource
	}
	if len(ecsDest) > 0 {
		event.Fields["destination"] = ecsDest
	}
	if len(ecsNetwork) > 0 {
		event.Fields["network"] = ecsNetwork
	}
	if len(relatedIP) > 0 {
		event.Fields["related"] = mapstr.M{"ip": uniqueIPs(relatedIP)}
	}
	return event
}