in pcap-cli/internal/transformer/json_translator.go [997:1203]
func (t *JSONPcapTranslator) finalize(
ctx context.Context,
ifaces netIfaceIndex,
iface *PcapIface,
serial *uint64,
p *gopacket.Packet,
conntrack bool,
packet fmt.Stringer,
) (fmt.Stringer, error) {
json := t.asTranslation(packet)
data := make(map[string]any, 15)
id := ctx.Value(ContextID)
logName := ctx.Value(ContextLogName)
operation, _ := json.Object("logging.googleapis.com/operation")
operation.Set(logName, "producer")
if *serial == 1 {
operation.Set(true, "first")
}
data["ifaceIndex"] = t.iface.Index
data["ifaceName"] = t.iface.Name
data["serial"] = *serial
flowIDstr, _ := json.S("meta", "flow").Data().(string) // this is always available
flowID, _ := strconv.ParseUint(flowIDstr, 10, 64)
l3Src, _ := json.S("L3", "src").Data().(net.IP)
l3Dst, _ := json.S("L3", "dst").Data().(net.IP)
// handle cases where there is L3 is not available
if l3Src == nil && l3Dst == nil {
if json.Exists("ARP") {
arpSrcIP, _ := json.S("ARP", "src", "IP").Data().(string)
l3Src = net.ParseIP(arpSrcIP)
data["L3Src"] = arpSrcIP
arpDstIP, _ := json.S("ARP", "dst", "IP").Data().(string)
l3Dst = net.ParseIP(arpDstIP)
data["L3Dst"] = arpDstIP
t.checkL3Address(ctx, json, data, ifaces, iface, l3Src, l3Dst)
if arpFlowIDstr, arpOK := json.S("ARP", "flow").Data().(string); arpOK {
arpFlowID, _ := strconv.ParseUint(arpFlowIDstr, 10, 64)
flowID = fnv1a.AddUint64(flowID, arpFlowID)
flowIDstr = strconv.FormatUint(flowID, 10)
data["flowID"] = flowIDstr
json.Set(flowIDstr, "flow")
} else {
data["flowID"] = "0"
json.Set("0", "flow")
}
operation.Set(stringFormatter.Format(jsonTranslationFlowTemplate, id, t.iface.Name, "arp", flowIDstr), "id")
json.Set(stringFormatter.FormatComplex(jsonTranslationSummaryARP, data), "message")
return json, nil
}
operation.Set(stringFormatter.Format(jsonTranslationFlowTemplate, id, t.iface.Name, "l2", flowIDstr), "id")
json.Set(stringFormatter.FormatComplex(jsonTranslationTemplate, data), "message")
return json, nil
}
data["L3Src"] = l3Src
data["L3Dst"] = l3Dst
// report complete interface details when capturing for `any` interface
t.checkL3Address(ctx, json, data, ifaces, iface, l3Src, l3Dst)
isSrcLocal := iface.Addrs.Contains(l3Src.String())
proto := json.S("L3", "proto", "num").Data().(layers.IPProtocol)
isTCP := proto == layers.IPProtocolTCP
isUDP := proto == layers.IPProtocolUDP
isICMPv4 := proto == layers.IPProtocolICMPv4
isICMPv6 := proto == layers.IPProtocolICMPv6
// `flowID` is the unique ID of this conversation:
// given by the 6-tuple: iface_index+protocol+src_ip+src_port+dst_ip+dst_port.
// Addition is commutative, so after hashing `net.IP` bytes and L4 ports to `uint64`,
// the same `uint64`/`flowID` is produced after adding everything up, no matter the order.
// Using the same `flowID` will produce grouped logs in Cloud Logging.
if l3FlowIDstr, l3OK := json.S("L3", "flow").Data().(string); l3OK {
l3FlowID, _ := strconv.ParseUint(l3FlowIDstr, 10, 64)
flowID = fnv1a.AddUint64(flowID, l3FlowID)
}
if l4FlowIDstr, l4OK := json.S("L4", "flow").Data().(string); l4OK {
l4FlowID, _ := strconv.ParseUint(l4FlowIDstr, 10, 64)
flowID = fnv1a.AddUint64(flowID, l4FlowID)
} else {
flowID = fnv1a.AddUint64(flowID, 255) // RESERVED (0xFF)
}
flowIDstr = strconv.FormatUint(flowID, 10)
data["flowID"] = flowIDstr
json.Set(flowIDstr, "flow")
if !isTCP && !isUDP {
if isICMPv4 || isICMPv6 {
if isICMPv6 {
data["icmpVersion"] = 6
} else {
data["icmpVersion"] = 4
}
data["icmpMessage"] = json.S("ICMP", "msg").Data().(string)
operation.Set(stringFormatter.Format(jsonTranslationFlowTemplate, id, t.iface.Name, "icmp", flowIDstr), "id")
json.Set(stringFormatter.FormatComplex(jsonTranslationSummaryICMP, data), "message")
return json, nil
}
// unhandled L3 protocol
operation.Set(stringFormatter.Format(jsonTranslationFlowTemplate, id, t.iface.Name, "l3", flowIDstr), "id")
json.Set(stringFormatter.FormatComplex(jsonTranslationSummaryWithoutL4, data), "message")
return json, nil
}
if isUDP {
data["L4Proto"] = "UDP"
srcPort, _ := json.S("L4", "src").Data().(layers.UDPPort)
data["L4Src"] = uint16(srcPort)
dstPort, _ := json.S("L4", "dst").Data().(layers.UDPPort)
data["L4Dst"] = uint16(dstPort)
isSrcLocal = isSrcLocal && !t.ephemerals.isEphemeralUDPPort(&srcPort)
json.Set(isSrcLocal, "local")
operation.Set(stringFormatter.Format(jsonTranslationFlowTemplate, id, t.iface.Name, "udp", flowIDstr), "id")
json.Set(stringFormatter.FormatComplex(jsonTranslationSummaryUDP, data), "message")
return json, nil
}
data["L4Proto"] = "TCP"
srcPort, _ := json.S("L4", "src").Data().(layers.TCPPort)
data["L4Src"] = uint16(srcPort)
dstPort, _ := json.S("L4", "dst").Data().(layers.TCPPort)
data["L4Dst"] = uint16(dstPort)
data["tcpFlags"] = json.S("L4", "flags", "str").Data().(string)
seq, _ := json.S("L4", "seq").Data().(uint32)
data["tcpSeq"] = seq
ack, _ := json.S("L4", "ack").Data().(uint32)
data["tcpAck"] = ack
tcpLen, _ := json.S("L4", "len").Data().(string)
data["tcpLen"] = tcpLen
operation.Set(stringFormatter.Format(jsonTranslationFlowTemplate, id, t.iface.Name, "tcp", flowIDstr), "id")
message := stringFormatter.FormatComplex(jsonTranslationSummaryTCP, data)
// local means: a service running within the sandbox
// - so it is not a client which created a socket to communicate with a remote host using an ephemeral port
// this approach is best effort as a client may use a `not ephemeral port` to create a socket for egress networking.
isSrcLocal = isSrcLocal && !t.ephemerals.isEphemeralTCPPort(&srcPort)
json.Set(isSrcLocal, "local")
if !isDebugVerbosity(ctx, t.pcapTranslator) {
// skip advanced packet analysis
json.Set(message, "message")
return json, nil
}
setFlags, _ := json.S("L4", "flags", "dec").Data().(uint8)
// `finalize` is invoked from a `worker` via a go-routine `pool`:
// - there are no guarantees about which packet will get `finalize`d 1st
// - there are no guarantees about about which packet will get the `lock` next
// minimize locking: lock per-flow instead of across-flows.
// Locking is done in the name of throubleshoot-ability, so some contention at the flow level should be acceptable...
lock, traceAndSpanProvider := t.fm.lock(ctx, serial, &flowID, &setFlags, &seq, &ack, isSrcLocal)
if conntrack {
t.analyzeConnection(p, &flowID, &setFlags, json)
}
appLayer := (*p).ApplicationLayer()
if ((tcpSyn|tcpFin|tcpRst)&setFlags == 0) && appLayer != nil {
return t.addAppLayerData(ctx, p, lock, &flowID, &setFlags, &seq, &appLayer, json, &message, traceAndSpanProvider)
}
if !lock.IsHTTP2() {
// most ingress traffic is HTTP/1.1 , so:
// - try to get trace tracking information using h1 stream id
streamID := http11StreamID
if ts, ok := traceAndSpanProvider(&streamID); ok {
t.setTraceAndSpan(json, ts)
}
}
json.Set(message, "message")
// packet is not carrying any data, unlock using TCP flags
_, lockLatency := lock.UnlockWithTCPFlags(ctx, &setFlags)
json.Set(lockLatency.String(), "ll")
return json, nil
}