pcap-cli/internal/transformer/translator_worker.go (458 lines of code) (raw):

// Copyright 2024 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package transformer import ( "context" "fmt" "net/netip" "runtime/debug" "sync" "time" mapset "github.com/deckarep/golang-set/v2" "github.com/google/gopacket" "github.com/google/gopacket/layers" ) type ( pcapTranslatorWorker struct { ifaces netIfaceIndex iface *PcapIface filters PcapFilters serial *uint64 packet *gopacket.Packet translator PcapTranslator conntrack bool compat bool loggerPrefix *string } packetLayerTranslator = func(context.Context, *pcapTranslatorWorker, bool) fmt.Stringer layersTranslators = map[gopacket.LayerType]packetLayerTranslator httpRequest struct { timestamp *time.Time url, method *string } traceAndSpan struct { traceID, spanID *string streamID *uint32 } ) var ( // alternatives per layer; there can only be one! packetLayerTranslators = [][]packetLayerTranslator{ // [0]: L2 { // [0][0] func(ctx context.Context, w *pcapTranslatorWorker, deep bool) fmt.Stringer { return w.translateEthernetLayer(ctx, deep) }, }, // [1]: L3 { // [1][0] func(ctx context.Context, w *pcapTranslatorWorker, deep bool) fmt.Stringer { return w.translateIPv4Layer(ctx, deep) }, // [1][1] func(ctx context.Context, w *pcapTranslatorWorker, deep bool) fmt.Stringer { return w.translateIPv6Layer(ctx, deep) }, }, // [2]: L4 { // ICMP layers // - ICMP is not a transport layer protocol, but if it is present // - then actual transport layer protocol translations are disabled; // - thus, TCP/UDP translations are causally dependant on the lack of ICMP. // [2][0] func(ctx context.Context, w *pcapTranslatorWorker, deep bool) fmt.Stringer { return w.translateICMPv4Layer(ctx, deep) }, // [2][1] func(ctx context.Context, w *pcapTranslatorWorker, deep bool) fmt.Stringer { return w.translateICMPv6Layer(ctx, deep) }, // non-ICMP layers // [2][2] func(ctx context.Context, w *pcapTranslatorWorker, deep bool) fmt.Stringer { return w.translateTCPLayer(ctx, deep) }, // [2][3] func(ctx context.Context, w *pcapTranslatorWorker, deep bool) fmt.Stringer { return w.translateUDPLayer(ctx, deep) }, }, // [3]: L7 { // [3][0] func(ctx context.Context, w *pcapTranslatorWorker, deep bool) fmt.Stringer { return w.translateDNSLayer(ctx, deep) }, // [3][1] func(ctx context.Context, w *pcapTranslatorWorker, deep bool) fmt.Stringer { return w.translateTLSLayer(ctx, deep) }, }, } packetLayerTranslatorsSize = len(packetLayerTranslators) packetLayerTranslatorsMap layersTranslators = map[gopacket.LayerType]packetLayerTranslator{ layers.LayerTypeEthernet: packetLayerTranslators[0][0], layers.LayerTypeIPv4: packetLayerTranslators[1][0], layers.LayerTypeIPv6: packetLayerTranslators[1][1], layers.LayerTypeICMPv4: packetLayerTranslators[2][0], layers.LayerTypeICMPv6: packetLayerTranslators[2][1], layers.LayerTypeTCP: packetLayerTranslators[2][2], layers.LayerTypeUDP: packetLayerTranslators[2][3], layers.LayerTypeDNS: packetLayerTranslators[3][0], layers.LayerTypeTLS: packetLayerTranslators[3][1], layers.LayerTypeARP: func( ctx context.Context, w *pcapTranslatorWorker, deep bool, ) fmt.Stringer { return w.translateARPLayer(ctx, deep) }, layers.LayerTypeICMPv6Echo: func( ctx context.Context, w *pcapTranslatorWorker, deep bool, ) fmt.Stringer { return w.translateICMPv6EchoLayer(ctx, deep) }, layers.LayerTypeICMPv6Redirect: func( ctx context.Context, w *pcapTranslatorWorker, deep bool, ) fmt.Stringer { return w.translateICMPv6RedirectLayer(ctx, deep) }, gopacket.LayerTypeDecodeFailure: func( ctx context.Context, w *pcapTranslatorWorker, deep bool, ) fmt.Stringer { return w.translateErrorLayer(ctx, deep) }, } skippedLayersList = []gopacket.LayerType{ gopacket.LayerTypePayload, gopacket.LayerTypeDecodeFailure, layers.LayerTypeLinuxSLL, } skippedLayers = mapset.NewSet(skippedLayersList...) ) func (w pcapTranslatorWorker) pkt(ctx context.Context) gopacket.Packet { return *w.packet } func (w *pcapTranslatorWorker) asLayer(ctx context.Context, layer gopacket.LayerType) gopacket.Layer { // https://github.com/google/gopacket/blob/master/packet.go#L568-L585 // https://github.com/google/gopacket/blob/master/packet.go#L476-L483 return w.pkt(ctx).Layer(layer) } func (w *pcapTranslatorWorker) translateLayer( ctx context.Context, layer gopacket.LayerType, deep bool, ) fmt.Stringer { // confirm that the packet actually contains the requested layer l := w.asLayer(ctx, layer) if l == nil { return nil } switch lType := l.(type) { default: return nil case *layers.Ethernet: return w.translator.translateEthernetLayer(ctx, lType) case *layers.ARP: return w.translator.translateARPLayer(ctx, lType) case *layers.IPv4: return w.translator.translateIPv4Layer(ctx, lType) case *layers.IPv6: return w.translator.translateIPv6Layer(ctx, lType) case *layers.ICMPv4: return w.translator.translateICMPv4Layer(ctx, lType) case *layers.ICMPv6: icmp6 := w.translator.translateICMPv6Layer(ctx, lType) // [ToDo]: handle layers.ICMPv6TypePacketTooBig if lType.TypeCode.Type() == layers.ICMPv6TypeDestinationUnreachable || lType.TypeCode.Type() == layers.ICMPv6TypeTimeExceeded { return w.translator.translateICMPv6L3HeaderLayer(ctx, icmp6, lType) } if !deep { return icmp6 } _l := w.asLayer(ctx, lType.NextLayerType()) switch _lType := _l.(type) { default: return icmp6 case *layers.ICMPv6Echo: return w.translator.translateICMPv6EchoLayer(ctx, icmp6, _lType) case *layers.ICMPv6Redirect: return w.translator.translateICMPv6RedirectLayer(ctx, icmp6, _lType) } case *layers.ICMPv6Echo: return w.translator.translateICMPv6EchoLayer(ctx, nil, lType) case *layers.ICMPv6Redirect: return w.translator.translateICMPv6RedirectLayer(ctx, nil, lType) case *layers.TCP: return w.translator.translateTCPLayer(ctx, lType) case *layers.UDP: return w.translator.translateUDPLayer(ctx, lType) case *layers.DNS: return w.translator.translateDNSLayer(ctx, lType) case *layers.TLS: return w.translator.translateTLSLayer(ctx, lType) case *gopacket.DecodeFailure: // see: https://github.com/google/gopacket/blob/v1.1.19/decode.go#L118-L126 return w.translator.translateErrorLayer(ctx, lType) } } func (w pcapTranslatorWorker) translateEthernetLayer(ctx context.Context, deep bool) fmt.Stringer { return w.translateLayer(ctx, layers.LayerTypeEthernet, deep) } func (w pcapTranslatorWorker) translateARPLayer(ctx context.Context, deep bool) fmt.Stringer { return w.translateLayer(ctx, layers.LayerTypeARP, deep) } func (w *pcapTranslatorWorker) translateIPv4Layer(ctx context.Context, deep bool) fmt.Stringer { return w.translateLayer(ctx, layers.LayerTypeIPv4, deep) } func (w *pcapTranslatorWorker) translateIPv6Layer(ctx context.Context, deep bool) fmt.Stringer { return w.translateLayer(ctx, layers.LayerTypeIPv6, deep) } func (w *pcapTranslatorWorker) translateICMPv4Layer(ctx context.Context, deep bool) fmt.Stringer { return w.translateLayer(ctx, layers.LayerTypeICMPv4, deep) } func (w *pcapTranslatorWorker) translateICMPv6Layer(ctx context.Context, deep bool) fmt.Stringer { return w.translateLayer(ctx, layers.LayerTypeICMPv6, deep) } func (w *pcapTranslatorWorker) translateICMPv6EchoLayer(ctx context.Context, deep bool) fmt.Stringer { return w.translateLayer(ctx, layers.LayerTypeICMPv6Echo, deep) } func (w *pcapTranslatorWorker) translateICMPv6RedirectLayer(ctx context.Context, deep bool) fmt.Stringer { return w.translateLayer(ctx, layers.LayerTypeICMPv6Redirect, deep) } func (w *pcapTranslatorWorker) translateTCPLayer(ctx context.Context, deep bool) fmt.Stringer { return w.translateLayer(ctx, layers.LayerTypeTCP, deep) } func (w *pcapTranslatorWorker) translateUDPLayer(ctx context.Context, deep bool) fmt.Stringer { return w.translateLayer(ctx, layers.LayerTypeUDP, deep) } func (w *pcapTranslatorWorker) translateDNSLayer(ctx context.Context, deep bool) fmt.Stringer { return w.translateLayer(ctx, layers.LayerTypeDNS, deep) } func (w *pcapTranslatorWorker) translateTLSLayer(ctx context.Context, deep bool) fmt.Stringer { /* packet := w.pkt(ctx) if packet.ApplicationLayer() != nil { var tls layers.TLS var decoded []gopacket.LayerType parser := gopacket.NewDecodingLayerParser(layers.LayerTypeTLS, &tls) err := parser.DecodeLayers(packet.ApplicationLayer().LayerContents(), &decoded) if err == nil { for _, layerType := range decoded { switch layerType { case layers.LayerTypeTLS: return w.translator.translateTLSLayer(ctx, &tls) } } } } */ return w.translateLayer(ctx, layers.LayerTypeTLS, deep) } func (w *pcapTranslatorWorker) translateErrorLayer(ctx context.Context, deep bool) fmt.Stringer { return w.translateLayer(ctx, gopacket.LayerTypeDecodeFailure, deep) } func (w *pcapTranslatorWorker) toIP4Addrs( ip4 *layers.IPv4, ) (*netip.Addr, *netip.Addr) { var ip4Bytes [4]byte copy(ip4Bytes[:], ip4.SrcIP.To4()) srcAddr := netip.AddrFrom4(ip4Bytes) copy(ip4Bytes[:], ip4.DstIP.To4()) dstAddr := netip.AddrFrom4(ip4Bytes) return &srcAddr, &dstAddr } func (w *pcapTranslatorWorker) toIP6Addrs( ip6 *layers.IPv6, ) (*netip.Addr, *netip.Addr) { var ip6Bytes [16]byte copy(ip6Bytes[:], ip6.SrcIP.To16()) srcAddr := netip.AddrFrom16(ip6Bytes) copy(ip6Bytes[:], ip6.DstIP.To16()) dstAddr := netip.AddrFrom16(ip6Bytes) return &srcAddr, &dstAddr } func (w *pcapTranslatorWorker) isIPv4Allowed( ctx context.Context, ip4 *layers.IPv4, ) (*netip.Addr, *netip.Addr, bool) { src, dst := w.toIP4Addrs(ip4) if w.filters.HasL3Protos() && !w.filters.AllowsIPv4() { // fail fast: nothing to verify return src, dst, false } if !w.filters.HasIPv4s() { // fail open: ALL IPv4s are allowed return src, dst, true } if !w.filters.AllowsIPv4Addr(src) { // fail fast: if SRC is not allowed, skip checking DST return src, dst, false } return src, dst, w.filters.AllowsIPv4Addr(dst) } func (w *pcapTranslatorWorker) isIPv6Allowed( ctx context.Context, ip6 *layers.IPv6, ) (*netip.Addr, *netip.Addr, bool) { src, dst := w.toIP6Addrs(ip6) if w.filters.HasL3Protos() && !w.filters.AllowsIPv6() { // fail fast: nothing to verify return src, dst, false } if !w.filters.HasIPv6s() { // fail open: ALL IPv6s are allowed return src, dst, true } if !w.filters.AllowsIPv6Addr(src) { // fail fast: if SRC is not allowed, skip checking DST return src, src, false } return src, dst, w.filters.AllowsIPv6Addr(dst) } func (w *pcapTranslatorWorker) isL3Allowed( ctx context.Context, ) (*netip.Addr, *netip.Addr, bool) { layer := w.asLayer(ctx, layers.LayerTypeIPv4) isIPv6 := false if layer == nil { if layer = w.asLayer(ctx, layers.LayerTypeIPv6); layer == nil { // the packet does not contain IP layer information // fail open: nothing to verify return nil, nil, true } isIPv6 = true } if isIPv6 { ip6 := layer.(*layers.IPv6) return w.isIPv6Allowed(ctx, ip6) } ip4 := layer.(*layers.IPv4) return w.isIPv4Allowed(ctx, ip4) } func (w *pcapTranslatorWorker) arePortsAllowed( ctx context.Context, src *uint16, dst *uint16, ) (*uint16, *uint16, bool) { if w.filters.AllowsAnyL4Addr(*src, *dst) { return src, dst, true } // fail open return src, dst, false } func (w *pcapTranslatorWorker) isL4Allowed( ctx context.Context, ) (*uint16, *uint16, bool) { isProtosFilterAvailable := w.filters.HasL4Protos() isTCPflagsFilterAvailable := w.filters.HasTCPflags() isL4AddrsFilterAvailable := w.filters.HasL4Addrs() layer := w.asLayer(ctx, layers.LayerTypeTCP) if layer != nil { tcp := layer.(*layers.TCP) srcPort := uint16(tcp.SrcPort) dstPort := uint16(tcp.DstPort) if isProtosFilterAvailable && !w.filters.AllowsTCP() { // fail fast: if TCP is not allowed, do not check ports return &srcPort, &dstPort, false } if isTCPflagsFilterAvailable { // fail fast & open: if this it TCP, then flags cannot be 0; some flag must be set if flags := parseTCPflags(tcp); !w.filters.AllowsAnyTCPflags(&flags) { return &srcPort, &dstPort, false } } if isL4AddrsFilterAvailable { return w.arePortsAllowed(ctx, &srcPort, &dstPort) } return &srcPort, &dstPort, true } layer = w.asLayer(ctx, layers.LayerTypeUDP) if layer == nil { // the packet does not contain TCP/UDP information // fail open return nil, nil, true } udp := layer.(*layers.UDP) srcPort := uint16(udp.SrcPort) dstPort := uint16(udp.DstPort) if isProtosFilterAvailable && !w.filters.AllowsUDP() { // fail fast: if UDP is not allowed, do not check ports return &srcPort, &dstPort, false } if isL4AddrsFilterAvailable { return w.arePortsAllowed(ctx, &srcPort, &dstPort) } return &srcPort, &dstPort, true } func (w *pcapTranslatorWorker) isSocketAllowed( srcAddr *netip.Addr, srcPort *uint16, dstAddr *netip.Addr, dstPort *uint16, ) bool { if srcAddr == nil || srcPort == nil || dstAddr == nil || dstPort == nil { // fail open: if any input is `nil`, skip socket filter return true } return w.filters.AllowsSocket(srcAddr, srcPort, dstAddr, dstPort) } func (w *pcapTranslatorWorker) shouldTranslate(ctx context.Context) bool { srcAddr, dstAddr, l3Allowed := w.isL3Allowed(ctx) srcPort, dstPort, l4Allowed := w.isL4Allowed(ctx) if l3Allowed && l4Allowed { // only enforce sockets if everything else is allowed return w.isSocketAllowed(srcAddr, srcPort, dstAddr, dstPort) } return false } func (w *pcapTranslatorWorker) translate( ctx context.Context, index int, layer gopacket.Layer, translations chan<- fmt.Stringer, wg *sync.WaitGroup, ) { layerType := layer.LayerType() defer func(index int, layer gopacket.Layer, wg *sync.WaitGroup) { if r := recover(); r != nil { translations <- w.translator.translateLayerError(ctx, layerType, fmt.Errorf("%v: %s", r, string(debug.Stack()))) } wg.Done() }(index, layer, wg) if translator, ok := packetLayerTranslatorsMap[layerType]; ok { if t := translator(ctx, w, false /* deep */); t == nil { translations <- w.translator.translateLayerError(ctx, layerType, fmt.Errorf("unavailable@%d", index)) } else { translations <- t } return } // translator does not have an implementation to handle this layer type switch layer.(type) { default: if !skippedLayers.Contains(layerType) { translations <- w.translator.translateLayerError(ctx, layerType, fmt.Errorf("unimplemented@%d", index)) } } } // The work that needs to be performed // The input type should implement the WorkFunction interface func (w *pcapTranslatorWorker) Run(ctx context.Context) (buffer interface{}) { defer func() { if r := recover(); r != nil { transformerLogger.Printf("%s @translator | panic: %s\n%s\n", *w.loggerPrefix, r, string(debug.Stack())) buffer = nil } }() // fail open: // - if there aren't any filters, continue with translation. // fail fast: // - do not translate any layers before enforcing filters. if w.filters != nil && !w.shouldTranslate(ctx) { return nil } var _buffer fmt.Stringer = nil select { case <-ctx.Done(): _buffer = nil default: _buffer = w.translator.next(ctx, w.iface, w.serial, w.packet) } if _buffer == nil { transformerLogger.Printf("%s @translator | failed", *w.loggerPrefix) buffer = nil return nil } translations := make(chan fmt.Stringer, packetLayerTranslatorsSize) var wg sync.WaitGroup // number of layers to be translated packetLayers := w.pkt(ctx).Layers() wg.Add(len(packetLayers)) go func(wg *sync.WaitGroup) { wg.Wait() close(translations) }(&wg) // O(N); N is the number of layers available in the packet // this is a faster implementation as there is no layer discovery; // layers are translated on-demand based on the packet's contents. for i, l := range packetLayers { // translate layers concurrently: // - layers must know nothing about each other go w.translate(ctx, i, l, translations, &wg) } for translation := range translations { // translations are `nil` if layer is not available if translation != nil { // see: https://github.com/Jeffail/gabs?tab=readme-ov-file#merge-two-containers _buffer, _ = w.translator.merge(ctx, _buffer, translation) } } select { case <-ctx.Done(): // skip `finalize` deliver translation as-is transformerLogger.Printf("%s @translator | incomplete", *w.loggerPrefix) default: // `finalize` is the only method that is allowed to work across layers _buffer, _ = w.translator.finalize(ctx, w.ifaces, w.iface, w.serial, w.packet, w.conntrack, _buffer) } buffer = &_buffer return &_buffer } func newPcapTranslatorWorker( ifaces netIfaceIndex, iface *PcapIface, filters PcapFilters, serial *uint64, packet *gopacket.Packet, translator PcapTranslator, connTrack bool, compat bool, ) *pcapTranslatorWorker { loggerPrefix := fmt.Sprintf("[%d/%s] - #:%d |", iface.Index, iface.Name, *serial) worker := &pcapTranslatorWorker{ filters: filters, ifaces: ifaces, iface: iface, serial: serial, packet: packet, translator: translator, conntrack: connTrack, compat: compat, loggerPrefix: &loggerPrefix, } return worker }