pcap-cli/internal/transformer/json_translator.go (1,350 lines of code) (raw):
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//go:build json
package transformer
import (
"bufio"
"bytes"
"context"
"encoding/binary"
"fmt"
"io"
"net"
"net/http"
"net/netip"
"os"
"regexp"
"strconv"
"strings"
"time"
"github.com/Jeffail/gabs/v2"
"github.com/google/gopacket"
"github.com/google/gopacket/layers"
"github.com/pkg/errors"
"github.com/segmentio/fasthash/fnv1a"
"github.com/wissance/stringFormatter"
"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"
"github.com/alphadose/haxmap"
mapset "github.com/deckarep/golang-set/v2"
)
type (
JSONPcapTranslator struct {
*pcapTranslator
fm *flowMutex
traceToHttpRequestMap *haxmap.Map[string, *httpRequest]
flowToStreamToSequenceMap FTSTSM
}
)
const (
jsonTranslationFlowTemplate = "{0}/iface/{1}/flow/{2}:{3}"
jsonTranslationTemplate = "#{serial} | @{ifaceIndex}/{ifaceName}"
jsonTranslationSummary = jsonTranslationTemplate + " | flow:{flowID}"
jsonTranslationSummaryWithoutL4 = jsonTranslationSummary + " | {L3Src} > {L3Dst}"
jsonTranslationSummaryARP = jsonTranslationSummary + " | ARP | {L3Src} > {L3Dst}"
jsonTranslationSummaryICMP = jsonTranslationSummary + " | ICMPv{icmpVersion} | {L3Src} > {L3Dst} | {icmpMessage}"
jsonTranslationSummaryUDP = jsonTranslationSummary + " | {L4Proto} | {L3Src}:{L4Src} > {L3Dst}:{L4Dst}"
jsonTranslationSummaryTCP = jsonTranslationSummaryUDP + " | [{tcpFlags}] | len/seq/ack:{tcpLen}/{tcpSeq}/{tcpAck}"
)
func init() {
registerTranslatorFactory(JSON, newJSONPcapTranslator)
}
func (t *JSONPcapTranslator) translate(_ *gopacket.Packet) error {
return fmt.Errorf("not implemented")
}
func (t *JSONPcapTranslator) done(ctx context.Context) {
t.fm.MutexMap.ForEach(func(flowID uint64, lock *flowLockCarrier) bool {
if lock.mu.TryLock() {
t.fm.untrackConnection(ctx, &flowID, lock)
transformerLogger.Printf("[%d/%s] – untracked flow: %d\n", t.iface.Index, t.iface.Name, flowID)
lock.mu.Unlock()
}
return true
})
t.fm.MutexMap.Clear()
t.flowToStreamToSequenceMap.Clear()
t.traceToHttpRequestMap.Clear()
}
// return pointer to `struct` `gabs.Container`
func (t *JSONPcapTranslator) next(
ctx context.Context,
nic *PcapIface,
serial *uint64,
packet *gopacket.Packet,
) fmt.Stringer {
flowID := fnv1a.AddUint64(fnv1a.Init64, uint64(t.iface.Index))
flowIDstr := strconv.FormatUint(flowID, 10)
json := gabs.New()
id := ctx.Value(ContextID)
logName := ctx.Value(ContextLogName)
pcap, _ := json.Object("pcap")
serialStr := strconv.FormatUint(*serial, 10)
pcap.Set(serialStr, "num")
labels, _ := json.Object("logging.googleapis.com/labels")
labels.Set("pcap-sidecar", "run.googleapis.com/tool")
metadata := (*packet).Metadata()
info := metadata.CaptureInfo
meta, _ := json.Object("meta")
meta.Set(flowIDstr, "flow")
meta.Set(metadata.Truncated, "trunc")
timestamp, _ := json.Object("timestamp")
timestamp.Set(info.Timestamp.Unix(), "seconds")
timestamp.Set(info.Timestamp.Nanosecond(), "nanos")
atDebugVerbosity(ctx,
t.pcapTranslator,
func(
ctx context.Context,
translator *pcapTranslator,
) {
pcap.Set(id, "id")
labels.Set(id, "run.googleapis.com/pcap/id")
pcap.Set(logName, "ctx")
labels.Set(logName, "run.googleapis.com/pcap/name")
labels.Set(t.iface.Name, "run.googleapis.com/pcap/iface")
meta.Set(info.Length, "len")
meta.Set(info.CaptureLength, "cap_len")
meta.Set(info.Timestamp.Format(time.RFC3339Nano), "timestamp")
netIface := *nic
iface, _ := json.Object("iface")
iface.Set(netIface.Index, "index")
iface.Set(netIface.Name, "name")
if sizeOfAddrs := nic.Addrs.Cardinality(); sizeOfAddrs > 0 {
addrs, _ := iface.ArrayOfSize(sizeOfAddrs, "addrs")
netIface.Addrs.Each(func(IP string) bool {
sizeOfAddrs -= 1
addrs.SetIndex(IP, sizeOfAddrs)
return false
})
}
})
return json
}
func (t *JSONPcapTranslator) asTranslation(buffer fmt.Stringer) *gabs.Container {
if buffer == nil {
return nil
}
return buffer.(*gabs.Container)
}
func newError(
ctx context.Context,
err error,
) (fmt.Stringer, *gabs.Container) {
json := gabs.New()
errors, _ := json.ArrayOfSize(1, "err")
errJSON, _ := errors.ObjectI(0)
errJSON.Set(err.Error(), "msg")
json.Set("ERROR", "severity")
// return only the error for caller to hydrate.
return json, errJSON
}
func (t *JSONPcapTranslator) translateErrorLayer(
ctx context.Context,
err *gopacket.DecodeFailure,
) fmt.Stringer {
json, errJSON := newError(ctx, err.Error())
errJSON.Set(err.Dump(), "trace")
return json
}
func (t *JSONPcapTranslator) translateLayerError(
ctx context.Context,
lType gopacket.LayerType,
err error,
) fmt.Stringer {
json, errJSON := newError(ctx, err)
errJSON.Set(lType.String(), "layer")
return json
}
func (t *JSONPcapTranslator) translateError(
ctx context.Context,
err error,
) fmt.Stringer {
json, _ := newError(ctx, err)
return json
}
func (t *JSONPcapTranslator) translateEthernetLayer(ctx context.Context, eth *layers.Ethernet) fmt.Stringer {
json := gabs.New()
L2, _ := json.Object("L2")
L2.Set(eth.EthernetType.String(), "type")
L2.Set(eth.SrcMAC.String(), "src")
L2.Set(eth.DstMAC.String(), "dst")
return json
}
func (t *JSONPcapTranslator) translateARPLayer(ctx context.Context, arp *layers.ARP) fmt.Stringer {
json := gabs.New()
arpJSON, _ := json.Object("ARP")
arpJSON.Set(arp.Operation, "op")
var ipBytes [4]byte
copy(ipBytes[:], arp.SourceProtAddress)
ip4 := netip.AddrFrom4(ipBytes)
mac := net.HardwareAddr(arp.SourceHwAddress[:])
srcJSON, _ := arpJSON.Object("src")
srcJSON.Set(ip4.String(), "IP")
srcJSON.Set(mac.String(), "MAC")
copy(ipBytes[:], arp.DstProtAddress)
ip4 = netip.AddrFrom4(ipBytes)
mac = net.HardwareAddr(arp.DstHwAddress[:])
dstJSON, _ := arpJSON.Object("dst")
dstJSON.Set(ip4.String(), "IP")
dstJSON.Set(mac.String(), "MAC")
flowID := fnv1a.HashUint64(fnv1a.HashBytes64(arp.SourceProtAddress) + fnv1a.HashBytes64(arp.DstProtAddress))
flowIDstr := strconv.FormatUint(flowID, 10)
arpJSON.Set(flowIDstr, "flow")
return json
}
func (t *JSONPcapTranslator) addEndpoints(
json *gabs.Container,
flow *gopacket.Flow,
) {
flows, _ := json.Object("endpoints")
flows.Set(flow.Src().String(), "src")
flows.Set(flow.Dst().String(), "dst")
flows.Set(flow.String(), "fwd")
flows.Set(flow.Reverse().String(), "bwd")
flows.Set(strconv.FormatUint(flow.FastHash(), 10), "hash")
}
func (t *JSONPcapTranslator) translateIPv4Layer(
ctx context.Context,
ip4 *layers.IPv4,
) fmt.Stringer {
json := gabs.New()
// https://github.com/google/gopacket/blob/master/layers/ip4.go#L43
L3, _ := json.Object("L3")
L3.Set(ip4.Version, "v")
L3.Set(ip4.SrcIP, "src")
L3.Set(ip4.DstIP, "dst")
L3.Set(ip4.Id, "id")
L3.Set(ip4.IHL, "ihl")
L3.Set(ip4.TTL, "ttl")
L3.Set(ip4.TOS, "tos")
L3.Set(ip4.Length, "len")
L3.Set(ip4.FragOffset, "foff")
L3.Set(ip4.Checksum, "xsum")
proto, _ := L3.Object("proto")
proto.Set(ip4.Protocol, "num")
if sizeOfOptions := len(ip4.Options); sizeOfOptions > 0 {
opts, _ := L3.ArrayOfSize(sizeOfOptions, "opts")
for i, opt := range ip4.Options {
o, _ := opts.ObjectI(i)
o.Set(string(opt.OptionData), "data")
o.Set(opt.OptionType, "type")
}
}
// hashing bytes yields `uint64`, and addition is commutative:
// - so hashing the IP byte array representations and then adding then resulting `uint64`s is a commutative operation as well.
flowID := fnv1a.HashUint64(uint64(4) + fnv1a.HashBytes64(ip4.SrcIP.To4()) + fnv1a.HashBytes64(ip4.DstIP.To4()))
flowIDstr := strconv.FormatUint(flowID, 10)
L3.Set(flowIDstr, "flow") // IPv4(4) (0x04)
atDebugVerbosity(ctx,
t.pcapTranslator,
func(
ctx context.Context,
translator *pcapTranslator,
) {
proto.Set(ip4.Protocol.String(), "name")
// https://github.com/google/gopacket/blob/master/layers/ip4.go#L28-L40
L3.Set(strings.Split(ip4.Flags.String(), "|"), "flags")
networkFlow := ip4.NetworkFlow()
t.addEndpoints(L3, &networkFlow)
})
return json
}
func (t *JSONPcapTranslator) translateIPv6Layer(
ctx context.Context,
ip6 *layers.IPv6,
) fmt.Stringer {
json := gabs.New()
// https://github.com/google/gopacket/blob/master/layers/ip6.go#L28-L43
L3, _ := json.Object("L3")
L3.Set(ip6.Version, "v")
L3.Set(ip6.SrcIP, "src")
L3.Set(ip6.DstIP, "dst")
L3.Set(ip6.Length, "len")
L3.Set(ip6.TrafficClass, "cls")
L3.Set(ip6.FlowLabel, "lbl")
L3.Set(ip6.HopLimit, "ttl")
proto, _ := L3.Object("proto")
proto.Set(ip6.NextHeader, "num")
// hashing bytes yields `uint64`, and addition is commutative:
// - so hashing the IP byte array representations and then adding then resulting `uint64`s is a commutative operation as well.
flowID := fnv1a.HashUint64(uint64(41) + fnv1a.HashBytes64(ip6.SrcIP.To16()) + fnv1a.HashBytes64(ip6.DstIP.To16()))
flowIDstr := strconv.FormatUint(flowID, 10)
L3.Set(flowIDstr, "flow") // IPv6(41) (0x29)
atDebugVerbosity(ctx,
t.pcapTranslator,
func(
ctx context.Context,
translator *pcapTranslator,
) {
proto.Set(ip6.NextHeader.String(), "name")
networkFlow := ip6.NetworkFlow()
t.addEndpoints(L3, &networkFlow)
})
// missing `HopByHop`: https://github.com/google/gopacket/blob/master/layers/ip6.go#L40
return json
}
func (t *JSONPcapTranslator) translateICMPv4Layer(ctx context.Context, icmp4 *layers.ICMPv4) fmt.Stringer {
// see: https://github.com/google/gopacket/blob/master/layers/icmp4.go#L208-L215
json := gabs.New()
ICMP4, _ := json.Object("ICMP")
ICMP4.Set(icmp4.TypeCode.Type(), "type")
ICMP4.Set(icmp4.TypeCode.Code(), "code")
ICMP4.Set(icmp4.Checksum, "xsum")
// see: https://github.com/google/gopacket/blob/master/layers/icmp4.go#L78-L153
ICMP4.Set(icmp4.TypeCode.String(), "msg")
switch icmp4.TypeCode.Type() {
case layers.ICMPv4TypeEchoRequest, layers.ICMPv4TypeEchoReply:
ICMP4.Set(icmp4.Id, "id")
ICMP4.Set(icmp4.Seq, "seq")
case layers.ICMPv4TypeTimeExceeded, layers.ICMPv4TypeDestinationUnreachable, layers.ICMPv4TypeRedirect:
IPv4, _ := ICMP4.Object("IPv4")
// original IPv4 header starts from offset 8
// reference:
// - https://en.wikipedia.org/wiki/Internet_Control_Message_Protocol#Time_exceeded
// - https://en.wikipedia.org/wiki/Internet_Control_Message_Protocol#Destination_unreachable
ipHeader := icmp4.LayerPayload()
IPv4.Set(binary.BigEndian.Uint16(ipHeader[4:6]), "id")
IPv4.Set(uint8(ipHeader[8]), "ttl")
IPv4.Set(uint8(ipHeader[9]), "proto")
IPv4.Set(binary.BigEndian.Uint16(ipHeader[10:12]), "xsum")
// IP addresses are represented as bigendian []byte slices in Go
var ipBytes [4]byte
copy(ipBytes[:], ipHeader[12:16])
srcIP := netip.AddrFrom4(ipBytes)
IPv4.Set(srcIP.String(), "src")
copy(ipBytes[:], ipHeader[16:20])
dstIP := netip.AddrFrom4(ipBytes)
IPv4.Set(dstIP.String(), "dst")
if icmp4.TypeCode.Type() == layers.ICMPv4TypeRedirect {
// see: https://github.com/google/gopacket/blob/master/layers/icmp4.go#L230
copy(ipBytes[:], icmp4.LayerContents()[4:8])
ICMP4.Set(netip.AddrFrom4(ipBytes).String(), "tgt")
}
}
return json
}
func (t *JSONPcapTranslator) translateICMPv6Layer(ctx context.Context, icmp6 *layers.ICMPv6) fmt.Stringer {
// see: https://github.com/google/gopacket/blob/master/layers/icmp6.go#L174-L183
json := gabs.New()
ICMP6, _ := json.Object("ICMP")
ICMP6.Set(icmp6.TypeCode.Type(), "type")
ICMP6.Set(icmp6.TypeCode.Code(), "code")
ICMP6.Set(icmp6.Checksum, "xsum")
ICMP6.Set(icmp6.TypeCode.String(), "msg")
return json
}
func (t *JSONPcapTranslator) asICMPv6(
ctx context.Context,
buffer fmt.Stringer,
) (*gabs.Container, *gabs.Container) {
var ICMPv6 *gabs.Container
json := t.asTranslation(buffer)
if json == nil {
json = gabs.New()
ICMPv6, _ = json.Object("ICMP")
} else if json.Exists("ICMP") {
ICMPv6 = json.S("ICMP")
} else {
ICMPv6, _ = json.Object("ICMP")
}
return json, ICMPv6
}
func (t *JSONPcapTranslator) translateICMPv6EchoLayer(
ctx context.Context, json fmt.Stringer, icmp6 *layers.ICMPv6Echo,
) fmt.Stringer {
// see: https://github.com/google/gopacket/blob/master/layers/icmp6msg.go#L57-L62
_json, ICMP6 := t.asICMPv6(ctx, json)
ICMP6.Set(icmp6.Identifier, "id")
ICMP6.Set(icmp6.SeqNumber, "seq")
return _json
}
func (t *JSONPcapTranslator) translateICMPv6RedirectLayer(
ctx context.Context, json fmt.Stringer, icmp6 *layers.ICMPv6Redirect,
) fmt.Stringer {
// see: https://github.com/google/gopacket/blob/master/layers/icmp6msg.go#L97-L104
_json, ICMP6 := t.asICMPv6(ctx, json)
ICMP6.Set(icmp6.TargetAddress, "tgt")
ICMP6.Set(icmp6.DestinationAddress, "dst")
return _json
}
func (t *JSONPcapTranslator) translateICMPv6L3HeaderLayer(
ctx context.Context, json fmt.Stringer, icmp6 *layers.ICMPv6,
) fmt.Stringer {
// see: https://github.com/google/gopacket/blob/master/layers/icmp6msg.go#L97-L104
_json, ICMP6 := t.asICMPv6(ctx, json)
IPv6, _ := ICMP6.Object("IPv6")
ipHeader := icmp6.LayerPayload()[4:]
// IPv6 header 1st 32 bits ( 4 bytes )
ipHeaderBytes0to3 := binary.BigEndian.Uint32(ipHeader[:4])
// Version: from bit 0 to 3 ( 4 bits )
// - bin mask: 11110000000000000000000000000000
// - hex mask: 0xF0000000
// - must be shifted 28 positions to the right to discard `TrafficClass` (8 bits) and `FlowLabel` (20 bits)
version := ipHeaderBytes0to3 & uint32(0xF0000000) >> 28
IPv6.Set(version, "v")
// FlowLabel: from bit 12 to 31 ( 20 bits )
// - bin mask: 00000000000011111111111111111111
// - hex mask: 0x000FFFFF
flowLabel := ipHeaderBytes0to3 & uint32(0x000FFFFF)
IPv6.Set(flowLabel, "lbl")
// TrafficClass: from bit 4 to 11 ( 6+2 bits )
// - bin mask: 00001111111100000000000000000000
// - hex mask: 0x0FF00000
// - must be shifted 20 positions to the right to discard `FlowLabel` bits
trafficClass := (ipHeaderBytes0to3 & uint32(0x0FF00000)) >> 20
// The six most-significant bits hold the differentiated services field
// - DS field mask: `11111100` or `0xFC`
// - must be shifted 2 bits to the right to remove bits from ECN
IPv6.Set((trafficClass&0xFC)>>2, "dsf")
// The remaining two bits are used for Explicit Congestion Notification
// - ECN mask: `00000011` or `0x03`
IPv6.Set((trafficClass & 0x03), "ecn")
// HopLimit (aka TTL): 8 bits, 7th byte
IPv6.Set(uint32(ipHeader[7]), "ttl")
var ipBytes [16]byte
copy(ipBytes[:], ipHeader[8:24])
srcIP := netip.AddrFrom16(ipBytes)
IPv6.Set(srcIP.String(), "src")
copy(ipBytes[:], ipHeader[24:40])
dstIP := netip.AddrFrom16(ipBytes)
IPv6.Set(dstIP.String(), "dst")
atDebugVerbosity(ctx,
t.pcapTranslator,
func(
ctx context.Context,
translator *pcapTranslator,
) {
nextHeader := uint8(ipHeader[6])
switch nextHeader {
default:
IPv6.Set(nextHeader, "proto")
case 0x06: // TCP
IPv6.Set("TCP", "proto")
case 0x11: // UDP
IPv6.Set("UDP", "proto")
}
})
return _json
}
func (t *JSONPcapTranslator) translateUDPLayer(ctx context.Context, udp *layers.UDP) fmt.Stringer {
json := gabs.New()
// https://github.com/google/gopacket/blob/master/layers/udp.go#L17-L25
L4, _ := json.Object("L4")
transportFlow := udp.TransportFlow()
t.addEndpoints(L4, &transportFlow)
L4.Set(len(udp.Payload), "size")
L4.Set(udp.Checksum, "xsum")
L4.Set(udp.Length, "len")
L4.Set(udp.SrcPort, "src")
if name, ok := layers.UDPPortNames[udp.SrcPort]; ok {
L4.Set(name, "sproto")
}
L4.SetP(udp.DstPort, "dst")
if name, ok := layers.UDPPortNames[udp.DstPort]; ok {
L4.Set(name, "dproto")
}
// UDP(17) (0x11) | `SrcPort` and `DstPort` are `uint8`
flowID := fnv1a.HashUint64(uint64(17) + uint64(udp.SrcPort) + uint64(udp.DstPort))
flowIDstr := strconv.FormatUint(flowID, 10)
L4.Set(flowIDstr, "flow")
return json
}
func (t *JSONPcapTranslator) addTCPwindowScale(
ctx context.Context,
tcp *layers.TCP,
optKey, optHexVal *string,
optJSON, L4 *gabs.Container,
) {
winScalePowerOf2, winScaleErr := strconv.ParseUint(*optHexVal, 0, 16)
if winScaleErr != nil {
optJSON.ArrayAppend(*optHexVal, *optKey)
return
}
winScaleMultiplier := uint64(2 << (winScalePowerOf2 - 1))
realWindowSizeStr := strconv.FormatUint(uint64(tcp.Window)*winScaleMultiplier, 10)
atDebugVerbosity(ctx,
t.pcapTranslator,
func(
ctx context.Context,
translator *pcapTranslator,
) {
winScale := gabs.New()
winScale.Set(optHexVal, "hex")
winScale.Set(winScalePowerOf2, "dec")
winScale.Set(strconv.FormatUint(winScaleMultiplier, 10), "scale")
winScale.Set(realWindowSizeStr, "win")
optJSON.ArrayAppend(winScale, *optKey)
})
L4.Set(realWindowSizeStr, "xwin")
}
func (t *JSONPcapTranslator) addTCPoptions(
ctx context.Context,
tcp *layers.TCP,
L4 *gabs.Container,
) {
opts, _ := L4.ArrayOfSize(len(tcp.Options), "opts")
for i, tcpOpt := range tcp.Options {
// see: https://github.com/google/gopacket/blob/master/layers/tcp.go#L104C9-L128
if o := tcpOptionRgx.FindStringSubmatch(tcpOpt.String()); o != nil {
tcpOptVal := strings.TrimSpace(o[2])
if tcpOptVal == "" {
opts.SetIndex(o[1], i)
continue
}
opt, _ := opts.ObjectI(i)
optKey := strings.TrimSpace(o[1])
optVals := strings.Split(tcpOptVal, " ")
opt.Array(optKey)
for _, optVal := range optVals {
optVal = strings.TrimSpace(optVal)
// see: https://github.com/google/gopacket/blob/master/layers/tcp.go#L37-L57
// [ToDo] – handle: SACK
if optVal == "" {
continue
} else if strings.HasPrefix(optVal, "0x") {
optHexVal := strings.TrimRight(optVal, "0")
switch tcpOpt.OptionType {
case 3: // WindowScale
t.addTCPwindowScale(ctx, tcp, &optKey, &optHexVal, opt, L4)
default:
opt.ArrayAppend(optHexVal, optKey)
}
} else {
switch tcpOpt.OptionType {
case 8: // Timestamps
atDebugVerbosity(ctx,
t.pcapTranslator,
func(
ctx context.Context,
translator *pcapTranslator,
) {
for _, ts := range strings.Split(optVal, "/") {
opt.ArrayAppend(strings.TrimSpace(ts), optKey)
}
})
default:
opt.ArrayAppend(optVal, optKey)
}
}
}
}
}
}
func (t *JSONPcapTranslator) addTCPflags(
ctx context.Context,
tcp *layers.TCP,
flags *gabs.Container,
) {
flagsStr := make([]string, 0, len(tcpFlags))
for key := range tcpFlags {
switch key {
case tcpSynStr:
if tcp.SYN {
flagsStr = append(flagsStr, tcpSynStr)
}
case tcpAckStr:
if tcp.ACK {
flagsStr = append(flagsStr, tcpAckStr)
}
case tcpPshStr:
if tcp.PSH {
flagsStr = append(flagsStr, tcpPshStr)
}
case tcpFinStr:
if tcp.FIN {
flagsStr = append(flagsStr, tcpFinStr)
}
case tcpRstStr:
if tcp.RST {
flagsStr = append(flagsStr, tcpRstStr)
}
case tcpUrgStr:
if tcp.URG {
flagsStr = append(flagsStr, tcpUrgStr)
}
case tcpEceStr:
if tcp.ECE {
flagsStr = append(flagsStr, tcpEceStr)
}
case tcpCwrStr:
if tcp.CWR {
flagsStr = append(flagsStr, tcpCwrStr)
}
}
}
flags.Set(strings.Join(flagsStr, "|"), "str")
}
func (t *JSONPcapTranslator) translateTCPLayer(
ctx context.Context,
tcp *layers.TCP,
) fmt.Stringer {
json := gabs.New()
// https://github.com/google/gopacket/blob/master/layers/tcp.go#L19-L35
L4, _ := json.Object("L4")
L4.Set(strconv.FormatInt(int64(len(tcp.Payload)), 10), "len")
L4.Set(tcp.Seq, "seq")
L4.Set(tcp.Ack, "ack")
L4.Set(tcp.DataOffset, "off")
L4.Set(tcp.Window, "win")
L4.Set(tcp.Checksum, "xsum")
L4.Set(tcp.Urgent, "urg")
flags, _ := L4.Object("flags")
setFlags := parseTCPflags(tcp)
flags.Set(setFlags, "dec")
if flagsStr, ok := tcpFlagsStr[setFlags]; ok {
flags.Set(flagsStr, "str")
} else {
// this scenario is slow, but it should also be exceedingly rare
t.addTCPflags(ctx, tcp, flags)
}
t.addTCPoptions(ctx, tcp, L4)
L4.Set(tcp.SrcPort, "src")
L4.Set(tcp.DstPort, "dst")
// TCP(6) (0x06) | `SrcPort` and `DstPort` are `uint8`
flowID := fnv1a.HashUint64(uint64(6) + uint64(tcp.SrcPort) + uint64(tcp.DstPort))
flowIDstr := strconv.FormatUint(flowID, 10)
L4.Set(flowIDstr, "flow")
atDebugVerbosity(ctx,
t.pcapTranslator,
func(
ctx context.Context,
translator *pcapTranslator,
) {
if name, ok := layers.TCPPortNames[tcp.SrcPort]; ok {
L4.Set(name, "sproto")
}
if name, ok := layers.TCPPortNames[tcp.DstPort]; ok {
L4.Set(name, "dproto")
}
transportFlow := tcp.TransportFlow()
t.addEndpoints(L4, &transportFlow)
flagsMap, _ := flags.Object("map")
flagsMap.Set(tcp.SYN, "SYN")
flagsMap.Set(tcp.ACK, "ACK")
flagsMap.Set(tcp.PSH, "PSH")
flagsMap.Set(tcp.FIN, "FIN")
flagsMap.Set(tcp.RST, "RST")
flagsMap.Set(tcp.URG, "URG")
flagsMap.Set(tcp.ECE, "ECE")
flagsMap.Set(tcp.CWR, "CWR")
flagsMap.Set(tcp.NS, "NS")
flags.Set("0b"+strconv.FormatUint(uint64(setFlags), 2), "bin")
flags.Set("0x"+strconv.FormatUint(uint64(setFlags), 16), "hex")
})
return json
}
func (t *JSONPcapTranslator) translateTLSLayer(ctx context.Context, tls *layers.TLS) fmt.Stringer {
json := gabs.New()
if !isDebugVerbosity(ctx, t.pcapTranslator) {
return json
}
TLS, _ := json.Object("TLS")
// disabled until memory leak is fixed
// [TODO]: fix memory leak...
// t.decodeTLSRecords(1, tls.Contents, TLS)
if len(tls.ChangeCipherSpec) > 0 {
t.translateTLSLayer_ChangeCipherSpec(ctx, TLS, tls)
}
if len(tls.Handshake) > 0 {
t.translateTLSLayer_Handshake(ctx, TLS, tls)
}
if len(tls.AppData) > 0 {
t.translateTLSLayer_AppData(ctx, TLS, tls)
}
return json
}
func (t *JSONPcapTranslator) translateDNSquestions(
ctx context.Context,
dns *layers.DNS,
size *int,
domain *gabs.Container,
) {
questions, _ := domain.ArrayOfSize(*size, "questions")
for i, question := range dns.Questions {
q, _ := questions.ObjectI(i)
q.Set(string(question.Name), "name")
q.Set(question.Type.String(), "type")
q.Set(question.Class.String(), "class")
}
}
func (t *JSONPcapTranslator) translateDNSanswers(
ctx context.Context,
dns *layers.DNS,
size *int,
domain *gabs.Container,
) {
answers, _ := domain.ArrayOfSize(*size, "answers")
for i, answer := range dns.Answers {
a, _ := answers.ObjectI(i)
// Header
a.Set(string(answer.Name), "name")
a.Set(answer.Type.String(), "type")
a.Set(answer.Class.String(), "class")
a.Set(answer.TTL, "ttl")
switch answer.Type {
case layers.DNSTypeA:
// see: https://github.com/google/gopacket/blob/master/layers/dns.go#L908-L909
a.Set(answer.IP.String(), "A")
case layers.DNSTypeAAAA:
// see: https://github.com/google/gopacket/blob/master/layers/dns.go#L910-L911
a.Set(answer.IP.String(), "AAAA")
case layers.DNSTypeNS:
// see: https://github.com/google/gopacket/blob/master/layers/dns.go#L919-L924
a.Set(string(answer.NS), "NS")
case layers.DNSTypeCNAME:
// see: https://github.com/google/gopacket/blob/master/layers/dns.go#L925-L930
a.Set(string(answer.CNAME), "CNAME")
case layers.DNSTypePTR:
// see: https://github.com/google/gopacket/blob/master/layers/dns.go#L931-L936
a.Set(string(answer.PTR), "PTR")
case layers.DNSTypeSOA:
// see: https://github.com/google/gopacket/blob/master/layers/dns.go#L937-L955
soaJSON, _ := a.Object("SOA")
soaJSON.Set(string(answer.SOA.MName), "mname")
soaJSON.Set(string(answer.SOA.RName), "rname")
soaJSON.Set(answer.SOA.Serial, "serial")
soaJSON.Set(answer.SOA.Expire, "expire")
soaJSON.Set(answer.SOA.Refresh, "refresh")
soaJSON.Set(answer.SOA.Retry, "retry")
case layers.DNSTypeSRV:
// see: https://github.com/google/gopacket/blob/master/layers/dns.go#L973-L984
srvJSON, _ := a.Object("SRV")
srvJSON.SetP(string(answer.SRV.Name), "name")
srvJSON.SetP(answer.SRV.Port, "port")
srvJSON.SetP(answer.SRV.Weight, "weight")
srvJSON.SetP(answer.SRV.Priority, "priority")
case layers.DNSTypeMX:
// see: https://github.com/google/gopacket/blob/master/layers/dns.go#L956-L965
mxJSON, _ := a.Object("MX")
mxJSON.SetP(string(answer.MX.Name), "name")
mxJSON.SetP(answer.MX.Preference, "preference")
case layers.DNSTypeURI:
// see: https://github.com/google/gopacket/blob/master/layers/dns.go#L966-L972
uriJSON, _ := a.Object("URI")
uriJSON.SetP(string(answer.URI.Target), "target")
uriJSON.SetP(answer.URI.Priority, "priority")
uriJSON.SetP(answer.URI.Weight, "weight")
// see: https://github.com/google/gopacket/blob/master/layers/dns.go#L912-L918
case layers.DNSTypeTXT:
// see: https://github.com/google/gopacket/blob/master/layers/dns.go#L864-L875
txts, _ := a.ArrayOfSize(len(answer.TXTs))
for i, txt := range answer.TXTs {
txts.SetIndex(string(txt), i)
}
// see: https://github.com/google/gopacket/blob/master/layers/dns.go#L985-L990
case layers.DNSTypeOPT:
// see: https://github.com/google/gopacket/blob/master/layers/dns.go#L877-L903
opts, _ := a.ArrayOfSize(len(answer.OPT), "OPT")
for i, opt := range answer.OPT {
o, _ := opts.ObjectI(i)
o.Set(opt.Code.String(), "code")
o.Set(string(opt.Data), "data")
}
}
}
}
func (t *JSONPcapTranslator) translateDNSLayer(ctx context.Context, dns *layers.DNS) fmt.Stringer {
json := gabs.New()
domain, _ := json.Object("DNS")
domain.Set(dns.ID, "id")
domain.Set(dns.OpCode.String(), "op")
domain.Set(dns.ResponseCode.String(), "response_code")
/*
json.SetP(dns.QR, "DNS.QR")
json.SetP(dns.AA, "DNS.AA")
json.SetP(dns.TC, "DNS.TC")
json.SetP(dns.RD, "DNS.RD")
json.SetP(dns.RA, "DNS.RA")
*/
/*
json.SetP(dns.NSCount, "DNS.authorities_count")
json.SetP(dns.ARCount, "DNS.additionals_count")
*/
if sizeOfQuestions := len(dns.Questions); sizeOfQuestions > 0 {
t.translateDNSquestions(ctx, dns, &sizeOfQuestions, domain)
}
if sizeOfAnswers := len(dns.Answers); sizeOfAnswers > 0 {
t.translateDNSanswers(ctx, dns, &sizeOfAnswers, domain)
}
atDebugVerbosity(ctx,
t.pcapTranslator,
func(
ctx context.Context,
translator *pcapTranslator,
) {
domain.Set(dns.QDCount, "questions_count")
domain.Set(dns.ANCount, "answers_count")
})
return json
}
func (t *JSONPcapTranslator) merge(ctx context.Context, tgt fmt.Stringer, src fmt.Stringer) (fmt.Stringer, error) {
return tgt, t.asTranslation(tgt).Merge(t.asTranslation(src))
}
// for JSON translator, this mmethod generates:
// - the `flowID` for any 6-tuple conversation
// - the summary line at {`message`: $summary}
func (t *JSONPcapTranslator) finalize(
ctx context.Context,
ifaces netIfaceIndex,
iface *PcapIface,
serial *uint64,
p *gopacket.Packet,
conntrack bool,
packet fmt.Stringer,
) (fmt.Stringer, error) {
json := t.asTranslation(packet)
data := make(map[string]any, 15)
id := ctx.Value(ContextID)
logName := ctx.Value(ContextLogName)
operation, _ := json.Object("logging.googleapis.com/operation")
operation.Set(logName, "producer")
if *serial == 1 {
operation.Set(true, "first")
}
data["ifaceIndex"] = t.iface.Index
data["ifaceName"] = t.iface.Name
data["serial"] = *serial
flowIDstr, _ := json.S("meta", "flow").Data().(string) // this is always available
flowID, _ := strconv.ParseUint(flowIDstr, 10, 64)
l3Src, _ := json.S("L3", "src").Data().(net.IP)
l3Dst, _ := json.S("L3", "dst").Data().(net.IP)
// handle cases where there is L3 is not available
if l3Src == nil && l3Dst == nil {
if json.Exists("ARP") {
arpSrcIP, _ := json.S("ARP", "src", "IP").Data().(string)
l3Src = net.ParseIP(arpSrcIP)
data["L3Src"] = arpSrcIP
arpDstIP, _ := json.S("ARP", "dst", "IP").Data().(string)
l3Dst = net.ParseIP(arpDstIP)
data["L3Dst"] = arpDstIP
t.checkL3Address(ctx, json, data, ifaces, iface, l3Src, l3Dst)
if arpFlowIDstr, arpOK := json.S("ARP", "flow").Data().(string); arpOK {
arpFlowID, _ := strconv.ParseUint(arpFlowIDstr, 10, 64)
flowID = fnv1a.AddUint64(flowID, arpFlowID)
flowIDstr = strconv.FormatUint(flowID, 10)
data["flowID"] = flowIDstr
json.Set(flowIDstr, "flow")
} else {
data["flowID"] = "0"
json.Set("0", "flow")
}
operation.Set(stringFormatter.Format(jsonTranslationFlowTemplate, id, t.iface.Name, "arp", flowIDstr), "id")
json.Set(stringFormatter.FormatComplex(jsonTranslationSummaryARP, data), "message")
return json, nil
}
operation.Set(stringFormatter.Format(jsonTranslationFlowTemplate, id, t.iface.Name, "l2", flowIDstr), "id")
json.Set(stringFormatter.FormatComplex(jsonTranslationTemplate, data), "message")
return json, nil
}
data["L3Src"] = l3Src
data["L3Dst"] = l3Dst
// report complete interface details when capturing for `any` interface
t.checkL3Address(ctx, json, data, ifaces, iface, l3Src, l3Dst)
isSrcLocal := iface.Addrs.Contains(l3Src.String())
proto := json.S("L3", "proto", "num").Data().(layers.IPProtocol)
isTCP := proto == layers.IPProtocolTCP
isUDP := proto == layers.IPProtocolUDP
isICMPv4 := proto == layers.IPProtocolICMPv4
isICMPv6 := proto == layers.IPProtocolICMPv6
// `flowID` is the unique ID of this conversation:
// given by the 6-tuple: iface_index+protocol+src_ip+src_port+dst_ip+dst_port.
// Addition is commutative, so after hashing `net.IP` bytes and L4 ports to `uint64`,
// the same `uint64`/`flowID` is produced after adding everything up, no matter the order.
// Using the same `flowID` will produce grouped logs in Cloud Logging.
if l3FlowIDstr, l3OK := json.S("L3", "flow").Data().(string); l3OK {
l3FlowID, _ := strconv.ParseUint(l3FlowIDstr, 10, 64)
flowID = fnv1a.AddUint64(flowID, l3FlowID)
}
if l4FlowIDstr, l4OK := json.S("L4", "flow").Data().(string); l4OK {
l4FlowID, _ := strconv.ParseUint(l4FlowIDstr, 10, 64)
flowID = fnv1a.AddUint64(flowID, l4FlowID)
} else {
flowID = fnv1a.AddUint64(flowID, 255) // RESERVED (0xFF)
}
flowIDstr = strconv.FormatUint(flowID, 10)
data["flowID"] = flowIDstr
json.Set(flowIDstr, "flow")
if !isTCP && !isUDP {
if isICMPv4 || isICMPv6 {
if isICMPv6 {
data["icmpVersion"] = 6
} else {
data["icmpVersion"] = 4
}
data["icmpMessage"] = json.S("ICMP", "msg").Data().(string)
operation.Set(stringFormatter.Format(jsonTranslationFlowTemplate, id, t.iface.Name, "icmp", flowIDstr), "id")
json.Set(stringFormatter.FormatComplex(jsonTranslationSummaryICMP, data), "message")
return json, nil
}
// unhandled L3 protocol
operation.Set(stringFormatter.Format(jsonTranslationFlowTemplate, id, t.iface.Name, "l3", flowIDstr), "id")
json.Set(stringFormatter.FormatComplex(jsonTranslationSummaryWithoutL4, data), "message")
return json, nil
}
if isUDP {
data["L4Proto"] = "UDP"
srcPort, _ := json.S("L4", "src").Data().(layers.UDPPort)
data["L4Src"] = uint16(srcPort)
dstPort, _ := json.S("L4", "dst").Data().(layers.UDPPort)
data["L4Dst"] = uint16(dstPort)
isSrcLocal = isSrcLocal && !t.ephemerals.isEphemeralUDPPort(&srcPort)
json.Set(isSrcLocal, "local")
operation.Set(stringFormatter.Format(jsonTranslationFlowTemplate, id, t.iface.Name, "udp", flowIDstr), "id")
json.Set(stringFormatter.FormatComplex(jsonTranslationSummaryUDP, data), "message")
return json, nil
}
data["L4Proto"] = "TCP"
srcPort, _ := json.S("L4", "src").Data().(layers.TCPPort)
data["L4Src"] = uint16(srcPort)
dstPort, _ := json.S("L4", "dst").Data().(layers.TCPPort)
data["L4Dst"] = uint16(dstPort)
data["tcpFlags"] = json.S("L4", "flags", "str").Data().(string)
seq, _ := json.S("L4", "seq").Data().(uint32)
data["tcpSeq"] = seq
ack, _ := json.S("L4", "ack").Data().(uint32)
data["tcpAck"] = ack
tcpLen, _ := json.S("L4", "len").Data().(string)
data["tcpLen"] = tcpLen
operation.Set(stringFormatter.Format(jsonTranslationFlowTemplate, id, t.iface.Name, "tcp", flowIDstr), "id")
message := stringFormatter.FormatComplex(jsonTranslationSummaryTCP, data)
// local means: a service running within the sandbox
// - so it is not a client which created a socket to communicate with a remote host using an ephemeral port
// this approach is best effort as a client may use a `not ephemeral port` to create a socket for egress networking.
isSrcLocal = isSrcLocal && !t.ephemerals.isEphemeralTCPPort(&srcPort)
json.Set(isSrcLocal, "local")
if !isDebugVerbosity(ctx, t.pcapTranslator) {
// skip advanced packet analysis
json.Set(message, "message")
return json, nil
}
setFlags, _ := json.S("L4", "flags", "dec").Data().(uint8)
// `finalize` is invoked from a `worker` via a go-routine `pool`:
// - there are no guarantees about which packet will get `finalize`d 1st
// - there are no guarantees about about which packet will get the `lock` next
// minimize locking: lock per-flow instead of across-flows.
// Locking is done in the name of throubleshoot-ability, so some contention at the flow level should be acceptable...
lock, traceAndSpanProvider := t.fm.lock(ctx, serial, &flowID, &setFlags, &seq, &ack, isSrcLocal)
if conntrack {
t.analyzeConnection(p, &flowID, &setFlags, json)
}
appLayer := (*p).ApplicationLayer()
if ((tcpSyn|tcpFin|tcpRst)&setFlags == 0) && appLayer != nil {
return t.addAppLayerData(ctx, p, lock, &flowID, &setFlags, &seq, &appLayer, json, &message, traceAndSpanProvider)
}
if !lock.IsHTTP2() {
// most ingress traffic is HTTP/1.1 , so:
// - try to get trace tracking information using h1 stream id
streamID := http11StreamID
if ts, ok := traceAndSpanProvider(&streamID); ok {
t.setTraceAndSpan(json, ts)
}
}
json.Set(message, "message")
// packet is not carrying any data, unlock using TCP flags
_, lockLatency := lock.UnlockWithTCPFlags(ctx, &setFlags)
json.Set(lockLatency.String(), "ll")
return json, nil
}
func (t *JSONPcapTranslator) checkL3Address(
ctx context.Context,
json *gabs.Container,
data map[string]any,
ifaces netIfaceIndex,
iface *PcapIface,
srcIP, dstIP net.IP,
) {
if iface.Index != 0 {
return
}
// O(1) interface lookups by IP
_iface, ok := ifaces[srcIP.String()]
if !ok {
_iface, ok = ifaces[dstIP.String()]
}
if !ok {
// this should never happen
return
}
data["ifaceIndex"] = _iface.Index
data["ifaceName"] = _iface.Name
ifaceJSON := json.S("iface")
ifaceJSON.Set(_iface.Index, "index")
ifaceJSON.Set(_iface.Name, "name")
if sizeOfAddrs := _iface.Addrs.Cardinality(); sizeOfAddrs > 0 {
addrs, _ := ifaceJSON.ArrayOfSize(sizeOfAddrs, "addrs")
_iface.Addrs.Each(func(IP string) bool {
sizeOfAddrs -= 1
addrs.SetIndex(IP, sizeOfAddrs)
return false
})
}
}
func (t *JSONPcapTranslator) analyzeConnection(
_ *gopacket.Packet,
_ *uint64, /* flowID */
_ *uint8, /* TCP flags */
_ *gabs.Container, /* JSON object */
) {
// implement connection tracking
}
func (t *JSONPcapTranslator) addAppLayerData(
ctx context.Context,
packet *gopacket.Packet,
lock *flowLock,
flowID *uint64,
tcpFlags *uint8,
sequence *uint32,
appLayer *gopacket.ApplicationLayer,
json *gabs.Container,
message *string,
tsp TraceAndSpanProvider,
) (*gabs.Container, error) {
appLayerData := (*appLayer).LayerContents()
sizeOfAppLayerData := len(appLayerData)
if sizeOfAppLayerData == 0 {
_, lockLatency := lock.UnlockWithTCPFlags(ctx, tcpFlags)
json.Set(lockLatency.String(), "ll")
return json, errors.New("AppLayer is empty")
}
if L7, handled, isHTTP2 := t.trySetHTTP(ctx, packet, lock, flowID,
tcpFlags, sequence, appLayerData, json, message, tsp); handled {
// this `size` is not the same as `length`:
// - `size` includes everything, not only the HTTP `payload`
L7.Set(sizeOfAppLayerData, "size")
// HTTP/2.0 is binary so not showing it raw
if !isHTTP2 && sizeOfAppLayerData > 512 {
L7.Set(string(appLayerData[:512-3])+"...", "raw")
} else if !isHTTP2 {
L7.Set(string(appLayerData), "raw")
}
return json, nil
}
// best-effort to add some information about L7
json.Set(stringFormatter.Format("{0} | size:{1}",
*message, sizeOfAppLayerData), "message")
L7, _ := json.Object("L7")
L7.Set(sizeOfAppLayerData, "length")
if sizeOfAppLayerData > 128 {
L7.Set(string(appLayerData[:128-3])+"...", "sample")
} else {
L7.Set(string(appLayerData), "content")
}
_, lockLatency := lock.UnlockWithTCPFlags(ctx, tcpFlags)
json.Set(lockLatency.String(), "ll")
return json, nil
}
func (t *JSONPcapTranslator) trySetHTTP(
ctx context.Context,
packet *gopacket.Packet,
lock *flowLock,
flowID *uint64,
tcpFlags *uint8,
sequence *uint32,
appLayerData []byte,
json *gabs.Container,
message *string,
tsp TraceAndSpanProvider,
) (*gabs.Container, bool /* handled */, bool /* isHTTP2 */) {
isHTTP11Request := http11RequestPayloadRegex.Match(appLayerData)
isHTTP11Response := !isHTTP11Request && http11ResponsePayloadRegex.Match(appLayerData)
isHTTP2 := !isHTTP11Request && !isHTTP11Response && http2PrefaceRegex.Match(appLayerData)
framer := http2.NewFramer(io.Discard, bytes.NewReader(appLayerData))
frame, frameErr := framer.ReadFrame()
// if content is not HTTP in clear text, abort
if !isHTTP11Request && !isHTTP11Response && !isHTTP2 && frame == nil {
return json, false, false
}
// SETs are used to avoid duplicates
streams := mapset.NewThreadUnsafeSet[uint32]()
requestStreams := mapset.NewThreadUnsafeSet[uint32]()
responseStreams := mapset.NewThreadUnsafeSet[uint32]()
dataStreams := mapset.NewThreadUnsafeSet[uint32]()
requestTS := make(map[uint32]*traceAndSpan)
responseTS := make(map[uint32]*traceAndSpan)
// making at least 1 big assumption:
// HTTP request/status line and headers fit in 1 packet ( TCP segment )
// which is not always the case when fragmentation occurs
L7, _ := json.Object("HTTP")
defer func() {
var lockLatency *time.Duration = nil
if requestStreams.Cardinality() > 0 ||
responseStreams.Cardinality() > 0 {
_, lockLatency = lock.UnlockWithTraceAndSpan(
ctx, tcpFlags, isHTTP2,
requestStreams.ToSlice(),
responseStreams.ToSlice(),
requestTS, responseTS,
)
} else {
_, lockLatency = lock.UnlockWithTCPFlags(ctx, tcpFlags)
}
json.Set(lockLatency.String(), "ll")
}()
if isHTTP2 {
L7.Set(true, "preface")
h2cData := http2PrefaceRegex.ReplaceAll(appLayerData, nil)
if len(h2cData) == 0 {
L7.Set("h2c", "proto")
L7.Set(string(appLayerData), "raw")
return L7, true, true
}
framer = http2.NewFramer(io.Discard, bytes.NewReader(h2cData))
frame, frameErr = framer.ReadFrame()
}
isHTTP2 = (isHTTP2 || frame != nil)
// handle h2c traffic
if isHTTP2 {
L7.Set("h2c", "proto")
streamsJSON, _ := L7.Object("streams")
// multple h2 frames ( from multiple streams ) may be delivered by the same packet
for frame != nil {
isRequest := false
isResponse := false
frameHeader := frame.Header()
// h2 is multiplexed, `StreamID` allows to link HTTP conversations
// - see: https://datatracker.ietf.org/doc/html/rfc9113#name-stream-identifiers
// - Streams initiated by a client MUST use odd-numbered stream identifiers
// - Streams initiated by the server MUST use even-numbered stream identifiers
// - A stream identifier of zero (0x00) is used for connection control messages
// - Stream identifiers cannot be reused.
// A stream is equal to a single HTTP conversation: request and response.
StreamID := frameHeader.StreamID
StreamIDstr := strconv.FormatUint(uint64(StreamID), 10)
streams.Add(StreamID)
ts, traced := tsp(&StreamID)
var stream, frames *gabs.Container
if stream = streamsJSON.S(StreamIDstr); stream == nil {
stream, _ = streamsJSON.Object(StreamIDstr)
_, _ = stream.Array("frames")
stream.Set(StreamID, "id")
} else if frames = stream.S("frames"); frames == nil {
_, _ = stream.Array("frames")
}
frameJSON := gabs.New()
stream.ArrayAppend(frameJSON, "frames")
if m := http2RawFrameRegex.
FindStringSubmatch(frameHeader.String()); len(m) > 0 {
frameJSON.Set(m[1], "raw")
}
sizeOfFrame := frameHeader.Length /* uint32 */
frameJSON.Set(sizeOfFrame, "len")
// see: https://pkg.go.dev/golang.org/x/net/http2#Flags
flagsJSON, _ := frameJSON.Object("flags")
flagsJSON.Set("0b"+strconv.FormatUint(uint64(frameHeader.Flags /* uint8 */), 2), "bin")
flagsJSON.Set("0x"+strconv.FormatUint(uint64(frameHeader.Flags /* uint8 */), 16), "hex")
flagsJSON.Set(strconv.FormatUint(uint64(frameHeader.Flags /* uint8 */), 10), "dec")
var _ts *traceAndSpan = nil
switch frame := frame.(type) {
case *http2.GoAwayFrame:
frameJSON.Set("goaway", "type")
case *http2.RSTStreamFrame:
frameJSON.Set("rst", "type")
case *http2.PingFrame:
frameJSON.Set("ping", "type")
frameJSON.Set(frame.IsAck(), "ack")
frameJSON.Set(string(frame.Data[:]), "data")
case *http2.SettingsFrame:
frameJSON.Set("settings", "type")
settings, _ := frameJSON.Object("settings")
frame.ForeachSetting(func(s http2.Setting) error {
// see: https://pkg.go.dev/golang.org/x/net/http2#SettingID
settings.Set(strconv.FormatUint(uint64(s.Val), 10),
"0x"+strconv.FormatUint(uint64(s.ID), 16))
return nil
})
frameJSON.Set(frame.IsAck(), "ack")
case *http2.HeadersFrame:
frameJSON.Set("headers", "type")
decoder := hpack.NewDecoder(2048, nil)
hf, _ := decoder.DecodeFull(frame.HeaderBlockFragment())
headers := http.Header{}
for _, header := range hf {
isRequest = (isRequest || (header.Name == ":method"))
isResponse = (isResponse || (header.Name == ":status"))
// `Add(...)` internally applies `http.CanonicalHeaderKey(...)`
headers.Add(header.Name, header.Value)
}
decoder.Close()
if _ts = t.addHTTPHeaders(frameJSON, &headers); _ts != nil {
_ts.streamID = &StreamID
if isRequest {
requestTS[StreamID] = _ts
} else if isResponse {
responseTS[StreamID] = _ts
}
} else if traced && isResponse {
responseTS[StreamID] = ts
}
case *http2.MetaHeadersFrame:
frameJSON.Set("metadata", "type")
mdJSON, _ := frameJSON.Object("metadata")
for _, md := range frame.Fields {
mdJSON.Set(md.Value, md.Name)
}
case *http2.DataFrame:
dataStreams.Add(StreamID)
frameJSON.Set("data", "type")
data := frame.Data()
sizeOfData := int64(sizeOfFrame)
t.addHTTPBodyDetails(frameJSON, &sizeOfData, bytes.NewReader(data))
}
if isRequest {
requestStreams.Add(StreamID)
frameJSON.Set("request", "kind")
} else if isResponse {
responseStreams.Add(StreamID)
frameJSON.Set("response", "kind")
}
// multiple streams with frames for req/res
// might arrive within the same TCP segment
if _ts != nil {
t.setTraceAndSpan(frameJSON, _ts)
} else if traced {
t.setTraceAndSpan(frameJSON, ts)
}
// read next frame
frame, frameErr = framer.ReadFrame()
}
if frameErr != nil && frameErr != io.EOF && frameErr != io.ErrUnexpectedEOF {
errorJSON, _ := L7.Object("error")
errorJSON.Set("INVALID_HTTP2_FRAME", "code")
errorJSON.Set(frameErr.Error(), "info")
}
streamsJSONbytes, err := streams.MarshalJSON()
if err == nil {
L7.Set(string(streamsJSONbytes), "includes")
} else {
L7.Set(streams.ToSlice(), "includes")
}
sizeOfStreams := streams.Cardinality()
if (sizeOfStreams == 1 && streams.Contains(0)) || sizeOfStreams > 10 {
json.Set(stringFormatter.Format("{0} | {1}", *message, "h2c"), "message")
} else {
json.Set(stringFormatter.Format("{0} | {1} | streams:{2} | req:{3} | res:{4} | data:{5}", *message, "h2c",
streams.ToSlice(), requestStreams.ToSlice(), responseStreams.ToSlice(), dataStreams.ToSlice()), "message")
}
return L7, true, true
}
// HTTP/1.1 is not multiplexed, so `StreamID` is always `1`
StreamID := http11StreamID
ts, traced := tsp(&StreamID)
streams.Add(StreamID)
fragmented := false // stop tracking is the default behavior
defer func() {
// some HTTP Servers split headers and body by flushing immediately after headers,
// so if this packet is carrying an HTTP Response, stop trace-tracking if:
// - the packet contains the full HTTP Response body, or more specifically:
// - if the `Content-Length` header value is equal to the observed `size-of-payload`:
// - which means that the HTTP Response is not fragmented.
L7.Set(fragmented, "fragmented")
}()
// L7 is a quasi-RAW representation of the HTTP message.
// see: https://www.rfc-editor.org/rfc/rfc7540#section-8.1.3
dataBytes := bytes.SplitN(appLayerData, http11BodySeparator, 2)
// `parts` is everything before HTTP payload separator (`2*line-break`)
// - it includes: the HTTP line, and HTTP headers
parts := bytes.Split(dataBytes[0], http11Separator)
meta, _ := json.Object("L7") // `parts[0]` is the HTTP/1.1 preface
meta.Set("HTTP", "proto")
meta.Set(string(parts[0]), "preface")
metaHeaders, _ := meta.ArrayOfSize(len(parts)-1, "headers")
// HTTP headers starts at `parts[1]`
for i, header := range parts[1:] {
if len(header) > 128 {
metaHeaders.SetIndex(string(header[:128-3]), i)
} else if len(header) > 0 {
metaHeaders.SetIndex(string(header), i)
} else {
metaHeaders.SetIndex("<EMPTY>", i)
}
}
if len(dataBytes) > 1 {
parts = bytes.Split(dataBytes[1], http11Separator)
body, _ := meta.ArrayOfSize(len(parts), "body")
for i, line := range parts {
if len(line) > 128 {
body.SetIndex(string(line[:128-3])+"...", i)
} else if len(line) > 0 {
body.SetIndex(string(line), i)
} else {
body.SetIndex("<EMPTY>", i)
}
}
}
httpDataReader := bufio.NewReaderSize(bytes.NewReader(appLayerData), len(appLayerData))
// attempt to parse HTTP/1.1 request
if isHTTP11Request {
requestStreams.Add(StreamID)
L7.Set("request", "kind")
request, err := http.ReadRequest(httpDataReader)
if (err != nil && err != io.EOF && err != io.ErrUnexpectedEOF) || request == nil {
errorJSON, _ := L7.Object("error")
errorJSON.Set("INVALID_HTTP11_REQUEST", "code")
if err != nil {
errorJSON.Set(err.Error(), "info")
}
errorJSON.Set(request != nil, "parsed")
L7.Set("HTTP/1.1", "proto")
json.Set(stringFormatter.Format("{0} | {1}: {2}",
*message, "INVALID_HTTP11_REQUEST", err.Error()), "message")
return L7, true, false
}
url := ""
if _url := request.URL; _url != nil {
url = _url.String()
}
if url == "" {
if parts := http11RequestPayloadRegex.
FindSubmatch(appLayerData); len(parts) >= 3 {
url = string(parts[2])
L7.Set(url, "url")
L7.Set("HTTP/1.1", "proto")
}
// abort, not safe to continue,
// the "quasi-RAW" will tell...
return L7, true, false
}
L7.Set(url, "url")
L7.Set(request.Proto, "proto")
L7.Set(request.Method, "method")
if _ts := t.addHTTPHeaders(L7, &request.Header); _ts != nil {
_ts.streamID = &StreamID
requestTS[StreamID] = _ts
// include trace and span id for traceability
t.setTraceAndSpan(json, _ts)
t.recordHTTP11Request(packet, flowID, sequence, _ts, &request.Method, &request.Host, &url)
}
sizeOfBody := t.addHTTPBodyDetails(L7, &request.ContentLength, request.Body)
if sizeOfBody > 0 {
dataStreams.Add(StreamID)
}
if cl, clErr := strconv.ParseUint(request.Header.Get(httpContentLengthHeader), 10, 64); clErr == nil {
fragmented = cl > sizeOfBody
}
json.Set(stringFormatter.Format("{0} | {1} {2} {3}", *message, request.Proto, request.Method, url), "message")
return L7, true, false
}
// attempt to parse HTTP/1.1 response
if isHTTP11Response {
responseStreams.Add(StreamID)
L7.Set("response", "kind")
// Go's `http` implementation may miss the `Transfer-Encoding` header
// - see: https://github.com/golang/go/issues/27061
response, err := http.ReadResponse(httpDataReader, nil)
if (err != nil && err != io.EOF && err != io.ErrUnexpectedEOF) || response == nil {
errorJSON, _ := L7.Object("error")
errorJSON.Set("INVALID_HTTP11_RESPONSE", "code")
if err != nil {
errorJSON.Set(err.Error(), "info")
}
errorJSON.Set(response != nil, "parsed")
L7.Set("HTTP/1.1", "proto")
json.Set(stringFormatter.Format("{0} | {1}: {2}",
*message, "INVALID_HTTP11_RESPONSE", err.Error()), "message")
return L7, true, false
}
L7.Set(response.Proto, "proto")
L7.Set(response.StatusCode, "code")
L7.Set(response.Status, "status")
if _ts := t.addHTTPHeaders(L7, &response.Header); _ts != nil {
_ts.streamID = &StreamID
responseTS[StreamID] = _ts
// include trace and span id for traceability
t.setTraceAndSpan(json, _ts)
if linkErr := t.linkHTTP11ResponseToRequest(packet, flowID, L7, _ts); linkErr != nil {
io.WriteString(os.Stderr, linkErr.Error()+"\n")
}
} else if traced {
responseTS[StreamID] = ts
t.setTraceAndSpan(json, ts)
t.linkHTTP11ResponseToRequest(packet, flowID, L7, ts)
}
sizeOfBody := t.addHTTPBodyDetails(L7, &response.ContentLength, response.Body)
if sizeOfBody > 0 {
dataStreams.Add(StreamID)
}
if cl, clErr := strconv.ParseUint(response.Header.Get(httpContentLengthHeader), 10, 64); clErr == nil {
// if content-length is greater than the size of body:
// - this HTTP message is fragmented and so there's more to come
fragmented = cl > sizeOfBody
}
json.Set(stringFormatter.Format("{0} | {1} {2}", *message, response.Proto, response.Status), "message")
return L7, true, false
}
return json, true, false
}
func (t *JSONPcapTranslator) addHTTPBodyDetails(L7 *gabs.Container, contentLength *int64, body io.Reader) uint64 {
bodyBytes, err := io.ReadAll(body)
if err != nil {
return uint64(0)
}
bodyJSON, _ := L7.Object("body")
sizeOfBody := uint64(len(bodyBytes))
bodyLengthJSON, _ := bodyJSON.ArrayOfSize(2, "length")
bodyLengthJSON.SetIndex(strconv.FormatUint(sizeOfBody, 10), 0)
bodyLengthJSON.SetIndex(strconv.FormatInt(*contentLength, 10), 1)
if sizeOfBody > 512 {
bodyJSON.Set(string(bodyBytes[:512-3])+"...", "sample")
} else if sizeOfBody > 0 {
bodyJSON.Set(string(bodyBytes), "data")
}
return sizeOfBody
}
func (t *JSONPcapTranslator) recordHTTP11Request(
packet *gopacket.Packet,
_ *uint64, /* flowID */
_ *uint32, /* TCP sequence */
ts *traceAndSpan,
method, host, url *string,
) {
fullURL := stringFormatter.Format("{0}{1}", *host, *url)
_httpRequest := &httpRequest{
timestamp: &(*packet).Metadata().Timestamp,
method: method,
url: &fullURL,
}
t.traceToHttpRequestMap.Set(*ts.traceID, _httpRequest)
}
func (t *JSONPcapTranslator) linkHTTP11ResponseToRequest(
packet *gopacket.Packet,
_ *uint64, /* flowID */
response *gabs.Container,
ts *traceAndSpan,
) error {
jsonTranslatorRequest, ok := t.traceToHttpRequestMap.Get(*ts.traceID)
if !ok {
return errors.New(stringFormatter.Format("no request found for trace-id: {0}", *ts.traceID))
}
translatorRequest := *jsonTranslatorRequest
// hydrate response with information from request
request, _ := response.Object("request")
request.Set(*translatorRequest.method, "method")
request.Set(*translatorRequest.url, "url")
requestTimestamp := *translatorRequest.timestamp
responseTimestamp := (*packet).Metadata().Timestamp
latency := responseTimestamp.Sub(requestTimestamp)
request.Set(requestTimestamp.Format(time.RFC3339Nano), "timestamp")
request.Set(latency.Milliseconds(), "latency")
// intentionally not removing from `traceToHttpRequestMap`:
// - it will be done by `untrackConnection` on `RST` or `FIN+ACK`
// - allows to link multiple `traceID`s with the same flow
return nil
}
func (t *JSONPcapTranslator) addHTTPHeaders(L7 *gabs.Container, headers *http.Header) *traceAndSpan {
jsonHeaders, _ := L7.Object("headers")
var traceAndSpan *traceAndSpan = nil
for key, value := range *headers {
jsonHeaders.Set(value, key)
for headerStr, headerRgx := range traceAndSpanRegex {
if strings.EqualFold(key, headerStr) {
traceAndSpan = t.getTraceAndSpan(headerRgx, &value[0])
}
}
}
return traceAndSpan
}
func (t *JSONPcapTranslator) getTraceAndSpan(
headerRgx *regexp.Regexp,
rawTraceAndSpan *string,
) *traceAndSpan {
if ts := headerRgx.FindStringSubmatch(*rawTraceAndSpan); ts != nil {
return &traceAndSpan{traceID: &ts[1], spanID: &ts[2]}
}
return nil
}
func (t *JSONPcapTranslator) setTraceAndSpan(json *gabs.Container, ts *traceAndSpan) bool {
if ts == nil {
json.Set(false, "logging.googleapis.com/trace_sampled")
return false
}
json.Set(cloudTracePrefix+*ts.traceID, "logging.googleapis.com/trace")
json.Set(*ts.spanID, "logging.googleapis.com/spanId")
json.Set(true, "logging.googleapis.com/trace_sampled")
return true
}
func (t *JSONPcapTranslator) toJSONBytes(packet *fmt.Stringer) (int, []byte, error) {
translation, err := t.asTranslation(*packet).MarshalJSON()
if err != nil {
return 0, nil, errors.Wrap(err, "JSON translation failed")
}
lineBreak := []byte("\n")
b := make([]byte, len(lineBreak)+len(translation))
return copy(b[copy(b[0:], translation):], lineBreak), b, nil
}
func (t *JSONPcapTranslator) write(ctx context.Context, writer io.Writer, packet *fmt.Stringer) (int, error) {
bytesCount, translationBytes, err := t.toJSONBytes(packet)
if err != nil {
return 0, errors.Wrap(err, "JSON translation failed")
}
writtenBytes, err := writer.Write(translationBytes)
if err != nil {
return 0, errors.Wrap(err, "failed to write JSON translation")
}
if bytesCount != writtenBytes {
return writtenBytes, errors.New("translationBytes(" + strconv.Itoa(bytesCount) + ") != writtenBytes(" + strconv.Itoa(writtenBytes) + ")")
}
return writtenBytes, nil
}
func newJSONPcapTranslator(
ctx context.Context,
debug bool,
verbosity PcapVerbosity,
iface *PcapIface,
ephemerals *PcapEphemeralPorts,
) PcapTranslator {
flowToStreamToSequenceMap := haxmap.New[uint64, STSM]()
traceToHttpRequestMap := haxmap.New[string, *httpRequest]()
flowMutex := newFlowMutex(ctx, debug, flowToStreamToSequenceMap, traceToHttpRequestMap)
translator := newPcapTranslator(ctx, debug, verbosity, iface, ephemerals)
return &JSONPcapTranslator{
pcapTranslator: translator,
fm: flowMutex,
traceToHttpRequestMap: traceToHttpRequestMap,
flowToStreamToSequenceMap: flowToStreamToSequenceMap,
}
}