components/otelopscol/processor/agentmetricsprocessor/agentmetricsprocessor.go (83 lines of code) (raw):
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package agentmetricsprocessor
import (
"context"
"regexp"
"sync"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/multierr"
"go.uber.org/zap"
)
// matches the string after the last "." (or the whole string if no ".")
var metricPostfixRegex = regexp.MustCompile(`([^.]*$)`)
type opKey struct {
device, direction string
}
type opData struct {
operations pmetric.NumberDataPoint
time pmetric.NumberDataPoint
cumAvgTime float64
}
type agentMetricsProcessor struct {
logger *zap.Logger
cfg *Config
mutex sync.Mutex
prevCPUTimeValues map[string]float64
prevOp map[opKey]opData
}
func newAgentMetricsProcessor(logger *zap.Logger, cfg *Config) *agentMetricsProcessor {
return &agentMetricsProcessor{
logger: logger,
cfg: cfg,
prevOp: make(map[opKey]opData),
}
}
// ProcessMetrics implements the MProcessor interface.
func (mtp *agentMetricsProcessor) ProcessMetrics(_ context.Context, metrics pmetric.Metrics) (pmetric.Metrics, error) {
convertNonMonotonicSumsToGauges(metrics.ResourceMetrics())
removeVersionAttribute(metrics.ResourceMetrics())
var errors []error
if err := combineProcessMetrics(metrics.ResourceMetrics()); err != nil {
errors = append(errors, err)
}
if err := splitReadWriteBytesMetrics(metrics.ResourceMetrics()); err != nil {
errors = append(errors, err)
}
if err := mtp.appendUtilizationMetrics(metrics.ResourceMetrics()); err != nil {
errors = append(errors, err)
}
if err := cleanCPUNumber(metrics.ResourceMetrics()); err != nil {
errors = append(errors, err)
}
if err := mtp.appendAverageDiskMetrics(metrics.ResourceMetrics()); err != nil {
errors = append(errors, err)
}
// Add blank labels last so they can also be applied to metrics added by agentmetricsprocessor.
if err := mtp.addBlankLabel(metrics.ResourceMetrics()); err != nil {
errors = append(errors, err)
}
if len(errors) > 0 {
return metrics, multierr.Combine(errors...)
}
return metrics, nil
}
// newMetric creates a new metric with no data points using the provided descriptor info
func newMetric(metric pmetric.Metric) pmetric.Metric {
return newMetricWithName(metric, "")
}
// newMetric creates a new metric with no data points using the provided descriptor info
// and overrides the name with the supplied value
func newMetricWithName(metric pmetric.Metric, name string) pmetric.Metric {
newMetric := pmetric.NewMetric()
if name != "" {
newMetric.SetName(name)
} else {
newMetric.SetName(metric.Name())
}
newMetric.SetDescription(metric.Description())
newMetric.SetUnit(metric.Unit())
switch t := metric.Type(); t {
case pmetric.MetricTypeSum:
newMetric.SetEmptySum()
sum := newMetric.Sum()
sum.SetIsMonotonic(metric.Sum().IsMonotonic())
sum.SetAggregationTemporality(metric.Sum().AggregationTemporality())
case pmetric.MetricTypeGauge:
newMetric.SetEmptyGauge()
newMetric.Gauge()
}
return newMetric
}