packetbeat/decoder/decoder.go (381 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package decoder
import (
"fmt"
"sort"
"time"
"github.com/google/gopacket"
"github.com/google/gopacket/layers"
"github.com/elastic/beats/v7/packetbeat/flows"
"github.com/elastic/beats/v7/packetbeat/protos"
"github.com/elastic/beats/v7/packetbeat/protos/icmp"
"github.com/elastic/beats/v7/packetbeat/protos/tcp"
"github.com/elastic/beats/v7/packetbeat/protos/udp"
"github.com/elastic/elastic-agent-libs/logp"
)
type Decoder struct {
decoders map[gopacket.LayerType]gopacket.DecodingLayer
linkLayerDecoder gopacket.DecodingLayer
linkLayerType gopacket.LayerType
sll layers.LinuxSLL
lo layers.Loopback
eth layers.Ethernet
d1q [2]layers.Dot1Q
ip4 [2]layers.IPv4
ip6 [2]layers.IPv6
icmp4 layers.ICMPv4
icmp6 layers.ICMPv6
tcp layers.TCP
udp layers.UDP
truncated bool
fragments fragmentCache
stD1Q, stIP4, stIP6 multiLayer
icmp4Proc icmp.ICMPv4Processor
icmp6Proc icmp.ICMPv6Processor
tcpProc tcp.Processor
udpProc udp.Processor
flows *flows.Flows
statPackets *flows.Uint
statBytes *flows.Uint
icmpV4TypeCode *flows.Uint
icmpV6TypeCode *flows.Uint
// hold current flow ID
flowID *flows.FlowID // buffer flowID among many calls
flowIDBufferBacking [flows.SizeFlowIDMax]byte
logger *logp.Logger
}
const (
netPacketsTotalCounter = "packets"
netBytesTotalCounter = "bytes"
icmpV4TypeCodeValue = "icmpV4TypeCode"
icmpV6TypeCodeValue = "icmpV6TypeCode"
)
// New creates and initializes a new packet decoder.
func New(f *flows.Flows, datalink layers.LinkType, icmp4 icmp.ICMPv4Processor, icmp6 icmp.ICMPv6Processor, tcp tcp.Processor, udp udp.Processor) (*Decoder, error) {
d := Decoder{
flows: f,
decoders: make(map[gopacket.LayerType]gopacket.DecodingLayer),
icmp4Proc: icmp4, icmp6Proc: icmp6, tcpProc: tcp, udpProc: udp,
fragments: fragmentCache{collected: make(map[uint16]fragments)},
logger: logp.NewLogger("decoder"),
}
d.stD1Q.init(&d.d1q[0], &d.d1q[1])
d.stIP4.init(&d.ip4[0], &d.ip4[1])
d.stIP6.init(&d.ip6[0], &d.ip6[1])
if f != nil {
var err error
d.statPackets, err = f.NewUint(netPacketsTotalCounter)
if err != nil {
return nil, err
}
d.statBytes, err = f.NewUint(netBytesTotalCounter)
if err != nil {
return nil, err
}
d.icmpV4TypeCode, err = f.NewUint(icmpV4TypeCodeValue)
if err != nil {
return nil, err
}
d.icmpV6TypeCode, err = f.NewUint(icmpV6TypeCodeValue)
if err != nil {
return nil, err
}
d.flowID = &flows.FlowID{}
}
d.AddLayers([]gopacket.DecodingLayer{
&d.sll, // LinuxSLL
&d.eth, // Ethernet
&d.lo, // loopback on OS X
&d.stD1Q, // VLAN
&d.stIP4, &d.stIP6, // IP
&d.icmp4, &d.icmp6, // ICMP
&d.tcp, &d.udp, // TCP/UDP
})
d.logger.Debugf("Layer type: %s", datalink)
switch datalink {
case layers.LinkTypeLinuxSLL:
d.linkLayerDecoder = &d.sll
d.linkLayerType = layers.LayerTypeLinuxSLL
case layers.LinkTypeEthernet:
d.linkLayerDecoder = &d.eth
d.linkLayerType = layers.LayerTypeEthernet
case layers.LinkTypeNull: // loopback on OSx
d.linkLayerDecoder = &d.lo
d.linkLayerType = layers.LayerTypeLoopback
default:
return nil, fmt.Errorf("unsupported link type: %s", datalink)
}
return &d, nil
}
func (d *Decoder) SetTruncated() {
d.truncated = true
}
func (d *Decoder) AddLayer(layer gopacket.DecodingLayer) {
for _, typ := range layer.CanDecode().LayerTypes() {
d.decoders[typ] = layer
}
}
func (d *Decoder) AddLayers(layers []gopacket.DecodingLayer) {
for _, layer := range layers {
d.AddLayer(layer)
}
}
func (d *Decoder) OnPacket(data []byte, ci *gopacket.CaptureInfo) {
d.truncated = false
current := d.linkLayerDecoder
currentType := d.linkLayerType
packet := protos.Packet{Ts: ci.Timestamp}
d.logger.Debug("decode packet data")
if d.flowID != nil {
d.flowID.Reset(d.flowIDBufferBacking[:0])
// suppress flow stats snapshots while processing packet
d.flows.Lock()
defer d.flows.Unlock()
}
d.stD1Q.i = 0
d.stIP4.i = 0
d.stIP6.i = 0
for len(data) != 0 {
err := current.DecodeFromBytes(data, d)
if err != nil {
d.logger.Infof("packet decode failed with: %v", err)
break
}
nextType := current.NextLayerType()
data = current.LayerPayload()
if nextType == gopacket.LayerTypeFragment {
ipv4, ok := ipv4Layer(current)
if !ok {
// This should never happen. Log the issue and attempt to continue.
// The process logic below will handle this if it can.
d.logger.Warn("no IPv4 layer for fragment")
} else {
now := time.Now()
const offsetMask = 1<<13 - 1 // https://datatracker.ietf.org/doc/html/rfc791#section-3.1
f := fragment{
id: ipv4.Id,
offset: int(ipv4.FragOffset&offsetMask) * 8,
data: append(data[:0:0], data...), // Ensure that we are not aliasing data.
more: ipv4.Flags&layers.IPv4MoreFragments != 0,
expire: now.Add(time.Duration(ipv4.TTL) * time.Second),
}
var more bool
data, more, err = d.fragments.add(now, f)
if err != nil {
d.logger.Warnf("%v src=%s dst=%s", err, ipv4.SrcIP, ipv4.DstIP)
return
}
if more {
return
}
d.process(&packet, currentType)
currentType = ipv4.Protocol.LayerType()
current, ok = d.decoders[currentType]
if !ok {
d.logger.Debugf("no layer decoder for reconstructed fragments: %v (%[1]d)", currentType)
break
}
continue
}
}
done := d.process(&packet, currentType)
if done {
d.logger.Debugf("processed")
break
}
// choose next decoding layer
next, ok := d.decoders[nextType]
if !ok {
d.logger.Debugf("no next type: %v (%[1]d)", nextType)
break
}
// jump to next layer
current = next
currentType = nextType
}
// add flow s.tats
if d.flowID != nil {
d.logger.Debugf("flow id flags: %v", d.flowID.Flags())
}
if d.flowID != nil && d.flowID.Flags() != 0 {
flow := d.flows.Get(d.flowID)
d.statPackets.Add(flow, 1)
d.statBytes.Add(flow, uint64(ci.Length))
}
}
// fragmentCache is a TTL aware cache of IPv4 fragments to reassemble.
type fragmentCache struct {
// oldest is the expiry time of the oldest fragment.
oldest time.Time
// collected is the collections of fragments keyed on their
// IPv4 packet ID field.
collected map[uint16]fragments
}
// maxReassemble is the maximum size that a collection of fragmented
// packets will be reassembled to.
const maxReassemble = 1e5
// add adds a new fragment to the cache. The value of now is used to expire fragments
// and collections of fragments. If the fragment completes a set of fragments for
// reassembly, the payload of the reassembled packet is returned in data.
// If more packets are required to complete the reassembly of the packets in the
// fragments ID set, more is returned true. Expiries and oversize reassemblies are
// signaled via the returned error.
// The cache is purged of expired collections before add returns.
func (c *fragmentCache) add(now time.Time, f fragment) (data []byte, more bool, err error) {
defer c.purge(now)
collected, ok := c.collected[f.id]
if ok && !collected.expire.IsZero() && now.After(collected.expire) {
delete(c.collected, f.id)
return nil, false, fmt.Errorf("fragments expired before reassembly ID=%d", f.id)
}
if c.oldest.After(f.expire) {
c.oldest = f.expire
}
if collected.expire.IsZero() || collected.expire.After(f.expire) {
collected.expire = f.expire
}
collected.fragments = append(collected.fragments, f)
// Check whether we have all the fragments we need to do a reassembly.
// Do the least amount of work possible
if !f.more {
collected.haveFinal = true
}
more = !collected.haveFinal
if collected.haveFinal {
sort.Slice(collected.fragments, func(i, j int) bool {
return collected.fragments[i].offset < collected.fragments[j].offset
})
more = collected.fragments[0].offset != 0
if !more {
n := len(collected.fragments[0].data)
for _, f := range collected.fragments[1:] {
if f.offset != n {
more = true
break
}
n += len(f.data)
}
}
}
if more {
c.collected[f.id] = collected
return nil, true, nil
}
// Drop the fragments and do the reassembly.
delete(c.collected, f.id)
data = collected.fragments[0].data
for _, f := range collected.fragments[1:] {
if len(data)+len(f.data) > maxReassemble {
return nil, false, fmt.Errorf("packet reconstruction would exceed limit ID=%d", f.id)
}
data = append(data, f.data...)
}
return data, false, nil
}
// purge performs a cache expiry purge, removing all collected fragments
// that expired before now.
func (c *fragmentCache) purge(now time.Time) {
if c.oldest.After(now) {
return
}
c.oldest = now
for id, coll := range c.collected {
if now.After(coll.expire) {
delete(c.collected, id)
continue
}
if c.oldest.After(coll.expire) {
c.oldest = coll.expire
}
}
}
// fragments holds a collection of fragmented packets sharing an IPv4 packet ID.
type fragments struct {
expire time.Time
fragments []fragment
haveFinal bool
}
// fragment is an IPv4 packet fragment.
type fragment struct {
id uint16
offset int
data []byte
more bool
expire time.Time
}
func (d *Decoder) process(packet *protos.Packet, layerType gopacket.LayerType) (done bool) {
withFlow := d.flowID != nil
switch layerType {
case layers.LayerTypeEthernet:
if withFlow {
d.flowID.AddEth(d.eth.SrcMAC, d.eth.DstMAC)
}
case layers.LayerTypeDot1Q:
d1q := &d.d1q[d.stD1Q.i]
d.stD1Q.next()
if withFlow {
d.flowID.AddVLan(d1q.VLANIdentifier)
}
case layers.LayerTypeIPv4:
d.logger.Debugf("IPv4 packet")
ip4 := &d.ip4[d.stIP4.i]
d.stIP4.next()
if withFlow {
d.flowID.AddIPv4(ip4.SrcIP, ip4.DstIP)
}
packet.Tuple.SrcIP = ip4.SrcIP
packet.Tuple.DstIP = ip4.DstIP
packet.Tuple.IPLength = 4
case layers.LayerTypeIPv6:
d.logger.Debugf("IPv6 packet")
ip6 := &d.ip6[d.stIP6.i]
d.stIP6.next()
if withFlow {
d.flowID.AddIPv6(ip6.SrcIP, ip6.DstIP)
}
packet.Tuple.SrcIP = ip6.SrcIP
packet.Tuple.DstIP = ip6.DstIP
packet.Tuple.IPLength = 16
case layers.LayerTypeICMPv4:
d.logger.Debugf("ICMPv4 packet")
d.onICMPv4(packet)
return true
case layers.LayerTypeICMPv6:
d.logger.Debugf("ICMPv6 packet")
d.onICMPv6(packet)
return true
case layers.LayerTypeUDP:
d.logger.Debugf("UDP packet")
d.onUDP(packet)
return true
case layers.LayerTypeTCP:
d.logger.Debugf("TCP packet")
d.onTCP(packet)
return true
}
return false
}
func (d *Decoder) onICMPv4(packet *protos.Packet) {
if d.flowID != nil {
flow := d.flows.Get(d.flowID)
d.icmpV4TypeCode.Set(flow, uint64(d.icmp4.TypeCode))
}
if d.icmp4Proc != nil {
packet.Payload = d.icmp4.Payload
packet.Tuple.ComputeHashables()
d.icmp4Proc.ProcessICMPv4(d.flowID, &d.icmp4, packet)
}
}
func (d *Decoder) onICMPv6(packet *protos.Packet) {
if d.flowID != nil {
flow := d.flows.Get(d.flowID)
d.icmpV6TypeCode.Set(flow, uint64(d.icmp6.TypeCode))
}
if d.icmp6Proc != nil {
// google/gopacket treats the first four bytes
// after the typo, code and checksum as part of
// the payload. So drop those bytes.
// See https://github.com/google/gopacket/pull/423/
d.icmp6.Payload = d.icmp6.Payload[4:]
packet.Payload = d.icmp6.Payload
packet.Tuple.ComputeHashables()
d.icmp6Proc.ProcessICMPv6(d.flowID, &d.icmp6, packet)
}
}
func (d *Decoder) onUDP(packet *protos.Packet) {
src := uint16(d.udp.SrcPort)
dst := uint16(d.udp.DstPort)
id := d.flowID
if id != nil {
d.flowID.AddUDP(src, dst)
}
packet.Tuple.SrcPort = src
packet.Tuple.DstPort = dst
packet.Payload = d.udp.Payload
packet.Tuple.ComputeHashables()
d.udpProc.Process(id, packet)
}
func (d *Decoder) onTCP(packet *protos.Packet) {
src := uint16(d.tcp.SrcPort)
dst := uint16(d.tcp.DstPort)
id := d.flowID
if id != nil {
id.AddTCP(src, dst)
}
packet.Tuple.SrcPort = src
packet.Tuple.DstPort = dst
packet.Payload = d.tcp.Payload
if id == nil && len(packet.Payload) == 0 && !d.tcp.FIN {
// We have no use for this atm.
d.logger.Debugf("Ignore empty non-FIN packet")
return
}
packet.Tuple.ComputeHashables()
d.tcpProc.Process(id, &d.tcp, packet)
}