internal/run/run.go (202 lines of code) (raw):
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package run is a package with utilities for running command and handling
// results.
package run
import (
"bufio"
"bytes"
"context"
"errors"
"fmt"
"io"
"os"
"os/exec"
"time"
"github.com/GoogleCloudPlatform/galog"
)
var (
// Client is the Runner running commands.
Client RunnerInterface
)
// RunnerInterface defines the runner running commands.
type RunnerInterface interface {
WithContext(ctx context.Context, opts Options) (*Result, error)
}
// StreamOutput represents the output channels streaming command result.
// Executor takes care of closing all channels after all writes are completed.
// Caller must not try to close channel and should only read from these receive
// only channels.
type StreamOutput struct {
// StdOut is the channel for stdout of a command.
StdOut <-chan string
// StdErr is the channel for stderr of a command.
StdErr <-chan string
// Result is the final output of a command. It is same as what cmd.Wait()
// finally returns.
Result <-chan error
}
// Result represents the result of running commands.
type Result struct {
// OutputType is the output type requested/configured with [Options].
OutputType OutputType
// Output is the output of the command, depending on the OutputType it could
// be either, stdout, stderr, combined or none.
Output string
// OutputScanners is the scanner for the output of the command. This is set
// only if the [Options] OutputType is OutputStream.
OutputScanners *StreamOutput
// Pid is the PID of the process that started the command. It's only set if
// the [Options]` ExecMode is either ExecModeAsync or ExecModeDetack.
Pid int
}
// Options represents the command options.
type Options struct {
// OutputType is the output type requested/configured, it could be either:
// stdout, stderr, combined or none.
OutputType OutputType
// Name is the command name.
Name string
// Args is the command arguments.
Args []string
// Input is written to the process stdin.
Input string
// Timeout is the timeout of the command. If it's not set (or set to 0) no
// timeout will be set/assumed.
Timeout time.Duration
// ExecMode defines the process/command execution mode, i.e. blocking,
// non-blocking, detaching etc.
ExecMode ExecMode
// Dir specifies the working directory of the command/process. If not
// specified the exec.Command's Dir behavior is honored.
Dir string
}
// ExecMode represents the command execution mode: i.e. blocking, non-blocking,
// detaching etc.
type ExecMode int
// OutputType represents the output type of the command.
type OutputType int
// Runner implements the RunnerInterface and represents the runner running
// commands.
type Runner struct{}
const (
// OutputStdout is the output enum for stdout output. The process' stderr is
// still piped and buffered and is used in case of error (reported in the
// returned error).
OutputStdout OutputType = iota
// OutputStderr is the output enum for stderr output. The process' stdout is
// never piped and buffered.
OutputStderr
// OutputCombined is the output enum for stdout+stderr combined output.
OutputCombined
// OutputNone is the output enum for no output/quiet. The process' stderr is
// still piped and buffered and is used in case of error (reported in the
// returned error).
OutputNone
// OutputStream is the output enum for streaming output.
OutputStream
// ExecModeSync is the execution mode enum for sync processes(blocking).
ExecModeSync ExecMode = iota
// ExecModeAsync is the execution mode enum for async processes(non blocking).
ExecModeAsync
// ExecModeDetach is the execution mode enum for detached processes. The
// operation is async as [ExecModeAsync] but it has the process group re-set
// - the process will survive the callers process exit.
ExecModeDetach
)
// init initializes the RunClient.
func init() {
Client = Runner{}
}
// WithContext runs the command with the given [Options].
func WithContext(ctx context.Context, opts Options) (*Result, error) {
return Client.WithContext(ctx, opts)
}
// WithContext runs the command with the given [Options].
func (rr Runner) WithContext(ctx context.Context, opts Options) (*Result, error) {
var cancel context.CancelFunc
mainContext := ctx
if opts.Timeout != 0 {
ctx, cancel = context.WithTimeout(ctx, opts.Timeout)
defer cancel()
}
timeoutResult := func(res *Result, err error) (*Result, error) {
if err != nil && mainContext.Err() == nil && ctx.Err() != nil {
return res, &TimeoutError{err: err}
}
return res, err
}
if opts.OutputType == OutputStream {
return timeoutResult(streamOutput(ctx, opts))
}
if opts.ExecMode == ExecModeAsync || opts.ExecMode == ExecModeDetach {
return timeoutResult(start(ctx, opts))
}
if opts.OutputType == OutputCombined {
return timeoutResult(combinedOutput(ctx, opts))
}
return timeoutResult(splitOutput(ctx, opts))
}
// streamOutput starts the command and streams the output and any errors on the
// channel.
func streamOutput(ctx context.Context, opts Options) (*Result, error) {
galog.Debugf("Running command: %+v", opts)
cmd := exec.CommandContext(ctx, opts.Name, opts.Args...)
stdout, err := cmd.StdoutPipe()
if err != nil {
return nil, fmt.Errorf("unable to obtain pipe to stdout: %w", err)
}
stderr, err := cmd.StderrPipe()
if err != nil {
return nil, fmt.Errorf("unable to obtain pipe to stderr: %w", err)
}
outChan := make(chan string)
errChan := make(chan string)
doneChan := make(chan error, 1)
go scanPipe(stdout, outChan)
go scanPipe(stderr, errChan)
if err := cmd.Start(); err != nil {
return nil, fmt.Errorf("unable to start command: %w", err)
}
go func() {
defer close(doneChan)
doneChan <- cmd.Wait()
}()
output := &StreamOutput{StdOut: outChan, StdErr: errChan, Result: doneChan}
res := &Result{OutputType: OutputStream, OutputScanners: output, Pid: cmd.Process.Pid}
return res, nil
}
// scanPipe scans the pipe and streams the output on the channel.
func scanPipe(pipe io.ReadCloser, streamOut chan string) {
defer func() {
// Error is not really actionable, just log it.
if err := pipe.Close(); err != nil && !errors.Is(err, os.ErrClosed) && !errors.Is(errors.Unwrap(err), os.ErrClosed) {
galog.Errorf("Failed to close pipe: %v", err)
}
close(streamOut)
}()
scanner := bufio.NewScanner(pipe)
for scanner.Scan() {
streamOut <- scanner.Text()
}
}
// splitOutput runs the requested command but only reads either stdout or stderr
// output. In case of error the output is merged with the error, in case of
// success the output is set to [Result]'s Output field. The requested
// OutputType is set to [Result]'s OutputType field.
func splitOutput(ctx context.Context, opts Options) (*Result, error) {
galog.Debugf("Running command: %+v", opts)
cmd := exec.CommandContext(ctx, opts.Name, opts.Args...)
var stdout, stderr bytes.Buffer
var resOutput *bytes.Buffer
cmd.Stderr = &stderr
cmd.Dir = opts.Dir
if err := writeToStdin(cmd, opts.Input); err != nil {
return nil, fmt.Errorf("failed to write input in splitOutput: %v", err)
}
// stderr is always piped and buffered as we always want to report it in case
// of error - it's added to the returned error.
if opts.OutputType == OutputStderr {
resOutput = &stderr
} else if opts.OutputType == OutputStdout {
cmd.Stdout = &stdout
resOutput = &stdout
}
if err := cmd.Run(); err != nil {
return nil, errorWithOutput(err, stderr.String())
}
return &Result{OutputType: opts.OutputType, Output: resOutput.String()}, nil
}
// combinedOutput runs the requested command and reads the combined output (both
// stdout and stderr). In case of error the combined output is merged with the
// error, in case of success the output is set to [Result]'s Output field. The
// requested OutputType is set to [Result]'s OutputType field.
func combinedOutput(ctx context.Context, opts Options) (*Result, error) {
cmd := exec.CommandContext(ctx, opts.Name, opts.Args...)
cmd.Dir = opts.Dir
if err := writeToStdin(cmd, opts.Input); err != nil {
return nil, fmt.Errorf("failed to write input in combinedOutput: %v", err)
}
output, err := cmd.CombinedOutput()
if err != nil {
return nil, errorWithOutput(err, string(output))
}
return &Result{OutputType: opts.OutputType, Output: string(output)}, nil
}
func writeToStdin(cmd *exec.Cmd, input string) error {
if input == "" { // NOMUTANTS -- Writing "" is a noop, checking for it saves us a syscall in some cases.
return nil
}
stdinpipe, err := cmd.StdinPipe()
if err != nil {
return fmt.Errorf("failed to obtain pipe to stdin: %v", err)
}
defer stdinpipe.Close()
if b, err := fmt.Fprint(stdinpipe, input); err != nil {
return fmt.Errorf("failed to write to stdin pipe: %v", err)
} else if b != len(input) {
return fmt.Errorf("attempted to write %d bytes but wrote %d", len(input), b)
}
return nil
}
// TimeoutError is the error type returned when a command execution times out.
type TimeoutError struct {
err error
}
// Error returns the error message.
func (e *TimeoutError) Error() string {
return e.err.Error()
}
// AsTimeoutError returns a TimeoutError if the error is a TimeoutError.
func AsTimeoutError(err error) (*TimeoutError, bool) {
var ee *TimeoutError
if err == nil {
return nil, false
}
if errors.As(err, &ee) {
return ee, true
}
return nil, false
}
// errorWithOutput merges an error with a command's output.
func errorWithOutput(err error, output string) error {
if output == "" {
return err
}
return fmt.Errorf("%w; %s", err, output)
}
// AsExitError returns an ExitError if the error is an ExitError.
func AsExitError(err error) (*exec.ExitError, bool) {
var ee *exec.ExitError
if err == nil {
return nil, false
}
if errors.As(err, &ee) {
return ee, true
}
return nil, false
}