receiver/netflowreceiver/receiver.go (114 lines of code) (raw):
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package netflowreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/netflowreceiver"
import (
"context"
"errors"
"fmt"
"net"
"github.com/netsampler/goflow2/v2/decoders/netflow"
protoproducer "github.com/netsampler/goflow2/v2/producer/proto"
"github.com/netsampler/goflow2/v2/utils"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/receiver"
"go.uber.org/zap"
)
var _ utils.ReceiverCallback = (*dropHandler)(nil)
type dropHandler struct {
logger *zap.Logger
}
func (d dropHandler) Dropped(msg utils.Message) {
d.logger.Warn("Dropped netflow message", zap.Any("msg", msg))
}
type netflowReceiver struct {
config Config
logger *zap.Logger
udpReceiver *utils.UDPReceiver
logConsumer consumer.Logs
}
func newNetflowLogsReceiver(params receiver.Settings, cfg Config, consumer consumer.Logs) (receiver.Logs, error) {
// UDP receiver configuration
udpCfg := &utils.UDPReceiverConfig{
Sockets: cfg.Sockets,
Workers: cfg.Workers,
QueueSize: cfg.QueueSize,
Blocking: false,
ReceiverCallback: &dropHandler{
logger: params.Logger,
},
}
udpReceiver, err := utils.NewUDPReceiver(udpCfg)
if err != nil {
return nil, err
}
nr := &netflowReceiver{
logger: params.Logger,
config: cfg,
logConsumer: consumer,
udpReceiver: udpReceiver,
}
return nr, nil
}
func (nr *netflowReceiver) Start(_ context.Context, _ component.Host) error {
// The function that will decode packets
decodeFunc, err := nr.buildDecodeFunc()
if err != nil {
return err
}
nr.logger.Info("Starting UDP listener", zap.String("scheme", nr.config.Scheme), zap.Int("port", nr.config.Port))
if err := nr.udpReceiver.Start(nr.config.Hostname, nr.config.Port, decodeFunc); err != nil {
return err
}
// This runs until the receiver is stoppped, consuming from an error channel
go nr.handleErrors()
return nil
}
func (nr *netflowReceiver) Shutdown(context.Context) error {
if nr.udpReceiver == nil {
return nil
}
err := nr.udpReceiver.Stop()
if err != nil {
nr.logger.Warn("Error stopping UDP receiver", zap.Error(err))
}
return nil
}
// buildDecodeFunc creates a decode function based on the scheme
// This is the fuction that will be invoked for every netflow packet received
// The function depends on the type of schema (netflow, sflow, flow)
func (nr *netflowReceiver) buildDecodeFunc() (utils.DecoderFunc, error) {
// Eventually this can be used to configure mappings
cfgProducer := &protoproducer.ProducerConfig{}
cfgm, err := cfgProducer.Compile() // converts configuration into a format that can be used by a protobuf producer
if err != nil {
return nil, err
}
// We use a goflow2 proto producer to produce messages using protobuf format
protoProducer, err := protoproducer.CreateProtoProducer(cfgm, protoproducer.CreateSamplingSystem)
if err != nil {
return nil, err
}
// the otel log producer converts those messages into OpenTelemetry logs
// it is a wrapper around the protobuf producer
otelLogsProducer := newOtelLogsProducer(protoProducer, nr.logConsumer, nr.logger)
cfgPipe := &utils.PipeConfig{
Producer: otelLogsProducer,
}
var p utils.FlowPipe
switch nr.config.Scheme {
case "sflow":
p = utils.NewSFlowPipe(cfgPipe)
case "netflow":
p = utils.NewNetFlowPipe(cfgPipe)
default:
return nil, fmt.Errorf("scheme does not exist: %s", nr.config.Scheme)
}
return p.DecodeFlow, nil
}
// handleErrors handles errors from the listener
// We don't want the receiver to stop if there is an error processing a packet
func (nr *netflowReceiver) handleErrors() {
for err := range nr.udpReceiver.Errors() {
switch {
case errors.Is(err, net.ErrClosed):
nr.logger.Info("UDP receiver closed, exiting error handler")
return
case !errors.Is(err, netflow.ErrorTemplateNotFound):
nr.logger.Error("received a generic error while processing a flow message via GoFlow2 for the netflow receiver", zap.Error(err))
continue
case errors.Is(err, netflow.ErrorTemplateNotFound):
nr.logger.Warn("we could not find a template for a flow message, this error is expected from time to time until the device sends a template", zap.Error(err))
continue
default:
nr.logger.Error("unexpected error processing the message", zap.Error(err))
continue
}
}
}