internal/onetime/readmetrics/readmetrics.go (274 lines of code) (raw):

/* Copyright 2023 Google LLC Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at https://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ // Package readmetrics implements OTE mode for reading Cloud Monitoring metrics. package readmetrics import ( "context" "encoding/json" "fmt" "io" "os" "path/filepath" "time" "flag" "cloud.google.com/go/monitoring/apiv3/v2" "google.golang.org/api/option" "google.golang.org/protobuf/encoding/protojson" "github.com/google/subcommands" "github.com/GoogleCloudPlatform/sapagent/internal/configuration" "github.com/GoogleCloudPlatform/sapagent/internal/hostmetrics/cloudmetricreader" "github.com/GoogleCloudPlatform/sapagent/internal/onetime" "github.com/GoogleCloudPlatform/sapagent/internal/usagemetrics" "github.com/GoogleCloudPlatform/sapagent/internal/utils/protostruct" "github.com/GoogleCloudPlatform/workloadagentplatform/sharedlibraries/cloudmonitoring" "github.com/GoogleCloudPlatform/workloadagentplatform/sharedlibraries/log" "github.com/GoogleCloudPlatform/workloadagentplatform/sharedlibraries/storage" "github.com/GoogleCloudPlatform/workloadagentplatform/sharedlibraries/timeseries" mpb "google.golang.org/genproto/googleapis/monitoring/v3" mrpb "google.golang.org/genproto/googleapis/monitoring/v3" tspb "google.golang.org/protobuf/types/known/timestamppb" s "cloud.google.com/go/storage" ipb "github.com/GoogleCloudPlatform/sapagent/protos/instanceinfo" ) const ( userAgent = "ReadMetrics for GCS" defaultHanaAvailability = "fetch gce_instance | metric 'workload.googleapis.com/sap/hana/availability' | group_by 1m, [value_availability_mean: mean(value.availability)] | every 1m | group_by [metric.sid, resource.instance_id], [value_availability_mean_mean: mean(value_availability_mean)] | within 24h" defaultHanaHAAvailability = "fetch gce_instance | metric 'workload.googleapis.com/sap/hana/ha/availability' | group_by 1m, [value_availability_mean: mean(value.availability)] | every 1m | group_by [metric.sid, resource.instance_id], [value_availability_mean_mean: mean(value_availability_mean)] | within 24h" ) // ReadMetrics has args for readmetrics subcommands. type ReadMetrics struct { projectID string inputFile, outputFolder string bucketName, serviceAccount string sendToMonitoring bool help bool logLevel, logPath string queries map[string]string cmr *cloudmetricreader.CloudMetricReader bucket *s.BucketHandle status bool timeSeriesCreator cloudmonitoring.TimeSeriesCreator cloudProps *ipb.CloudProperties oteLogger *onetime.OTELogger } // Name implements the subcommand interface for readmetrics. func (*ReadMetrics) Name() string { return "readmetrics" } // Synopsis implements the subcommand interface for readmetrics. func (*ReadMetrics) Synopsis() string { return "read metrics from Cloud Monitoring" } // Usage implements the subcommand interface for readmetrics. func (*ReadMetrics) Usage() string { return `Usage: readmetrics -project=<project-id> [-i=<input-file>] [-o=output-folder] [-bucket=<bucket-name>] [-service-account=<service-account>] [-h] [-loglevel=<debug|info|warn|error>] [-log-path=<log-path>]` + "\n" } // SetFlags implements the subcommand interface for readmetrics. func (r *ReadMetrics) SetFlags(fs *flag.FlagSet) { fs.StringVar(&r.projectID, "project", "", "Project ID, defaults to the value from the metadata server") fs.StringVar(&r.inputFile, "i", "", "Input file") fs.StringVar(&r.outputFolder, "o", "/tmp/google-cloud-sap-agent", "Output folder") fs.StringVar(&r.bucketName, "bucket", "", "GCS bucket name to send packaged results to") fs.StringVar(&r.serviceAccount, "service-account", "", "Service account to authenticate with") fs.BoolVar(&r.sendToMonitoring, "send-status-to-monitoring", true, "Send the execution status to cloud monitoring as a metric") fs.StringVar(&r.logPath, "log-path", "", "The log path to write the log file (optional), default value is /var/log/google-cloud-sap-agent/readmetrics.log") fs.BoolVar(&r.help, "h", false, "Displays help") fs.StringVar(&r.logLevel, "loglevel", "info", "Sets the logging level") } // Execute implements the subcommand interface for readmetrics. func (r *ReadMetrics) Execute(ctx context.Context, f *flag.FlagSet, args ...any) subcommands.ExitStatus { _, cloudProps, exitStatus, completed := onetime.Init(ctx, onetime.InitOptions{ Name: r.Name(), Help: r.help, LogLevel: r.logLevel, LogPath: r.logPath, Fs: f, }, args...) if !completed { return exitStatus } r.cloudProps = cloudProps return r.Run(ctx, onetime.CreateRunOptions(cloudProps, false), args...) } // Run executes the command and returns the status. func (r *ReadMetrics) Run(ctx context.Context, runOpts *onetime.RunOptions, args ...any) subcommands.ExitStatus { r.oteLogger = onetime.CreateOTELogger(runOpts.DaemonMode) log.CtxLogger(ctx).Info("ReadMetrics starting") if r.projectID == "" { r.projectID = r.cloudProps.GetProjectId() log.CtxLogger(ctx).Warnf("Project ID defaulted to: %s", r.projectID) } var err error if r.queries, err = r.createQueryMap(); err != nil { log.CtxLogger(ctx).Errorw("Failed to create queries", "inputFile", r.inputFile, "err", err) return subcommands.ExitFailure } if r.bucketName != "" { connectParams := &storage.ConnectParameters{ StorageClient: s.NewClient, ServiceAccount: r.serviceAccount, BucketName: r.bucketName, UserAgentSuffix: userAgent, UserAgent: configuration.StorageAgentName(), } var ok bool if r.bucket, ok = storage.ConnectToBucket(ctx, connectParams); !ok { log.CtxLogger(ctx).Errorw("Failed to connect to bucket", "bucketName", r.bucketName) return subcommands.ExitFailure } } var opts []option.ClientOption if r.serviceAccount != "" { opts = append(opts, option.WithCredentialsFile(r.serviceAccount)) } if r.timeSeriesCreator, err = monitoring.NewMetricClient(ctx, opts...); err != nil { log.CtxLogger(ctx).Errorw("Failed to create Cloud Monitoring metric client", "error", err) r.oteLogger.LogUsageError(usagemetrics.MetricClientCreateFailure) return subcommands.ExitFailure } mqc, err := monitoring.NewQueryClient(ctx, opts...) if err != nil { log.CtxLogger(ctx).Errorw("Failed to create Cloud Monitoring query client", "error", err) r.oteLogger.LogUsageError(usagemetrics.QueryClientCreateFailure) return subcommands.ExitFailure } r.cmr = &cloudmetricreader.CloudMetricReader{ QueryClient: &cloudmetricreader.QueryClient{Client: mqc}, BackOffs: cloudmonitoring.NewDefaultBackOffIntervals(), } return r.readMetricsHandler(ctx, io.Copy) } // readMetricsHandler executes all queries, saves results to the local // filesystem, and optionally uploads results to a GCS bucket. func (r *ReadMetrics) readMetricsHandler(ctx context.Context, copier storage.IOFileCopier) subcommands.ExitStatus { r.oteLogger.LogUsageAction(usagemetrics.ReadMetricsStarted) if err := os.MkdirAll(r.outputFolder, os.ModePerm); err != nil { log.CtxLogger(ctx).Errorw("Failed to create output folder", "outputFolder", r.outputFolder, "err", err) return subcommands.ExitFailure } defer r.sendStatusToMonitoring(ctx, cloudmonitoring.NewDefaultBackOffIntervals()) for identifier, query := range r.queries { if query == "" { log.CtxLogger(ctx).Infow("Skipping empty query", "identifier", identifier) continue } data, err := r.executeQuery(ctx, identifier, query) if err != nil { log.CtxLogger(ctx).Errorw("Failed to execute query", "identifier", identifier, "query", query, "err", err) r.oteLogger.LogUsageError(usagemetrics.ReadMetricsQueryFailure) return subcommands.ExitFailure } fileName, err := r.writeResults(data, identifier) if err != nil { log.CtxLogger(ctx).Errorw("Failed to write results", "identifier", identifier, "query", query, "err", err) r.oteLogger.LogUsageError(usagemetrics.ReadMetricsWriteFileFailure) return subcommands.ExitFailure } if r.bucket != nil { if err := r.uploadFile(ctx, fileName, copier); err != nil { log.CtxLogger(ctx).Errorw("Failed to upload file", "fileName", fileName, "err", err) r.oteLogger.LogUsageError(usagemetrics.ReadMetricsBucketUploadFailure) return subcommands.ExitFailure } } } log.CtxLogger(ctx).Info("ReadMetrics finished") r.status = true r.oteLogger.LogUsageAction(usagemetrics.ReadMetricsFinished) return subcommands.ExitSuccess } // createQueryMap creates a map of identifiers to MQL queries from default // queries and an optional inputFile supplied by the user. func (r *ReadMetrics) createQueryMap() (map[string]string, error) { queries := map[string]string{ "default_hana_availability": defaultHanaAvailability, "default_hana_ha_availability": defaultHanaHAAvailability, } if r.inputFile == "" { return queries, nil } data, err := os.ReadFile(r.inputFile) if err != nil { return nil, err } if err := json.Unmarshal(data, &queries); err != nil { return nil, err } return queries, nil } // executeQuery queries Cloud Monitoring and returns the results. func (r *ReadMetrics) executeQuery(ctx context.Context, identifier, query string) ([]*mrpb.TimeSeriesData, error) { req := &mpb.QueryTimeSeriesRequest{ Name: fmt.Sprintf("projects/%s", r.projectID), Query: query, } log.CtxLogger(ctx).Infow("Executing query", "identifier", identifier, "query", query) data, err := cloudmonitoring.QueryTimeSeriesWithRetry(ctx, r.cmr.QueryClient, req, r.cmr.BackOffs) if err != nil { return nil, err } log.CtxLogger(ctx).Infow("Query succeeded", "identifier", identifier) log.CtxLogger(ctx).Debugw("Query response", "identifier", identifier, "response", data) return data, nil } // writeResults marshalls the query response data and writes it to the output. func (r *ReadMetrics) writeResults(data []*mrpb.TimeSeriesData, identifier string) (string, error) { // Use ISO 8601 standard for printing the current time. outFile := fmt.Sprintf("%s/%s_%s.json", r.outputFolder, identifier, time.Now().Format("20060102T150405Z0700")) log.Logger.Infow("Writing results", "outFile", outFile) var output []byte if len(data) == 0 { log.Logger.Warnw("No data, writing empty file", "outFile", outFile) } else { jsonData := make([]json.RawMessage, len(data)) var err error for i, d := range data { jsonData[i], err = protojson.Marshal(d) if err != nil { return "", err } } output, err = json.Marshal(jsonData) if err != nil { return "", err } } if err := os.WriteFile(outFile, output, os.ModePerm); err != nil { return "", err } log.Logger.Infow("Results written", "outFile", outFile) return outFile, nil } // uploadFile uploads the file to the GCS bucket. func (r *ReadMetrics) uploadFile(ctx context.Context, fileName string, copier storage.IOFileCopier) error { if r.bucket == nil { return fmt.Errorf("bucket is nil") } log.CtxLogger(ctx).Infow("Uploading file", "bucket", r.bucketName, "fileName", fileName) f, err := os.Open(fileName) if err != nil { return err } defer f.Close() fileInfo, err := f.Stat() if err != nil { return err } rw := storage.ReadWriter{ Reader: f, Copier: copier, BucketHandle: r.bucket, BucketName: r.bucketName, ChunkSizeMb: 100, ObjectName: r.cloudProps.GetNumericProjectId() + "/" + filepath.Base(fileName), TotalBytes: fileInfo.Size(), MaxRetries: 5, LogDelay: 30 * time.Second, VerifyUpload: false, } bytesWritten, err := rw.Upload(ctx) if err != nil { return err } log.CtxLogger(ctx).Infow("File uploaded", "bucket", r.bucketName, "fileName", fileName, "objectName", rw.ObjectName, "bytesWritten", bytesWritten, "fileSize", fileInfo.Size()) return nil } // sendStatusToMonitoring sends the status of ReadMetrics one time execution // to cloud monitoring as a GAUGE metric. func (r *ReadMetrics) sendStatusToMonitoring(ctx context.Context, bo *cloudmonitoring.BackOffIntervals) bool { if !r.sendToMonitoring { return false } log.CtxLogger(ctx).Infow("Sending ReadMetrics status to cloud monitoring", "status", r.status) ts := []*mrpb.TimeSeries{ timeseries.BuildBool(timeseries.Params{ CloudProp: protostruct.ConvertCloudPropertiesToStruct(r.cloudProps), MetricType: "workload.googleapis.com/sap/agent/" + r.Name(), Timestamp: tspb.Now(), BoolValue: r.status, }), } if _, _, err := cloudmonitoring.SendTimeSeries(ctx, ts, r.timeSeriesCreator, bo, r.projectID); err != nil { log.CtxLogger(ctx).Errorw("Error sending status metric to cloud monitoring", "error", err.Error()) return false } return true }