internal/workloadmanager/remote.go (338 lines of code) (raw):
/*
Copyright 2022 Google LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package workloadmanager
import (
"context"
"fmt"
"os"
"strings"
"sync"
"time"
"google.golang.org/protobuf/encoding/protojson"
"github.com/gammazero/workerpool"
"github.com/GoogleCloudPlatform/sapagent/internal/usagemetrics"
"github.com/GoogleCloudPlatform/workloadagentplatform/sharedlibraries/commandlineexecutor"
"github.com/GoogleCloudPlatform/workloadagentplatform/sharedlibraries/log"
"github.com/GoogleCloudPlatform/workloadagentplatform/sharedlibraries/recovery"
mrpb "google.golang.org/genproto/googleapis/monitoring/v3"
cpb "github.com/GoogleCloudPlatform/sapagent/protos/configuration"
ipb "github.com/GoogleCloudPlatform/sapagent/protos/instanceinfo"
wlmpb "github.com/GoogleCloudPlatform/sapagent/protos/wlmvalidation"
)
const (
agentBinary = "/usr/bin/google_cloud_sap_agent"
remoteAgentBinary = "/tmp/google_cloud_sap_agent"
remoteValidationConfig = "/tmp/workload-validation.json"
)
// CollectMetricsToJSON will collect all of the workload manager metrics and return the
// JSON representation of them, this is only called on remote instances for metric collection
// only called through the google_cloud_sap_agent binary using remote mode
func CollectMetricsToJSON(ctx context.Context, params Parameters) string {
wm := collectMetricsFromConfig(ctx, params, metricOverridePath)
var sb strings.Builder
for _, t := range wm.Metrics {
b, err := protojson.Marshal(t)
if err != nil {
return fmt.Sprintf("ERROR Could not create metrics JSON; %v", err)
}
sb.WriteString(fmt.Sprintf("%s\n", string(b)))
}
return fmt.Sprint(sb.String())
}
/*
The collectAndSendRemoteMetrics runs in the local binary and collects validation metrics
from remote hosts.
Returns the total number of metrics sent for all remotely collected hosts.
*/
func collectAndSendRemoteMetrics(ctx context.Context, params Parameters) int {
rc := params.Config.GetCollectionConfiguration().GetWorkloadValidationRemoteCollection()
// make sure collection via gcloud or ssh is defined
if rc.GetRemoteCollectionSsh() == nil && rc.GetRemoteCollectionGcloud() == nil {
log.CtxLogger(ctx).Error("One of remote_collection_gcloud or remote_collection_ssh must be defined for remote collection")
return 0
}
tempFile, err := createWorkloadValidationFile(ctx, params.WorkloadConfig)
if err != nil {
log.CtxLogger(ctx).Errorw("Could not create temporary file for workload validation", "error", err)
return 0
}
defer os.Remove(tempFile.Name())
wp := workerpool.New(int(params.Config.GetCollectionConfiguration().GetWorkloadValidationRemoteCollection().GetConcurrentCollections()))
mu := &sync.Mutex{}
metricsSent := 0
var routines []*recovery.RecoverableRoutine
for _, inst := range rc.GetRemoteCollectionInstances() {
inst := inst
ch := make(chan WorkloadMetrics)
wp.Submit(func() {
log.CtxLogger(ctx).Infow("Collecting metrics from", "instance", inst)
var r *recovery.RecoverableRoutine
if rc.GetRemoteCollectionSsh() != nil {
r = &recovery.RecoverableRoutine{
Routine: collectRemoteSSH,
RoutineArg: collectOptions{
exists: params.Exists,
execute: params.Execute,
configPath: tempFile.Name(),
rc: rc,
i: inst,
wm: ch,
},
ErrorCode: usagemetrics.RemoteCollectSSHFailure,
UsageLogger: *usagemetrics.Logger,
ExpectedMinDuration: time.Minute,
}
} else if rc.GetRemoteCollectionGcloud() != nil {
r = &recovery.RecoverableRoutine{
Routine: collectRemoteGcloud,
RoutineArg: collectOptions{
exists: params.Exists,
execute: params.Execute,
configPath: tempFile.Name(),
rc: rc,
i: inst,
wm: ch,
},
ErrorCode: usagemetrics.RemoteCollectGcloudFailure,
UsageLogger: *usagemetrics.Logger,
ExpectedMinDuration: time.Minute,
}
}
if r != nil {
routines = append(routines, r)
r.StartRoutine(ctx)
}
wm := <-ch
// lock so we can update the metricsSent
mu.Lock()
defer mu.Unlock()
remoteCp := &ipb.CloudProperties{
ProjectId: inst.GetProjectId(),
InstanceId: inst.GetInstanceId(),
Zone: inst.GetZone(),
InstanceName: inst.GetInstanceName(),
}
metricsSent += sendMetrics(ctx, sendMetricsParams{
wm: wm,
cp: remoteCp,
bareMetal: params.Config.GetBareMetal(),
sendToCloudMonitoring: params.Config.GetSupportConfiguration().GetSendWorkloadValidationMetricsToCloudMonitoring().GetValue(),
timeSeriesCreator: params.TimeSeriesCreator,
backOffIntervals: params.BackOffs,
wlmService: params.WLMService,
})
})
}
wp.StopWait()
return metricsSent
}
// createWorkloadValidationFile stores a workload validation definition in a
// temporary file. Callers should remove the file after use.
func createWorkloadValidationFile(ctx context.Context, workloadConfig *wlmpb.WorkloadValidation) (*os.File, error) {
log.CtxLogger(ctx).Info("Creating temporary file containing workload validation configuration")
tempFile, err := os.CreateTemp("", "workload-validation.*.json")
if err != nil {
return nil, err
}
defer tempFile.Close()
configJSON, err := protojson.Marshal(workloadConfig)
if err != nil {
os.Remove(tempFile.Name())
return nil, err
}
if _, err = tempFile.Write(configJSON); err != nil {
os.Remove(tempFile.Name())
return nil, err
}
return tempFile, nil
}
// parseRemoteJSON parses JSON strings from remote host collection
func parseRemoteJSON(output string, metrics *[]*mrpb.TimeSeries) error {
for _, s := range strings.Split(output, "\n") {
if len(s) == 0 {
// blank line, continue
continue
}
metric := &mrpb.TimeSeries{}
if err := protojson.Unmarshal([]byte(s), metric); err != nil {
return err
}
*metrics = append(*metrics, metric)
}
return nil
}
// appendCommonGcloudArgs appends common gcloud args to the given args slice.
func appendCommonGcloudArgs(args []string, rc *cpb.WorkloadValidationRemoteCollection, i *cpb.RemoteCollectionInstance) []string {
args = append(args, "--project", i.ProjectId, "--zone", i.Zone)
if rc.GetRemoteCollectionGcloud().GetTunnelThroughIap() {
args = append(args, "--tunnel-through-iap")
}
if rc.GetRemoteCollectionGcloud().GetUseInternalIp() {
args = append(args, "--internal-ip")
}
if rc.GetRemoteCollectionGcloud().GetGcloudArgs() != "" {
args = append(args, strings.Split(rc.GetRemoteCollectionGcloud().GetGcloudArgs(), " ")...)
}
return args
}
// gcloudInstanceName returns the instance name to use for gcloud commands.
// If the SSH username is set, it will be prepended to the instance name.
func gcloudInstanceName(rc *cpb.WorkloadValidationRemoteCollection, i *cpb.RemoteCollectionInstance) string {
if rc.GetRemoteCollectionGcloud().GetSshUsername() != "" {
return fmt.Sprintf("%s@%s", rc.GetRemoteCollectionGcloud().GetSshUsername(), i.GetInstanceName())
}
return i.GetInstanceName()
}
// collectOptions is a struct that contains the parameters needed to collect metrics from a remote host.
type collectOptions struct {
exists commandlineexecutor.Exists
execute commandlineexecutor.Execute
configPath string
rc *cpb.WorkloadValidationRemoteCollection
i *cpb.RemoteCollectionInstance
wm chan<- WorkloadMetrics
}
// The collectRemoteGcloud function will:
// - copy the workload validation configuration to the remote host
// - copy the google_cloud_sap_agent binary to the remote host
// - execute the binary to collect the metrics in JSON format in stdout
// - read the stdout and parse errors or JSON into Metrics
// - return the metrics from the host to the caller
func collectRemoteGcloud(ctx context.Context, a any) {
var opts collectOptions
var ok bool
if opts, ok = a.(collectOptions); !ok {
log.CtxLogger(ctx).Errorw("Cannot collect remote metrics using gcloud", "reason", fmt.Sprintf("args of type %T does not match collectOptions", a))
return
}
var metrics []*mrpb.TimeSeries
if !opts.exists("gcloud") {
log.CtxLogger(ctx).Error("gcloud command not found. Ensure the google cloud SDK is installed and that the gcloud command is in systemd's PATH environment variable: `systemctl show-environment`, `systemctl set-environment PATH=</path:/another/path>")
opts.wm <- WorkloadMetrics{Metrics: metrics}
return
}
log.CtxLogger(ctx).Infow("Collecting remote metrics using gcloud", "instance", opts.i)
iName := gcloudInstanceName(opts.rc, opts.i)
// remove the binary just in case it still exists on the remote
sshArgs := []string{"compute", "ssh"}
sshArgs = appendCommonGcloudArgs(sshArgs, opts.rc, opts.i)
sshArgs = append(sshArgs, iName, "--command", "sudo rm -f "+remoteAgentBinary)
result := opts.execute(ctx, commandlineexecutor.Params{
Executable: "gcloud",
Args: sshArgs,
})
if result.Error != nil {
log.CtxLogger(ctx).Errorw("Could not ssh to remote instance to remove existing tmp binary", "instance", opts.i, "error", result.Error, "stderr", result.StdErr, "stdout", result.StdOut)
}
// gcloud compute scp --project someproject --zone somezone [--tunnel-through-iap] [--internal-ip] [otherargs] filetotransfer [user@]instancename:path
scpArgs := []string{"compute", "scp"}
scpArgs = appendCommonGcloudArgs(scpArgs, opts.rc, opts.i)
scpArgs = append(scpArgs, opts.configPath, fmt.Sprintf("%s:%s", iName, remoteValidationConfig))
log.CtxLogger(ctx).Debugw("Sending workload validation config to remote host", "instance", opts.i)
result = opts.execute(ctx, commandlineexecutor.Params{
Executable: "gcloud",
Args: scpArgs,
})
if result.Error != nil {
log.CtxLogger(ctx).Errorw("Could not copy workload validation config to remote instance", "instance", opts.i, "error", result.Error, "stderr", result.StdErr, "stdout", result.StdOut)
opts.wm <- WorkloadMetrics{Metrics: metrics}
return
}
// gcloud compute scp --project someproject --zone somezone [--tunnel-through-iap] [--internal-ip] [otherargs] filetotransfer [user@]instancename:path
scpArgs = []string{"compute", "scp"}
scpArgs = appendCommonGcloudArgs(scpArgs, opts.rc, opts.i)
scpArgs = append(scpArgs, agentBinary, fmt.Sprintf("%s:%s", iName, remoteAgentBinary))
log.CtxLogger(ctx).Debugw("Sending binary to remote host", "instance", opts.i)
result = opts.execute(ctx, commandlineexecutor.Params{
Executable: "gcloud",
Args: scpArgs,
})
if result.Error != nil {
log.CtxLogger(ctx).Errorw("Could not copy binary to remote instance", "instance", opts.i, "error", result.Error, "stderr", result.StdErr, "stdout", result.StdOut)
opts.wm <- WorkloadMetrics{Metrics: metrics}
return
}
// gcloud compute ssh ---project someproject --zone somezone [--tunnel-through-iap] [--internal-ip] [otherargs] [user@]instancename --command="commandtoexec"
command := "sudo " + remoteAgentBinary + fmt.Sprintf(" remote -c=%s -p=%s -z=%s -i=%s -n=%s", remoteValidationConfig, opts.i.GetProjectId(), opts.i.GetZone(), opts.i.GetInstanceId(), opts.i.GetInstanceName()) + "; rm " + remoteAgentBinary + "; rm " + remoteValidationConfig
sshArgs = []string{"compute", "ssh"}
sshArgs = appendCommonGcloudArgs(sshArgs, opts.rc, opts.i)
sshArgs = append(sshArgs, iName, "--command", command)
result = opts.execute(ctx, commandlineexecutor.Params{
Executable: "gcloud",
Args: sshArgs,
})
if result.Error != nil {
log.CtxLogger(ctx).Errorw("Could not execute remote collection on instance", "instance", opts.i, "error", result.Error, "stderr", result.StdErr, "stdout", result.StdOut)
opts.wm <- WorkloadMetrics{Metrics: metrics}
return
}
if strings.HasPrefix(result.StdOut, "ERROR") {
log.CtxLogger(ctx).Errorw("Error encountered on remote instance", "instance", opts.i, "error", result.StdOut)
opts.wm <- WorkloadMetrics{Metrics: metrics}
return
}
err := parseRemoteJSON(result.StdOut, &metrics)
if err != nil {
log.CtxLogger(ctx).Errorw("Error parsing metrics collected from remote instance", "instance", opts.i, "error", err)
}
if len(metrics) == 0 {
log.CtxLogger(ctx).Warnw("No data collected from remote instance", "instance", opts.i)
}
opts.wm <- WorkloadMetrics{Metrics: metrics}
}
// appendSSHArgs appends SSH arguments to the given args slice.
func appendSSHArgs(args []string, rc *cpb.WorkloadValidationRemoteCollection, i *cpb.RemoteCollectionInstance, isScp bool) []string {
hostAddr := i.SshHostAddress
pkPath := rc.RemoteCollectionSsh.GetSshPrivateKeyPath()
userName := rc.RemoteCollectionSsh.GetSshUsername()
sshArg := userName + "@" + hostAddr
args = append(args, "-i", pkPath)
if isScp {
// append "-i /root/.ssh/sap-agent-key /usr/sap/google-cloud-sap-agent/google-cloud-sap-agent-remote username@10.128.0.36:remoteAgentBinary"
args = append(args, agentBinary, sshArg+":"+remoteAgentBinary)
} else {
// append "-i /root/.ssh/sap-agent-key username@10.128.0.36"
args = append(args, sshArg)
}
return args
}
// The collectRemoteSSH function will:
// - copy the workload validation configuration to the remote host
// - copy the google_cloud_sap_agent binary to the remote host,
// - execute the binary to collect the metrics in JSON format in stdout
// - read the stdout and parse errors or JSON into Metrics
// - return the metrics from the host to the caller
func collectRemoteSSH(ctx context.Context, a any) {
var opts collectOptions
var ok bool
if opts, ok = a.(collectOptions); !ok {
log.CtxLogger(ctx).Errorw("Cannot collect remote metrics using ssh", "reason", fmt.Sprintf("args of type %T does not match collectOptions", a))
return
}
projectID := opts.i.ProjectId
zone := opts.i.Zone
instanceID := opts.i.InstanceId
instanceName := opts.i.InstanceName
log.CtxLogger(ctx).Infow("Collecting remote metrics using ssh", "instance", opts.i)
rmArgs := []string{}
rmArgs = appendSSHArgs(rmArgs, opts.rc, opts.i, false)
// append "rm -f remoteAgentBinary"
rmArgs = append(rmArgs, "rm -f "+remoteAgentBinary)
result := opts.execute(ctx, commandlineexecutor.Params{
Executable: "ssh",
Args: rmArgs,
})
if result.Error != nil {
log.CtxLogger(ctx).Errorw("Could not ssh to remote instance to remove existing tmp binary", "instance", opts.i, "error", result.Error, "stderr", result.StdErr, "stdout", result.StdOut)
}
var metrics []*mrpb.TimeSeries
scpArgs := []string{"-i", opts.rc.RemoteCollectionSsh.GetSshPrivateKeyPath()}
scpArgs = append(scpArgs, opts.configPath)
scpArgs = append(scpArgs, fmt.Sprintf("%s@%s:%s", opts.rc.RemoteCollectionSsh.GetSshUsername(), opts.i.SshHostAddress, remoteValidationConfig))
result = opts.execute(ctx, commandlineexecutor.Params{
Executable: "scp",
Args: scpArgs,
})
if result.Error != nil {
log.CtxLogger(ctx).Errorw("Could not copy workload validation config to remote instance", "instance", opts.i, "error", result.Error, "stderr", result.StdErr, "stdout", result.StdOut)
opts.wm <- WorkloadMetrics{Metrics: metrics}
return
}
scpArgs = []string{}
scpArgs = appendSSHArgs(scpArgs, opts.rc, opts.i, true)
result = opts.execute(ctx, commandlineexecutor.Params{
Executable: "scp",
Args: scpArgs,
})
if result.Error != nil {
log.CtxLogger(ctx).Errorw("Could not copy binary to remote instance", "instance", opts.i, "error", result.Error, "stderr", result.StdErr, "stdout", result.StdOut)
opts.wm <- WorkloadMetrics{Metrics: metrics}
return
}
sshArgs := []string{}
sshArgs = appendSSHArgs(sshArgs, opts.rc, opts.i, false)
// append "remoteAgentBinary remote -h=false -p=projectID -i=instanceID -n=instanceName -z=zone"
sshArgs = append(sshArgs, remoteAgentBinary, "remote", fmt.Sprintf("-c=%s -p=%s -i=%s -n=%s -z=%s", remoteValidationConfig, projectID, instanceID, instanceName, zone))
sshArgs = append(sshArgs, "; rm "+remoteAgentBinary, "; rm "+remoteValidationConfig)
result = opts.execute(ctx, commandlineexecutor.Params{
Executable: "ssh",
Args: sshArgs,
})
if result.Error != nil {
log.CtxLogger(ctx).Errorw("Could not execute remote collection on instance", "instance", opts.i, "error", result.Error, "stderr", result.StdErr, "stdout", result.StdOut)
opts.wm <- WorkloadMetrics{Metrics: metrics}
return
}
if strings.HasPrefix(result.StdOut, "ERROR") {
log.CtxLogger(ctx).Errorw("Error encountered on remote instance", "instance", opts.i, "error", result.StdOut)
opts.wm <- WorkloadMetrics{Metrics: metrics}
return
}
err := parseRemoteJSON(result.StdOut, &metrics)
if err != nil {
log.CtxLogger(ctx).Errorw("Error parsing metrics collected from remote instance", "instance", opts.i, "error", err)
}
if len(metrics) == 0 {
log.CtxLogger(ctx).Warnw("No data collected from remote instance", "instance", opts.i)
}
opts.wm <- WorkloadMetrics{Metrics: metrics}
}