receiver/statsdreceiver/internal/parser/statsd_parser.go (395 lines of code) (raw):

// Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 package parser // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/internal/parser" import ( "errors" "fmt" "net" "strconv" "strings" "time" "github.com/lightstep/go-expohisto/structure" "go.opentelemetry.io/collector/client" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" semconv "go.opentelemetry.io/collector/semconv/v1.22.0" "go.opentelemetry.io/otel/attribute" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver/protocol" ) var ( errEmptyMetricName = errors.New("empty metric name") errEmptyMetricValue = errors.New("empty metric value") ) type MetricType string // From the statsd line e.g., "c", "g", "h" const ( tagMetricType = "metric_type" CounterType MetricType = "c" GaugeType MetricType = "g" HistogramType MetricType = "h" TimingType MetricType = "ms" DistributionType MetricType = "d" receiverName = "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver" ) type ObserverCategory struct { method protocol.ObserverType histogramConfig structure.Config summaryPercentiles []float64 } var defaultObserverCategory = ObserverCategory{ method: protocol.DefaultObserverType, } // StatsDParser supports the Parse method for parsing StatsD messages with Tags. type StatsDParser struct { instrumentsByAddress map[netAddr]*instruments enableMetricType bool enableSimpleTags bool isMonotonicCounter bool enableIPOnlyAggregation bool timerEvents ObserverCategory histogramEvents ObserverCategory lastIntervalTime time.Time BuildInfo component.BuildInfo } type instruments struct { addr net.Addr gauges map[statsDMetricDescription]pmetric.ScopeMetrics counters map[statsDMetricDescription]pmetric.ScopeMetrics summaries map[statsDMetricDescription]summaryMetric histograms map[statsDMetricDescription]histogramMetric timersAndDistributions []pmetric.ScopeMetrics } func newInstruments(addr net.Addr) *instruments { return &instruments{ addr: addr, gauges: make(map[statsDMetricDescription]pmetric.ScopeMetrics), counters: make(map[statsDMetricDescription]pmetric.ScopeMetrics), summaries: make(map[statsDMetricDescription]summaryMetric), histograms: make(map[statsDMetricDescription]histogramMetric), } } type sampleValue struct { value float64 count float64 } type summaryMetric struct { points []float64 weights []float64 percentiles []float64 } type histogramStructure = structure.Histogram[float64] type histogramMetric struct { agg *histogramStructure } type statsDMetric struct { description statsDMetricDescription asFloat float64 addition bool unit string sampleRate float64 timestamp uint64 } type statsDMetricDescription struct { name string metricType MetricType attrs attribute.Set } func (t MetricType) FullName() protocol.TypeName { switch t { case GaugeType: return protocol.GaugeTypeName case CounterType: return protocol.CounterTypeName case TimingType: return protocol.TimingTypeName case HistogramType: return protocol.HistogramTypeName case DistributionType: return protocol.DistributionTypeName } return protocol.TypeName(fmt.Sprintf("unknown(%s)", t)) } func (p *StatsDParser) resetState(when time.Time) { p.lastIntervalTime = when p.instrumentsByAddress = make(map[netAddr]*instruments) } func (p *StatsDParser) Initialize(enableMetricType bool, enableSimpleTags bool, isMonotonicCounter bool, enableIPOnlyAggregation bool, sendTimerHistogram []protocol.TimerHistogramMapping) error { p.resetState(timeNowFunc()) p.histogramEvents = defaultObserverCategory p.timerEvents = defaultObserverCategory p.enableMetricType = enableMetricType p.enableSimpleTags = enableSimpleTags p.isMonotonicCounter = isMonotonicCounter p.enableIPOnlyAggregation = enableIPOnlyAggregation // Note: validation occurs in ("../".Config).validate() for _, eachMap := range sendTimerHistogram { switch eachMap.StatsdType { case protocol.HistogramTypeName, protocol.DistributionTypeName: p.histogramEvents.method = eachMap.ObserverType p.histogramEvents.histogramConfig = expoHistogramConfig(eachMap.Histogram) p.histogramEvents.summaryPercentiles = eachMap.Summary.Percentiles case protocol.TimingTypeName, protocol.TimingAltTypeName: p.timerEvents.method = eachMap.ObserverType p.timerEvents.histogramConfig = expoHistogramConfig(eachMap.Histogram) p.timerEvents.summaryPercentiles = eachMap.Summary.Percentiles case protocol.CounterTypeName, protocol.GaugeTypeName: } } return nil } func expoHistogramConfig(opts protocol.HistogramConfig) structure.Config { var r []structure.Option if opts.MaxSize >= structure.MinSize { r = append(r, structure.WithMaxSize(opts.MaxSize)) } return structure.NewConfig(r...) } // GetMetrics gets the metrics preparing for flushing and reset the state. func (p *StatsDParser) GetMetrics() []BatchMetrics { batchMetrics := make([]BatchMetrics, 0, len(p.instrumentsByAddress)) now := timeNowFunc() for _, instrument := range p.instrumentsByAddress { batch := BatchMetrics{ Info: client.Info{ Addr: instrument.addr, }, Metrics: pmetric.NewMetrics(), } rm := batch.Metrics.ResourceMetrics().AppendEmpty() for _, metric := range instrument.gauges { p.copyMetricAndScope(rm, metric) } for _, metric := range instrument.timersAndDistributions { p.copyMetricAndScope(rm, metric) } for _, metric := range instrument.counters { setTimestampsForCounterMetric(metric, p.lastIntervalTime, now) p.copyMetricAndScope(rm, metric) } for desc, summaryMetric := range instrument.summaries { ilm := rm.ScopeMetrics().AppendEmpty() p.setVersionAndNameScope(ilm.Scope()) percentiles := summaryMetric.percentiles if len(summaryMetric.percentiles) == 0 { percentiles = statsDDefaultPercentiles } buildSummaryMetric( desc, summaryMetric, p.lastIntervalTime, now, percentiles, ilm, ) } for desc, histogramMetric := range instrument.histograms { ilm := rm.ScopeMetrics().AppendEmpty() p.setVersionAndNameScope(ilm.Scope()) buildHistogramMetric( desc, histogramMetric, p.lastIntervalTime, now, ilm, ) } batchMetrics = append(batchMetrics, batch) } p.resetState(now) return batchMetrics } func (p *StatsDParser) copyMetricAndScope(rm pmetric.ResourceMetrics, metric pmetric.ScopeMetrics) { ilm := rm.ScopeMetrics().AppendEmpty() metric.CopyTo(ilm) p.setVersionAndNameScope(ilm.Scope()) } func (p *StatsDParser) setVersionAndNameScope(ilm pcommon.InstrumentationScope) { ilm.SetVersion(p.BuildInfo.Version) ilm.SetName(receiverName) } var timeNowFunc = time.Now func (p *StatsDParser) observerCategoryFor(t MetricType) ObserverCategory { switch t { case HistogramType, DistributionType: return p.histogramEvents case TimingType: return p.timerEvents case CounterType, GaugeType: } return defaultObserverCategory } // Aggregate for each metric line. func (p *StatsDParser) Aggregate(line string, addr net.Addr) error { parsedMetric, err := parseMessageToMetric(line, p.enableMetricType, p.enableSimpleTags) if err != nil { return err } addrKey := newNetAddr(addr) if p.enableIPOnlyAggregation { addrKey = newIPOnlyNetAddr(addr) } instrument, ok := p.instrumentsByAddress[addrKey] if !ok { instrument = newInstruments(addr) p.instrumentsByAddress[addrKey] = instrument } switch parsedMetric.description.metricType { case GaugeType: _, ok := instrument.gauges[parsedMetric.description] if !ok { instrument.gauges[parsedMetric.description] = buildGaugeMetric(parsedMetric, timeNowFunc()) } else { if parsedMetric.addition { point := instrument.gauges[parsedMetric.description].Metrics().At(0).Gauge().DataPoints().At(0) point.SetDoubleValue(point.DoubleValue() + parsedMetric.gaugeValue()) } else { instrument.gauges[parsedMetric.description] = buildGaugeMetric(parsedMetric, timeNowFunc()) } } case CounterType: _, ok := instrument.counters[parsedMetric.description] if !ok { instrument.counters[parsedMetric.description] = buildCounterMetric(parsedMetric, p.isMonotonicCounter) } else { point := instrument.counters[parsedMetric.description].Metrics().At(0).Sum().DataPoints().At(0) point.SetIntValue(point.IntValue() + parsedMetric.counterValue()) } case TimingType, HistogramType, DistributionType: category := p.observerCategoryFor(parsedMetric.description.metricType) switch category.method { case protocol.GaugeObserver: instrument.timersAndDistributions = append(instrument.timersAndDistributions, buildGaugeMetric(parsedMetric, timeNowFunc())) case protocol.SummaryObserver: raw := parsedMetric.sampleValue() if existing, ok := instrument.summaries[parsedMetric.description]; !ok { instrument.summaries[parsedMetric.description] = summaryMetric{ points: []float64{raw.value}, weights: []float64{raw.count}, percentiles: category.summaryPercentiles, } } else { instrument.summaries[parsedMetric.description] = summaryMetric{ points: append(existing.points, raw.value), weights: append(existing.weights, raw.count), percentiles: category.summaryPercentiles, } } case protocol.HistogramObserver: raw := parsedMetric.sampleValue() var agg *histogramStructure if existing, ok := instrument.histograms[parsedMetric.description]; ok { agg = existing.agg } else { agg = new(histogramStructure) agg.Init(category.histogramConfig) instrument.histograms[parsedMetric.description] = histogramMetric{ agg: agg, } } agg.UpdateByIncr( raw.value, uint64(raw.count), // Note! Rounding float64 to uint64 here. ) case protocol.DisableObserver: // No action. } } return nil } func parseMessageToMetric(line string, enableMetricType bool, enableSimpleTags bool) (statsDMetric, error) { result := statsDMetric{} nameValue, rest, foundName := strings.Cut(line, "|") if !foundName { return result, fmt.Errorf("invalid message format: %s", line) } name, valueStr, foundValue := strings.Cut(nameValue, ":") if !foundValue { return result, fmt.Errorf("invalid <name>:<value> format: %s", nameValue) } if name == "" { return result, errEmptyMetricName } result.description.name = name if valueStr == "" { return result, errEmptyMetricValue } if strings.HasPrefix(valueStr, "-") || strings.HasPrefix(valueStr, "+") { result.addition = true } metricType, additionalParts, _ := strings.Cut(rest, "|") inType := MetricType(metricType) switch inType { case CounterType, GaugeType, HistogramType, TimingType, DistributionType: result.description.metricType = inType default: return result, fmt.Errorf("unsupported metric type: %s", inType) } var kvs []attribute.KeyValue var part string part, additionalParts, _ = strings.Cut(additionalParts, "|") for ; len(part) > 0; part, additionalParts, _ = strings.Cut(additionalParts, "|") { switch { case strings.HasPrefix(part, "@"): sampleRateStr := strings.TrimPrefix(part, "@") f, err := strconv.ParseFloat(sampleRateStr, 64) if err != nil { return result, fmt.Errorf("parse sample rate: %s", sampleRateStr) } result.sampleRate = f case strings.HasPrefix(part, "#"): tagsStr := strings.TrimPrefix(part, "#") // handle an empty tag set // where the tags part was still sent (some clients do this) if len(tagsStr) == 0 { continue } var tagSet string tagSet, tagsStr, _ = strings.Cut(tagsStr, ",") for ; len(tagSet) > 0; tagSet, tagsStr, _ = strings.Cut(tagsStr, ",") { k, v, _ := strings.Cut(tagSet, ":") if k == "" { return result, fmt.Errorf("invalid tag format: %q", tagSet) } // support both simple tags (w/o value) and dimension tags (w/ value). // dogstatsd notably allows simple tags. if v == "" && !enableSimpleTags { return result, fmt.Errorf("invalid tag format: %q", tagSet) } kvs = append(kvs, attribute.String(k, v)) } case strings.HasPrefix(part, "c:"): // As per DogStatD protocol v1.2: // https://docs.datadoghq.com/developers/dogstatsd/datagram_shell/?tab=metrics#dogstatsd-protocol-v12 containerID := strings.TrimPrefix(part, "c:") if containerID != "" { kvs = append(kvs, attribute.String(semconv.AttributeContainerID, containerID)) } case strings.HasPrefix(part, "T"): // As per DogStatD protocol v1.3: // https://docs.datadoghq.com/developers/dogstatsd/datagram_shell/?tab=metrics#dogstatsd-protocol-v13 if inType != CounterType && inType != GaugeType { return result, errors.New("only GAUGE and COUNT metrics support a timestamp") } timestampStr := strings.TrimPrefix(part, "T") timestampSeconds, err := strconv.ParseUint(timestampStr, 10, 64) if err != nil { return result, fmt.Errorf("invalid timestamp: %s", timestampStr) } result.timestamp = timestampSeconds * 1e9 // Convert seconds to nanoseconds default: return result, fmt.Errorf("unrecognized message part: %s", part) } } var err error result.asFloat, err = strconv.ParseFloat(valueStr, 64) if err != nil { return result, fmt.Errorf("parse metric value string: %s", valueStr) } // add metric_type dimension for all metrics if enableMetricType { metricType := string(result.description.metricType.FullName()) kvs = append(kvs, attribute.String(tagMetricType, metricType)) } if len(kvs) != 0 { result.description.attrs = attribute.NewSet(kvs...) } return result, nil } type netAddr struct { Network string String string } func newNetAddr(addr net.Addr) netAddr { return netAddr{addr.Network(), addr.String()} } func newIPOnlyNetAddr(addr net.Addr) netAddr { host, _, err := net.SplitHostPort(addr.String()) if err != nil { // if there is an error, use the original address return netAddr{addr.Network(), addr.String()} } return netAddr{addr.Network(), host} }