func createEvent()

in packetbeat/flows/worker.go [292:528]


func createEvent(watcher *procs.ProcessesWatcher, ts time.Time, f *biFlow, isOver bool, intNames, uintNames, floatNames []string, enableDeltaFlowReporting bool) beat.Event {
	timestamp := ts

	event := mapstr.M{
		"start":    common.Time(f.createTS),
		"end":      common.Time(f.ts),
		"duration": f.ts.Sub(f.createTS),
		"dataset":  "flow",
		"kind":     "event",
		"category": []string{"network"},
		"action":   "network_flow",
	}
	eventType := []string{"connection"}
	if isOver {
		eventType = append(eventType, "end")
	}
	event["type"] = eventType

	flow := mapstr.M{
		"id":    common.NetString(f.id.Serialize()),
		"final": isOver,
	}
	fields := mapstr.M{
		"event": event,
		"flow":  flow,
		"type":  "flow",
	}
	network := mapstr.M{}
	source := mapstr.M{}
	dest := mapstr.M{}
	tuple := common.IPPortTuple{}
	var communityID flowhash.Flow
	var proto applayer.Transport

	// add ethernet layer meta data
	if src, dst, ok := f.id.EthAddr(); ok {
		source["mac"] = formatHardwareAddr(net.HardwareAddr(src))
		dest["mac"] = formatHardwareAddr(net.HardwareAddr(dst))
	}

	// add vlan
	if vlan := f.id.OutterVLan(); vlan != nil {
		vlanID := uint64(binary.LittleEndian.Uint16(vlan))
		putOrAppendUint64(flow, "vlan", vlanID)
	}
	if vlan := f.id.VLan(); vlan != nil {
		vlanID := uint64(binary.LittleEndian.Uint16(vlan))
		putOrAppendUint64(flow, "vlan", vlanID)
	}

	// ipv4 layer meta data
	if src, dst, ok := f.id.OutterIPv4Addr(); ok {
		srcIP, dstIP := net.IP(src), net.IP(dst)
		source["ip"] = srcIP.String()
		dest["ip"] = dstIP.String()
		tuple.SrcIP = srcIP
		tuple.DstIP = dstIP
		tuple.IPLength = 4
		network["type"] = "ipv4"
		communityID.SourceIP = srcIP
		communityID.DestinationIP = dstIP
	}
	if src, dst, ok := f.id.IPv4Addr(); ok {
		srcIP, dstIP := net.IP(src), net.IP(dst)
		putOrAppendString(source, "ip", srcIP.String())
		putOrAppendString(dest, "ip", dstIP.String())
		// Save IPs for process matching if an outer layer was not present
		if tuple.IPLength == 0 {
			tuple.SrcIP = srcIP
			tuple.DstIP = dstIP
			tuple.IPLength = 4
			communityID.SourceIP = srcIP
			communityID.DestinationIP = dstIP
			network["type"] = "ipv4"
		}
	}

	// ipv6 layer meta data
	if src, dst, ok := f.id.OutterIPv6Addr(); ok {
		srcIP, dstIP := net.IP(src), net.IP(dst)
		putOrAppendString(source, "ip", srcIP.String())
		putOrAppendString(dest, "ip", dstIP.String())
		tuple.SrcIP = srcIP
		tuple.DstIP = dstIP
		tuple.IPLength = 6
		network["type"] = "ipv6"
		communityID.SourceIP = srcIP
		communityID.DestinationIP = dstIP
	}
	if src, dst, ok := f.id.IPv6Addr(); ok {
		srcIP, dstIP := net.IP(src), net.IP(dst)
		putOrAppendString(source, "ip", srcIP.String())
		putOrAppendString(dest, "ip", dstIP.String())
		// Save IPs for process matching if an outer layer was not present
		if tuple.IPLength == 0 {
			tuple.SrcIP = srcIP
			tuple.DstIP = dstIP
			tuple.IPLength = 6
			communityID.SourceIP = srcIP
			communityID.DestinationIP = dstIP
			network["type"] = "ipv6"
		}
	}

	// udp layer meta data
	if src, dst, ok := f.id.UDPAddr(); ok {
		tuple.SrcPort = binary.LittleEndian.Uint16(src)
		tuple.DstPort = binary.LittleEndian.Uint16(dst)
		source["port"], dest["port"] = tuple.SrcPort, tuple.DstPort
		network["transport"] = "udp"
		proto = applayer.TransportUDP
		communityID.SourcePort = tuple.SrcPort
		communityID.DestinationPort = tuple.DstPort
		communityID.Protocol = 17
	}

	// tcp layer meta data
	if src, dst, ok := f.id.TCPAddr(); ok {
		tuple.SrcPort = binary.LittleEndian.Uint16(src)
		tuple.DstPort = binary.LittleEndian.Uint16(dst)
		source["port"], dest["port"] = tuple.SrcPort, tuple.DstPort
		network["transport"] = "tcp"
		proto = applayer.TransportTCP
		communityID.SourcePort = tuple.SrcPort
		communityID.DestinationPort = tuple.DstPort
		communityID.Protocol = 6
	}

	var totalBytes, totalPackets uint64
	if f.stats[0] != nil {
		// Source stats.
		stats := encodeStats(f.stats[0], intNames, uintNames, floatNames, enableDeltaFlowReporting)
		for k, v := range stats {
			switch k {
			case "icmpV4TypeCode":
				if typeCode, ok := v.(uint64); ok && typeCode > 0 {
					network["transport"] = "icmp"
					communityID.Protocol = 1
					communityID.ICMP.Type = uint8(typeCode >> 8)
					communityID.ICMP.Code = uint8(typeCode)
				}
			case "icmpV6TypeCode":
				if typeCode, ok := v.(uint64); ok && typeCode > 0 {
					network["transport"] = "ipv6-icmp"
					communityID.Protocol = 58
					communityID.ICMP.Type = uint8(typeCode >> 8)
					communityID.ICMP.Code = uint8(typeCode)
				}
			default:
				source[k] = v
			}
		}

		if v, found := stats["bytes"]; found {
			//nolint:errcheck // ignore
			totalBytes += v.(uint64)
		}
		if v, found := stats["packets"]; found {
			//nolint:errcheck // ignore
			totalPackets += v.(uint64)
		}
	}
	if f.stats[1] != nil {
		// Destination stats.
		stats := encodeStats(f.stats[1], intNames, uintNames, floatNames, enableDeltaFlowReporting)
		for k, v := range stats {
			switch k {
			case "icmpV4TypeCode", "icmpV6TypeCode":
			default:
				dest[k] = v
			}
		}

		if v, found := stats["bytes"]; found {
			//nolint:errcheck // ignore
			totalBytes += v.(uint64)
		}
		if v, found := stats["packets"]; found {
			//nolint:errcheck // ignore
			totalPackets += v.(uint64)
		}
	}
	if communityID.Protocol > 0 && len(communityID.SourceIP) > 0 && len(communityID.DestinationIP) > 0 {
		if hash := flowhash.CommunityID.Hash(communityID); hash != "" {
			network["community_id"] = hash
		}
	}
	network["bytes"] = totalBytes
	network["packets"] = totalPackets
	fields["network"] = network

	// Set process information if it's available
	if tuple.IPLength != 0 && tuple.SrcPort != 0 {
		if proc := watcher.FindProcessesTuple(&tuple, proto); proc != nil {
			if proc.Src.PID > 0 {
				p := mapstr.M{
					"pid":               proc.Src.PID,
					"name":              proc.Src.Name,
					"args":              proc.Src.Args,
					"ppid":              proc.Src.PPID,
					"executable":        proc.Src.Exe,
					"start":             proc.Src.StartTime,
					"working_directory": proc.Src.CWD,
				}
				if proc.Src.CWD != "" {
					p["working_directory"] = proc.Src.CWD
				}
				source["process"] = p
				fields["process"] = p
			}
			if proc.Dst.PID > 0 {
				p := mapstr.M{
					"pid":               proc.Dst.PID,
					"name":              proc.Dst.Name,
					"args":              proc.Dst.Args,
					"ppid":              proc.Dst.PPID,
					"executable":        proc.Dst.Exe,
					"start":             proc.Dst.StartTime,
					"working_directory": proc.Src.CWD,
				}
				if proc.Dst.CWD != "" {
					p["working_directory"] = proc.Dst.CWD
				}
				dest["process"] = p
				fields["process"] = p
			}
		}
	}

	fields["source"] = source
	fields["destination"] = dest

	return beat.Event{
		Timestamp: timestamp,
		Fields:    fields,
	}
}