verify-evaluate-cloud-metrics/main.go (187 lines of code) (raw):
// Copyright 2023 Google LLC
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// https://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package main contains the logic for using Cloud Monitoring to determine whether requests have been receiving 5xx errors.
package main
import (
"context"
"flag"
"fmt"
"os"
"strings"
"time"
monitoring "cloud.google.com/go/monitoring/apiv3/v2"
"cloud.google.com/go/monitoring/apiv3/v2/monitoringpb"
"google.golang.org/api/iterator"
)
var (
// Variable to hold the flag's values.
project string
tableName string
metricType string
predicates string
responseCodeClass string
maxErrorPercentage float64
triggerDuration time.Duration
timeToMonitor time.Duration
slidingWindow time.Duration
refreshPeriod time.Duration
// Custom Query. If this is specified, then the query will not be crafted by the program.
customQuery string
)
func getQueryText(timeOfStart time.Time) string {
if len(customQuery) != 0 {
return customQuery
}
var sb strings.Builder
// Fetch from the table name and the metric type specified by arguments.
sb.WriteString(fmt.Sprintf("fetch %s::%s", tableName, metricType))
// Include the predicates to filter on.
parts := strings.Split(predicates, ",")
if len(parts) > 0 {
holder := ""
for i, p := range parts {
holder += p
if i != len(parts)-1 {
holder += " && "
}
}
sb.WriteString(" | ")
sb.WriteString(fmt.Sprintf("(%s)", holder))
}
// Specify the start time.
sb.WriteString(" | ")
dateTime := strings.ReplaceAll(timeOfStart.UTC().Format(time.DateTime), "-", "/")
sb.WriteString(fmt.Sprintf("within d'%s'", dateTime))
// Group by the specified sliding window
sb.WriteString(" | ")
sb.WriteString(fmt.Sprintf("group_by sliding(%v)", slidingWindow))
// Filter the error ratio.
sb.WriteString(" | ")
sb.WriteString(fmt.Sprintf("filter_ratio response_code_class == '%s'", responseCodeClass))
return sb.String()
}
func formatMsg(in string) string {
if len(customQuery) > 0 {
return fmt.Sprintf("(ignore due to custom query) %s", in)
}
return in
}
// replaceEnvVars replaces env var refs in the string with their value (if set). Env var refs are made
// with the format $envVarName
func replaceEnvVars(input string) string {
for _, e := range os.Environ() {
pair := strings.SplitN(e, "=", 2)
input = strings.ReplaceAll(input, "$"+pair[0], pair[1])
}
return input
}
func init() {
// Initializing of the flag and print out the values for visibility.
flag.StringVar(&project, "project", os.Getenv("CLOUD_DEPLOY_PROJECT"), "The ID of the project that has the metrics defined, defaulted to the CLOUD_DEPLOY_PROJECT environmental variable")
flag.StringVar(&tableName, "table-name", "", "The [tablename](https://cloud.google.com/monitoring/mql/reference#fetch-tabop) to fetch from")
flag.StringVar(&metricType, "metric-type", "", "The [metric type](https://cloud.google.com/monitoring/mql/reference#metric-tabop) to get from the table-name")
flag.StringVar(&predicates, "predicates", "", "Commma delimited list of [predicates](https://cloud.google.com/monitoring/mql/reference#filter-tabop)")
flag.StringVar(&responseCodeClass, "response-code-class", "5xx", "The response_code_class that is being monitored for the error condition")
flag.Float64Var(&maxErrorPercentage, "max-error-percentage", 10, "The maximum allowable percentage of the specified response_code_class per sliding window")
flag.DurationVar(&slidingWindow, "sliding-window", time.Minute, "The duration of the sliding window")
flag.DurationVar(&triggerDuration, "trigger-duration", 5*time.Minute, "The time required to observe the error condition for verify to fail")
flag.DurationVar(&timeToMonitor, "time-to-monitor", 20*time.Minute, "The time to monitor for response failures before the verification is marked successful")
flag.DurationVar(&refreshPeriod, "refresh-period", 5*time.Minute, "The time to wait before refreshing the data set with new data")
flag.StringVar(&customQuery, "custom-query", "", "Customized query following [MQL](https://cloud.google.com/monitoring/mql/reference) to use for query instead. By specifying this, the query will not be crafted by the program")
flag.Parse()
project = replaceEnvVars(project)
tableName = replaceEnvVars(tableName)
metricType = replaceEnvVars(metricType)
predicates = replaceEnvVars(predicates)
responseCodeClass = replaceEnvVars(responseCodeClass)
fmt.Println("---")
fmt.Println("Verification configured as follows:")
fmt.Printf("Project: %q\n", project)
fmt.Println(formatMsg(fmt.Sprintf("Table Name: %q", tableName)))
fmt.Println(formatMsg(fmt.Sprintf("Metric Type: %q", metricType)))
fmt.Println(formatMsg(fmt.Sprintf("Predicates: %q", predicates)))
fmt.Println(formatMsg(fmt.Sprintf("Response Code Class: %q", responseCodeClass)))
fmt.Printf("Max Error Percentage: %v\n", maxErrorPercentage)
fmt.Println(formatMsg(fmt.Sprintf("Sliding Window: %v", slidingWindow)))
fmt.Printf("Trigger Duration: %v\n", triggerDuration)
fmt.Printf("Time To Monitor: %v\n", timeToMonitor)
fmt.Printf("Refresh Period: %v\n", refreshPeriod)
fmt.Println("---")
}
func main() {
if err := do(); err != nil {
fmt.Printf("err: %v", err)
os.Exit(1)
}
fmt.Println("Done")
}
func do() error {
ctx := context.Background()
client, err := monitoring.NewQueryClient(ctx)
if err != nil {
return fmt.Errorf("unable to create NewQueryClient: %w", err)
}
defer client.Close()
timeToStart := time.Now()
timeToEnd := timeToStart.Add(timeToMonitor)
queryToUse := getQueryText(timeToStart)
fmt.Printf("The query is %q\n", queryToUse)
refreshCount := 1
for time.Now().Before(timeToEnd) {
triggered, err := errorConditionTriggered(ctx, client, refreshCount, queryToUse)
if err != nil {
return fmt.Errorf("failed to determine whether error condition triggered: %w", err)
}
if triggered {
return fmt.Errorf("verify failed, error condition triggered for more than duration")
}
time.Sleep(refreshPeriod)
refreshCount++
}
return nil
}
// Validates that the error condition was not exceeded for trigger_duration on the sliding window.
func errorConditionTriggered(ctx context.Context, client *monitoring.QueryClient, refreshCount int, query string) (bool, error) {
req := &monitoringpb.QueryTimeSeriesRequest{
Name: fmt.Sprintf("projects/%s", project),
Query: query,
}
it := client.QueryTimeSeries(ctx, req)
fmt.Printf("querying the time series, refresh count: %d\n", refreshCount)
for {
resp, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
return false, fmt.Errorf("could not read time series value: %w", err)
}
// The sliding window calculation are based on the points of a singular time series.
startTimeOfErrorCondition := time.Time{}
endTimeOfErrorCondition := time.Time{}
var dataPoints []*monitoringpb.TimeSeriesData_PointData
for _, p := range resp.GetPointData() {
errorRatio := p.GetValues()[0].GetDoubleValue() * 100
fmt.Printf("error ratio: %f\n", errorRatio)
fmt.Printf("Start time: %v\n", p.GetTimeInterval().StartTime.AsTime())
fmt.Printf("End time: %v\n", p.GetTimeInterval().EndTime.AsTime())
if calculateDuration(startTimeOfErrorCondition, endTimeOfErrorCondition) >= triggerDuration {
// We check to see if the sliding windows that we have set from previous iterations exceed the trigger duration.
// If it has, then we stop reading point data.
break
}
// Time series list data points from newest data to oldest data.
if len(p.GetValues()) != 1 {
// Assuming that the point data is a ratio.
return false, fmt.Errorf("expected 1 rate value for the total interval, instead got: %d", len(p.GetValues()))
}
if errorRatio := p.GetValues()[0].GetDoubleValue() * 100; errorRatio >= maxErrorPercentage {
if endTimeOfErrorCondition.IsZero() {
// initialization
endTimeOfErrorCondition = p.GetTimeInterval().EndTime.AsTime()
}
// Always replace the start as we iterate; it gets earlier and earlier.
dataPoints = append([]*monitoringpb.TimeSeriesData_PointData{p}, dataPoints...)
startTimeOfErrorCondition = p.GetTimeInterval().StartTime.AsTime()
} else {
// We found a sliding window which does not violate percentage.
startTimeOfErrorCondition = time.Time{}
endTimeOfErrorCondition = time.Time{}
dataPoints = nil // reset the points
}
}
// We check to see if the sliding windows that we have set from previous iterations exceed the trigger duration.
if errorDuration := calculateDuration(startTimeOfErrorCondition, endTimeOfErrorCondition); errorDuration >= triggerDuration {
fmt.Printf("found duration in which max error percentage %f exceeded trigger duration, duration condition triggered for: %v\n", maxErrorPercentage, errorDuration)
fmt.Printf("data: %v\n", dataPoints)
return true, nil
}
}
return false, nil
}
func calculateDuration(start time.Time, end time.Time) time.Duration {
if start.IsZero() {
return 0
}
if end.IsZero() {
return 0
}
return end.Sub(start)
}