input/otlp/metrics.go (355 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. // Portions copied from OpenTelemetry Collector (contrib), from the // elastic exporter. // // Copyright 2020, OpenTelemetry Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package otlp import ( "context" "math" "strings" "sync/atomic" "time" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" "github.com/elastic/apm-data/model/modelpb" ) // ConsumeMetricsResult contains the number of rejected data points and error message for partial success response. type ConsumeMetricsResult struct { ErrorMessage string RejectedDataPoints int64 } // ConsumeMetrics calls ConsumeMetricsWithResult but ignores the result. // It exists to satisfy the go.opentelemetry.io/collector/consumer.Metrics interface. func (c *Consumer) ConsumeMetrics(ctx context.Context, metrics pmetric.Metrics) error { _, err := c.ConsumeMetricsWithResult(ctx, metrics) return err } // ConsumeMetricsWithResult consumes OpenTelemetry metrics data, converting into // the Elastic APM metrics model and sending to the reporter. func (c *Consumer) ConsumeMetricsWithResult(ctx context.Context, metrics pmetric.Metrics) (ConsumeMetricsResult, error) { totalDataPoints := int64(metrics.DataPointCount()) totalMetrics := int64(metrics.MetricCount()) if err := semAcquire(ctx, c.sem, 1); err != nil { return ConsumeMetricsResult{}, err } defer c.sem.Release(1) remainingDataPoints := totalDataPoints remainingMetrics := totalMetrics receiveTimestamp := time.Now() batch := c.handleMetrics(metrics, receiveTimestamp, &remainingDataPoints, &remainingMetrics) if remainingMetrics > 0 { // Some metrics remained after conversion, meaning that they were dropped. atomic.AddInt64(&c.stats.unsupportedMetricsDropped, remainingMetrics) } if err := c.config.Processor.ProcessBatch(ctx, batch); err != nil { return ConsumeMetricsResult{}, err } var errMsg string if remainingDataPoints > 0 { errMsg = "unsupported data points" } return ConsumeMetricsResult{ RejectedDataPoints: remainingDataPoints, ErrorMessage: errMsg, }, nil } func (c *Consumer) handleMetrics( metrics pmetric.Metrics, receiveTimestamp time.Time, remainingDataPoints, remainingMetrics *int64, ) (batch *modelpb.Batch) { batch = &modelpb.Batch{} resourceMetrics := metrics.ResourceMetrics() for i := 0; i < resourceMetrics.Len(); i++ { c.handleResourceMetrics(resourceMetrics.At(i), receiveTimestamp, batch, remainingDataPoints, remainingMetrics) } return } func (c *Consumer) handleResourceMetrics( resourceMetrics pmetric.ResourceMetrics, receiveTimestamp time.Time, out *modelpb.Batch, remainingDataPoints, remainingMetrics *int64, ) ( droppedDataPoints, droppedMetrics int64) { baseEvent := modelpb.APMEvent{} baseEvent.Event = &modelpb.Event{} baseEvent.Event.Received = modelpb.FromTime(receiveTimestamp) var timeDelta time.Duration resource := resourceMetrics.Resource() translateResourceMetadata(resource, &baseEvent) if exportTimestamp, ok := exportTimestamp(resource); ok { timeDelta = receiveTimestamp.Sub(exportTimestamp) } scopeMetrics := resourceMetrics.ScopeMetrics() for i := 0; i < scopeMetrics.Len(); i++ { c.handleScopeMetrics(scopeMetrics.At(i), resource, &baseEvent, timeDelta, out, remainingDataPoints, remainingMetrics) } return } func (c *Consumer) handleScopeMetrics( in pmetric.ScopeMetrics, resource pcommon.Resource, baseEvent *modelpb.APMEvent, timeDelta time.Duration, out *modelpb.Batch, remainingDataPoints, remainingMetrics *int64, ) { ms := make(metricsets) // Add the original otel metrics to the metricset. otelMetrics := in.Metrics() for i := 0; i < otelMetrics.Len(); i++ { c.addMetric(otelMetrics.At(i), ms, remainingDataPoints, remainingMetrics) } // Handle remapping if any. Remapped metrics will be added to a new // metric slice and then processed as any other metric in the scope. if len(c.remappers) > 0 { remappedMetrics := pmetric.NewMetricSlice() for _, r := range c.remappers { r.Remap(in, remappedMetrics, resource) } *remainingDataPoints += int64(dataPointsCount(remappedMetrics)) *remainingMetrics += int64(remappedMetrics.Len()) for i := 0; i < remappedMetrics.Len(); i++ { c.addMetric(remappedMetrics.At(i), ms, remainingDataPoints, remainingMetrics) } } // Process all the metrics added to the metricset. for key, ms := range ms { event := baseEvent.CloneVT() translateScopeMetadata(in.Scope(), event) event.Timestamp = modelpb.FromTime(key.timestamp.Add(timeDelta)) metrs := make([]*modelpb.MetricsetSample, 0, len(ms.samples)) for _, s := range ms.samples { metrs = append(metrs, s) } event.Metricset = &modelpb.Metricset{} event.Metricset.Samples = metrs event.Metricset.Name = "app" if ms.attributes.Len() > 0 { initEventLabels(event) ms.attributes.Range(func(k string, v pcommon.Value) bool { switch k { // data_stream.* case attributeDataStreamDataset: if event.DataStream == nil { event.DataStream = &modelpb.DataStream{} } event.DataStream.Dataset = sanitizeDataStreamDataset(v.Str()) case attributeDataStreamNamespace: if event.DataStream == nil { event.DataStream = &modelpb.DataStream{} } event.DataStream.Namespace = sanitizeDataStreamNamespace(v.Str()) // The below fields are required by the Processes tab of the // curated Kibana's hostmetrics UI. These fields are // produced by opentelemetry-lib. The below fields are // added to the remapped OTel metrics datapoints as attributes // and are not OTel semconv fields. case "system.process.cpu.start_time": if event.System == nil { event.System = &modelpb.System{} } if event.System.Process == nil { event.System.Process = &modelpb.SystemProcess{} } if event.System.Process.Cpu == nil { event.System.Process.Cpu = &modelpb.SystemProcessCPU{} } event.System.Process.Cpu.StartTime = v.Str() // `system.process.cmdline` is same as the ECS field `process.command_line` // however, Kibana curated UIs requires this field to work. In addition, // the current Kibana code will not work if this field is added to documents // with `system.process.memory.rss.pct` and other metrics required in the // Processes tab of the Kibana hostmetrics UI. Due to this, we have to process // the datapoint field added by the opentelemetry-lib instead of directly // processing the OTel semconv resource attribute `process.command_line`. case "system.process.cmdline": if event.System == nil { event.System = &modelpb.System{} } if event.System.Process == nil { event.System.Process = &modelpb.SystemProcess{} } event.System.Process.Cmdline = truncate(v.Str()) case "system.process.state": if event.System == nil { event.System = &modelpb.System{} } if event.System.Process == nil { event.System.Process = &modelpb.SystemProcess{} } event.System.Process.State = v.Str() case "system.filesystem.mount_point": if event.System == nil { event.System = &modelpb.System{} } if event.System.Filesystem == nil { event.System.Filesystem = &modelpb.SystemFilesystem{} } event.System.Filesystem.MountPoint = truncate(v.Str()) case "event.dataset": if event.Event == nil { event.Event = &modelpb.Event{} } event.Event.Dataset = v.Str() case "event.module": if event.Event == nil { event.Event = &modelpb.Event{} } event.Event.Module = v.Str() case "user.name": if event.User == nil { event.User = &modelpb.User{} } event.User.Name = truncate(v.Str()) default: setLabel(k, event, v) } return true }) if len(event.Labels) == 0 { event.Labels = nil } if len(event.NumericLabels) == 0 { event.NumericLabels = nil } } *out = append(*out, event) } } func (c *Consumer) addMetric(metric pmetric.Metric, ms metricsets, remainingDataPoints, remainingMetrics *int64) { var anyDropped bool // TODO(axw) support units switch metric.Type() { case pmetric.MetricTypeGauge: dps := metric.Gauge().DataPoints() for i := 0; i < dps.Len(); i++ { dp := dps.At(i) if sample, ok := numberSample(dp, modelpb.MetricType_METRIC_TYPE_GAUGE); ok { sample.Name = metric.Name() ms.upsert(dp.Timestamp().AsTime(), dp.Attributes(), sample) *remainingDataPoints-- } else { anyDropped = true } } case pmetric.MetricTypeSum: dps := metric.Sum().DataPoints() for i := 0; i < dps.Len(); i++ { dp := dps.At(i) if sample, ok := numberSample(dp, modelpb.MetricType_METRIC_TYPE_COUNTER); ok { sample.Name = metric.Name() ms.upsert(dp.Timestamp().AsTime(), dp.Attributes(), sample) *remainingDataPoints-- } else { anyDropped = true } } case pmetric.MetricTypeHistogram: dps := metric.Histogram().DataPoints() for i := 0; i < dps.Len(); i++ { dp := dps.At(i) if sample, ok := histogramSample(dp.BucketCounts(), dp.ExplicitBounds()); ok { sample.Name = metric.Name() ms.upsert(dp.Timestamp().AsTime(), dp.Attributes(), sample) *remainingDataPoints-- } else { anyDropped = true } } case pmetric.MetricTypeSummary: dps := metric.Summary().DataPoints() for i := 0; i < dps.Len(); i++ { dp := dps.At(i) sample := summarySample(dp) sample.Name = metric.Name() ms.upsert(dp.Timestamp().AsTime(), dp.Attributes(), sample) *remainingDataPoints-- } default: // Unsupported metric: // It will be recorded as dropped as remainingDataPoints and remainingMetrics are not decreased anyDropped = true } if !anyDropped { *remainingMetrics-- } } func numberSample(dp pmetric.NumberDataPoint, metricType modelpb.MetricType) (*modelpb.MetricsetSample, bool) { var value float64 switch dp.ValueType() { case pmetric.NumberDataPointValueTypeInt: value = float64(dp.IntValue()) case pmetric.NumberDataPointValueTypeDouble: value = dp.DoubleValue() if math.IsNaN(value) || math.IsInf(value, 0) { return nil, false } default: return nil, false } ms := modelpb.MetricsetSample{} ms.Type = metricType ms.Value = value return &ms, true } func summarySample(dp pmetric.SummaryDataPoint) *modelpb.MetricsetSample { ms := modelpb.MetricsetSample{} ms.Type = modelpb.MetricType_METRIC_TYPE_SUMMARY ms.Summary = &modelpb.SummaryMetric{} ms.Summary.Count = uint64(dp.Count()) ms.Summary.Sum = dp.Sum() return &ms } func histogramSample(bucketCounts pcommon.UInt64Slice, explicitBounds pcommon.Float64Slice) (*modelpb.MetricsetSample, bool) { // (From opentelemetry-proto/opentelemetry/proto/metrics/v1/metrics.proto) // // This defines size(explicit_bounds) + 1 (= N) buckets. The boundaries for // bucket at index i are: // // (-infinity, explicit_bounds[i]] for i == 0 // (explicit_bounds[i-1], explicit_bounds[i]] for 0 < i < N-1 // (explicit_bounds[i], +infinity) for i == N-1 // // The values in the explicit_bounds array must be strictly increasing. // if bucketCounts.Len() != explicitBounds.Len()+1 || explicitBounds.Len() == 0 { return &modelpb.MetricsetSample{}, false } // For the bucket values, we follow the approach described by Prometheus's // histogram_quantile function (https://prometheus.io/docs/prometheus/latest/querying/functions/#histogram_quantile) // to achieve consistent percentile aggregation results: // // "The histogram_quantile() function interpolates quantile values by assuming a linear // distribution within a bucket. (...) If a quantile is located in the highest bucket, // the upper bound of the second highest bucket is returned. A lower limit of the lowest // bucket is assumed to be 0 if the upper bound of that bucket is greater than 0. In that // case, the usual linear interpolation is applied within that bucket. Otherwise, the upper // bound of the lowest bucket is returned for quantiles located in the lowest bucket." values := make([]float64, 0, bucketCounts.Len()) counts := make([]uint64, 0, bucketCounts.Len()) for i := 0; i < bucketCounts.Len(); i++ { count := bucketCounts.At(i) if count == 0 { continue } var value float64 switch i { // (-infinity, explicit_bounds[i]] case 0: value = explicitBounds.At(i) if value > 0 { value /= 2 } // (explicit_bounds[i], +infinity) case bucketCounts.Len() - 1: value = explicitBounds.At(i - 1) // [explicit_bounds[i-1], explicit_bounds[i]) default: // Use the midpoint between the boundaries. value = explicitBounds.At(i-1) + (explicitBounds.At(i)-explicitBounds.At(i-1))/2.0 } counts = append(counts, uint64(count)) values = append(values, value) } ms := modelpb.MetricsetSample{} ms.Type = modelpb.MetricType_METRIC_TYPE_HISTOGRAM ms.Histogram = &modelpb.Histogram{} ms.Histogram.Counts = counts ms.Histogram.Values = values return &ms, true } type metricsets map[metricsetKey]metricset type metricsetKey struct { timestamp time.Time signature string // combination of all attributes } type metricset struct { attributes pcommon.Map samples map[string]*modelpb.MetricsetSample } // upsert searches for an existing metricset with the given timestamp and labels, // and appends the sample to it. If there is no such existing metricset, a new one // is created. func (ms metricsets) upsert(timestamp time.Time, attributes pcommon.Map, sample *modelpb.MetricsetSample) { // We always record metrics as they are given. We also copy some // well-known OpenTelemetry metrics to their Elastic APM equivalents. ms.upsertOne(timestamp, attributes, sample) } func (ms metricsets) upsertOne(timestamp time.Time, attributes pcommon.Map, sample *modelpb.MetricsetSample) { var signatureBuilder strings.Builder attributes.Range(func(k string, v pcommon.Value) bool { signatureBuilder.WriteString(k) signatureBuilder.WriteString(v.AsString()) return true }) key := metricsetKey{timestamp: timestamp, signature: signatureBuilder.String()} m, ok := ms[key] if !ok { m = metricset{ attributes: attributes, samples: make(map[string]*modelpb.MetricsetSample), } ms[key] = m } m.samples[sample.Name] = sample } func dataPointsCount(ms pmetric.MetricSlice) (count int) { for i := 0; i < ms.Len(); i++ { m := ms.At(i) switch m.Type() { case pmetric.MetricTypeGauge: count += m.Gauge().DataPoints().Len() case pmetric.MetricTypeSum: count += m.Sum().DataPoints().Len() case pmetric.MetricTypeHistogram: count += m.Histogram().DataPoints().Len() case pmetric.MetricTypeExponentialHistogram: count += m.ExponentialHistogram().DataPoints().Len() case pmetric.MetricTypeSummary: count += m.Summary().DataPoints().Len() } } return }