in otelcollector/fluent-bit/src/telemetry.go [596:666]
func UpdateMEMetricsProcessedCount(records []map[interface{}]interface{}) int {
for _, record := range records {
var logEntry = ToString(record["message"])
var metricScrapeInfoRegex = regexp.MustCompile(`\s*([^\s]+)\s*([^\s]+)\s*([^\s]+).*ProcessedCount: ([\d]+).*ProcessedBytes: ([\d]+).*SentToPublicationCount: ([\d]+).*SentToPublicationBytes: ([\d]+).*`)
groupMatches := metricScrapeInfoRegex.FindStringSubmatch(logEntry)
if len(groupMatches) > 7 {
metricsProcessedCount, err := strconv.ParseFloat(groupMatches[4], 64)
if err == nil {
metricsAccountName := groupMatches[3]
bytesProcessedCount, e := strconv.ParseFloat(groupMatches[5], 64)
if e != nil {
bytesProcessedCount = 0.0
}
metricsSentToPubCount, e := strconv.ParseFloat(groupMatches[6], 64)
if e != nil {
metricsSentToPubCount = 0.0
}
bytesSentToPubCount, e := strconv.ParseFloat(groupMatches[7], 64)
if e != nil {
bytesSentToPubCount = 0.0
}
//update map
meMetricsProcessedCountMapMutex.Lock()
ref, ok := meMetricsProcessedCountMap[metricsAccountName]
if ok {
ref.DimBytesProcessedCount += bytesProcessedCount
ref.DimBytesSentToPubCount += bytesSentToPubCount
ref.DimMetricsSentToPubCount += metricsSentToPubCount
ref.Value += metricsProcessedCount
} else {
m := &meMetricsProcessedCount{
DimBytesProcessedCount: bytesProcessedCount,
DimBytesSentToPubCount: bytesSentToPubCount,
DimMetricsSentToPubCount: metricsSentToPubCount,
Value: metricsProcessedCount,
}
meMetricsProcessedCountMap[metricsAccountName] = m
}
meMetricsProcessedCountMapMutex.Unlock()
}
if strings.ToLower(os.Getenv(envPrometheusCollectorHealth)) == "true" {
// Add to the total that PublishTimeseriesVolume() uses
metricsSentToPubCount, err := strconv.ParseFloat(groupMatches[6], 64)
if err == nil {
TimeseriesVolumeMutex.Lock()
TimeseriesSentTotal += metricsSentToPubCount
TimeseriesVolumeMutex.Unlock()
}
// Add to the total that PublishTimeseriesVolume() uses
bytesSentToPubCount, err := strconv.ParseFloat(groupMatches[7], 64)
if err == nil {
TimeseriesVolumeMutex.Lock()
BytesSentTotal += bytesSentToPubCount
TimeseriesVolumeMutex.Unlock()
}
}
}
}
return output.FLB_OK
}