pcap-cli/pkg/pcap/gopacket_engine.go (187 lines of code) (raw):

// Copyright 2024 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package pcap import ( "context" "fmt" "io" "log" "os" "strconv" "strings" "sync/atomic" "time" "github.com/GoogleCloudPlatform/pcap-sidecar/pcap-cli/internal/transformer" mapset "github.com/deckarep/golang-set/v2" "github.com/google/gopacket" "github.com/google/gopacket/pcap" ) var gopacketLogger = log.New(os.Stderr, "[gopacket] - ", log.LstdFlags) func (p *Pcap) IsActive() bool { return p.isActive.Load() } func (p *Pcap) newPcap(ctx context.Context) (*pcap.InactiveHandle, error) { cfg := *p.config var err error inactiveHandle, err := pcap.NewInactiveHandle(cfg.Iface) if err != nil { gopacketLogger.Printf("could not create: %v\n", err) } if err = inactiveHandle.SetSnapLen(cfg.Snaplen); err != nil { gopacketLogger.Printf("could not set snap length: %v\n", err) return nil, err } if err = inactiveHandle.SetPromisc(cfg.Promisc); err != nil { gopacketLogger.Printf("could not set promisc mode: %v\n", err) return nil, err } // [TODO]: make handle timeout dynamic if err = inactiveHandle.SetTimeout(100 * time.Millisecond); err != nil { gopacketLogger.Printf("could not set timeout: %v\n", err) return nil, err } if cfg.TsType != "" { if t, err := pcap.TimestampSourceFromString(cfg.TsType); err != nil { gopacketLogger.Printf("Supported timestamp types: %v\n", inactiveHandle.SupportedTimestamps()) return nil, err } else if err := inactiveHandle.SetTimestampSource(t); err != nil { gopacketLogger.Printf("Supported timestamp types: %v\n", inactiveHandle.SupportedTimestamps()) return nil, err } } p.inactiveHandle = inactiveHandle return inactiveHandle, nil } func (p *Pcap) Start( ctx context.Context, writers []PcapWriter, stopDeadline <-chan *time.Duration, ) error { // atomically activate the packet capture if !p.isActive.CompareAndSwap(false, true) { return fmt.Errorf("already started") } var err error var handle *pcap.Handle inactiveHandle, err := p.newPcap(ctx) if err != nil { return err } defer inactiveHandle.CleanUp() if handle, err = inactiveHandle.Activate(); err != nil { p.isActive.Store(false) return fmt.Errorf("failed to activate: %s", err) } defer handle.Close() p.activeHandle = handle cfg := *p.config debug := cfg.Debug compat := cfg.Compat device := cfg.Device var iface *transformer.PcapIface if device != nil { // `device` is not safe to use outside this branch addrs := mapset.NewSetWithSize[string](len(device.Addresses)) for _, addr := range device.Addresses { // [ToDo]: use `net.IP` instead of `string` addrs.Add(addr.IP.String()) } iface = &transformer.PcapIface{ Index: uint8(device.NetInterface.Index), Name: device.Name, Addrs: addrs, } } else { iface = &transformer.PcapIface{ Index: anyDeviceIndex, Name: anyDeviceName, Addrs: mapset.NewThreadUnsafeSetWithSize[string](0), } } loggerPrefix := fmt.Sprintf("[%d/%s]", iface.Index, iface.Name) if !compat { // set packet capture filter; i/e: `tcp port 8080` if filter := providePcapFilter(ctx, &cfg.Filter, cfg.Filters); *filter != "" { if err = handle.SetBPFFilter(*filter); err != nil { gopacketLogger.Printf("%s - BPF filter error: [%s] => %+v\n", loggerPrefix, *filter, err) return fmt.Errorf("BPF filter error: %s", err) } gopacketLogger.Printf("%s - filter: %s\n", loggerPrefix, *filter) } } gopacketLogger.Printf("%s - starting packet capture\n", loggerPrefix) source := gopacket.NewPacketSource(handle, handle.LinkType()) // https://github.com/google/gopacket/blob/master/packet.go#L660-L680 source.Lazy = true // https://github.com/google/gopacket/blob/master/packet.go#L655-L659 source.NoCopy = true source.SkipDecodeRecovery = false source.DecodeStreamsAsDatagrams = true // `io.Writer` is what `fmt.Fprintf` requires ioWriters := make([]io.Writer, len(writers)) for i, writer := range writers { ioWriters[i] = writer } format := cfg.Format compatFilters, ok := cfg.CompatFilters.(transformer.PcapFilters) if !ok { compatFilters = nil } // create new transformer for the specified output format if cfg.Ordered { p.fn, err = transformer.NewOrderedTransformer(ctx, cfg.Verbosity, iface, cfg.Ephemerals, compatFilters, ioWriters, &format, debug, compat) } else if cfg.ConnTrack { p.fn, err = transformer.NewConnTrackTransformer(ctx, cfg.Verbosity, iface, cfg.Ephemerals, compatFilters, ioWriters, &format, debug, compat) } else { p.fn, err = transformer.NewTransformer(ctx, cfg.Verbosity, iface, cfg.Ephemerals, compatFilters, ioWriters, &format, debug, compat) } if err != nil { return fmt.Errorf("invalid format: %s", err) } if firstPacket, err := source.NextPacket(); err == nil && firstPacket != nil { serial := uint64(0) if err = p.fn.Apply(ctx, &firstPacket, &serial); err != nil { gopacketLogger.Printf("%s - #:0 | failed to translate 1st packet: %v\n", loggerPrefix, err) } } else { gopacketLogger.Printf("%s - #:0 | error: %v\n", loggerPrefix, err) } gopacketLogger.Printf("%s - translating packets\n", loggerPrefix) var packetsCounter atomic.Uint64 var ctxDoneTS time.Time for p.isActive.Load() { select { case <-ctx.Done(): if p.isActive.CompareAndSwap(true, false) { ctxDoneTS = time.Now() gopacketLogger.Printf("%s - stopping packet capture\n", loggerPrefix) } case packet := <-source.Packets(): serial := packetsCounter.Add(1) // non-blocking operation if err = p.fn.Apply(ctx, &packet, &serial); err != nil && p.isActive.Load() { gopacketLogger.Printf("%s - #:%d | failed to translate: %v\n", loggerPrefix, serial, err) } } } gopacketLogger.Printf("%s - stopping packet capture\n", loggerPrefix) engineStopDeadline := <-stopDeadline deadline := *engineStopDeadline - time.Since(ctxDoneTS) p.fn.WaitDone(ctx, &deadline) gopacketLogger.Printf("%s – total packets: %d\n", loggerPrefix, packetsCounter.Load()) return ctx.Err() } func NewPcap(config *PcapConfig) (PcapEngine, error) { var isActive atomic.Bool isActive.Store(false) debug := config.Debug if debugEnvVar, err := strconv.ParseBool(os.Getenv("PCAP_DEBUG")); err == nil { config.Debug = debug || debugEnvVar } // `config.Ephemerals` is already a safe type, // here the validation only enforces correctness of port range. if config.Ephemerals == nil || config.Ephemerals.Min < pcap_min_ephemeral_port || config.Ephemerals.Min >= config.Ephemerals.Max { config.Ephemerals = &PcapEphemeralPorts{ Min: PCAP_MIN_EPHEMERAL_PORT, Max: PCAP_MAX_EPHEMERAL_PORT, } } pcap := Pcap{config: config, isActive: &isActive} if strings.EqualFold(config.Iface, anyDeviceName) { config.Device = nil } else { devices, err := FindDevicesByName(&config.Iface) if err == nil { config.Device = devices[0] } } return &pcap, nil }