helpers/runner_wrapper/wrapper.go (197 lines of code) (raw):

package runner_wrapper import ( "context" "errors" "fmt" "sync" "syscall" "time" "github.com/sirupsen/logrus" "gitlab.com/gitlab-org/gitlab-runner/helpers/runner_wrapper/api" ) const ( DefaultTerminationTimeout = 10 * time.Second ) var ( errFailedToStartProcess = fmt.Errorf("failed to start process") errFailedToTerminateProcess = fmt.Errorf("could not send SIGTERM") errProcessExitTimeout = fmt.Errorf("timed out waiting for process to exit") ) type commanderFactory func(path string, args []string) commander type Wrapper struct { log logrus.FieldLogger path string args []string errCh chan error lock sync.RWMutex process process terminationTimeout time.Duration commanderFactory commanderFactory status api.Status failureReason error shutdownCallback api.ShutdownCallback } func New(log logrus.FieldLogger, path string, args []string) *Wrapper { return &Wrapper{ log: log, path: path, args: args, errCh: make(chan error), terminationTimeout: DefaultTerminationTimeout, status: api.StatusUnknown, commanderFactory: newDefaultCommander, } } func (w *Wrapper) SetTerminationTimeout(timeout time.Duration) { w.terminationTimeout = timeout } func (w *Wrapper) Run(ctx context.Context) error { go w.start() return w.wait(ctx) } func (w *Wrapper) start() { cmd := w.commanderFactory(w.path, w.args) w.log. WithField("path", w.path). WithField("args", w.args). Debug("Starting process") err := cmd.Start() if err != nil { w.errCh <- fmt.Errorf("%w: %v", errFailedToStartProcess, err) return } w.setProcess(cmd.Process()) w.setStatus(api.StatusRunning) w.errCh <- cmd.Wait() } func (w *Wrapper) setProcess(process process) { w.lock.Lock() defer w.lock.Unlock() w.process = process } func (w *Wrapper) setStatus(status api.Status) { w.lock.Lock() defer w.lock.Unlock() w.status = status } func (w *Wrapper) wait(ctx context.Context) error { for { select { case err := <-w.errCh: w.handleWrappedProcessShutdown(ctx, err) case <-ctx.Done(): return w.terminateWrapper() } } } func (w *Wrapper) handleWrappedProcessShutdown(ctx context.Context, err error) { if err != nil { w.setFailureReason(err) } w.setProcess(nil) w.setStatus(api.StatusStopped) go w.sendShutdownCallback(ctx) } func (w *Wrapper) setFailureReason(err error) { w.lock.Lock() defer w.lock.Unlock() w.failureReason = err } func (w *Wrapper) sendShutdownCallback(ctx context.Context) { w.lock.Lock() c := w.shutdownCallback w.lock.Unlock() if c == nil { w.log.Info("No shutdown callback registered; skipping") return } c.Run(ctx) } func (w *Wrapper) terminateWrapper() error { w.log.Info("Shutting down wrapper process...") err := w.terminateWrappedProcess() if err != nil { if errors.Is(err, api.ErrProcessNotInitialized) { return nil } return err } select { case err := <-w.errCh: w.log.WithError(err).Info("Wrapped application exited") return nil case <-time.After(w.terminationTimeout): return errProcessExitTimeout } } func (w *Wrapper) terminateWrappedProcess() error { w.lock.RLock() p := w.process w.lock.RUnlock() if p == nil { w.log.Info("No process to shutdown; exiting") return api.ErrProcessNotInitialized } err := p.Signal(syscall.SIGTERM) if err != nil { return fmt.Errorf("%w: %v", errFailedToTerminateProcess, err) } return nil } func (w *Wrapper) Status() api.Status { w.lock.RLock() defer w.lock.RUnlock() w.log.WithField("status", w.status.String()).Debug("Checking process status") return w.status } func (w *Wrapper) FailureReason() string { w.lock.RLock() defer w.lock.RUnlock() w.log.WithError(w.failureReason).Debug("Checking process failure reason") if w.failureReason == nil { return "" } return w.failureReason.Error() } func (w *Wrapper) InitiateGracefulShutdown(req api.InitGracefulShutdownRequest) error { w.lock.RLock() p := w.process w.lock.RUnlock() if p == nil { return api.ErrProcessNotInitialized } w.log.Info("Initiating graceful shutdown of the process") err := p.Signal(gracefulShutdownSignal) if err != nil { return fmt.Errorf("could not send graceful shutdown signal: %w", err) } if req.ShutdownCallbackDef().URL() != "" { w.log. WithField("target", req.ShutdownCallbackDef().URL()). WithField("method", req.ShutdownCallbackDef().Method()). Debug("Registering shutdown callback") w.setShutdownCallback(api.NewShutdownCallback(w.log, req.ShutdownCallbackDef())) } w.setStatus(api.StatusInShutdown) return nil } func (w *Wrapper) InitiateForcefulShutdown() error { w.lock.RLock() p := w.process w.lock.RUnlock() if p == nil { return api.ErrProcessNotInitialized } w.log.Info("Initiating forceful shutdown of the process") err := w.forcefulShutdown(p) if err != nil { return fmt.Errorf("could not send forceful shutdown signal: %w", err) } w.setStatus(api.StatusInShutdown) return nil } func (w *Wrapper) setShutdownCallback(callback api.ShutdownCallback) { w.lock.Lock() defer w.lock.Unlock() w.shutdownCallback = callback }