func()

in plugins/teststeps/exec/transport/ssh_process_async.go [197:276]


func (m *asyncMonitor) Start(
	ctx xcontext.Context,
	outWriter io.WriteCloser, errWriter io.WriteCloser,
	exitChan chan<- error,
) {
	defer outWriter.Close()
	defer errWriter.Close()

	for {
		select {
		case <-time.After(time.Second):
			ctx.Debugf("polling remote process: %s", m.sid)

			stdout, stderr, err, runerr := m.runAgent(ctx, "poll")
			if err != nil {
				ctx.Warnf("failed to run agent: %w", err)
				continue
			}

			// append stdout, stderr; blocking until read
			if _, err := outWriter.Write(stdout); err != nil {
				ctx.Warnf("failed to write to stdout pipe: %w", err)
				continue
			}

			if _, err := errWriter.Write(stderr); err != nil {
				ctx.Warnf("failed to write to stderr pipe: %w", err)
				continue
			}

			if runerr != nil {
				var em *ssh.ExitMissingError
				if errors.As(runerr, &em) {
					if err := m.reap(ctx); err != nil {
						ctx.Warnf("monitor error: %w", err)
					}

					// process exited without an error or signal; this is a ssh server error
					exitChan <- fmt.Errorf("internal ssh server error: %w", em)
					return
				}

				var ee *ssh.ExitError
				if errors.As(runerr, &ee) {
					if err := m.reap(ctx); err != nil {
						ctx.Warnf("monitor error: %w", err)
					}

					switch ee.ExitStatus() {
					case ProcessFinishedExitCode:
						// agent controlled process exited by itself
						exitChan <- nil

					case DeadAgentExitCode:
						// agent killed itself due to time quota or other error
						exitChan <- fmt.Errorf("agent exceeded time quota or just crashed")

					default:
						exitChan <- ee
					}
					return
				}

				// process is done, but there's some other internal error
				exitChan <- runerr
			}

		case <-ctx.Done():
			ctx.Debugf("killing remote process, reason: cancellation")

			err := m.kill(ctx)
			if err := m.reap(ctx); err != nil {
				ctx.Warnf("monitor error: %w", err)
			}

			exitChan <- err
			return
		}
	}
}