in internal/workloadmanager/remote.go [346:432]
func collectRemoteSSH(ctx context.Context, a any) {
var opts collectOptions
var ok bool
if opts, ok = a.(collectOptions); !ok {
log.CtxLogger(ctx).Errorw("Cannot collect remote metrics using ssh", "reason", fmt.Sprintf("args of type %T does not match collectOptions", a))
return
}
projectID := opts.i.ProjectId
zone := opts.i.Zone
instanceID := opts.i.InstanceId
instanceName := opts.i.InstanceName
log.CtxLogger(ctx).Infow("Collecting remote metrics using ssh", "instance", opts.i)
rmArgs := []string{}
rmArgs = appendSSHArgs(rmArgs, opts.rc, opts.i, false)
// append "rm -f remoteAgentBinary"
rmArgs = append(rmArgs, "rm -f "+remoteAgentBinary)
result := opts.execute(ctx, commandlineexecutor.Params{
Executable: "ssh",
Args: rmArgs,
})
if result.Error != nil {
log.CtxLogger(ctx).Errorw("Could not ssh to remote instance to remove existing tmp binary", "instance", opts.i, "error", result.Error, "stderr", result.StdErr, "stdout", result.StdOut)
}
var metrics []*mrpb.TimeSeries
scpArgs := []string{"-i", opts.rc.RemoteCollectionSsh.GetSshPrivateKeyPath()}
scpArgs = append(scpArgs, opts.configPath)
scpArgs = append(scpArgs, fmt.Sprintf("%s@%s:%s", opts.rc.RemoteCollectionSsh.GetSshUsername(), opts.i.SshHostAddress, remoteValidationConfig))
result = opts.execute(ctx, commandlineexecutor.Params{
Executable: "scp",
Args: scpArgs,
})
if result.Error != nil {
log.CtxLogger(ctx).Errorw("Could not copy workload validation config to remote instance", "instance", opts.i, "error", result.Error, "stderr", result.StdErr, "stdout", result.StdOut)
opts.wm <- WorkloadMetrics{Metrics: metrics}
return
}
scpArgs = []string{}
scpArgs = appendSSHArgs(scpArgs, opts.rc, opts.i, true)
result = opts.execute(ctx, commandlineexecutor.Params{
Executable: "scp",
Args: scpArgs,
})
if result.Error != nil {
log.CtxLogger(ctx).Errorw("Could not copy binary to remote instance", "instance", opts.i, "error", result.Error, "stderr", result.StdErr, "stdout", result.StdOut)
opts.wm <- WorkloadMetrics{Metrics: metrics}
return
}
sshArgs := []string{}
sshArgs = appendSSHArgs(sshArgs, opts.rc, opts.i, false)
// append "remoteAgentBinary remote -h=false -p=projectID -i=instanceID -n=instanceName -z=zone"
sshArgs = append(sshArgs, remoteAgentBinary, "remote", fmt.Sprintf("-c=%s -p=%s -i=%s -n=%s -z=%s", remoteValidationConfig, projectID, instanceID, instanceName, zone))
sshArgs = append(sshArgs, "; rm "+remoteAgentBinary, "; rm "+remoteValidationConfig)
result = opts.execute(ctx, commandlineexecutor.Params{
Executable: "ssh",
Args: sshArgs,
})
if result.Error != nil {
log.CtxLogger(ctx).Errorw("Could not execute remote collection on instance", "instance", opts.i, "error", result.Error, "stderr", result.StdErr, "stdout", result.StdOut)
opts.wm <- WorkloadMetrics{Metrics: metrics}
return
}
if strings.HasPrefix(result.StdOut, "ERROR") {
log.CtxLogger(ctx).Errorw("Error encountered on remote instance", "instance", opts.i, "error", result.StdOut)
opts.wm <- WorkloadMetrics{Metrics: metrics}
return
}
err := parseRemoteJSON(result.StdOut, &metrics)
if err != nil {
log.CtxLogger(ctx).Errorw("Error parsing metrics collected from remote instance", "instance", opts.i, "error", err)
}
if len(metrics) == 0 {
log.CtxLogger(ctx).Warnw("No data collected from remote instance", "instance", opts.i)
}
opts.wm <- WorkloadMetrics{Metrics: metrics}
}