func()

in otelcollector/prometheusreceiver/internal/transaction.go [99:191]


func (t *transaction) Append(_ storage.SeriesRef, ls labels.Labels, atMs int64, val float64) (storage.SeriesRef, error) {
	select {
	case <-t.ctx.Done():
		return 0, errTransactionAborted
	default:
	}

	if t.externalLabels.Len() != 0 {
		b := labels.NewBuilder(ls)
		t.externalLabels.Range(func(l labels.Label) {
			b.Set(l.Name, l.Value)
		})
		ls = b.Labels()
	}

	rKey, err := t.initTransaction(ls)
	if err != nil {
		return 0, err
	}

	// Any datapoint with duplicate labels MUST be rejected per:
	// * https://github.com/open-telemetry/wg-prometheus/issues/44
	// * https://github.com/open-telemetry/opentelemetry-collector/issues/3407
	// as Prometheus rejects such too as of version 2.16.0, released on 2020-02-13.
	if dupLabel, hasDup := ls.HasDuplicateLabelNames(); hasDup {
		return 0, fmt.Errorf("invalid sample: non-unique label names: %q", dupLabel)
	}

	metricName := ls.Get(model.MetricNameLabel)
	if metricName == "" {
		return 0, errMetricNameNotFound
	}

	// See https://www.prometheus.io/docs/concepts/jobs_instances/#automatically-generated-labels-and-time-series
	// up: 1 if the instance is healthy, i.e. reachable, or 0 if the scrape failed.
	// But it can also be a staleNaN, which is inserted when the target goes away.
	if metricName == scrapeUpMetricName && val != 1.0 && !value.IsStaleNaN(val) {
		if val == 0.0 {
			t.logger.Warn("Failed to scrape Prometheus endpoint",
				zap.Int64("scrape_timestamp", atMs),
				zap.Stringer("target_labels", ls))
		} else {
			t.logger.Warn("The 'up' metric contains invalid value",
				zap.Float64("value", val),
				zap.Int64("scrape_timestamp", atMs),
				zap.Stringer("target_labels", ls))
		}
	}

	// For the `target_info` metric we need to convert it to resource attributes.
	if metricName == prometheus.TargetInfoMetricName {
		t.AddTargetInfo(*rKey, ls)
		return 0, nil
	}

	// For the `otel_scope_info` metric we need to convert it to scope attributes.
	if metricName == prometheus.ScopeInfoMetricName {
		t.addScopeInfo(*rKey, ls)
		return 0, nil
	}

	curMF, existing := t.getOrCreateMetricFamily(*rKey, getScopeID(ls), metricName)

	if t.enableNativeHistograms && curMF.mtype == pmetric.MetricTypeExponentialHistogram {
		// If a histogram has both classic and native version, the native histogram is scraped
		// first. Getting a float sample for the same series means that `scrape_classic_histogram`
		// is set to true in the scrape config. In this case, we should ignore the native histogram.
		curMF.mtype = pmetric.MetricTypeHistogram
	}

	seriesRef := t.getSeriesRef(ls, curMF.mtype)
	err = curMF.addSeries(seriesRef, metricName, ls, atMs, val)
	if err != nil {
		// Handle special case of float sample indicating staleness of native
		// histogram. This is similar to how Prometheus handles it, but we
		// don't have access to the previous value so we're applying some
		// heuristics to figure out if this is native histogram or not.
		// The metric type will indicate histogram, but presumably there will be no
		// _bucket, _count, _sum suffix or `le` label, which makes addSeries fail
		// with errEmptyLeLabel.
		if t.enableNativeHistograms && errors.Is(err, errEmptyLeLabel) && !existing && value.IsStaleNaN(val) && curMF.mtype == pmetric.MetricTypeHistogram {
			mg := curMF.loadMetricGroupOrCreate(seriesRef, ls, atMs)
			curMF.mtype = pmetric.MetricTypeExponentialHistogram
			mg.mtype = pmetric.MetricTypeExponentialHistogram
			_ = curMF.addExponentialHistogramSeries(seriesRef, metricName, ls, atMs, &histogram.Histogram{Sum: math.Float64frombits(value.StaleNaN)}, nil)
			// ignore errors here, this is best effort.
		} else {
			t.logger.Warn("failed to add datapoint", zap.Error(err), zap.String("metric_name", metricName), zap.Any("labels", ls))
		}
	}

	return 0, nil // never return errors, as that fails the whole scrape
}