cmd/cni-metrics-helper/metrics/metrics.go (375 lines of code) (raw):
// Copyright Amazon.com Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"). You may
// not use this file except in compliance with the License. A copy of the
// License is located at
//
// http://aws.amazon.com/apache2.0/
//
// or in the "license" file accompanying this file. This file is distributed
// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
// express or implied. See the License for the specific language governing
// permissions and limitations under the License.
// Package metrics provide common data structure and routines for converting/aggregating prometheus metrics to cloudwatch metrics
package metrics
import (
"bytes"
"context"
"errors"
"fmt"
"github.com/aws/aws-sdk-go-v2/aws"
cloudwatchtypes "github.com/aws/aws-sdk-go-v2/service/cloudwatch/types"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
"k8s.io/client-go/kubernetes"
"github.com/aws/amazon-vpc-cni-k8s/pkg/publisher"
"github.com/aws/amazon-vpc-cni-k8s/pkg/utils/logger"
"github.com/aws/amazon-vpc-cni-k8s/utils/prometheusmetrics"
"github.com/prometheus/client_golang/prometheus"
)
type metricMatcher func(metric *dto.Metric) bool
type actionFuncType func(aggregatedValue *float64, sampleValue float64)
type metricsTarget interface {
grabMetricsFromTarget(ctx context.Context, target string) ([]byte, error)
getInterestingMetrics() map[string]metricsConvert
getCWMetricsPublisher() publisher.Publisher
getTargetList(ctx context.Context) ([]string, error)
submitCloudWatch() bool
submitPrometheus() bool
getLogger() logger.Logger
}
type metricsConvert struct {
actions []metricsAction
}
type metricsAction struct {
cwMetricName string
matchFunc metricMatcher
actionFunc actionFuncType
data *dataPoints
bucket *bucketPoints
logToFile bool
}
type dataPoints struct {
lastSingleDataPoint float64
curSingleDataPoint float64
}
type bucketPoint struct {
CumulativeCount *float64
UpperBound *float64
}
type bucketPoints struct {
lastBucket []*bucketPoint
curBucket []*bucketPoint
}
func matchAny(metric *dto.Metric) bool {
return true
}
func metricsAdd(aggregatedValue *float64, sampleValue float64) {
*aggregatedValue += sampleValue
}
func metricsMax(aggregatedValue *float64, sampleValue float64) {
if *aggregatedValue < sampleValue {
*aggregatedValue = sampleValue
}
}
func getMetricsFromPod(ctx context.Context, k8sClient kubernetes.Interface, podName string, namespace string, port int) ([]byte, error) {
rawOutput, err := k8sClient.CoreV1().RESTClient().Get().
Namespace(namespace).
Resource("pods").
SubResource("proxy").
Name(fmt.Sprintf("%v:%v", podName, port)).
Suffix("metrics").
Do(ctx).Raw()
if err != nil {
return nil, err
}
return rawOutput, nil
}
func processGauge(metric *dto.Metric, act *metricsAction) {
act.actionFunc(&act.data.curSingleDataPoint, metric.GetGauge().GetValue())
}
func processCounter(metric *dto.Metric, act *metricsAction) {
act.actionFunc(&act.data.curSingleDataPoint, metric.GetCounter().GetValue())
}
func processPercentile(metric *dto.Metric, act *metricsAction) {
var p99 float64
summary := metric.GetSummary()
quantiles := summary.GetQuantile()
for _, q := range quantiles {
if q.GetQuantile() == 0.99 {
p99 = q.GetValue()
}
}
act.actionFunc(&act.data.curSingleDataPoint, p99)
}
func processHistogram(metric *dto.Metric, act *metricsAction, log logger.Logger) {
histogram := metric.GetHistogram()
for _, bucket := range histogram.GetBucket() {
existingBucket := false
for _, bucketInAct := range act.bucket.curBucket {
if bucket.GetUpperBound() == *bucketInAct.UpperBound {
// found the matching bucket
act.actionFunc(bucketInAct.CumulativeCount, float64(bucket.GetCumulativeCount()))
existingBucket = true
break
}
}
if !existingBucket {
upperBound := new(float64)
*upperBound = float64(bucket.GetUpperBound())
cumulativeCount := new(float64)
*cumulativeCount = float64(bucket.GetCumulativeCount())
newBucket := &bucketPoint{UpperBound: upperBound, CumulativeCount: cumulativeCount}
act.bucket.curBucket = append(act.bucket.curBucket, newBucket)
log.Infof("Created a new bucket with upperBound: %f", bucket.GetUpperBound())
}
}
}
func postProcessingCounter(convert metricsConvert, log logger.Logger) bool {
resetDetected := false
noPreviousDataPoint := true
noCurrentDataPoint := true
for _, action := range convert.actions {
currentTotal := action.data.curSingleDataPoint
// Only do delta if metric target did NOT restart
if action.data.curSingleDataPoint < action.data.lastSingleDataPoint {
resetDetected = true
} else {
action.data.curSingleDataPoint -= action.data.lastSingleDataPoint
}
if action.data.lastSingleDataPoint != 0 {
noPreviousDataPoint = false
}
if action.data.curSingleDataPoint != 0 {
noCurrentDataPoint = false
}
action.data.lastSingleDataPoint = currentTotal
}
if resetDetected || (noPreviousDataPoint && !noCurrentDataPoint) {
log.Debugf("Reset detected resetDetected: %v, noPreviousDataPoint: %v, noCurrentDataPoint: %v",
resetDetected, noPreviousDataPoint, noCurrentDataPoint)
}
return resetDetected || (noPreviousDataPoint && !noCurrentDataPoint)
}
func postProcessingHistogram(convert metricsConvert, log logger.Logger) bool {
resetDetected := false
noLastBucket := true
for _, action := range convert.actions {
numOfBuckets := len(action.bucket.curBucket)
if numOfBuckets == 0 {
log.Info("Post Histogram Processing: no bucket found")
continue
}
for i := 1; i < numOfBuckets; i++ {
log.Infof("Found numOfBuckets-i:=%d, *action.bucket.curBucket[numOfBuckets-i].CumulativeCount=%f",
numOfBuckets-i, *action.bucket.curBucket[numOfBuckets-i].CumulativeCount)
// Delta against the previous bucket value
// e.g. diff between bucket LE250000 and previous bucket LE125000
*action.bucket.curBucket[numOfBuckets-i].CumulativeCount -= *action.bucket.curBucket[numOfBuckets-i-1].CumulativeCount
log.Infof("Found numOfBuckets-i:=%d, *action.bucket.curBucket[numOfBuckets-i].CumulativeCount=%f, *action.bucket.curBucket[numOfBuckets-i-1].CumulativeCount=%f",
numOfBuckets-i, *action.bucket.curBucket[numOfBuckets-i].CumulativeCount, *action.bucket.curBucket[numOfBuckets-i-1].CumulativeCount)
// Delta against the previous value
if action.bucket.lastBucket != nil {
log.Infof("Found *action.bucket.lastBucket[numOfBuckets-i].CumulativeCount=%f",
*action.bucket.lastBucket[numOfBuckets-i].CumulativeCount)
currentTotal := *action.bucket.curBucket[numOfBuckets-i].CumulativeCount
// Only do delta if there is no restart for metric target
if *action.bucket.curBucket[numOfBuckets-i].CumulativeCount >= *action.bucket.lastBucket[numOfBuckets-i].CumulativeCount {
*action.bucket.curBucket[numOfBuckets-i].CumulativeCount -= *action.bucket.lastBucket[numOfBuckets-i].CumulativeCount
log.Infof("Found *action.bucket.lastBucket[numOfBuckets-i].CumulativeCount=%f, *action.bucket.lastBucket[numOfBuckets-i].CumulativeCount=%f",
*action.bucket.curBucket[numOfBuckets-i].CumulativeCount, *action.bucket.lastBucket[numOfBuckets-i].CumulativeCount)
} else {
resetDetected = true
}
*action.bucket.lastBucket[numOfBuckets-i].CumulativeCount = currentTotal
}
}
if action.bucket.lastBucket != nil {
currentTotal := *action.bucket.curBucket[0].CumulativeCount
// Only do delta if there is no restart for metric target
if *action.bucket.curBucket[0].CumulativeCount >= *action.bucket.lastBucket[0].CumulativeCount {
*action.bucket.curBucket[0].CumulativeCount -= *action.bucket.lastBucket[0].CumulativeCount
} else {
resetDetected = true
}
*action.bucket.lastBucket[0].CumulativeCount = currentTotal
}
if action.bucket.lastBucket == nil {
action.bucket.lastBucket = action.bucket.curBucket
} else {
noLastBucket = false
}
}
return resetDetected || noLastBucket
}
func processMetric(family *dto.MetricFamily, convert metricsConvert, log logger.Logger) (bool, error) {
resetDetected := false
metricType := family.GetType()
for _, metric := range family.GetMetric() {
for _, act := range convert.actions {
if act.matchFunc(metric) {
switch metricType {
case dto.MetricType_GAUGE:
processGauge(metric, &act)
case dto.MetricType_HISTOGRAM:
processHistogram(metric, &act, log)
case dto.MetricType_COUNTER:
processCounter(metric, &act)
case dto.MetricType_SUMMARY:
processPercentile(metric, &act)
}
}
}
}
switch metricType {
case dto.MetricType_COUNTER:
curResetDetected := postProcessingCounter(convert, log)
if curResetDetected {
resetDetected = true
}
case dto.MetricType_GAUGE:
// no addition work needs for GAUGE
case dto.MetricType_SUMMARY:
// no addition work needs for PERCENTILE
case dto.MetricType_HISTOGRAM:
curResetDetected := postProcessingHistogram(convert, log)
if curResetDetected {
resetDetected = true
}
}
return resetDetected, nil
}
func produceHistogram(act metricsAction, cw publisher.Publisher) {
prevUpperBound := float64(0)
for _, bucket := range act.bucket.curBucket {
mid := (*bucket.UpperBound-float64(prevUpperBound))/2 + prevUpperBound
if mid == *bucket.UpperBound {
newMid := prevUpperBound + prevUpperBound/2
mid = newMid
}
prevUpperBound = *bucket.UpperBound
if *bucket.CumulativeCount != 0 {
dataPoint := cloudwatchtypes.MetricDatum{
MetricName: aws.String(act.cwMetricName),
StatisticValues: &cloudwatchtypes.StatisticSet{
Maximum: aws.Float64(mid),
Minimum: aws.Float64(mid),
SampleCount: aws.Float64(*bucket.CumulativeCount),
Sum: aws.Float64(mid * float64(*bucket.CumulativeCount)),
},
}
cw.Publish(dataPoint)
}
}
}
func filterMetrics(originalMetrics map[string]*dto.MetricFamily,
interestingMetrics map[string]metricsConvert,
) (map[string]*dto.MetricFamily, error) {
result := map[string]*dto.MetricFamily{}
for metric := range interestingMetrics {
if family, found := originalMetrics[metric]; found {
result[metric] = family
}
}
return result, nil
}
func produceCloudWatchMetrics(t metricsTarget, families map[string]*dto.MetricFamily, convertDef map[string]metricsConvert, cw publisher.Publisher) error {
for key, family := range families {
convertMetrics := convertDef[key]
metricType := family.GetType()
for _, action := range convertMetrics.actions {
switch metricType {
case dto.MetricType_COUNTER:
dataPoint := cloudwatchtypes.MetricDatum{
MetricName: aws.String(action.cwMetricName),
Unit: cloudwatchtypes.StandardUnitCount,
Value: aws.Float64(action.data.curSingleDataPoint),
}
cw.Publish(dataPoint)
case dto.MetricType_GAUGE:
dataPoint := cloudwatchtypes.MetricDatum{
MetricName: aws.String(action.cwMetricName),
Unit: cloudwatchtypes.StandardUnitCount,
Value: aws.Float64(action.data.curSingleDataPoint),
}
cw.Publish(dataPoint)
case dto.MetricType_SUMMARY:
dataPoint := cloudwatchtypes.MetricDatum{
MetricName: aws.String(action.cwMetricName),
Unit: cloudwatchtypes.StandardUnitCount,
Value: aws.Float64(action.data.curSingleDataPoint),
}
cw.Publish(dataPoint)
case dto.MetricType_HISTOGRAM:
produceHistogram(action, cw)
}
}
}
return nil
}
// Prometheus export supports only gauge metrics for now.
func producePrometheusMetrics(t metricsTarget, families map[string]*dto.MetricFamily, convertDef map[string]metricsConvert) error {
prometheusCNIMetrics := prometheusmetrics.GetSupportedPrometheusCNIMetricsMapping()
if len(prometheusCNIMetrics) == 0 {
errorMsg := "Skipping since prometheus mapping is missing"
t.getLogger().Infof(errorMsg)
return errors.New(errorMsg)
}
for key, family := range families {
convertMetrics := convertDef[key]
metricType := family.GetType()
for _, action := range convertMetrics.actions {
switch metricType {
case dto.MetricType_GAUGE:
metrics, ok := prometheusCNIMetrics[family.GetName()]
if ok {
if gauge, isGauge := metrics.(prometheus.Gauge); isGauge {
gauge.Set(action.data.curSingleDataPoint)
} else {
t.getLogger().Warnf("Metric %s is not a Gauge type, skipping", family.GetName())
}
}
}
}
}
return nil
}
func resetMetrics(interestingMetrics map[string]metricsConvert) {
for _, convert := range interestingMetrics {
for _, act := range convert.actions {
if act.data != nil {
act.data.curSingleDataPoint = 0
}
if act.bucket != nil {
act.bucket.curBucket = make([]*bucketPoint, 0)
}
}
}
}
func metricsListGrabAggregateConvert(ctx context.Context, t metricsTarget) (map[string]*dto.MetricFamily, map[string]metricsConvert, bool, error) {
var resetDetected = false
var families map[string]*dto.MetricFamily
interestingMetrics := t.getInterestingMetrics()
resetMetrics(interestingMetrics)
targetList, _ := t.getTargetList(ctx)
t.getLogger().Debugf("Total TargetList pod count: %d", len(targetList))
for _, target := range targetList {
rawOutput, err := t.grabMetricsFromTarget(ctx, target)
if err != nil {
// it may take times to remove some metric targets
continue
}
parser := &expfmt.TextParser{}
origFamilies, err := parser.TextToMetricFamilies(bytes.NewReader(rawOutput))
if err != nil {
return nil, nil, true, err
}
families, err = filterMetrics(origFamilies, interestingMetrics)
if err != nil {
return nil, nil, true, err
}
for _, family := range families {
convert := interestingMetrics[family.GetName()]
curReset, err := processMetric(family, convert, t.getLogger())
if err != nil {
return nil, nil, true, err
}
if curReset {
resetDetected = true
}
}
}
// TODO resetDetected is NOT right for cniMetrics, so force it for now
if len(targetList) > 1 {
resetDetected = false
}
return families, interestingMetrics, resetDetected, nil
}
// Handler grabs metrics from target, aggregates the metrics and convert them into cloudwatch metrics
func Handler(ctx context.Context, t metricsTarget) {
families, interestingMetrics, resetDetected, err := metricsListGrabAggregateConvert(ctx, t)
if err != nil || resetDetected {
t.getLogger().Infof("Skipping 1st poll after reset, error: %v", err)
}
if t.submitCloudWatch() {
cw := t.getCWMetricsPublisher()
produceCloudWatchMetrics(t, families, interestingMetrics, cw)
}
if t.submitPrometheus() {
producePrometheusMetrics(t, families, interestingMetrics)
}
}