func()

in x-pack/auditbeat/module/system/socket/state.go [907:1058]


func (f *flow) toEvent(final bool) (ev mb.Event, err error) {
	localAddr := f.local.addr
	remoteAddr := f.remote.addr

	local := mapstr.M{
		"ip":      localAddr.IP.String(),
		"port":    localAddr.Port,
		"packets": f.local.packets,
		"bytes":   f.local.bytes,
	}

	remote := mapstr.M{
		"ip":      remoteAddr.IP.String(),
		"port":    remoteAddr.Port,
		"packets": f.remote.packets,
		"bytes":   f.remote.bytes,
	}

	src, dst := local, remote
	switch f.dir {
	case directionIngress:
		src, dst = dst, src
	case directionUnknown:
		// For some flows we can miss information to determine the source (dir=unknown).
		// As a last resort, assume that the client side uses a higher port number
		// than the server.
		if localAddr.Port < remoteAddr.Port {
			src, dst = dst, src
		}
	}

	inetType := f.inetType
	// Under Linux, a socket created as AF_INET6 can receive IPv4 connections
	// and it will use the IPv4 stack.
	// This results in src and dst address using IPv4 mapped addresses (which
	// Golang converts to IPv4 automatically). It will be misleading to report
	// network.type: ipv6 and have v4 addresses, so it's better to report
	// a network.type of ipv4 (which also matches the actual stack used).
	if inetType == inetTypeIPv6 && f.local.addr.IP.To4() != nil && f.remote.addr.IP.To4() != nil {
		inetType = inetTypeIPv4
	}
	eventType := []string{"info"}
	if inetType == inetTypeIPv6 || inetType == inetTypeIPv4 {
		eventType = append(eventType, "connection")
	}

	root := mapstr.M{
		"source":      src,
		"client":      src,
		"destination": dst,
		"server":      dst,
		"network": mapstr.M{
			"direction": f.dir.String(),
			"type":      inetType.String(),
			"transport": f.proto.String(),
			"packets":   f.local.packets + f.remote.packets,
			"bytes":     f.local.bytes + f.remote.bytes,
		},
		"event": mapstr.M{
			"kind":     "event",
			"action":   "network_flow",
			"category": []string{"network"},
			"type":     eventType,
			"start":    f.createdTime,
			"end":      f.lastSeenTime,
			"duration": f.lastSeenTime.Sub(f.createdTime).Nanoseconds(),
		},
		"flow": mapstr.M{
			"final":    final,
			"complete": f.complete,
		},
	}
	if communityid := flowhash.CommunityID.Hash(flowhash.Flow{
		SourceIP:        localAddr.IP,
		SourcePort:      uint16(localAddr.Port),
		DestinationIP:   remoteAddr.IP,
		DestinationPort: uint16(remoteAddr.Port),
		Protocol:        uint8(f.proto),
	}); communityid != "" {
		(root["network"].(mapstr.M))["community_id"] = communityid
	}

	var errs []error
	rootPut := func(key string, value interface{}) {
		if _, err := root.Put(key, value); err != nil {
			errs = append(errs, err)
		}
	}

	relatedIPs := []string{}
	if len(localAddr.IP) != 0 {
		relatedIPs = append(relatedIPs, localAddr.IP.String())
	}
	if len(localAddr.IP) > 0 {
		relatedIPs = append(relatedIPs, remoteAddr.IP.String())
	}
	if len(relatedIPs) > 0 {
		rootPut("related.ip", relatedIPs)
	}

	metricset := mapstr.M{
		"kernel_sock_address": fmt.Sprintf("0x%x", f.sock),
	}

	if f.pid != 0 {
		process := mapstr.M{
			"pid": int(f.pid),
		}
		if f.process != nil {
			process["name"] = f.process.name
			process["args"] = f.process.args
			process["executable"] = f.process.path
			if f.process.createdTime != (time.Time{}) {
				process["created"] = f.process.createdTime
			}
			if f.process.entityID != "" {
				process["entity_id"] = f.process.entityID
			}

			if f.process.hasCreds {
				uid := strconv.Itoa(int(f.process.uid))
				gid := strconv.Itoa(int(f.process.gid))
				rootPut("user.id", uid)
				rootPut("group.id", gid)
				if name := userCache.LookupID(uid); name != "" {
					rootPut("user.name", name)
					rootPut("related.user", []string{name})
				}
				if name := groupCache.LookupID(gid); name != "" {
					rootPut("group.name", name)
				}
				metricset["uid"] = f.process.uid
				metricset["gid"] = f.process.gid
				metricset["euid"] = f.process.euid
				metricset["egid"] = f.process.egid
			}

			if domain, found := f.process.ResolveIP(f.local.addr.IP); found {
				local["domain"] = domain
			}
			if domain, found := f.process.ResolveIP(f.remote.addr.IP); found {
				remote["domain"] = domain
			}
		}
		root["process"] = process
	}

	return mb.Event{
		RootFields:      root,
		MetricSetFields: metricset,
	}, errors.Join(errs...)
}