func()

in pcap-cli/internal/transformer/json_translator.go [997:1203]


func (t *JSONPcapTranslator) finalize(
	ctx context.Context,
	ifaces netIfaceIndex,
	iface *PcapIface,
	serial *uint64,
	p *gopacket.Packet,
	conntrack bool,
	packet fmt.Stringer,
) (fmt.Stringer, error) {
	json := t.asTranslation(packet)

	data := make(map[string]any, 15)

	id := ctx.Value(ContextID)
	logName := ctx.Value(ContextLogName)

	operation, _ := json.Object("logging.googleapis.com/operation")
	operation.Set(logName, "producer")
	if *serial == 1 {
		operation.Set(true, "first")
	}

	data["ifaceIndex"] = t.iface.Index
	data["ifaceName"] = t.iface.Name

	data["serial"] = *serial

	flowIDstr, _ := json.S("meta", "flow").Data().(string) // this is always available
	flowID, _ := strconv.ParseUint(flowIDstr, 10, 64)

	l3Src, _ := json.S("L3", "src").Data().(net.IP)
	l3Dst, _ := json.S("L3", "dst").Data().(net.IP)

	// handle cases where there is L3 is not available
	if l3Src == nil && l3Dst == nil {
		if json.Exists("ARP") {
			arpSrcIP, _ := json.S("ARP", "src", "IP").Data().(string)
			l3Src = net.ParseIP(arpSrcIP)
			data["L3Src"] = arpSrcIP

			arpDstIP, _ := json.S("ARP", "dst", "IP").Data().(string)
			l3Dst = net.ParseIP(arpDstIP)
			data["L3Dst"] = arpDstIP

			t.checkL3Address(ctx, json, data, ifaces, iface, l3Src, l3Dst)

			if arpFlowIDstr, arpOK := json.S("ARP", "flow").Data().(string); arpOK {
				arpFlowID, _ := strconv.ParseUint(arpFlowIDstr, 10, 64)
				flowID = fnv1a.AddUint64(flowID, arpFlowID)
				flowIDstr = strconv.FormatUint(flowID, 10)

				data["flowID"] = flowIDstr
				json.Set(flowIDstr, "flow")
			} else {
				data["flowID"] = "0"
				json.Set("0", "flow")
			}

			operation.Set(stringFormatter.Format(jsonTranslationFlowTemplate, id, t.iface.Name, "arp", flowIDstr), "id")
			json.Set(stringFormatter.FormatComplex(jsonTranslationSummaryARP, data), "message")

			return json, nil
		}

		operation.Set(stringFormatter.Format(jsonTranslationFlowTemplate, id, t.iface.Name, "l2", flowIDstr), "id")
		json.Set(stringFormatter.FormatComplex(jsonTranslationTemplate, data), "message")

		return json, nil
	}

	data["L3Src"] = l3Src
	data["L3Dst"] = l3Dst

	// report complete interface details when capturing for `any` interface
	t.checkL3Address(ctx, json, data, ifaces, iface, l3Src, l3Dst)

	isSrcLocal := iface.Addrs.Contains(l3Src.String())

	proto := json.S("L3", "proto", "num").Data().(layers.IPProtocol)
	isTCP := proto == layers.IPProtocolTCP
	isUDP := proto == layers.IPProtocolUDP
	isICMPv4 := proto == layers.IPProtocolICMPv4
	isICMPv6 := proto == layers.IPProtocolICMPv6

	// `flowID` is the unique ID of this conversation:
	// given by the 6-tuple: iface_index+protocol+src_ip+src_port+dst_ip+dst_port.
	// Addition is commutative, so after hashing `net.IP` bytes and L4 ports to `uint64`,
	// the same `uint64`/`flowID` is produced after adding everything up, no matter the order.
	// Using the same `flowID` will produce grouped logs in Cloud Logging.
	if l3FlowIDstr, l3OK := json.S("L3", "flow").Data().(string); l3OK {
		l3FlowID, _ := strconv.ParseUint(l3FlowIDstr, 10, 64)
		flowID = fnv1a.AddUint64(flowID, l3FlowID)
	}
	if l4FlowIDstr, l4OK := json.S("L4", "flow").Data().(string); l4OK {
		l4FlowID, _ := strconv.ParseUint(l4FlowIDstr, 10, 64)
		flowID = fnv1a.AddUint64(flowID, l4FlowID)
	} else {
		flowID = fnv1a.AddUint64(flowID, 255) // RESERVED (0xFF)
	}
	flowIDstr = strconv.FormatUint(flowID, 10)

	data["flowID"] = flowIDstr
	json.Set(flowIDstr, "flow")

	if !isTCP && !isUDP {
		if isICMPv4 || isICMPv6 {
			if isICMPv6 {
				data["icmpVersion"] = 6
			} else {
				data["icmpVersion"] = 4
			}
			data["icmpMessage"] = json.S("ICMP", "msg").Data().(string)

			operation.Set(stringFormatter.Format(jsonTranslationFlowTemplate, id, t.iface.Name, "icmp", flowIDstr), "id")
			json.Set(stringFormatter.FormatComplex(jsonTranslationSummaryICMP, data), "message")

			return json, nil
		}

		// unhandled L3 protocol
		operation.Set(stringFormatter.Format(jsonTranslationFlowTemplate, id, t.iface.Name, "l3", flowIDstr), "id")
		json.Set(stringFormatter.FormatComplex(jsonTranslationSummaryWithoutL4, data), "message")

		return json, nil
	}

	if isUDP {
		data["L4Proto"] = "UDP"
		srcPort, _ := json.S("L4", "src").Data().(layers.UDPPort)
		data["L4Src"] = uint16(srcPort)
		dstPort, _ := json.S("L4", "dst").Data().(layers.UDPPort)
		data["L4Dst"] = uint16(dstPort)

		isSrcLocal = isSrcLocal && !t.ephemerals.isEphemeralUDPPort(&srcPort)
		json.Set(isSrcLocal, "local")

		operation.Set(stringFormatter.Format(jsonTranslationFlowTemplate, id, t.iface.Name, "udp", flowIDstr), "id")
		json.Set(stringFormatter.FormatComplex(jsonTranslationSummaryUDP, data), "message")
		return json, nil
	}

	data["L4Proto"] = "TCP"
	srcPort, _ := json.S("L4", "src").Data().(layers.TCPPort)
	data["L4Src"] = uint16(srcPort)
	dstPort, _ := json.S("L4", "dst").Data().(layers.TCPPort)
	data["L4Dst"] = uint16(dstPort)

	data["tcpFlags"] = json.S("L4", "flags", "str").Data().(string)

	seq, _ := json.S("L4", "seq").Data().(uint32)
	data["tcpSeq"] = seq
	ack, _ := json.S("L4", "ack").Data().(uint32)
	data["tcpAck"] = ack
	tcpLen, _ := json.S("L4", "len").Data().(string)
	data["tcpLen"] = tcpLen

	operation.Set(stringFormatter.Format(jsonTranslationFlowTemplate, id, t.iface.Name, "tcp", flowIDstr), "id")

	message := stringFormatter.FormatComplex(jsonTranslationSummaryTCP, data)

	// local means: a service running within the sandbox
	//   - so it is not a client which created a socket to communicate with a remote host using an ephemeral port
	// this approach is best effort as a client may use a `not ephemeral port` to create a socket for egress networking.
	isSrcLocal = isSrcLocal && !t.ephemerals.isEphemeralTCPPort(&srcPort)
	json.Set(isSrcLocal, "local")

	if !isDebugVerbosity(ctx, t.pcapTranslator) {
		// skip advanced packet analysis
		json.Set(message, "message")
		return json, nil
	}

	setFlags, _ := json.S("L4", "flags", "dec").Data().(uint8)

	// `finalize` is invoked from a `worker` via a go-routine `pool`:
	//   - there are no guarantees about which packet will get `finalize`d 1st
	//   - there are no guarantees about about which packet will get the `lock` next
	// minimize locking: lock per-flow instead of across-flows.
	// Locking is done in the name of throubleshoot-ability, so some contention at the flow level should be acceptable...
	lock, traceAndSpanProvider := t.fm.lock(ctx, serial, &flowID, &setFlags, &seq, &ack, isSrcLocal)

	if conntrack {
		t.analyzeConnection(p, &flowID, &setFlags, json)
	}

	appLayer := (*p).ApplicationLayer()
	if ((tcpSyn|tcpFin|tcpRst)&setFlags == 0) && appLayer != nil {
		return t.addAppLayerData(ctx, p, lock, &flowID, &setFlags, &seq, &appLayer, json, &message, traceAndSpanProvider)
	}

	if !lock.IsHTTP2() {
		// most ingress traffic is HTTP/1.1 , so:
		//   - try to get trace tracking information using h1 stream id
		streamID := http11StreamID
		if ts, ok := traceAndSpanProvider(&streamID); ok {
			t.setTraceAndSpan(json, ts)
		}
	}

	json.Set(message, "message")

	// packet is not carrying any data, unlock using TCP flags
	_, lockLatency := lock.UnlockWithTCPFlags(ctx, &setFlags)
	json.Set(lockLatency.String(), "ll")

	return json, nil
}