in plugins/teststeps/sshcmd/sshcmd.go [72:275]
func (ts *SSHCmd) Run(ctx xcontext.Context, ch test.TestStepChannels, params test.TestStepParameters, ev testevent.Emitter, resumeState json.RawMessage) (json.RawMessage, error) {
log := ctx.Logger()
// XXX: Dragons ahead! The target (%t) substitution, and function
// expression evaluations are done at run-time, so they may still fail
// despite passing at early validation time.
// If the function evaluations called in validateAndPopulate are not idempotent,
// the output of the function expressions may be different (e.g. with a call to a
// backend or a random pool of results)
// Function evaluation could be done at validation time, but target
// substitution cannot, because the targets are not known at that time.
if err := ts.validateAndPopulate(params); err != nil {
return nil, err
}
f := func(ctx xcontext.Context, target *target.Target) error {
// apply filters and substitutions to user, host, private key, and command args
user, err := ts.User.Expand(target)
if err != nil {
return fmt.Errorf("cannot expand user parameter: %v", err)
}
host, err := ts.Host.Expand(target)
if err != nil {
return fmt.Errorf("cannot expand host parameter: %v", err)
}
if len(host) == 0 {
shouldSkip := false
if !ts.SkipIfEmptyHost.IsEmpty() {
var err error
shouldSkip, err = strconv.ParseBool(ts.SkipIfEmptyHost.String())
if err != nil {
return fmt.Errorf("cannot expand 'skip_if_empty_host' parameter value '%s': %w", ts.SkipIfEmptyHost, err)
}
}
if shouldSkip {
return nil
} else {
return fmt.Errorf("host value is empty")
}
}
portStr, err := ts.Port.Expand(target)
if err != nil {
return fmt.Errorf("cannot expand port parameter: %v", err)
}
port, err := strconv.Atoi(portStr)
if err != nil {
return fmt.Errorf("failed to convert port parameter to integer: %v", err)
}
timeoutStr, err := ts.Timeout.Expand(target)
if err != nil {
return fmt.Errorf("cannot expand timeout parameter %s: %v", timeoutStr, err)
}
timeout, err := time.ParseDuration(timeoutStr)
if err != nil {
return fmt.Errorf("cannot parse timeout paramter: %v", err)
}
timeTimeout := time.Now().Add(timeout)
// apply functions to the private key, if any
var signer ssh.Signer
privKeyFile, err := ts.PrivateKeyFile.Expand(target)
if err != nil {
return fmt.Errorf("cannot expand private key file parameter: %v", err)
}
if privKeyFile != "" {
key, err := ioutil.ReadFile(privKeyFile)
if err != nil {
return fmt.Errorf("cannot read private key at %s: %v", ts.PrivateKeyFile, err)
}
signer, err = ssh.ParsePrivateKey(key)
if err != nil {
return fmt.Errorf("cannot parse private key: %v", err)
}
}
password, err := ts.Password.Expand(target)
if err != nil {
return fmt.Errorf("cannot expand password parameter: %v", err)
}
auth := []ssh.AuthMethod{}
if signer != nil {
auth = append(auth, ssh.PublicKeys(signer))
}
if password != "" {
auth = append(auth, ssh.Password(password))
}
config := ssh.ClientConfig{
User: user,
Auth: auth,
// TODO expose this in the plugin arguments
//HostKeyCallback: ssh.FixedHostKey(hostKey),
HostKeyCallback: ssh.InsecureIgnoreHostKey(),
}
executable, err := ts.Executable.Expand(target)
if err != nil {
return fmt.Errorf("cannot expand executable parameter: %v", err)
}
// apply functions to the command args, if any
var args []string
for _, arg := range ts.Args {
earg, err := arg.Expand(target)
if err != nil {
return fmt.Errorf("cannot expand command argument '%s': %v", arg, err)
}
args = append(args, earg)
}
// connect to the host
addr := net.JoinHostPort(host, strconv.Itoa(port))
client, err := ssh.Dial("tcp", addr, &config)
if err != nil {
return fmt.Errorf("cannot connect to SSH server %s: %v", addr, err)
}
defer func() {
if err := client.Close(); err != nil {
ctx.Warnf("Failed to close SSH connection to %s: %v", addr, err)
}
}()
session, err := client.NewSession()
if err != nil {
return fmt.Errorf("cannot create SSH session to server %s: %v", addr, err)
}
defer func() {
if err := session.Close(); err != nil && err != io.EOF {
ctx.Warnf("Failed to close SSH session to %s: %v", addr, err)
}
}()
// run the remote command and catch stdout/stderr
var stdout, stderr bytes.Buffer
session.Stdout, session.Stderr = &stdout, &stderr
cmd := shellquote.Join(append([]string{executable}, args...)...)
log.Debugf("Running remote SSH command on %s: '%v'", addr, cmd)
errCh := make(chan error, 1)
go func() {
innerErr := session.Run(cmd)
errCh <- innerErr
}()
expect := ts.Expect.String()
re, err := regexp.Compile(expect)
keepAliveCnt := 0
if err != nil {
return fmt.Errorf("malformed expect parameter: Can not compile %s with %v", expect, err)
}
for {
select {
case err := <-errCh:
log.Infof("Stdout of command '%s' is '%s'", cmd, stdout.Bytes())
if err == nil {
// Execute expectations
if expect == "" {
ctx.Warnf("no expectations specified")
} else {
matches := re.FindAll(stdout.Bytes(), -1)
if len(matches) > 0 {
log.Infof("match for regex '%s' found", expect)
} else {
return fmt.Errorf("match for %s not found for target %v", expect, target)
}
}
} else {
ctx.Warnf("Stderr of command '%s' is '%s'", cmd, stderr.Bytes())
}
return err
case <-ctx.Done():
return session.Signal(ssh.SIGKILL)
case <-time.After(250 * time.Millisecond):
keepAliveCnt++
if expect != "" {
matches := re.FindAll(stdout.Bytes(), -1)
if len(matches) > 0 {
log.Infof("match for regex '%s' found", expect)
return nil
}
}
if time.Now().After(timeTimeout) {
return fmt.Errorf("timed out after %s", timeout)
}
// This is needed to keep the connection to the server alive
if keepAliveCnt%20 == 0 {
err = session.Signal(ssh.Signal("CONT"))
if err != nil {
log.Warnf("Unable to send CONT to ssh server: %v", err)
}
}
}
}
}
return teststeps.ForEachTarget(Name, ctx, ch, f)
}