plugins/teststeps/exec/transport/ssh_process.go (95 lines of code) (raw):
// Copyright (c) Facebook, Inc. and its affiliates.
//
// This source code is licensed under the MIT license found in the
// LICENSE file in the root directory of this source tree.
package transport
import (
"bytes"
"errors"
"fmt"
"io"
"time"
"github.com/facebookincubator/contest/pkg/xcontext"
"github.com/kballard/go-shellquote"
"golang.org/x/crypto/ssh"
)
type sshProcess struct {
session *ssh.Session
cmd string
keepAliveDone chan struct{}
stack *deferedStack
}
func newSSHProcess(ctx xcontext.Context, client *ssh.Client, bin string, args []string, stack *deferedStack) (Process, error) {
var stdin bytes.Buffer
return newSSHProcessWithStdin(ctx, client, bin, args, &stdin, stack)
}
func newSSHProcessWithStdin(
ctx xcontext.Context, client *ssh.Client,
bin string, args []string,
stdin io.Reader,
stack *deferedStack,
) (Process, error) {
session, err := client.NewSession()
if err != nil {
return nil, fmt.Errorf("cannot create SSH session to server: %v", err)
}
// set fds for the remote process
session.Stdin = stdin
cmd := shellquote.Join(append([]string{bin}, args...)...)
keepAliveDone := make(chan struct{})
return &sshProcess{session, cmd, keepAliveDone, stack}, nil
}
func (sp *sshProcess) Start(ctx xcontext.Context) error {
// important note: turns out that with some sshd configs/implementations
// sending signals thru the ssh channel doesn't work (either not implemented or
// refused due to privilege separation).
// So to work around that, allocate a pty for this session and rely on SIGHUP
// to kill the process remotely if the ctx gets cancelled.
// This obviously has the limitation that the spawned process can just ignore
// SIGHUP and control its own lifetime, but there's no other way to have this.
if err := sp.session.RequestPty("xterm", 80, 120, ssh.TerminalModes{}); err != nil {
return err
}
ctx.Debugf("starting remote binary: %s", sp.cmd)
if err := sp.session.Start(sp.cmd); err != nil {
return fmt.Errorf("failed to start process: %w", err)
}
go func() {
for {
select {
case <-sp.keepAliveDone:
return
case <-time.After(5 * time.Second):
ctx.Debugf("sending sigcont to ssh server...")
if err := sp.session.Signal(ssh.Signal("CONT")); err != nil {
ctx.Warnf("failed to send CONT to ssh server: %w", err)
}
case <-ctx.Done():
ctx.Debugf("killing ssh session because of cancellation...")
// Part 2 of the cancellation. Normally this would send a SIGKILL here
// but this is sometimes ignored by sshd based on config/impl, so instead
// rely on SIGHUP to kill the process by just closing the session.
// if err := sp.session.Signal(ssh.SIGINT); err != nil {
// ctx.Warnf("failed to send KILL on context cancel: %w", err)
// }
sp.session.Close()
return
}
}
}()
return nil
}
func (sp *sshProcess) Wait(c xcontext.Context) error {
// close these no matter what error we get from the wait
defer func() {
sp.stack.Done()
close(sp.keepAliveDone)
}()
defer sp.session.Close()
if err := sp.session.Wait(); err != nil {
var e *ssh.ExitError
if errors.As(err, &e) {
return fmt.Errorf("process exited with error: %w", e)
}
return fmt.Errorf("failed to wait on process: %w", err)
}
return nil
}
func (sp *sshProcess) StdoutPipe() (io.Reader, error) {
stdout, err := sp.session.StdoutPipe()
if err != nil {
return nil, fmt.Errorf("failed to get stdout pipe")
}
return stdout, nil
}
func (sp *sshProcess) StderrPipe() (io.Reader, error) {
stderr, err := sp.session.StderrPipe()
if err != nil {
return nil, fmt.Errorf("failed to get stderr pipe")
}
return stderr, nil
}
func (sp *sshProcess) String() string {
return sp.cmd
}