in pkg/jobmanager/jobmanager.go [138:216]
func (jm *JobManager) Run(ctx xcontext.Context, resumeJobs bool) error {
jm.jobRunner.StartLockRefresh()
defer jm.jobRunner.StopLockRefresh()
a, err := api.New(jm.config.apiOptions...)
if err != nil {
return fmt.Errorf("Cannot start API: %w", err)
}
// Deal with zombieed jobs (fail them).
if err := jm.failZombieJobs(ctx, a.ServerID()); err != nil {
ctx.Errorf("failed to fail jobs: %v", err)
}
// First, resume paused jobs.
if resumeJobs {
if err := jm.resumeJobs(ctx, a.ServerID()); err != nil {
return fmt.Errorf("failed to resume jobs: %w", err)
}
}
apiCtx, apiCancel := xcontext.WithCancel(ctx)
jm.apiCancel = apiCancel
errCh := make(chan error, 1)
go func() {
lErr := jm.apiListener.Serve(apiCtx, a)
ctx.Infof("Listener shut down successfully.")
errCh <- lErr
close(errCh)
}()
var handlerWg sync.WaitGroup
loop:
for {
select {
// handle events from the API
case ev := <-a.Events:
ev.Context.Debugf("Handling event %+v", ev)
handlerWg.Add(1)
go func() {
defer handlerWg.Done()
jm.handleEvent(ev)
}()
// check for errors or premature termination from the listener.
case err := <-errCh:
if err != nil {
ctx.Infof("JobManager: API listener failed (%v)", err)
}
break loop
case <-ctx.Until(xcontext.ErrPaused):
ctx.Infof("Paused")
jm.PauseAll(ctx)
break loop
case <-ctx.Done():
break loop
}
}
// Stop the API (if not already)
jm.StopAPI()
<-errCh
// Wait for event handler completion
handlerWg.Wait()
// Wait for jobs to complete or for cancellation signal.
doneCh := ctx.Done()
for !jm.checkIdle(ctx) {
select {
case <-doneCh:
ctx.Infof("Canceled")
jm.CancelAll(ctx)
// Note that we do not break out of the loop here, we expect runner to wind down and exit.
doneCh = nil
case <-time.After(50 * time.Millisecond):
}
}
// Refresh locks one last time for jobs that were paused.
jm.jobRunner.RefreshLocks()
return nil
}