sharedlibraries/cloudmonitoring/cloudmonitoring.go (202 lines of code) (raw):

/* Copyright 2022 Google LLC Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at https://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ // Package cloudmonitoring provides functionality to interact with the Cloud Monitoring API. package cloudmonitoring import ( "context" "fmt" "sort" "strings" "time" "github.com/cenkalti/backoff/v4" "github.com/googleapis/gax-go/v2" "github.com/GoogleCloudPlatform/workloadagentplatform/sharedlibraries/log" metricpb "google.golang.org/genproto/googleapis/api/metric" monitoredrespb "google.golang.org/genproto/googleapis/api/monitoredres" mpb "google.golang.org/genproto/googleapis/monitoring/v3" mrpb "google.golang.org/genproto/googleapis/monitoring/v3" ) // timeSeriesKey is a struct which holds the information which can uniquely identify each time series // and can be used as a Map key since every field is comparable. type timeSeriesKey struct { MetricType string MetricKind string MetricLabels string MonitoredResource string ResourceLabels string } // BackOffIntervals holds the initial intervals for the different back off mechanisms. type BackOffIntervals struct { LongExponential, ShortConstant time.Duration Retries uint64 } // Defaults for the different back off intervals. const ( DefaultLongExponentialBackOffInterval = 2 * time.Second DefaultShortConstantBackOffInterval = 5 * time.Second ) const maxTSPerRequest = 200 // Reference: https://cloud.google.com/monitoring/quotas // NewBackOffIntervals is a constructor for the back off intervals. func NewBackOffIntervals(longExponential, shortConstant time.Duration) *BackOffIntervals { return &BackOffIntervals{ LongExponential: longExponential, ShortConstant: shortConstant, Retries: 2, } } // NewDefaultBackOffIntervals is a default constructor, utilizing the default back off intervals. func NewDefaultBackOffIntervals() *BackOffIntervals { return &BackOffIntervals{ LongExponential: DefaultLongExponentialBackOffInterval, ShortConstant: DefaultShortConstantBackOffInterval, Retries: 2, } } // NoBackOff is a constructor for the back off intervals with no backoff. func NoBackOff() *BackOffIntervals { return &BackOffIntervals{ LongExponential: 0, ShortConstant: 0, Retries: 0, } } // TimeSeriesCreator provides an easily testable translation to the cloud monitoring API. type TimeSeriesCreator interface { CreateTimeSeries(ctx context.Context, req *mpb.CreateTimeSeriesRequest, opts ...gax.CallOption) error } // TimeSeriesQuerier provides an easily testable translation to the cloud monitoring API. type TimeSeriesQuerier interface { QueryTimeSeries(ctx context.Context, req *mpb.QueryTimeSeriesRequest, opts ...gax.CallOption) ([]*mrpb.TimeSeriesData, error) } // TimeSeriesDescriptorQuerier provides an easily testable translation to the cloud monitoring API. type TimeSeriesDescriptorQuerier interface { GetMetricDescriptor(ctx context.Context, req *mpb.GetMetricDescriptorRequest, opts ...gax.CallOption) (*metricpb.MetricDescriptor, error) GetMonitoredResourceDescriptor(ctx context.Context, req *mpb.GetMonitoredResourceDescriptorRequest, opts ...gax.CallOption) (*monitoredrespb.MonitoredResourceDescriptor, error) } // CreateTimeSeriesWithRetry decorates TimeSeriesCreator.CreateTimeSeries with a retry mechanism. func CreateTimeSeriesWithRetry(ctx context.Context, client TimeSeriesCreator, req *mpb.CreateTimeSeriesRequest, bo *BackOffIntervals) error { attempt := 1 if bo == nil { bo = NewDefaultBackOffIntervals() } err := backoff.Retry(func() error { if err := client.CreateTimeSeries(ctx, req); err != nil { if strings.Contains(err.Error(), "PermissionDenied") { log.CtxLogger(ctx).Warnw("Error in CreateTimeSeries, Permission denied - Enable the Monitoring Metrics Writer IAM role for the Service Account", "attempt", attempt, "error", err) } else { log.CtxLogger(ctx).Warnw("Error in CreateTimeSeries", "attempt", attempt, "error", err) } attempt++ return err } return nil }, ShortConstantBackOffPolicy(ctx, bo.ShortConstant, bo.Retries)) if err != nil { log.CtxLogger(ctx).Errorw("CreateTimeSeries retry limit exceeded", "request", req) return err } return nil } // QueryTimeSeriesWithRetry decorates TimeSeriesQuerier.QueryTimeSeries with a retry mechanism. func QueryTimeSeriesWithRetry(ctx context.Context, client TimeSeriesQuerier, req *mpb.QueryTimeSeriesRequest, bo *BackOffIntervals) ([]*mrpb.TimeSeriesData, error) { var ( attempt = 1 res []*mrpb.TimeSeriesData ) if bo == nil { bo = NewDefaultBackOffIntervals() } err := backoff.Retry(func() error { var err error res, err = client.QueryTimeSeries(ctx, req) if err != nil { if strings.Contains(err.Error(), "PermissionDenied") { log.CtxLogger(ctx).Warnw("Error in QueryTimeSeries, Permission denied - Enable the Monitoring Viewer IAM role for the Service Account", "attempt", attempt, "error", err) } else { log.CtxLogger(ctx).Warnw("Error in QueryTimeSeries", "attempt", attempt, "error", err) } attempt++ } return err }, LongExponentialBackOffPolicy(ctx, bo.LongExponential, 4, time.Minute, 15*time.Second)) if err != nil { log.CtxLogger(ctx).Errorw("QueryTimeSeries retry limit exceeded", "request", req, "error", err, "attempt", attempt) return nil, err } return res, nil } // LongExponentialBackOffPolicy returns a backoff policy with exponential backoff. func LongExponentialBackOffPolicy(ctx context.Context, initial time.Duration, retries uint64, maxElapsedTime time.Duration, maxInterval time.Duration) backoff.BackOffContext { exp := backoff.NewExponentialBackOff() exp.InitialInterval = initial exp.MaxInterval = maxInterval exp.MaxElapsedTime = maxElapsedTime log.CtxLogger(ctx).Debug("LongExponentialBackOffPolicy", "exp", exp) return backoff.WithContext(backoff.WithMaxRetries(exp, retries), ctx) } // LongExponentialBackOffPolicyForProcessMetrics returns a backoff policy with exponential backoff. func LongExponentialBackOffPolicyForProcessMetrics(ctx context.Context, initial time.Duration, retries uint64, maxElapsedTime time.Duration, maxInterval time.Duration) backoff.BackOffContext { exp := backoff.NewExponentialBackOff() exp.InitialInterval = initial exp.MaxInterval = maxInterval exp.MaxElapsedTime = maxElapsedTime exp.Multiplier = 2 log.CtxLogger(ctx).Debug("LongExponentialBackOffPolicyForProcessMetrics", "exp", exp) return backoff.WithContext(backoff.WithMaxRetries(exp, retries), ctx) } // ShortConstantBackOffPolicy returns a backoff policy with 15s MaxElapsedTime. func ShortConstantBackOffPolicy(ctx context.Context, initial time.Duration, retries uint64) backoff.BackOffContext { constantBackoff := backoff.NewConstantBackOff(initial) return backoff.WithContext(backoff.WithMaxRetries(constantBackoff, retries), ctx) } func flattenLabels(labels map[string]string) string { var metricLabels []string for k, v := range labels { metricLabels = append(metricLabels, k+"+"+v) } sort.Strings(metricLabels) return strings.Join(metricLabels, ",") } // prepareKey creates the key which can be used to group a time series // based on MetricType, MetricKind, MetricLabels, MonitoredResource and ResourceLabels. func prepareKey(t *mrpb.TimeSeries) timeSeriesKey { mtype := t.GetMetric().GetType() mkind := t.GetMetricKind().String() mresource := t.GetResource().GetType() tsk := timeSeriesKey{ MetricType: mtype, MetricKind: mkind, MonitoredResource: mresource, } tsk.MetricLabels = flattenLabels(t.GetMetric().GetLabels()) tsk.ResourceLabels = flattenLabels(t.GetResource().GetLabels()) return tsk } // SendTimeSeries sends all the time series objects to cloud monitoring. // maxTSPerRequest is used as an upper limit to batch send time series values per request. // If a cloud monitoring API call fails even after retries, the remaining measurements are discarded. func SendTimeSeries(ctx context.Context, timeSeries []*mrpb.TimeSeries, timeSeriesCreator TimeSeriesCreator, bo *BackOffIntervals, projectID string) (sent, batchCount int, err error) { var batchTimeSeries []*mrpb.TimeSeries for _, t := range timeSeries { batchTimeSeries = append(batchTimeSeries, t) if len(batchTimeSeries) == maxTSPerRequest { log.CtxLogger(ctx).Debug("Maximum batch size has been reached, sending the batch.") batchCount++ if err := sendBatch(ctx, batchTimeSeries, timeSeriesCreator, bo, projectID); err != nil { return sent, batchCount, err } sent += len(batchTimeSeries) batchTimeSeries = nil } } if len(batchTimeSeries) == 0 { return sent, batchCount, nil } batchCount++ if err := sendBatch(ctx, batchTimeSeries, timeSeriesCreator, bo, projectID); err != nil { return sent, batchCount, err } return sent + len(batchTimeSeries), batchCount, nil } // sendBatch sends one batch of metrics to cloud monitoring using an API call with retries. Returns an error in case of failures. func sendBatch(ctx context.Context, batchTimeSeries []*mrpb.TimeSeries, timeSeriesCreator TimeSeriesCreator, bo *BackOffIntervals, projectID string) error { log.CtxLogger(ctx).Debugw("Sending a batch of metrics to cloud monitoring.", "numberofmetrics", len(batchTimeSeries), "metrics", batchTimeSeries) req := &mpb.CreateTimeSeriesRequest{ Name: fmt.Sprintf("projects/%s", projectID), TimeSeries: pruneBatch(batchTimeSeries), } if bo != nil && bo.Retries > 0 { return CreateTimeSeriesWithRetry(ctx, timeSeriesCreator, req, bo) } return timeSeriesCreator.CreateTimeSeries(ctx, req) } func pruneBatch(batchTimeSeries []*mrpb.TimeSeries) []*mrpb.TimeSeries { ts := make(map[timeSeriesKey]bool) var finalBatch []*mrpb.TimeSeries for _, t := range batchTimeSeries { tsk := prepareKey(t) if _, ok := ts[tsk]; ok { log.Logger.Debug("Pruned a duplicate time series", "tsk:", tsk) continue } ts[tsk] = true finalBatch = append(finalBatch, t) } return finalBatch }