packetbeat/sniffer/afpacket_linux.go (202 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//go:build linux
package sniffer
import (
"fmt"
"syscall"
"time"
"unsafe"
"github.com/google/gopacket"
"github.com/google/gopacket/afpacket"
"github.com/google/gopacket/layers"
"github.com/google/gopacket/pcap"
"golang.org/x/net/bpf"
"github.com/elastic/beats/v7/libbeat/monitoring/inputmon"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/monitoring"
)
type afpacketHandle struct {
TPacket *afpacket.TPacket
frameSize int
promiscPreviousState bool
promiscPreviousStateDetected bool
device string
log *logp.Logger
metrics *metrics
}
func newAfpacketHandle(c afPacketConfig) (*afpacketHandle, error) {
var err error
var promiscEnabled bool
log := logp.NewLogger("sniffer")
if c.Promiscuous {
promiscEnabled, err = isPromiscEnabled(c.Device)
if err != nil {
log.Errorf("Failed to get promiscuous mode for device '%s': %v", c.Device, err)
}
if !promiscEnabled {
if setPromiscErr := setPromiscMode(c.Device, true); setPromiscErr != nil {
log.Warnf("Failed to set promiscuous mode for device '%s'. "+
"Packetbeat may be unable to see any network traffic. Please follow packetbeat "+
"FAQ to learn about mitigation: Error: %v", c.Device, err)
}
}
}
h := &afpacketHandle{
promiscPreviousState: promiscEnabled,
frameSize: c.FrameSize,
device: c.Device,
promiscPreviousStateDetected: c.Promiscuous && err == nil,
log: log,
}
if c.Device == "any" {
h.TPacket, err = afpacket.NewTPacket(
afpacket.OptFrameSize(c.FrameSize),
afpacket.OptBlockSize(c.BlockSize),
afpacket.OptNumBlocks(c.NumBlocks),
afpacket.OptPollTimeout(c.PollTimeout))
} else {
h.TPacket, err = afpacket.NewTPacket(
afpacket.OptInterface(c.Device),
afpacket.OptFrameSize(c.FrameSize),
afpacket.OptBlockSize(c.BlockSize),
afpacket.OptNumBlocks(c.NumBlocks),
afpacket.OptPollTimeout(c.PollTimeout))
}
if err != nil {
return nil, fmt.Errorf("failed creating af_packet socket: %w", err)
}
h.metrics = newMetrics(c.ID, c.Device, c.MetricsInterval, h.TPacket, log)
if c.FanoutGroupID != nil {
if err = h.TPacket.SetFanout(afpacket.FanoutHashWithDefrag, *c.FanoutGroupID); err != nil {
return nil, fmt.Errorf("failed setting af_packet fanout group: %w", err)
}
log.Infof("Joined af_packet fanout group %v", *c.FanoutGroupID)
}
return h, nil
}
func (h *afpacketHandle) ReadPacketData() (data []byte, ci gopacket.CaptureInfo, err error) {
return h.TPacket.ReadPacketData()
}
func (h *afpacketHandle) SetBPFFilter(expr string) error {
prog, err := pcap.CompileBPFFilter(layers.LinkTypeEthernet, h.frameSize, expr)
if err != nil {
return err
}
p := make([]bpf.RawInstruction, len(prog))
for i, ins := range prog {
p[i] = bpf.RawInstruction{
Op: ins.Code,
Jt: ins.Jt,
Jf: ins.Jf,
K: ins.K,
}
}
return h.TPacket.SetBPF(p)
}
func (h *afpacketHandle) LinkType() layers.LinkType {
return layers.LinkTypeEthernet
}
func (h *afpacketHandle) Close() {
h.metrics.close()
h.TPacket.Close()
// previous state detected only if auto mode was on
if h.promiscPreviousStateDetected {
if err := setPromiscMode(h.device, h.promiscPreviousState); err != nil {
h.log.Warnf("Failed to reset promiscuous mode for device '%s'. Your device might be in promiscuous mode.: %v", h.device, err)
}
}
}
func isPromiscEnabled(device string) (bool, error) {
if device == "any" {
return false, nil
}
s, e := syscall.Socket(syscall.AF_INET, syscall.SOCK_DGRAM, 0)
if e != nil {
return false, e
}
defer syscall.Close(s)
var ifreq struct {
name [syscall.IFNAMSIZ]byte
flags uint16
}
copy(ifreq.name[:], device)
_, _, ep := syscall.Syscall(syscall.SYS_IOCTL, uintptr(s), syscall.SIOCGIFFLAGS, uintptr(unsafe.Pointer(&ifreq)))
if ep != 0 {
return false, fmt.Errorf("ioctl command SIOCGIFFLAGS failed to get device flags for %v: return code %d", device, ep)
}
return ifreq.flags&uint16(syscall.IFF_PROMISC) != 0, nil
}
// setPromiscMode enables promisc mode if configured. This is a no-op when device is 'any'.
func setPromiscMode(device string, enabled bool) error {
if device == "any" {
logp.L().Named("sniffer").Warn("Cannot set promiscuous mode for device 'any'")
return nil
}
// SetLsfPromisc is marked as deprecated but used to improve readability (bpf)
// and avoid Cgo (pcap)
// TODO: replace with x/net/bpf or pcap
return syscall.SetLsfPromisc(device, enabled)
}
// isAfpacketErrTimeout returns whether err is afpacket.ErrTimeout.
func isAfpacketErrTimeout(err error) bool {
return err == afpacket.ErrTimeout
}
type metrics struct {
unregister func()
done chan struct{} // used to signal to polling goroutine to stop
device *monitoring.String // name of the device being monitored
socketPackets *monitoring.Uint // number of packets delivered by kernel
socketDrops *monitoring.Uint // number of packets dropped by kernel (i.e., buffer full)
socketQueueFreezes *monitoring.Uint // number of queue freezes
packets *monitoring.Uint // number of packets read off buffer by packetbeat
polls *monitoring.Uint // number of blocking syscalls made by packetbeat waiting for packets
}
func (m *metrics) close() {
if m == nil {
return
}
m.unregister()
if m.done != nil {
close(m.done)
m.done = nil
}
}
func newMetrics(id, device string, interval time.Duration, handle *afpacket.TPacket, log *logp.Logger) *metrics {
devID := fmt.Sprintf("%s-af_packet::%s", id, device)
reg, unreg := inputmon.NewInputRegistry("af_packet", devID, nil)
out := &metrics{
unregister: unreg,
device: monitoring.NewString(reg, "device"),
socketPackets: monitoring.NewUint(reg, "socket_packets"),
socketDrops: monitoring.NewUint(reg, "socket_drops"),
socketQueueFreezes: monitoring.NewUint(reg, "socket_queue_freezes"),
packets: monitoring.NewUint(reg, "packets"),
polls: monitoring.NewUint(reg, "polls"),
done: make(chan struct{}),
}
out.device.Set(device)
go func() {
log.Debug("Starting stats collection goroutine, collection interval: %v", interval)
ticker := time.NewTicker(interval)
defer ticker.Stop()
if err := handle.InitSocketStats(); err != nil {
log.Errorw("Failed to init socket stats", "error", err)
}
for {
select {
case <-out.done:
log.Debug("Shutting down stats collection goroutine")
return
case <-ticker.C:
_, sockStats, err := handle.SocketStats()
if err != nil {
log.Debugw("Error getting socket stats", "error", err)
} else {
out.socketPackets.Set(uint64(sockStats.Packets()))
out.socketDrops.Set(uint64(sockStats.Drops()))
out.socketQueueFreezes.Set(uint64(sockStats.QueueFreezes()))
}
stats, err := handle.Stats()
if err != nil {
log.Debugw("Error getting packetbeat stats", "error", err)
} else {
out.packets.Set(uint64(stats.Packets))
out.polls.Set(uint64(stats.Polls))
}
}
}
}()
return out
}