func UpdateMEMetricsProcessedCount()

in otelcollector/fluent-bit/src/telemetry.go [596:666]


func UpdateMEMetricsProcessedCount(records []map[interface{}]interface{}) int {
	for _, record := range records {
		var logEntry = ToString(record["message"])
		var metricScrapeInfoRegex = regexp.MustCompile(`\s*([^\s]+)\s*([^\s]+)\s*([^\s]+).*ProcessedCount: ([\d]+).*ProcessedBytes: ([\d]+).*SentToPublicationCount: ([\d]+).*SentToPublicationBytes: ([\d]+).*`)
		groupMatches := metricScrapeInfoRegex.FindStringSubmatch(logEntry)

		if len(groupMatches) > 7 {
			metricsProcessedCount, err := strconv.ParseFloat(groupMatches[4], 64)
			if err == nil {

				metricsAccountName := groupMatches[3]

				bytesProcessedCount, e := strconv.ParseFloat(groupMatches[5], 64)
				if e != nil {
					bytesProcessedCount = 0.0
				}

				metricsSentToPubCount, e := strconv.ParseFloat(groupMatches[6], 64)
				if e != nil {
					metricsSentToPubCount = 0.0
				}
				bytesSentToPubCount, e := strconv.ParseFloat(groupMatches[7], 64)
				if e != nil {
					bytesSentToPubCount = 0.0
				}

				//update map
				meMetricsProcessedCountMapMutex.Lock()

				ref, ok := meMetricsProcessedCountMap[metricsAccountName]

				if ok {
					ref.DimBytesProcessedCount += bytesProcessedCount
					ref.DimBytesSentToPubCount += bytesSentToPubCount
					ref.DimMetricsSentToPubCount += metricsSentToPubCount
					ref.Value += metricsProcessedCount

				} else {
					m := &meMetricsProcessedCount{
						DimBytesProcessedCount:   bytesProcessedCount,
						DimBytesSentToPubCount:   bytesSentToPubCount,
						DimMetricsSentToPubCount: metricsSentToPubCount,
						Value:                    metricsProcessedCount,
					}
					meMetricsProcessedCountMap[metricsAccountName] = m
				}
				meMetricsProcessedCountMapMutex.Unlock()
			}

			if strings.ToLower(os.Getenv(envPrometheusCollectorHealth)) == "true" {
				// Add to the total that PublishTimeseriesVolume() uses
				metricsSentToPubCount, err := strconv.ParseFloat(groupMatches[6], 64)
				if err == nil {
					TimeseriesVolumeMutex.Lock()
					TimeseriesSentTotal += metricsSentToPubCount
					TimeseriesVolumeMutex.Unlock()
				}

				// Add to the total that PublishTimeseriesVolume() uses
				bytesSentToPubCount, err := strconv.ParseFloat(groupMatches[7], 64)
				if err == nil {
					TimeseriesVolumeMutex.Lock()
					BytesSentTotal += bytesSentToPubCount
					TimeseriesVolumeMutex.Unlock()
				}
			}
		}

	}
	return output.FLB_OK
}