pkg/procmanager/procmanager.go (164 lines of code) (raw):

// Copyright (c) Microsoft Corporation. All rights reserved. // Licensed under the MIT License. package procmanager import ( "bytes" "context" "errors" "io" "log" "os" "os/exec" "strings" "sync" "time" "github.com/Azure/acr-builder/pkg/util" ) // ProcManager is a wrapper for os.Process. type ProcManager struct { DryRun bool mu sync.Mutex processes map[int]*os.Process } // NewProcManager creates a new ProcManager. func NewProcManager(dryRun bool) *ProcManager { return &ProcManager{ DryRun: dryRun, processes: map[int]*os.Process{}, mu: sync.Mutex{}, } } // RunRepeatWithRetries performs a Run multiple times with retries. // If any error occurs during the repetition, all errors will be aggregated and returned. func (pm *ProcManager) RunRepeatWithRetries( ctx context.Context, args []string, stdIn io.Reader, stdOut io.Writer, stdErr io.Writer, cmdDir string, retries int, retryOnErrors []string, retryDelay int, containerName string, repeat int) error { var aggErrors util.Errors for i := 0; i <= repeat; i++ { innerErr := pm.RunWithRetries(ctx, args, stdIn, stdOut, stdErr, cmdDir, retries, retryOnErrors, retryDelay, containerName) if innerErr != nil { aggErrors = append(aggErrors, innerErr) } } if len(aggErrors) > 0 { return errors.New(aggErrors.String()) } return nil } // RunWithRetries performs Run with retries. func (pm *ProcManager) RunWithRetries( ctx context.Context, args []string, stdIn io.Reader, stdOut io.Writer, stdErr io.Writer, cmdDir string, retries int, retryOnErrors []string, retryDelay int, containerName string) error { attempt := 0 var err error for attempt <= retries { log.Printf("Launching container with name: %s\n", containerName) var stdOutBuf, stdErrBuf bytes.Buffer var stdOutWriter, stdErrWriter io.Writer needToCheckError := len(retryOnErrors) > 0 if needToCheckError { stdOutWriter = io.MultiWriter(&stdOutBuf, stdOut) stdErrWriter = io.MultiWriter(&stdErrBuf, stdErr) } else { stdOutWriter = stdOut stdErrWriter = stdErr } if err = pm.Run(ctx, args, stdIn, stdOutWriter, stdErrWriter, cmdDir); err == nil { log.Printf("Successfully executed container: %s\n", containerName) break } attempt++ if attempt <= retries { if !needToCheckError || containsAnyError(retryOnErrors, &stdOutBuf, &stdErrBuf) { log.Printf("Container failed during run: %s, waiting %d seconds before retrying...\n", containerName, retryDelay) time.Sleep(time.Duration(retryDelay) * time.Second) continue } } log.Printf("Container failed during run: %s. No retries remaining.\n", containerName) break } return err } // Run runs an exec.Command based on the specified args. // stdIn, stdOut, stdErr, and cmdDir can be attached to the created exec.Command. func (pm *ProcManager) Run( ctx context.Context, args []string, stdIn io.Reader, stdOut io.Writer, stdErr io.Writer, cmdDir string) error { if pm.DryRun { log.Printf("[DRY RUN] Args: %v\n", args) return nil } if args == nil { return nil } cmd := exec.Command(args[0], args[1:]...) //#nosec G204 if cmdDir != "" { cmd.Dir = cmdDir } cmd.Stdin = stdIn cmd.Stdout = stdOut cmd.Stderr = stdErr if err := cmd.Start(); err != nil { return err } pid := cmd.Process.Pid pm.mu.Lock() pm.processes[pid] = cmd.Process pm.mu.Unlock() defer pm.DeletePid(pid) errChan := make(chan error) go func() { errChan <- cmd.Wait() }() select { case err := <-errChan: return err case <-ctx.Done(): go func() { if err := cmd.Process.Kill(); err != nil { log.Printf("Failed to kill process. Path: %s, Args: %v, Err: %v\n", cmd.Path, cmd.Args, err) } }() return ctx.Err() } } // DeletePid deletes the specified pid from the internal map. func (pm *ProcManager) DeletePid(pid int) { pm.mu.Lock() delete(pm.processes, pid) pm.mu.Unlock() } // Stop stops the process manager and tries to kill any remaining processes // in its internal map. Any errors encountered during kill will be return as // a list of errors. func (pm *ProcManager) Stop() util.Errors { pm.mu.Lock() defer pm.mu.Unlock() var errs util.Errors for pid, process := range pm.processes { if err := process.Kill(); err != nil { errs = append(errs, err) } delete(pm.processes, pid) } return errs } func containsAnyError(errors []string, stdOutBuf, stdErrBuf *bytes.Buffer) bool { stdOut := stdOutBuf.String() stdErr := stdErrBuf.String() for _, error := range errors { if strings.LastIndex(stdOut, error) >= 0 || strings.LastIndex(stdErr, error) >= 0 { return true } } return false }