packetbeat/protos/udp/udp.go (152 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package udp
import (
"fmt"
"sort"
"strconv"
"strings"
"time"
"github.com/rcrowley/go-metrics"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/monitoring/inputmon"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/monitoring"
"github.com/elastic/elastic-agent-libs/monitoring/adapter"
"github.com/elastic/beats/v7/packetbeat/flows"
"github.com/elastic/beats/v7/packetbeat/protos"
)
type Processor interface {
Process(id *flows.FlowID, pkt *protos.Packet)
}
type UDP struct {
protocols protos.Protocols
portMap map[uint16]protos.Protocol
metrics *inputMetrics
}
// NewUDP creates and returns a new UDP.
func NewUDP(p protos.Protocols, id, device string, idx int) (*UDP, error) {
portMap, err := buildPortsMap(p.GetAllUDP())
if err != nil {
return nil, err
}
udp := &UDP{
protocols: p,
portMap: portMap,
metrics: newInputMetrics(fmt.Sprintf("%s_%d", id, idx), device, portMap),
}
logp.Debug("udp", "Port map: %v", portMap)
return udp, nil
}
// buildPortsMap creates a mapping of port numbers to protocol identifiers. If
// any two UdpProtocolPlugins operate on the same port number then an error
// will be returned.
func buildPortsMap(plugins map[protos.Protocol]protos.UDPPlugin) (map[uint16]protos.Protocol, error) {
res := map[uint16]protos.Protocol{}
for proto, protoPlugin := range plugins {
for _, port := range protoPlugin.GetPorts() {
oldProto, exists := res[uint16(port)]
if exists {
if oldProto == proto {
continue
}
return nil, fmt.Errorf("duplicate port (%d) exists in %s and %s protocols",
port, oldProto, proto)
}
res[uint16(port)] = proto
}
}
return res, nil
}
// Process handles UDP packets that have been received. It attempts to
// determine the protocol type and then invokes the associated
// UdpProtocolPlugin's ParseUDP method. If the protocol cannot be determined
// or the payload is empty then the method is a noop.
func (udp *UDP) Process(id *flows.FlowID, pkt *protos.Packet) {
protocol := udp.decideProtocol(&pkt.Tuple)
if protocol == protos.UnknownProtocol {
logp.Debug("udp", "unknown protocol")
return
}
plugin := udp.protocols.GetUDP(protocol)
if plugin == nil {
logp.Debug("udp", "Ignoring protocol for which we have no module loaded: %s", protocol)
return
}
if len(pkt.Payload) > 0 {
logp.Debug("udp", "Parsing packet from %v of length %d.",
pkt.Tuple.String(), len(pkt.Payload))
plugin.ParseUDP(pkt)
udp.metrics.log(pkt)
}
}
// decideProtocol determines the protocol based on the source and destination
// ports. If the protocol cannot be determined then protos.UnknownProtocol
// is returned.
func (udp *UDP) decideProtocol(tuple *common.IPPortTuple) protos.Protocol {
protocol, exists := udp.portMap[tuple.SrcPort]
if exists {
return protocol
}
protocol, exists = udp.portMap[tuple.DstPort]
if exists {
return protocol
}
return protos.UnknownProtocol
}
func (udp *UDP) Close() {
if udp.metrics == nil {
return
}
udp.metrics.close()
}
// inputMetrics handles the input's metric reporting.
type inputMetrics struct {
unregister func()
lastPacket time.Time
device *monitoring.String // name of the device being monitored
packets *monitoring.Uint // number of packets processed
bytes *monitoring.Uint // number of bytes processed
arrivalPeriod metrics.Sample // histogram of the elapsed time between packet arrivals
processingTime metrics.Sample // histogram of the elapsed time between packet receipt and publication
}
// newInputMetrics returns an input metric for the UDP processor. If id or
// device is empty a nil inputMetric is returned.
func newInputMetrics(id, device string, ports map[uint16]protos.Protocol) *inputMetrics {
if id == "" || device == "" {
// An empty id signals to not record metrics,
// while an empty device means we are reading
// from a pcap file and no metrics are needed.
return nil
}
devID := fmt.Sprintf("%s-udp%s::%s", id, portList(ports), device)
reg, unreg := inputmon.NewInputRegistry("udp", devID, nil)
out := &inputMetrics{
unregister: unreg,
device: monitoring.NewString(reg, "device"),
packets: monitoring.NewUint(reg, "received_events_total"),
bytes: monitoring.NewUint(reg, "received_bytes_total"),
arrivalPeriod: metrics.NewUniformSample(1024),
processingTime: metrics.NewUniformSample(1024),
}
_ = adapter.NewGoMetrics(reg, "arrival_period", adapter.Accept).
Register("histogram", metrics.NewHistogram(out.arrivalPeriod))
_ = adapter.NewGoMetrics(reg, "processing_time", adapter.Accept).
Register("histogram", metrics.NewHistogram(out.processingTime))
out.device.Set(device)
return out
}
// portList returns a dash-separated list of port numbers sorted ascending. A leading
// dash is prepended to the list if it is not empty.
func portList(m map[uint16]protos.Protocol) string {
if len(m) == 0 {
return ""
}
ports := make([]int, 0, len(m))
for p := range m {
ports = append(ports, int(p))
}
sort.Ints(ports)
s := make([]string, len(ports)+1)
for i, p := range ports {
s[i+1] = strconv.FormatInt(int64(p), 10)
}
return strings.Join(s, "-")
}
// log logs metric for the given packet.
func (m *inputMetrics) log(pkt *protos.Packet) {
if m == nil {
return
}
m.processingTime.Update(time.Since(pkt.Ts).Nanoseconds())
m.packets.Add(1)
m.bytes.Add(uint64(len(pkt.Payload)))
if !m.lastPacket.IsZero() {
m.arrivalPeriod.Update(pkt.Ts.Sub(m.lastPacket).Nanoseconds())
}
m.lastPacket = pkt.Ts
}
func (m *inputMetrics) close() {
if m == nil {
return
}
m.unregister()
}