agent/stats/snapshotter.go (195 lines of code) (raw):
// Copyright Amazon.com Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"). You may
// not use this file except in compliance with the License. A copy of the
// License is located at
//
// http://aws.amazon.com/apache2.0/
//
// or in the "license" file accompanying this file. This file is distributed
// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
// express or implied. See the License for the specific language governing
// permissions and limitations under the License.
package stats
import (
"fmt"
"io/ioutil"
"net/http"
"net/url"
"time"
"github.com/aws/aws-app-mesh-agent/agent/client"
"github.com/aws/aws-app-mesh-agent/agent/config"
"github.com/hashicorp/go-retryablehttp"
"github.com/prometheus/client_model/go"
log "github.com/sirupsen/logrus"
"google.golang.org/protobuf/proto"
)
type Snapshotter struct {
Snapshot map[string]*io_prometheus_client.MetricFamily
Delta map[string]*io_prometheus_client.MetricFamily
HttpClient *retryablehttp.Client
HttpRequest *retryablehttp.Request
}
func (snapshotter *Snapshotter) StartSnapshot(agentConfig config.AgentConfig) {
httpClient, err := client.CreateRetryableHttpClientForEnvoyServer(agentConfig)
httpClient.HTTPClient.Timeout = EnvoyStatsClientHttpTimeout
if err != nil {
log.Errorf("unable to create Retryable Http Client: %v", err)
return
}
snapshotter.HttpClient = httpClient
queryParams := url.Values{}
queryParams.Add(usedOnlyQueryKey, "")
queryParams.Add(filterQueryKey, QuerySet[filterQueryKey].(string))
queryString := constructQueryString(queryParams)
requestUrl := getEnvoyStatsUrl(&agentConfig, queryString)
statsRequest, err := client.CreateRetryableAgentRequest(http.MethodGet, requestUrl, nil)
if err != nil {
log.Errorf("failed to create Retryable Http request, requestUrl: %s, error: %v", requestUrl, err)
return
}
snapshotter.HttpRequest = statsRequest
ticker := time.NewTicker(snapshotInterval)
// Loop forever
for {
select {
case <-ticker.C:
snapshotter.makeSnapshot()
}
}
}
// makeSnapshot capture snapshots once invoked, it will automatically compute delta once we have enough snapshots.
// The newly captured snapshot will be saved to snapshotter.
func (snapshotter *Snapshotter) makeSnapshot() {
statsBody, err := getStatsFromEnvoy(snapshotter.HttpClient, snapshotter.HttpRequest)
if err != nil {
log.Errorf("failed to get stats from Envoy, error: %v", err)
return
}
snapshot, err := processPrometheusStats(statsBody)
if err != nil {
log.Errorf("error processing Prometheus stats: %v", err)
return
}
if snapshot != nil {
// We will compute the delta and then save or overwrite the newly captured snapshot.
snapshotter.computeDelta(snapshot)
snapshotter.Snapshot = snapshot
}
}
// computeDelta will compute the delta between the given snapshots.
// If there is no previously captured snapshot, which means we just got the very first snapshot,
// we will use the snapshot as the delta value.
//
// The dependency library does not support protobuf v2 APIs. See their README: https://github.com/prometheus/client_model
// Possibly switching to OpenMetrics(https://openmetrics.io/) when it is ready in the future.
func (snapshotter *Snapshotter) computeDelta(newSnapshot map[string]*io_prometheus_client.MetricFamily) {
if newSnapshot == nil {
log.Errorf("the newSnapshot should exist to compute the snapshot")
return
}
if snapshotter.Snapshot == nil {
log.Debugf("Using the new snapshot as delta since there is no previously captured snapshot yet.")
snapshotter.Delta = newSnapshot
return
}
newDelta := make(map[string]*io_prometheus_client.MetricFamily)
for metricFamilyKey, newMetricFamily := range newSnapshot {
// It is possible that there are new metrics emitted in the new snapshot, in which case the old snapshot won't
// have the corresponding metrics. This would result in metricFamilyKey does not exist in the existing snapshot,
// in which case the oldMetricFamily passed in would simply be nil.
snapshotter.computeDeltaForMetricFamily(snapshotter.Snapshot[metricFamilyKey], newMetricFamily, newDelta)
}
// We only update Delta once the new delta is completely computed.
snapshotter.Delta = newDelta
}
func (snapshotter *Snapshotter) computeDeltaForMetricFamily(oldMetricFamily, newMetricFamily *io_prometheus_client.MetricFamily, delta map[string]*io_prometheus_client.MetricFamily) {
if oldMetricFamily == nil && newMetricFamily == nil {
log.Error("both metric families are empty, cannot compute delta")
return
}
// The assumption here is that, the metric family from new snapshot should always exist when computing the
// delta. This is because the metric families from old snapshot should be a subset of the metric families from
// new snapshot.
if newMetricFamily == nil {
log.Debugf("metricFamily from new snapshot must exist to compute the delta")
return
}
metricName := newMetricFamily.GetName()
metricType := newMetricFamily.GetType()
if _, ok := delta[metricName]; !ok {
// Create deltaEntry if it does not exist
delta[metricName] = &io_prometheus_client.MetricFamily{
Name: &metricName,
Type: &metricType,
Metric: make([]*io_prometheus_client.Metric, len(newMetricFamily.Metric)),
}
}
// Create a metric lookup
metricLookup := make(map[string]*io_prometheus_client.Metric)
if oldMetricFamily != nil {
for _, oldMetric := range oldMetricFamily.Metric {
metricKey := generateMetricKey(oldMetric.Label)
metricLookup[metricKey] = oldMetric
}
}
for metricIndex, newMetricFamilyMetric := range newMetricFamily.Metric {
deltaMetric := &io_prometheus_client.Metric{Label: newMetricFamilyMetric.GetLabel()}
metricKey := generateMetricKey(newMetricFamilyMetric.GetLabel())
oldMetric := metricLookup[metricKey]
switch metricType {
case io_prometheus_client.MetricType_COUNTER:
{
newValue := newMetricFamilyMetric.GetCounter().GetValue()
// We only compute delta when the same metric label was matched. Note that the metricKey consists of
// the labels of the metric.
if oldMetric != nil {
oldValue := oldMetric.GetCounter().GetValue()
deltaMetric.Counter = &io_prometheus_client.Counter{Value: proto.Float64(newValue - oldValue)}
} else {
deltaMetric.Counter = &io_prometheus_client.Counter{Value: proto.Float64(newValue), Exemplar: newMetricFamilyMetric.GetCounter().GetExemplar()}
}
}
case io_prometheus_client.MetricType_GAUGE:
{
// We don't need to compute the delta value of Gauge since itself is a metric that represents a single
// numerical value that can arbitrarily go up and down.
newValue := newMetricFamilyMetric.GetGauge().GetValue()
deltaMetric.Gauge = &io_prometheus_client.Gauge{Value: proto.Float64(newValue)}
}
case io_prometheus_client.MetricType_UNTYPED:
fallthrough
case io_prometheus_client.MetricType_SUMMARY:
// Do nothing, we are not expecting Untyped and Summary metric type.
log.Errorf("unsupported metric type for delta computation: %s", metricType)
case io_prometheus_client.MetricType_HISTOGRAM:
{
newMetricFamilyMetricHistogram := newMetricFamilyMetric.GetHistogram()
deltaHistogram := &io_prometheus_client.Histogram{}
if newMetricFamilyMetricHistogram.Bucket != nil {
newBucket := newMetricFamilyMetricHistogram.GetBucket()
deltaHistogram.Bucket = make([]*io_prometheus_client.Bucket, len(newBucket))
for bucketIndex, newBk := range newBucket {
deltaBucket := &io_prometheus_client.Bucket{UpperBound: newBk.UpperBound}
// Make sure the metric with same tags exists in the old metrics and the bucket size remains the
// same before we compute the delta. Otherwise, use the new metric as delta.
// Note that the bucket size should not change, this is just adding a safe check.
if oldMetric != nil && len(oldMetric.GetHistogram().Bucket) == len(newBucket) {
oldBk := oldMetric.GetHistogram().Bucket[bucketIndex]
deltaBucket.CumulativeCount = proto.Uint64(newBk.GetCumulativeCount() - oldBk.GetCumulativeCount())
} else {
deltaBucket.CumulativeCount = proto.Uint64(newBk.GetCumulativeCount())
}
deltaHistogram.Bucket[bucketIndex] = deltaBucket
}
}
if newMetricFamilyMetricHistogram.SampleCount != nil {
newSampleCount := newMetricFamilyMetricHistogram.GetSampleCount()
// We only compute delta when the same metric label was matched. Note that the metricKey consists of
// the labels of the metric.
if oldMetric != nil {
oldSampleCount := oldMetric.GetHistogram().GetSampleCount()
deltaHistogram.SampleCount = proto.Uint64(newSampleCount - oldSampleCount)
} else {
deltaHistogram.SampleCount = proto.Uint64(newSampleCount)
}
}
if newMetricFamilyMetricHistogram.SampleSum != nil {
newSampleSum := newMetricFamilyMetricHistogram.GetSampleSum()
if oldMetric != nil {
oldSampleSum := oldMetric.GetHistogram().GetSampleSum()
deltaHistogram.SampleSum = proto.Float64(newSampleSum - oldSampleSum)
} else {
deltaHistogram.SampleSum = proto.Float64(newSampleSum)
}
}
deltaMetric.Histogram = deltaHistogram
}
}
delta[metricName].Metric[metricIndex] = deltaMetric
}
}
func generateMetricKey(labels []*io_prometheus_client.LabelPair) string {
metricKey := ""
for _, label := range labels {
// Example format:
// LabelPair{Name: "Mesh", Value: "howto-k8s-http"} - will have a metric key
// "Mesh=howto-k8s-http;"
metricKey += fmt.Sprintf("%s=%s;", *label.Name, *label.Value)
}
return metricKey
}
// Util function for snapshotter to call Envoy Stats endpoint
func getStatsFromEnvoy(httpClient *retryablehttp.Client, request *retryablehttp.Request) ([]byte, error) {
// Send request to Envoy Stats endpoint
start := time.Now()
statsResponse, err := httpClient.Do(request)
duration := time.Since(start)
log.Debugf("Stats request took: %vms", duration.Milliseconds())
if err != nil {
return nil, fmt.Errorf("call to fetch stats from Envoy admin failed: %v", err)
}
defer statsResponse.Body.Close()
if statsResponse.StatusCode != http.StatusOK {
return nil, fmt.Errorf("envoy stats response status code not OK, error code: %v", statsResponse.StatusCode)
}
resBody, err := ioutil.ReadAll(statsResponse.Body)
if err != nil {
return resBody, fmt.Errorf("failed to read stats response retrieved from Envoy admin: %v", err)
}
return resBody, nil
}