func ParseMetricFamilies()

in metricbeat/helper/prometheus/textparse.go [482:751]


func ParseMetricFamilies(b []byte, contentType string, ts time.Time, logger *logp.Logger) ([]*MetricFamily, error) {
	parser, err := textparse.New(b, contentType, ContentTypeTextFormat, false, false, labels.NewSymbolTable()) // Fallback protocol set to ContentTypeTextFormat
	if err != nil {
		return nil, err
	}
	var (
		defTime              = timestamp.FromTime(ts)
		metricFamiliesByName = map[string]*MetricFamily{}
		summariesByName      = map[string]map[string]*OpenMetric{}
		histogramsByName     = map[string]map[string]*OpenMetric{}
		fam                  *MetricFamily
		// metricTypes stores the metric type for each metric name.
		metricTypes = make(map[string]model.MetricType)
	)

	for {
		var (
			et textparse.Entry
			ok bool
			e  exemplar.Exemplar
		)
		if et, err = parser.Next(); err != nil {
			if strings.HasPrefix(err.Error(), "invalid metric type") {
				logger.Debugf("Ignored invalid metric type : %v ", err)

				// NOTE: ignore any errors that are not EOF. This is to avoid breaking the parsing.
				// if acceptHeader in the prometheus client is `Accept: text/plain; version=0.0.4` (like it is now)
				// any `info` metrics are not supported, and then there will be ignored here.
				// if acceptHeader in the prometheus client `Accept: application/openmetrics-text; version=0.0.1`
				// any `info` metrics are supported, and then there will be parsed here.
				continue
			}

			if errors.Is(err, io.EOF) {
				break
			}
			if strings.HasPrefix(err.Error(), "data does not end with # EOF") {
				break
			}
			logger.Debugf("Error while parsing metrics: %v ", err)
			break
		}
		switch et {
		case textparse.EntryType:
			buf, t := parser.Type()
			s := string(buf)
			fam, ok = metricFamiliesByName[s]
			if !ok {
				fam = &MetricFamily{Name: &s, Type: t}
				metricFamiliesByName[s] = fam
			} else {
				fam.Type = t
			}
			// Store the metric type for each base metric name.
			metricTypes[s] = t
			continue
		case textparse.EntryHelp:
			buf, t := parser.Help()
			s := string(buf)
			h := string(t)
			_, ok = metricFamiliesByName[s]
			if !ok {
				fam = &MetricFamily{Name: &s, Help: &h}
				metricFamiliesByName[s] = fam
			} else {
				fam.Help = &h
			}
			continue
		case textparse.EntryUnit:
			buf, t := parser.Unit()
			s := string(buf)
			u := string(t)
			_, ok = metricFamiliesByName[s]
			if !ok {
				fam = &MetricFamily{Name: &s, Unit: &u}
				metricFamiliesByName[string(buf)] = fam
			} else {
				fam.Unit = &u
			}
			continue
		case textparse.EntryComment:
			continue
		default:
		}

		t := defTime
		_, tp, v := parser.Series()

		var (
			lset labels.Labels
			mets string
		)

		mets = parser.Metric(&lset)

		if !lset.Has(labels.MetricName) {
			// missing metric name from labels.MetricName, skip.
			break
		}

		var lbls strings.Builder
		lbls.Grow(len(mets))
		var labelPairs = []*labels.Label{}
		var qv string // value of le or quantile label
		for _, l := range lset.Copy() {
			if l.Name == labels.MetricName {
				continue
			}

			switch l.Name {
			case model.QuantileLabel:
				qv = lset.Get(model.QuantileLabel)
			case labels.BucketLabel:
				qv = lset.Get(labels.BucketLabel)
			default:
				lbls.WriteString(l.Name)
				lbls.WriteString(l.Value)
			}

			n := l.Name
			v := l.Value

			labelPairs = append(labelPairs, &labels.Label{
				Name:  n,
				Value: v,
			})
		}

		var metric *OpenMetric

		metricName := lset.Get(labels.MetricName)

		// lookupMetricName will have the suffixes removed
		lookupMetricName := metricName
		var exm *exemplar.Exemplar

		mt, ok := metricTypes[metricName]
		if !ok {
			// Splitting is necessary to find the base metric name type in the metricTypes map.
			// This allows us to group related metrics together under the same base metric name.
			// For example, the metric family `summary_metric` can have the metrics
			// `summary_metric_count` and `summary_metric_sum`, all having the same metric type.
			parts := strings.Split(metricName, "_")
			baseMetricNamekey := strings.Join(parts[:len(parts)-1], "_")

			// If the metric type is not found, default to unknown
			if metricTypeFound, ok := metricTypes[baseMetricNamekey]; ok {
				mt = metricTypeFound
			} else {
				mt = model.MetricTypeUnknown
			}
		}

		// Suffixes - https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#suffixes
		switch mt {
		case model.MetricTypeCounter:
			if contentType == OpenMetricsType && !isTotal(metricName) && !isCreated(metricName) {
				// Possible suffixes for counter in Open metrics are _created and _total.
				// Otherwise, ignore.
				// https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#counter-1
				continue
			}

			var counter = &Counter{Value: &v}
			mn := lset.Get(labels.MetricName)
			metric = &OpenMetric{Name: &mn, Counter: counter, Label: labelPairs}
			if contentType == OpenMetricsType {
				// Remove the two possible suffixes, _created and _total
				if isTotal(metricName) {
					lookupMetricName = strings.TrimSuffix(metricName, suffixTotal)
				} else {
					lookupMetricName = strings.TrimSuffix(metricName, suffixCreated)
				}
			} else {
				lookupMetricName = metricName
			}
		case model.MetricTypeGauge:
			var gauge = &Gauge{Value: &v}
			metric = &OpenMetric{Name: &metricName, Gauge: gauge, Label: labelPairs}
			//lookupMetricName = metricName
		case model.MetricTypeInfo:
			// Info only exists for Openmetrics. It must have the suffix _info
			if !isInfo(metricName) {
				continue
			}
			lookupMetricName = strings.TrimSuffix(metricName, suffixInfo)
			value := int64(v)
			var info = &Info{Value: &value}
			metric = &OpenMetric{Name: &metricName, Info: info, Label: labelPairs}
		case model.MetricTypeSummary:
			lookupMetricName, metric = summaryMetricName(metricName, v, qv, lbls.String(), summariesByName)
			metric.Label = labelPairs
			if !isSum(metricName) {
				// Avoid registering the metric multiple times.
				continue
			}
		case model.MetricTypeHistogram:
			if hasExemplar := parser.Exemplar(&e); hasExemplar {
				exm = &e
			}
			lookupMetricName, metric = histogramMetricName(metricName, v, qv, lbls.String(), &t, false, exm, histogramsByName)
			if metric == nil {
				continue
			}
			metric.Label = labelPairs
			if !isSum(metricName) {
				// Avoid registering the metric multiple times.
				continue
			}
		case model.MetricTypeGaugeHistogram:
			if hasExemplar := parser.Exemplar(&e); hasExemplar {
				exm = &e
			}
			lookupMetricName, metric = histogramMetricName(metricName, v, qv, lbls.String(), &t, true, exm, histogramsByName)
			if metric == nil { // metric name does not have a suffix supported for the type gauge histogram
				continue
			}
			metric.Label = labelPairs
			metric.Histogram.IsGaugeHistogram = true
			if !isGSum(metricName) {
				// Avoid registering the metric multiple times.
				continue
			}
		case model.MetricTypeStateset:
			value := int64(v)
			var stateset = &Stateset{Value: &value}
			metric = &OpenMetric{Name: &metricName, Stateset: stateset, Label: labelPairs}
		case model.MetricTypeUnknown:
			var unknown = &Unknown{Value: &v}
			metric = &OpenMetric{Name: &metricName, Unknown: unknown, Label: labelPairs}
		default:
		}

		fam, ok = metricFamiliesByName[lookupMetricName]
		if !ok {
			// If the lookupMetricName is not in metricFamiliesByName, we check for metric name, in case
			// the removed suffix is part of the name.
			fam, ok = metricFamiliesByName[metricName]
			if !ok {
				// There is not any metadata for this metric. In this case, the name of the metric
				// will remain metricName instead of the possible modified lookupMetricName
				fam = &MetricFamily{Name: &metricName, Type: mt}
				metricFamiliesByName[metricName] = fam

			}
		}

		if hasExemplar := parser.Exemplar(&e); hasExemplar && mt != model.MetricTypeHistogram && metric != nil {
			if !e.HasTs {
				e.Ts = t
			}
			metric.Exemplar = &e
		}

		if tp != nil && metric != nil {
			t = *tp
			metric.TimestampMs = &t
		}

		fam.Metric = append(fam.Metric, metric)
	}

	families := make([]*MetricFamily, 0, len(metricFamiliesByName))
	for _, v := range metricFamiliesByName {
		if v.Metric != nil {
			families = append(families, v)
		}
	}
	return families, nil
}