func()

in plugins/serializers/prometheusremotewrite/prometheusremotewrite.go [55:245]


func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
	var buf bytes.Buffer
	var entries = make(map[MetricKey]prompb.TimeSeries)
	for _, metric := range metrics {
		commonLabels := s.createLabels(metric)
		var metrickey MetricKey
		var promts prompb.TimeSeries
		for _, field := range metric.FieldList() {
			metricName := prometheus.MetricName(metric.Name(), field.Key, metric.Type())
			metricName, ok := prometheus.SanitizeMetricName(metricName)
			if !ok {
				continue
			}
			switch metric.Type() {
			case telegraf.Counter:
				fallthrough
			case telegraf.Gauge:
				fallthrough
			case telegraf.Untyped:
				value, ok := prometheus.SampleValue(field.Value)
				if !ok {
					continue
				}
				metrickey, promts = getPromTS(metricName, commonLabels, value, metric.Time())
			case telegraf.Histogram:
				switch {
				case strings.HasSuffix(field.Key, "_bucket"):
					// if bucket only, init sum, count, inf
					metrickeysum, promtssum := getPromTS(fmt.Sprintf("%s_sum", metricName), commonLabels, float64(0), metric.Time())
					if _, ok = entries[metrickeysum]; !ok {
						entries[metrickeysum] = promtssum
					}
					metrickeycount, promtscount := getPromTS(fmt.Sprintf("%s_count", metricName), commonLabels, float64(0), metric.Time())
					if _, ok = entries[metrickeycount]; !ok {
						entries[metrickeycount] = promtscount
					}
					labels := make([]prompb.Label, len(commonLabels), len(commonLabels)+1)
					copy(labels, commonLabels)
					labels = append(labels, prompb.Label{
						Name:  "le",
						Value: "+Inf",
					})
					metrickeyinf, promtsinf := getPromTS(fmt.Sprintf("%s_bucket", metricName), labels, float64(0), metric.Time())
					if _, ok = entries[metrickeyinf]; !ok {
						entries[metrickeyinf] = promtsinf
					}

					le, ok := metric.GetTag("le")
					if !ok {
						continue
					}
					bound, err := strconv.ParseFloat(le, 64)
					if err != nil {
						continue
					}
					count, ok := prometheus.SampleCount(field.Value)
					if !ok {
						continue
					}

					labels = make([]prompb.Label, len(commonLabels), len(commonLabels)+1)
					copy(labels, commonLabels)
					labels = append(labels, prompb.Label{
						Name:  "le",
						Value: fmt.Sprint(bound),
					})
					metrickey, promts = getPromTS(fmt.Sprintf("%s_bucket", metricName), labels, float64(count), metric.Time())
				case strings.HasSuffix(field.Key, "_sum"):
					sum, ok := prometheus.SampleSum(field.Value)
					if !ok {
						continue
					}

					metrickey, promts = getPromTS(fmt.Sprintf("%s_sum", metricName), commonLabels, sum, metric.Time())
				case strings.HasSuffix(field.Key, "_count"):
					count, ok := prometheus.SampleCount(field.Value)
					if !ok {
						continue
					}

					// if no bucket generate +Inf entry
					labels := make([]prompb.Label, len(commonLabels), len(commonLabels)+1)
					copy(labels, commonLabels)
					labels = append(labels, prompb.Label{
						Name:  "le",
						Value: "+Inf",
					})
					metrickeyinf, promtsinf := getPromTS(fmt.Sprintf("%s_bucket", metricName), labels, float64(count), metric.Time())
					if minf, ok := entries[metrickeyinf]; !ok || minf.Samples[0].Value == 0 {
						entries[metrickeyinf] = promtsinf
					}

					metrickey, promts = getPromTS(fmt.Sprintf("%s_count", metricName), commonLabels, float64(count), metric.Time())
				default:
					continue
				}
			case telegraf.Summary:
				switch {
				case strings.HasSuffix(field.Key, "_sum"):
					sum, ok := prometheus.SampleSum(field.Value)
					if !ok {
						continue
					}

					metrickey, promts = getPromTS(fmt.Sprintf("%s_sum", metricName), commonLabels, sum, metric.Time())
				case strings.HasSuffix(field.Key, "_count"):
					count, ok := prometheus.SampleCount(field.Value)
					if !ok {
						continue
					}

					metrickey, promts = getPromTS(fmt.Sprintf("%s_count", metricName), commonLabels, float64(count), metric.Time())
				default:
					quantileTag, ok := metric.GetTag("quantile")
					if !ok {
						continue
					}
					quantile, err := strconv.ParseFloat(quantileTag, 64)
					if err != nil {
						continue
					}
					value, ok := prometheus.SampleValue(field.Value)
					if !ok {
						continue
					}

					labels := make([]prompb.Label, len(commonLabels), len(commonLabels)+1)
					copy(labels, commonLabels)
					labels = append(labels, prompb.Label{
						Name:  "quantile",
						Value: fmt.Sprint(quantile),
					})
					metrickey, promts = getPromTS(metricName, labels, value, metric.Time())
				}
			default:
				return nil, fmt.Errorf("unknown type %v", metric.Type())
			}

			// A batch of metrics can contain multiple values for a single
			// Prometheus sample.  If this metric is older than the existing
			// sample then we can skip over it.
			m, ok := entries[metrickey]
			if ok {
				if metric.Time().Before(time.Unix(0, m.Samples[0].Timestamp*1_000_000)) {
					continue
				}
			}
			entries[metrickey] = promts
		}
	}

	var promTS = make([]prompb.TimeSeries, len(entries))
	var i int
	for _, promts := range entries {
		promTS[i] = promts
		i++
	}

	if s.config.MetricSortOrder == SortMetrics {
		sort.Slice(promTS, func(i, j int) bool {
			lhs := promTS[i].Labels
			rhs := promTS[j].Labels
			if len(lhs) != len(rhs) {
				return len(lhs) < len(rhs)
			}

			for index := range lhs {
				l := lhs[index]
				r := rhs[index]

				if l.Name != r.Name {
					return l.Name < r.Name
				}

				if l.Value != r.Value {
					return l.Value < r.Value
				}
			}

			return false
		})
	}
	pb := &prompb.WriteRequest{Timeseries: promTS}
	data, err := pb.Marshal()
	if err != nil {
		return nil, fmt.Errorf("unable to marshal protobuf: %v", err)
	}
	encoded := snappy.Encode(nil, data)
	buf.Write(encoded) //nolint:revive // from buffer.go: "err is always nil"
	return buf.Bytes(), nil
}