agent/stats/envoy_prometheus_stats.go (222 lines of code) (raw):
// Copyright Amazon.com Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"). You may
// not use this file except in compliance with the License. A copy of the
// License is located at
//
// http://aws.amazon.com/apache2.0/
//
// or in the "license" file accompanying this file. This file is distributed
// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
// express or implied. See the License for the specific language governing
// permissions and limitations under the License.
package stats
import (
"bytes"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"strings"
"time"
"github.com/aws/aws-app-mesh-agent/agent/client"
"github.com/aws/aws-app-mesh-agent/agent/config"
// reference: https://github.com/prometheus/common/blob/main/expfmt/text_parse.go#L25
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
log "github.com/sirupsen/logrus"
"golang.org/x/time/rate"
)
type EnvoyPrometheusStatsHandler struct {
AgentConfig config.AgentConfig
Limiter *rate.Limiter
QueryParameters url.Values
Snapshotter *Snapshotter
}
const (
usedOnlyQueryKey = "usedonly"
filterQueryKey = "filter"
deltaQueryKey = "delta"
extendedMetricsPrefix = "envoy_appmesh_"
snapshotInterval = 1 * time.Minute
EnvoyStatsClientHttpTimeout = 5 * time.Second
)
var (
filterQueryUsage = fmt.Sprintf("%s?%s=%s", config.AGENT_STATS_ENDPOINT_URL, filterQueryKey, QuerySet[filterQueryKey])
)
var (
// QuerySet records the set of the supported query parameters and their allowed value.
QuerySet = map[string]interface{}{
usedOnlyQueryKey: nil,
filterQueryKey: "metrics_extension",
deltaQueryKey: nil,
}
)
func (envoyPrometheusStatsHandler *EnvoyPrometheusStatsHandler) HandleStats(resWriter http.ResponseWriter, request *http.Request) {
if !envoyPrometheusStatsHandler.Limiter.Allow() {
http.Error(resWriter, http.StatusText(http.StatusTooManyRequests), http.StatusTooManyRequests)
return
}
// Validate the query parameters in the request before passing it to call Envoy stats endpoint.
// Extract queryStringToEnvoy from request and generate the stats url sent to Envoy stats endpoint.
//
// Note that we always prepend the ENVOY_PROMETHEUS_QUERY_STRING to enforce the prometheus format.
// See https://www.envoyproxy.io/docs/envoy/latest/operations/admin#get--stats?format=prometheus
if queryParams, err := validateGetStatsRequest(request); err != nil {
http.Error(resWriter, err.Error(), http.StatusBadRequest)
return
} else {
envoyPrometheusStatsHandler.QueryParameters = *queryParams
}
// If the Delta exists, we would just return Delta directly with no further operation needed.
if envoyPrometheusStatsHandler.QueryParameters.Has(deltaQueryKey) && envoyPrometheusStatsHandler.Snapshotter.Delta != nil {
resWriter.WriteHeader(http.StatusOK)
err := writeMetricsToResponse(resWriter, envoyPrometheusStatsHandler.Snapshotter.Delta)
if err != nil {
log.Errorf("error while writing response: %s", err)
}
return
}
// If Delta is not yet computed, most likely it is too early and we don't yet have two snapshots to compute the
// delta. In this case we will just return the stats as it is because that is essentially the delta
// (current stats - 0).
//
// Start building the request to Envoy Admin Interface for the stats.
queryStringToEnvoy := constructQueryString(envoyPrometheusStatsHandler.QueryParameters)
envoyStatsUrl := getEnvoyStatsUrl(&envoyPrometheusStatsHandler.AgentConfig, queryStringToEnvoy)
log.Debugf("Full URL to query Envoy Stats Endpoint: %s", envoyStatsUrl)
// Building the client for Envoy server
httpClient, err := client.CreateRetryableHttpClientForEnvoyServer(envoyPrometheusStatsHandler.AgentConfig)
httpClient.HTTPClient.Timeout = EnvoyStatsClientHttpTimeout
if err != nil {
http.Error(resWriter, err.Error(), http.StatusInternalServerError)
return
}
statsRequest, err := client.CreateRetryableAgentRequest(http.MethodGet, envoyStatsUrl, nil)
if err != nil {
http.Error(resWriter, err.Error(), http.StatusInternalServerError)
return
}
// Start a timer to record the response time observed from client side.
start := time.Now()
// Make the request to Envoy stats endpoint
statsResponse, err := httpClient.Do(statsRequest)
duration := time.Since(start)
log.Debugf("Stats request took: %vms", duration.Milliseconds())
if err != nil {
log.Errorf("Call to fetch stats from Envoy admin failed: %s", err)
http.Error(resWriter, "Failed to fetch stats from Envoy", http.StatusInternalServerError)
return
}
if statsResponse.StatusCode != http.StatusOK {
log.Errorf("Envoy stats response status code not OK: %v", statsResponse.Status)
http.Error(resWriter, "Failed to fetch stats from Envoy", http.StatusInternalServerError)
return
}
defer statsResponse.Body.Close()
responseBody, err := ioutil.ReadAll(statsResponse.Body)
if err != nil {
log.Errorf("Failed to read stats response retreived from Envoy admin: %v", err)
http.Error(resWriter, "Failed to fetch stats from Envoy", http.StatusInternalServerError)
return
}
// Directly write the response if there is no filter query
if !envoyPrometheusStatsHandler.QueryParameters.Has(filterQueryKey) {
resWriter.WriteHeader(http.StatusOK)
_, err := resWriter.Write(responseBody)
if err != nil {
log.Errorf("error while writing response: %s", err)
}
return
}
// Filter the stats
filteredMetricFamilies, err := processPrometheusStats(responseBody)
if err != nil {
http.Error(resWriter, fmt.Sprintf("error processing prometheus stats: %s", err), http.StatusInternalServerError)
return
}
resWriter.WriteHeader(http.StatusOK)
err = writeMetricsToResponse(resWriter, filteredMetricFamilies)
if err != nil {
log.Errorf("error while writing response: %s", err)
}
}
// processPrometheusStats parses the stats into a map of prometheus MetricFamily, filter the stats to only include
// metrics produced by metrics extension (metrics with prefix "envoy_appmesh_").
// Renaming will also happen - we will trim the prefix "envoy_appmesh_".
func processPrometheusStats(statsResponse []byte) (map[string]*dto.MetricFamily, error) {
metricFamilies, err := parsePrometheusStats(statsResponse)
if err != nil {
return nil, fmt.Errorf("error parsing prometheus stats: %s", err)
}
filteredMetricFamilies := make(map[string]*dto.MetricFamily)
for name, metricFamily := range metricFamilies {
if strings.HasPrefix(name, extendedMetricsPrefix) {
newStatName := strings.TrimPrefix(name, extendedMetricsPrefix)
metricFamily.Name = &newStatName
filteredMetricFamilies[newStatName] = metricFamily
}
}
return filteredMetricFamilies, nil
}
func parsePrometheusStats(statsInput []byte) (map[string]*dto.MetricFamily, error) {
reader := bytes.NewReader(statsInput)
var parser expfmt.TextParser
metricFamilies, err := parser.TextToMetricFamilies(reader)
if err != nil {
return nil, err
}
return metricFamilies, nil
}
func writeMetricsToResponse(writer http.ResponseWriter, metricFamilies map[string]*dto.MetricFamily) error {
metricFamiliesSlice := make([]*dto.MetricFamily, 0, len(metricFamilies))
for _, metricFamily := range metricFamilies {
metricFamiliesSlice = append(metricFamiliesSlice, metricFamily)
}
// We make use of the Gatherers Gather() function to sort the metrics.
// See https://github.com/prometheus/client_golang/blob/5d584e2717ef525673736d72cd1d12e304f243d7/prometheus/registry.go#L773
// and https://github.com/prometheus/client_golang/blob/5d584e2717ef525673736d72cd1d12e304f243d7/prometheus/internal/metric.go#L85
// for more details to understand how it normalize the metrics.
gatherers := prometheus.Gatherers{
prometheus.GathererFunc(func() ([]*dto.MetricFamily, error) {
return metricFamiliesSlice, nil
}),
}
metricFamiliesSlice, err := gatherers.Gather()
if err != nil {
errorMsg := fmt.Sprintf("Error when sorting the metrics, stop writing metrics to response: %v", err)
log.Error(errorMsg)
return fmt.Errorf(errorMsg)
}
for _, metricFamily := range metricFamiliesSlice {
// Add newline for each metric family
io.WriteString(writer, "\n")
_, err = expfmt.MetricFamilyToText(writer, metricFamily)
if err != nil {
return fmt.Errorf("failed to write metric family to response, metric family: %s, error: %v", metricFamily, err)
}
}
return nil
}
// validateGetStatsRequest validates the given http.Request and extract the queryParameters if success.
func validateGetStatsRequest(request *http.Request) (*url.Values, error) {
// Verify that the request is a GET request
if request.Method != http.MethodGet {
errorMsg := fmt.Sprintf("invalid method [%s] in request", request.Method)
log.Debug(errorMsg)
return nil, fmt.Errorf(errorMsg)
}
// Verify that no message body is present
if request.ContentLength > 0 {
errorMsg := fmt.Sprintf("unexpected content in request. Body size [%d]", request.ContentLength)
log.Debug(errorMsg)
return nil, fmt.Errorf(errorMsg)
}
// Verify the parameters supplied
queryParameters, err := url.ParseQuery(request.URL.RawQuery)
if err != nil {
errorMsg := fmt.Sprintf("unable to parse queries in URL: %s", err)
log.Debug(errorMsg)
return nil, fmt.Errorf(errorMsg)
}
for queryParam, _ := range queryParameters {
if allowedVal, ok := QuerySet[queryParam]; !ok {
errorMsg := fmt.Sprintf("unsupported query parameter: %s", queryParam)
log.Debug(errorMsg)
return nil, fmt.Errorf(errorMsg)
} else {
queryVal := queryParameters.Get(queryParam)
if allowedVal != nil && allowedVal.(string) != queryVal {
errorMsg := fmt.Sprintf("unsupported query value for parameter %s: %s, usage: %s", queryParam, queryVal, filterQueryUsage)
log.Debug(errorMsg)
return nil, fmt.Errorf(errorMsg)
}
}
}
// validate for delta
if queryParameters.Has(deltaQueryKey) {
if !queryParameters.Has(usedOnlyQueryKey) || !queryParameters.Has(filterQueryKey) {
errorMsg := fmt.Sprintf("when delta is enabled, both %s and %s query parameters are required", usedOnlyQueryKey, filterQueryKey)
log.Debug(errorMsg)
return nil, fmt.Errorf(errorMsg)
}
}
return &queryParameters, nil
}
// constructQueryString constructs the query string for Envoy Admin Stats API.
func constructQueryString(queryParameters url.Values) string {
queryStringToEnvoy := config.ENVOY_PROMETHEUS_QUERY_STRING
if queryParameters.Has(filterQueryKey) {
queryStringToEnvoy += fmt.Sprintf("&%s", config.APPMESH_FILTER_STRING)
}
if queryParameters.Has(usedOnlyQueryKey) {
queryStringToEnvoy += fmt.Sprintf("&%s", usedOnlyQueryKey)
}
return queryStringToEnvoy
}
// getEnvoyStatsUrl generates the full http url to call Envoy Admin Stats API with the given queryString.
func getEnvoyStatsUrl(agentConfig *config.AgentConfig, queryString string) string {
return fmt.Sprintf("%s://%s:%d%s%s",
agentConfig.EnvoyServerScheme,
agentConfig.EnvoyServerHostName,
agentConfig.EnvoyServerAdminPort,
agentConfig.EnvoyServerStatsUrl,
queryString)
}