internal/onetime/reliability/reliability.go (254 lines of code) (raw):
/*
Copyright 2023 Google LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package reliability implements OTE mode for reading, parsing, and sending
// Cloud Monitoring metrics reliability data to a GCS bucket.
package reliability
import (
"context"
"encoding/csv"
"fmt"
"io"
"os"
"path/filepath"
"strconv"
"time"
"flag"
"cloud.google.com/go/monitoring/apiv3/v2"
"google.golang.org/api/option"
"github.com/google/subcommands"
"github.com/GoogleCloudPlatform/sapagent/internal/configuration"
"github.com/GoogleCloudPlatform/sapagent/internal/hostmetrics/cloudmetricreader"
"github.com/GoogleCloudPlatform/sapagent/internal/onetime"
"github.com/GoogleCloudPlatform/sapagent/internal/usagemetrics"
"github.com/GoogleCloudPlatform/workloadagentplatform/sharedlibraries/cloudmonitoring"
"github.com/GoogleCloudPlatform/workloadagentplatform/sharedlibraries/log"
"github.com/GoogleCloudPlatform/workloadagentplatform/sharedlibraries/storage"
mpb "google.golang.org/genproto/googleapis/monitoring/v3"
mrpb "google.golang.org/genproto/googleapis/monitoring/v3"
s "cloud.google.com/go/storage"
ipb "github.com/GoogleCloudPlatform/sapagent/protos/instanceinfo"
)
const (
userAgent = "Reliability for GCS"
)
var (
// Force the numeric project id to be returned rather than the display name.
// See the following for more information: https://g3doc.corp.google.com/monitoring/monarch/mql/g3doc/user/how_to.md?cl=head#the-project-id-column.
defaultQueries = []queryInfo{
{
query: "fetch gce_instance | metric 'workload.googleapis.com/sap/hana/ha/availability' | map rename [resource.project_number: resource.project_id] | group_by drop [resource.zone, metric.instance_nr, metric.instance_name] | every 1s | within 2h",
identifier: "hana_ha_availability",
headers: []string{"project_id", "instance_id", "sid", "value", "start_time", "end_time"},
wantLabels: []string{"project_id", "instance_id", "sid"},
},
{
query: "fetch gce_instance | metric 'workload.googleapis.com/sap/hana/availability' | map rename [resource.project_number: resource.project_id] | group_by drop [resource.zone, metric.instance_nr, metric.instance_name] | every 1s | within 2h",
identifier: "hana_availability",
headers: []string{"project_id", "instance_id", "sid", "value", "start_time", "end_time"},
wantLabels: []string{"project_id", "instance_id", "sid"},
},
}
)
// queryInfo stores information for queries and their results.
type queryInfo struct {
query, identifier string
headers, wantLabels []string
}
// Reliability has args for reliability subcommands.
type Reliability struct {
projectID, outputFolder string
bucketName, serviceAccount string
help bool
logLevel, logPath string
queries []queryInfo
cmr *cloudmetricreader.CloudMetricReader
bucket *s.BucketHandle
cloudProps *ipb.CloudProperties
oteLogger *onetime.OTELogger
}
// Name implements the subcommand interface for reliability.
func (*Reliability) Name() string { return "reliability" }
// Synopsis implements the subcommand interface for reliability.
func (*Reliability) Synopsis() string { return "read reliability data from Cloud Monitoring" }
// Usage implements the subcommand interface for reliability.
func (*Reliability) Usage() string {
return `Usage: reliability [-project=<project-id>] [-bucket=<bucket-name>]
[-o=output-folder] [-service-account=<service-account>]
[-h] [-loglevel=<debug|info|warn|error>] [-log-path=<log-path>]` + "\n"
}
// SetFlags implements the subcommand interface for reliability.
func (r *Reliability) SetFlags(fs *flag.FlagSet) {
fs.StringVar(&r.projectID, "project", "", "Project ID, defaults to the value from the metadata server")
fs.StringVar(&r.bucketName, "bucket", "", "GCS bucket name to send packaged results to")
fs.StringVar(&r.outputFolder, "o", "/tmp/google-cloud-sap-agent", "Output folder")
fs.StringVar(&r.serviceAccount, "service-account", "", "Service account to authenticate with")
fs.StringVar(&r.logPath, "log-path", "", "The log path to write the log file (optional), default value is /var/log/google-cloud-sap-agent/reliability.log")
fs.BoolVar(&r.help, "h", false, "Displays help")
fs.StringVar(&r.logLevel, "loglevel", "info", "Sets the logging level")
}
// Execute implements the subcommand interface for reliability.
func (r *Reliability) Execute(ctx context.Context, f *flag.FlagSet, args ...any) subcommands.ExitStatus {
_, cloudProps, exitStatus, completed := onetime.Init(ctx, onetime.InitOptions{
Name: r.Name(),
Help: r.help,
LogLevel: r.logLevel,
LogPath: r.logPath,
Fs: f,
}, args...)
if !completed {
return exitStatus
}
return r.Run(ctx, onetime.CreateRunOptions(cloudProps, false))
}
// Run performs the functionality specified by the reliability subcommand.
func (r *Reliability) Run(ctx context.Context, runOpts *onetime.RunOptions) subcommands.ExitStatus {
r.oteLogger = onetime.CreateOTELogger(runOpts.DaemonMode)
r.cloudProps = runOpts.CloudProperties
log.CtxLogger(ctx).Info("Reliability starting")
r.queries = defaultQueries
if r.projectID == "" {
r.projectID = r.cloudProps.GetProjectId()
log.CtxLogger(ctx).Warnf("Project ID defaulted to: %s", r.projectID)
}
if r.bucketName != "" {
connectParams := &storage.ConnectParameters{
StorageClient: s.NewClient,
ServiceAccount: r.serviceAccount,
BucketName: r.bucketName,
UserAgentSuffix: userAgent,
UserAgent: configuration.StorageAgentName(),
}
var ok bool
if r.bucket, ok = storage.ConnectToBucket(ctx, connectParams); !ok {
log.CtxLogger(ctx).Errorw("Failed to connect to bucket", "bucketName", r.bucketName)
return subcommands.ExitFailure
}
}
var opts []option.ClientOption
if r.serviceAccount != "" {
opts = append(opts, option.WithCredentialsFile(r.serviceAccount))
}
mqc, err := monitoring.NewQueryClient(ctx, opts...)
if err != nil {
log.CtxLogger(ctx).Errorw("Failed to create Cloud Monitoring query client", "error", err)
r.oteLogger.LogUsageError(usagemetrics.QueryClientCreateFailure)
return subcommands.ExitFailure
}
r.cmr = &cloudmetricreader.CloudMetricReader{
QueryClient: &cloudmetricreader.QueryClient{Client: mqc},
BackOffs: cloudmonitoring.NewDefaultBackOffIntervals(),
}
return r.reliabilityHandler(ctx, io.Copy)
}
// reliabilityHandler executes all queries, parses the time series,
// writes CSV results and uploads them to a GCS bucket.
func (r *Reliability) reliabilityHandler(ctx context.Context, copier storage.IOFileCopier) subcommands.ExitStatus {
r.oteLogger.LogUsageAction(usagemetrics.ReliabilityStarted)
if err := os.MkdirAll(r.outputFolder, os.ModePerm); err != nil {
log.CtxLogger(ctx).Errorw("Failed to create output folder", "outputFolder", r.outputFolder, "err", err)
return subcommands.ExitFailure
}
for _, q := range r.queries {
data, err := r.executeQuery(ctx, q.identifier, q.query)
if err != nil {
log.CtxLogger(ctx).Errorw("Failed to execute query", "query", q.query, "err", err)
r.oteLogger.LogUsageError(usagemetrics.ReliabilityQueryFailure)
return subcommands.ExitFailure
}
results, err := r.parseData(ctx, q.headers, q.wantLabels, data)
if err != nil {
log.CtxLogger(ctx).Errorw("Failed to parse data", "query", q.query, "err", err)
r.oteLogger.LogUsageError(usagemetrics.ReliabilityQueryFailure)
return subcommands.ExitFailure
}
fileName, err := r.writeResults(results, q.identifier)
if err != nil {
log.CtxLogger(ctx).Errorw("Failed to write results", "identifier", q.identifier, "query", q.query, "err", err)
r.oteLogger.LogUsageError(usagemetrics.ReliabilityWriteFileFailure)
return subcommands.ExitFailure
}
if r.bucket != nil {
if err := r.uploadFile(ctx, fileName, copier); err != nil {
log.CtxLogger(ctx).Errorw("Failed to upload file", "fileName", fileName, "err", err)
r.oteLogger.LogUsageError(usagemetrics.ReliabilityBucketUploadFailure)
return subcommands.ExitFailure
}
}
}
log.CtxLogger(ctx).Info("Reliability finished")
r.oteLogger.LogUsageAction(usagemetrics.ReliabilityFinished)
return subcommands.ExitSuccess
}
// executeQuery queries Cloud Monitoring and returns the results.
func (r *Reliability) executeQuery(ctx context.Context, identifier, query string) ([]*mrpb.TimeSeriesData, error) {
req := &mpb.QueryTimeSeriesRequest{
Name: fmt.Sprintf("projects/%s", r.projectID),
Query: query,
}
log.CtxLogger(ctx).Infow("Executing query", "identifier", identifier, "query", query)
data, err := cloudmonitoring.QueryTimeSeriesWithRetry(ctx, r.cmr.QueryClient, req, r.cmr.BackOffs)
if err != nil {
return nil, err
}
log.CtxLogger(ctx).Infow("Query succeeded", "identifier", identifier)
log.CtxLogger(ctx).Debugw("Query response", "identifier", identifier, "response", data)
return data, nil
}
// parseData formats the raw time series data as CSV and
// buckets start and end times based on changing values.
func (r *Reliability) parseData(ctx context.Context, headers []string, wantLabels []string, data []*mrpb.TimeSeriesData) ([][]string, error) {
results := [][]string{}
results = append(results, headers)
for _, instance := range data {
labels := instance.GetLabelValues()
if len(labels) != len(wantLabels) {
return nil, fmt.Errorf("query results contained incorrect labels, want %s, got %s", wantLabels, labels)
}
stringLabels := []string{}
for _, label := range labels {
stringLabels = append(stringLabels, label.GetStringValue())
}
lastValue := instance.GetPointData()[0].GetValues()[0].GetInt64Value()
endTime := instance.GetPointData()[0].GetTimeInterval().GetEndTime()
startTime := endTime
for _, point := range instance.GetPointData() {
value := point.GetValues()[0].GetInt64Value()
if value != lastValue {
results = append(results, append(stringLabels, []string{strconv.FormatInt(lastValue, 10), strconv.FormatInt(startTime.GetSeconds(), 10), strconv.FormatInt(endTime.GetSeconds(), 10)}...))
endTime = point.GetTimeInterval().GetEndTime()
}
startTime = point.GetTimeInterval().GetStartTime()
lastValue = value
}
results = append(results, append(stringLabels, []string{strconv.FormatInt(lastValue, 10), strconv.FormatInt(startTime.GetSeconds(), 10), strconv.FormatInt(endTime.GetSeconds(), 10)}...))
}
return results, nil
}
// writeResults writes the string results to the output in CSV format.
func (r *Reliability) writeResults(results [][]string, identifier string) (string, error) {
// Use ISO 8601 standard for printing the current time.
outFile := fmt.Sprintf("%s/%s_%s.csv", r.outputFolder, identifier, time.Now().Format("20060102T150405Z0700"))
log.Logger.Infow("Writing results", "outFile", outFile)
f, err := os.Create(outFile)
if err != nil {
return "", err
}
defer f.Close()
if err := csv.NewWriter(f).WriteAll(results); err != nil {
return "", err
}
log.Logger.Infow("Results written", "outFile", outFile)
return outFile, nil
}
// uploadFile uploads the file to the GCS bucket.
func (r *Reliability) uploadFile(ctx context.Context, fileName string, copier storage.IOFileCopier) error {
if r.bucket == nil {
return fmt.Errorf("bucket is nil")
}
log.CtxLogger(ctx).Infow("Uploading file", "bucket", r.bucketName, "fileName", fileName)
f, err := os.Open(fileName)
if err != nil {
return err
}
defer f.Close()
fileInfo, err := f.Stat()
if err != nil {
return err
}
rw := storage.ReadWriter{
Reader: f,
Copier: copier,
BucketHandle: r.bucket,
BucketName: r.bucketName,
ChunkSizeMb: 100,
ObjectName: r.cloudProps.GetNumericProjectId() + "/" + filepath.Base(fileName),
TotalBytes: fileInfo.Size(),
MaxRetries: 5,
LogDelay: 30 * time.Second,
VerifyUpload: false,
}
bytesWritten, err := rw.Upload(ctx)
if err != nil {
return err
}
log.CtxLogger(ctx).Infow("File uploaded", "bucket", r.bucketName, "fileName", fileName, "objectName", rw.ObjectName, "bytesWritten", bytesWritten, "fileSize", fileInfo.Size())
return nil
}