receiver/statsdreceiver/receiver.go (158 lines of code) (raw):

// Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 package statsdreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver" import ( "context" "errors" "fmt" "net" "strings" "time" "go.opentelemetry.io/collector/client" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componentstatus" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/collector/receiver/receiverhelper" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/metadata" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/parser" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/transport" ) var _ receiver.Metrics = (*statsdReceiver)(nil) // statsdReceiver implements the receiver.Metrics for StatsD protocol. type statsdReceiver struct { settings receiver.Settings config *Config server transport.Server reporter *reporter obsrecv *receiverhelper.ObsReport parser parser.Parser nextConsumer consumer.Metrics cancel context.CancelFunc } // newReceiver creates the StatsD receiver with the given parameters. func newReceiver( set receiver.Settings, config Config, nextConsumer consumer.Metrics, ) (receiver.Metrics, error) { trans := transport.NewTransport(strings.ToLower(string(config.NetAddr.Transport))) if config.NetAddr.Endpoint == "" { if trans == transport.UDS { config.NetAddr.Endpoint = "/var/run/statsd-receiver.sock" } else { config.NetAddr.Endpoint = "localhost:8125" } } rep, err := newReporter(set) if err != nil { return nil, err } obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ LongLivedCtx: true, ReceiverID: set.ID, ReceiverCreateSettings: set, Transport: trans.String(), }) if err != nil { return nil, err } r := &statsdReceiver{ settings: set, config: &config, nextConsumer: nextConsumer, obsrecv: obsrecv, reporter: rep, parser: &parser.StatsDParser{ BuildInfo: set.BuildInfo, }, } return r, nil } func buildTransportServer(config Config) (transport.Server, error) { trans := transport.NewTransport(strings.ToLower(string(config.NetAddr.Transport))) switch trans { case transport.UDP, transport.UDP4, transport.UDP6: return transport.NewUDPServer(trans, config.NetAddr.Endpoint) case transport.TCP, transport.TCP4, transport.TCP6: return transport.NewTCPServer(trans, config.NetAddr.Endpoint) case transport.UDS: return transport.NewUDSServer(trans, config.NetAddr.Endpoint) } return nil, fmt.Errorf("unsupported transport %q", string(config.NetAddr.Transport)) } // Start starts a UDP server that can process StatsD messages. func (r *statsdReceiver) Start(ctx context.Context, host component.Host) error { ctx, r.cancel = context.WithCancel(ctx) server, err := buildTransportServer(*r.config) if err != nil { return err } r.server = server transferChan := make(chan transport.Metric, 10) ticker := time.NewTicker(r.config.AggregationInterval) err = r.parser.Initialize( r.config.EnableMetricType, r.config.EnableSimpleTags, r.config.IsMonotonicCounter, r.config.EnableIPOnlyAggregation, r.config.TimerHistogramMapping, ) if err != nil { return err } go func() { if err := r.server.ListenAndServe(r.nextConsumer, r.reporter, transferChan); err != nil { if !errors.Is(err, net.ErrClosed) { componentstatus.ReportStatus(host, componentstatus.NewFatalErrorEvent(err)) } } }() go func() { var failCnt, successCnt int64 for { select { case <-ticker.C: batchMetrics := r.parser.GetMetrics() for _, batch := range batchMetrics { batchCtx := client.NewContext(ctx, batch.Info) numPoints := batch.Metrics.DataPointCount() flushCtx := r.obsrecv.StartMetricsOp(batchCtx) err := r.Flush(flushCtx, batch.Metrics, r.nextConsumer) if err != nil { r.reporter.OnDebugf("Error flushing metrics", zap.Error(err)) } r.obsrecv.EndMetricsOp(flushCtx, metadata.Type.String(), numPoints, err) } case metric := <-transferChan: err := r.parser.Aggregate(metric.Raw, metric.Addr) if err != nil { failCnt++ if failCnt%100 == 0 { r.reporter.RecordParseFailure() failCnt = 0 } r.reporter.OnDebugf("Error aggregating pmetric", zap.Error(err)) } else { successCnt++ // Record every 100 to reduce overhead if successCnt%100 == 0 { r.reporter.RecordParseSuccess(successCnt) successCnt = 0 } } case <-ctx.Done(): ticker.Stop() return } } }() return nil } // Shutdown stops the StatsD receiver. func (r *statsdReceiver) Shutdown(context.Context) error { if r.cancel == nil || r.server == nil { return nil } err := r.server.Close() r.cancel() return err } func (r *statsdReceiver) Flush(ctx context.Context, metrics pmetric.Metrics, nextConsumer consumer.Metrics) error { return nextConsumer.ConsumeMetrics(ctx, metrics) }