exporter/splunkhecexporter/metricdata_to_splunk.go (271 lines of code) (raw):

// Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 package splunkhecexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/splunkhecexporter" import ( "hash/fnv" "math" "strconv" "strings" "github.com/goccy/go-json" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/splunk" ) const ( // unknownHostName is the default host name when no hostname label is passed. unknownHostName = "unknown" // splunkMetricTypeKey is the key which maps to the type of the metric. splunkMetricTypeKey = "metric_type" // splunkMetricValue is the splunk metric value prefix. splunkMetricValue = "metric_name" // countSuffix is the count metric value suffix. countSuffix = "_count" // sumSuffix is the sum metric value suffix. sumSuffix = "_sum" // bucketSuffix is the bucket metric value suffix. bucketSuffix = "_bucket" // nanValue is the string representation of a NaN value in HEC events nanValue = "NaN" // plusInfValue is the string representation of a +Inf value in HEC events plusInfValue = "+Inf" // minusInfValue is the string representation of a -Inf value in HEC events minusInfValue = "-Inf" ) func sanitizeFloat(value float64) any { if math.IsNaN(value) { return nanValue } if math.IsInf(value, 1) { return plusInfValue } if math.IsInf(value, -1) { return minusInfValue } return value } func mapMetricToSplunkEvent(res pcommon.Resource, m pmetric.Metric, config *Config, logger *zap.Logger) []*splunk.Event { sourceKey := config.HecToOtelAttrs.Source sourceTypeKey := config.HecToOtelAttrs.SourceType indexKey := config.HecToOtelAttrs.Index hostKey := config.HecToOtelAttrs.Host host := unknownHostName source := config.Source sourceType := config.SourceType index := config.Index commonFields := map[string]any{} for k, v := range res.Attributes().All() { switch k { case hostKey: host = v.Str() case sourceKey: source = v.Str() case sourceTypeKey: sourceType = v.Str() case indexKey: index = v.Str() case splunk.HecTokenLabel: // ignore default: commonFields[k] = v.AsString() } } metricFieldName := splunkMetricValue + ":" + m.Name() //exhaustive:enforce switch m.Type() { case pmetric.MetricTypeGauge: pts := m.Gauge().DataPoints() splunkMetrics := make([]*splunk.Event, pts.Len()) for gi := 0; gi < pts.Len(); gi++ { dataPt := pts.At(gi) fields := cloneMap(commonFields) populateAttributes(fields, dataPt.Attributes()) switch dataPt.ValueType() { case pmetric.NumberDataPointValueTypeInt: fields[metricFieldName] = dataPt.IntValue() case pmetric.NumberDataPointValueTypeDouble: fields[metricFieldName] = sanitizeFloat(dataPt.DoubleValue()) } fields[splunkMetricTypeKey] = pmetric.MetricTypeGauge.String() splunkMetrics[gi] = createEvent(dataPt.Timestamp(), host, source, sourceType, index, fields) } return splunkMetrics case pmetric.MetricTypeHistogram: pts := m.Histogram().DataPoints() var splunkMetrics []*splunk.Event for gi := 0; gi < pts.Len(); gi++ { dataPt := pts.At(gi) bounds := dataPt.ExplicitBounds() counts := dataPt.BucketCounts() // first, add one event for sum, and one for count if dataPt.HasSum() && !math.IsNaN(dataPt.Sum()) { fields := cloneMap(commonFields) populateAttributes(fields, dataPt.Attributes()) fields[metricFieldName+sumSuffix] = dataPt.Sum() fields[splunkMetricTypeKey] = pmetric.MetricTypeHistogram.String() splunkMetrics = append(splunkMetrics, createEvent(dataPt.Timestamp(), host, source, sourceType, index, fields)) } { fields := cloneMap(commonFields) populateAttributes(fields, dataPt.Attributes()) fields[metricFieldName+countSuffix] = dataPt.Count() fields[splunkMetricTypeKey] = pmetric.MetricTypeHistogram.String() splunkMetrics = append(splunkMetrics, createEvent(dataPt.Timestamp(), host, source, sourceType, index, fields)) } // Spec says counts is optional but if present it must have one more // element than the bounds array. if counts.Len() == 0 || counts.Len() != bounds.Len()+1 { continue } value := uint64(0) // now create buckets for each bound. for bi := 0; bi < bounds.Len(); bi++ { fields := cloneMap(commonFields) populateAttributes(fields, dataPt.Attributes()) fields["le"] = float64ToDimValue(bounds.At(bi)) value += counts.At(bi) fields[metricFieldName+bucketSuffix] = value fields[splunkMetricTypeKey] = pmetric.MetricTypeHistogram.String() sm := createEvent(dataPt.Timestamp(), host, source, sourceType, index, fields) splunkMetrics = append(splunkMetrics, sm) } // add an upper bound for +Inf { fields := cloneMap(commonFields) populateAttributes(fields, dataPt.Attributes()) fields["le"] = float64ToDimValue(math.Inf(1)) fields[metricFieldName+bucketSuffix] = value + counts.At(counts.Len()-1) fields[splunkMetricTypeKey] = pmetric.MetricTypeHistogram.String() sm := createEvent(dataPt.Timestamp(), host, source, sourceType, index, fields) splunkMetrics = append(splunkMetrics, sm) } } return splunkMetrics case pmetric.MetricTypeSum: pts := m.Sum().DataPoints() splunkMetrics := make([]*splunk.Event, pts.Len()) for gi := 0; gi < pts.Len(); gi++ { dataPt := pts.At(gi) fields := cloneMap(commonFields) populateAttributes(fields, dataPt.Attributes()) switch dataPt.ValueType() { case pmetric.NumberDataPointValueTypeInt: fields[metricFieldName] = dataPt.IntValue() case pmetric.NumberDataPointValueTypeDouble: fields[metricFieldName] = sanitizeFloat(dataPt.DoubleValue()) } fields[splunkMetricTypeKey] = pmetric.MetricTypeSum.String() sm := createEvent(dataPt.Timestamp(), host, source, sourceType, index, fields) splunkMetrics[gi] = sm } return splunkMetrics case pmetric.MetricTypeSummary: pts := m.Summary().DataPoints() var splunkMetrics []*splunk.Event for gi := 0; gi < pts.Len(); gi++ { dataPt := pts.At(gi) // first, add one event for sum, and one for count if !math.IsNaN(dataPt.Sum()) { fields := cloneMap(commonFields) populateAttributes(fields, dataPt.Attributes()) fields[metricFieldName+sumSuffix] = dataPt.Sum() fields[splunkMetricTypeKey] = pmetric.MetricTypeSummary.String() sm := createEvent(dataPt.Timestamp(), host, source, sourceType, index, fields) splunkMetrics = append(splunkMetrics, sm) } { fields := cloneMap(commonFields) populateAttributes(fields, dataPt.Attributes()) fields[metricFieldName+countSuffix] = dataPt.Count() fields[splunkMetricTypeKey] = pmetric.MetricTypeSummary.String() sm := createEvent(dataPt.Timestamp(), host, source, sourceType, index, fields) splunkMetrics = append(splunkMetrics, sm) } // now create values for each quantile. for bi := 0; bi < dataPt.QuantileValues().Len(); bi++ { fields := cloneMap(commonFields) populateAttributes(fields, dataPt.Attributes()) dp := dataPt.QuantileValues().At(bi) fields["qt"] = float64ToDimValue(dp.Quantile()) fields[metricFieldName+"_"+strconv.FormatFloat(dp.Quantile(), 'f', -1, 64)] = sanitizeFloat(dp.Value()) fields[splunkMetricTypeKey] = pmetric.MetricTypeSummary.String() sm := createEvent(dataPt.Timestamp(), host, source, sourceType, index, fields) splunkMetrics = append(splunkMetrics, sm) } } return splunkMetrics case pmetric.MetricTypeExponentialHistogram: logger.Warn( "Point with unsupported type ExponentialHistogram", zap.Any("metric", m)) return nil case pmetric.MetricTypeEmpty: return nil default: logger.Warn( "Point with unsupported type", zap.Any("metric", m)) return nil } } func createEvent(timestamp pcommon.Timestamp, host string, source string, sourceType string, index string, fields map[string]any) *splunk.Event { return &splunk.Event{ Time: timestampToSecondsWithMillisecondPrecision(timestamp), Host: host, Source: source, SourceType: sourceType, Index: index, Event: splunk.HecEventMetricType, Fields: fields, } } func copyEventWithoutValues(event *splunk.Event) *splunk.Event { return &splunk.Event{ Time: event.Time, Host: event.Host, Source: event.Source, SourceType: event.SourceType, Index: event.Index, Event: event.Event, Fields: cloneMapWithSelector(event.Fields, func(key string) bool { return !strings.HasPrefix(key, splunkMetricValue) }), } } func populateAttributes(fields map[string]any, attributeMap pcommon.Map) { for k, v := range attributeMap.All() { fields[k] = v.AsString() } } func cloneMap(fields map[string]any) map[string]any { newFields := make(map[string]any, len(fields)) for k, v := range fields { newFields[k] = v } return newFields } func cloneMapWithSelector(fields map[string]any, selector func(string) bool) map[string]any { newFields := make(map[string]any, len(fields)) for k, v := range fields { if selector(k) { newFields[k] = v } } return newFields } func timestampToSecondsWithMillisecondPrecision(ts pcommon.Timestamp) float64 { return math.Round(float64(ts)/1e6) / 1e3 } func float64ToDimValue(f float64) string { return strconv.FormatFloat(f, 'g', -1, 64) } // merge metric events to adhere to the multimetric format event. func mergeEventsToMultiMetricFormat(events []*splunk.Event) ([]*splunk.Event, error) { hashes := map[uint32]*splunk.Event{} hasher := fnv.New32a() var merged []*splunk.Event for _, e := range events { cloned := copyEventWithoutValues(e) data, err := json.Marshal(cloned) if err != nil { return nil, err } _, err = hasher.Write(data) if err != nil { return nil, err } hashed := hasher.Sum32() hasher.Reset() src, ok := hashes[hashed] if !ok { hashes[hashed] = e merged = append(merged, e) } else { for field, value := range e.Fields { if strings.HasPrefix(field, splunkMetricValue) { src.Fields[field] = value } } } } return merged, nil }