packetbeat/sniffer/sniffer.go (398 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package sniffer import ( "context" "encoding/json" "fmt" "io" "os" "runtime" "strings" "sync/atomic" "time" "github.com/google/gopacket" "github.com/google/gopacket/layers" "github.com/google/gopacket/pcap" "github.com/google/gopacket/pcapgo" "golang.org/x/sync/errgroup" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/beats/v7/packetbeat/config" "github.com/elastic/beats/v7/packetbeat/decoder" ) // Sniffer provides packet sniffing capabilities, forwarding packets read // to a Worker. type Sniffer struct { sniffers []sniffer cancel func() log *logp.Logger } type sniffer struct { config config.InterfaceConfig state *atomic.Int32 // store snifferState // device is the first active device after calling New. // It is not updated by default route polling. device string // followDefault indicates that the sniffer has // been configured to follow the default route. followDefault bool // filter is the bpf filter program used by the sniffer. filter string // id and idx identify the sniffer for metric collection. id string idx int decoders Decoders log *logp.Logger } type snifferHandle interface { gopacket.PacketDataSource LinkType() layers.LinkType Close() } // sniffer state values const ( snifferInactive = 0 snifferClosing = 1 snifferActive = 2 ) // New create a new Sniffer instance. Settings are validated in a best effort // only, but no device is opened yet. Accessing and configuring the actual device // is done by the Run method. The id parameter is used to specify the metric // collection ID for AF_PACKET sniffers on Linux. func New(id string, testMode bool, _ string, decoders map[string]Decoders, interfaces []config.InterfaceConfig) (*Sniffer, error) { s := &Sniffer{ sniffers: make([]sniffer, len(interfaces)), log: logp.NewLogger("sniffer"), } for i, iface := range interfaces { dec, ok := decoders[iface.Device] if !ok { // This should never happen. return nil, fmt.Errorf("no decoder for %s", iface.Device) } child := sniffer{ state: &atomic.Int32{}, followDefault: iface.PollDefaultRoute > 0 && strings.HasPrefix(iface.Device, "default_route"), id: id, idx: i, decoders: dec, log: s.log, } child.state.Store(snifferInactive) s.log.Debugf("interface: %d, BPF filter: '%s'", i, iface.BpfFilter) // pre-check and normalize configuration: // - resolve potential device name // - check for file output // - set some defaults if iface.File != "" { s.log.Debugf("Reading from file: %s", iface.File) if iface.BpfFilter != "" { s.log.Warn("Packet filters are not applied to pcap files. Ignoring BFP filter.") } // we read file with the pcap provider iface.Type = "pcap" iface.Device = "" } else { // try to resolve device name (ignore error if testMode is enabled) if name, err := resolveDeviceName(iface.Device); err != nil { if !testMode { return nil, err } } else { child.device = name if name == "any" && !deviceAnySupported { return nil, fmt.Errorf("any interface is not supported on %s", runtime.GOOS) } if iface.Snaplen == 0 { iface.Snaplen = 65535 } if iface.BufferSizeMb <= 0 { iface.BufferSizeMb = 24 } if iface.MetricsInterval <= 0 { iface.MetricsInterval = 5 * time.Second } if t := iface.Type; t == "autodetect" || t == "" { iface.Type = "pcap" } s.log.Debugf("Sniffer type: %s device: %s", iface.Type, child.device) } } err := validateConfig(iface.BpfFilter, &iface) //nolint:gosec // Bad linter! validateConfig completes before the next iteration. if err != nil { cfg, _ := json.Marshal(iface) return nil, fmt.Errorf("validate: %w: %s", err, cfg) } child.config = iface child.filter = iface.BpfFilter s.sniffers[i] = child } return s, nil } func validateConfig(filter string, cfg *config.InterfaceConfig) error { if cfg.File == "" { if err := validatePcapFilter(filter); err != nil { return err } } switch cfg.Type { case "pcap": return nil case "af_packet": return validateAfPacketConfig(cfg) default: return fmt.Errorf("unknown sniffer type for %s: %q", cfg.Device, cfg.Type) } } func validatePcapFilter(expr string) error { if expr == "" { return nil } _, err := pcap.NewBPF(layers.LinkTypeEthernet, 65535, expr) return err } func validateAfPacketConfig(cfg *config.InterfaceConfig) error { _, _, _, err := afpacketComputeSize(cfg.BufferSizeMb, cfg.Snaplen, os.Getpagesize()) return err } // Run opens the sniffing device and processes packets being read from that device. // Worker instances are instantiated as needed. func (s *Sniffer) Run() error { ctx, cancel := context.WithCancel(context.Background()) s.cancel = cancel g, ctx := errgroup.WithContext(ctx) for i := range s.sniffers { c := &s.sniffers[i] g.Go(func() error { var ( defaultRoute chan string refresh chan struct{} ) if c.followDefault { defaultRoute = make(chan string) refresh = make(chan struct{}, 1) go c.pollDefaultRoute(ctx, defaultRoute, refresh) } if defaultRoute == nil { return c.sniffStatic(ctx, c.device) } return c.sniffDynamic(ctx, defaultRoute, refresh) }) } return g.Wait() } // pollDefaultRoute repeatedly polls the default route's device at intervals // specified in config.PollDefaultRoute. The poller is terminated by cancelling // the context and the device chan can be read for changes in the default route. // Changes in default route will put the Sniffer into the inactive state to // trigger a new sniffer connection. Termination of the sniffer is not under // the control of the poller. func (s *sniffer) pollDefaultRoute(ctx context.Context, device chan<- string, refresh <-chan struct{}) { go func() { s.log.Info("starting default route poller") // Prime the channel. current := s.device device <- current defaultRouteMetric.Set(current) tick := time.NewTicker(s.config.PollDefaultRoute) for { select { case <-tick.C: s.log.Debug("polling default route") current = s.poll(current, device) case <-refresh: s.log.Debug("requested new default route") current = s.poll(current, device) case <-ctx.Done(): s.log.Info("closing default route poller") close(device) tick.Stop() return } // Purge any unused refresh request. The chan has a cap // of one and the send is conditional so we don't need // to do this in a loop. select { case <-refresh: default: } } }() } // poll returns the current default route interface and sends it on device // if it has a change from the old default route interface. If device resolution // fails, the default route interface is left unchanged. func (s *sniffer) poll(old string, device chan<- string) (current string) { current, err := resolveDeviceName(s.config.Device) if err != nil { s.log.Warnf("sniffer failed to poll default route device: %v", err) return old } if current != old { s.log.Infof("sniffer changing default route device: %s -> %s", old, current) s.state.Store(snifferInactive) // Mark current device as stale. ¯\_(ツ)_/¯ device <- current // Pass the new device name. defaultRouteMetric.Set(current) } return current } // sniffStatic performs the sniffing work on a single static interface. func (s *sniffer) sniffStatic(ctx context.Context, device string) error { handle, err := s.open(device) if err != nil { return fmt.Errorf("failed to start sniffer: %w", err) } defer handle.Close() dec, cleanup, err := s.decoders(handle.LinkType(), device, s.idx) if err != nil { return err } if cleanup != nil { defer cleanup() } return s.sniffHandle(ctx, handle, dec, nil) } // sniffDynamic performs sniffing work on a stream of dynamic interfaces from // defaultRoute decoders are retained between successive interfaces if they are // the same link type. func (s *sniffer) sniffDynamic(ctx context.Context, defaultRoute <-chan string, refresh chan<- struct{}) error { var ( last layers.LinkType dec *decoder.Decoder ) for device := range defaultRoute { var err error last, dec, err = s.sniffOneDynamic(ctx, device, last, dec, refresh) if err != nil { return err } } return nil } // sniffOneDynamic handles sniffing a single device that may change link type. // If the link type associated with the device differs from the last link // type or dec is nil, a new decoder is returned. The link type associated // with the device is returned. func (s *sniffer) sniffOneDynamic(ctx context.Context, device string, last layers.LinkType, dec *decoder.Decoder, refresh chan<- struct{}) (layers.LinkType, *decoder.Decoder, error) { handle, err := s.open(device) if err != nil { return last, dec, fmt.Errorf("failed to start sniffer: %w", err) } defer handle.Close() linkType := handle.LinkType() if dec == nil || linkType != last { s.log.Infof("changing link type: %d -> %d", last, linkType) var cleanup func() dec, cleanup, err = s.decoders(linkType, device, s.idx) if err != nil { return linkType, dec, err } if cleanup != nil { defer cleanup() } } err = s.sniffHandle(ctx, handle, dec, refresh) return linkType, dec, err } // sniff performs the sniffing work and writing dump files if requested. func (s *sniffer) sniffHandle(ctx context.Context, handle snifferHandle, dec *decoder.Decoder, refresh chan<- struct{}) error { var w *pcapgo.Writer if s.config.Dumpfile != "" { const timeSuffixFormat = "20060102150405" filename := fmt.Sprintf("%s-%s.pcap", s.config.Dumpfile, time.Now().Format(timeSuffixFormat)) s.log.Infof("creating new dump file %s", filename) f, err := os.Create(filename) if err != nil { return err } defer f.Close() w = pcapgo.NewWriterNanos(f) err = w.WriteFileHeader(65535, handle.LinkType()) if err != nil { return fmt.Errorf("failed to write dump file header to %s: %w", s.config.Dumpfile, err) } } // Mark inactive sniffer as active. In case of the sniffer/packetbeat closing // before/while Run is executed, the state will be snifferClosing. // => return if state is already snifferClosing. if !s.state.CompareAndSwap(snifferInactive, snifferActive) { return nil } defer s.state.Store(snifferInactive) var ( packets int timeouts int ) for s.state.Load() == snifferActive { select { case <-ctx.Done(): s.log.Infof("sniffing cancelled: %q", s.config.Device) // Return nil since this must have been due to an errgroup // termination and any error that caused that will already // have been captured by the errgroup. return nil default: } if s.config.OneAtATime { fmt.Fprintln(os.Stdout, "Press enter to read packet") fmt.Scanln() } data, ci, err := handle.ReadPacketData() if err == pcap.NextErrorTimeoutExpired || isAfpacketErrTimeout(err) { //nolint:errorlint // pcap.NextErrorTimeoutExpired is not wrapped. // If we have timed out too many times, and we are following // a default route, request a new default route interface. const maxTimeouts = 10 // Place-holder until we have a sensible notion of how big this should be. timeouts++ if s.followDefault && timeouts > maxTimeouts { select { case refresh <- struct{}{}: default: // Don't request to refresh if already requested. } timeouts = 0 } continue } timeouts = 0 if err != nil { // ignore EOF, if sniffer was driven from file if err == io.EOF && s.config.File != "" { //nolint:errorlint // io.EOF should never be wrapped. return nil } // If we are following a default route, request an interface // refresh and log the error. if s.followDefault { select { case refresh <- struct{}{}: default: // Don't request to refresh if already requested. } s.log.Warnf("error during packet capture: %v", err) continue } s.state.Store(snifferInactive) return fmt.Errorf("sniffing error: %w", err) } if len(data) == 0 { // Empty packet, probably timeout from afpacket. continue } packets++ if w != nil { err = w.WritePacket(ci, data) if err != nil { return fmt.Errorf("failed to write packet %d: %w", packets, err) } } if s.config.OneAtATime { s.log.Debugw("Packet received.", "network.packets", packets) } dec.OnPacket(data, &ci) } return nil } func (s *sniffer) open(device string) (snifferHandle, error) { if s.config.File != "" { return newFileHandler(s.config.File, s.config.TopSpeed, s.config.Loop) } switch s.config.Type { case "pcap": return openPcap(device, s.filter, &s.config) case "af_packet": return openAFPacket(fmt.Sprintf("%s_%d", s.id, s.idx), device, s.filter, &s.config) default: return nil, fmt.Errorf("unknown sniffer type for %s: %q", device, s.config.Type) } } // Stop marks a sniffer as stopped. The Run method will return once the stop // signal has been given. func (s *Sniffer) Stop() { s.log.Debug("sending stop to all sniffers") for _, c := range s.sniffers { s.log.Debugf("sending closing to %s", c.config.Device) c.state.Store(snifferClosing) } if s.cancel != nil { s.log.Debug("cancelling sniffers") s.cancel() } } func openPcap(device, filter string, cfg *config.InterfaceConfig) (snifferHandle, error) { snaplen := int32(cfg.Snaplen) timeout := 500 * time.Millisecond h, err := pcap.OpenLive(device, snaplen, true, timeout) if err != nil { return nil, err } err = h.SetBPFFilter(filter) if err != nil { h.Close() return nil, err } return h, nil } func openAFPacket(id, device, filter string, cfg *config.InterfaceConfig) (snifferHandle, error) { szFrame, szBlock, numBlocks, err := afpacketComputeSize(cfg.BufferSizeMb, cfg.Snaplen, os.Getpagesize()) if err != nil { return nil, err } timeout := 500 * time.Millisecond h, err := newAfpacketHandle(afPacketConfig{ ID: id, Device: device, FrameSize: szFrame, BlockSize: szBlock, NumBlocks: numBlocks, PollTimeout: timeout, MetricsInterval: cfg.MetricsInterval, FanoutGroupID: cfg.FanoutGroup, Promiscuous: cfg.EnableAutoPromiscMode, }) if err != nil { return nil, err } err = h.SetBPFFilter(filter) if err != nil { h.Close() return nil, err } return h, nil }