func()

in pkg/jobmanager/jobmanager.go [138:216]


func (jm *JobManager) Run(ctx xcontext.Context, resumeJobs bool) error {
	jm.jobRunner.StartLockRefresh()
	defer jm.jobRunner.StopLockRefresh()

	a, err := api.New(jm.config.apiOptions...)
	if err != nil {
		return fmt.Errorf("Cannot start API: %w", err)
	}

	// Deal with zombieed jobs (fail them).
	if err := jm.failZombieJobs(ctx, a.ServerID()); err != nil {
		ctx.Errorf("failed to fail jobs: %v", err)
	}

	// First, resume paused jobs.
	if resumeJobs {
		if err := jm.resumeJobs(ctx, a.ServerID()); err != nil {
			return fmt.Errorf("failed to resume jobs: %w", err)
		}
	}

	apiCtx, apiCancel := xcontext.WithCancel(ctx)
	jm.apiCancel = apiCancel

	errCh := make(chan error, 1)
	go func() {
		lErr := jm.apiListener.Serve(apiCtx, a)
		ctx.Infof("Listener shut down successfully.")
		errCh <- lErr
		close(errCh)
	}()

	var handlerWg sync.WaitGroup
loop:
	for {
		select {
		// handle events from the API
		case ev := <-a.Events:
			ev.Context.Debugf("Handling event %+v", ev)
			handlerWg.Add(1)
			go func() {
				defer handlerWg.Done()
				jm.handleEvent(ev)
			}()
		// check for errors or premature termination from the listener.
		case err := <-errCh:
			if err != nil {
				ctx.Infof("JobManager: API listener failed (%v)", err)
			}
			break loop
		case <-ctx.Until(xcontext.ErrPaused):
			ctx.Infof("Paused")
			jm.PauseAll(ctx)
			break loop
		case <-ctx.Done():
			break loop
		}
	}
	// Stop the API (if not already)
	jm.StopAPI()
	<-errCh
	// Wait for event handler completion
	handlerWg.Wait()
	// Wait for jobs to complete or for cancellation signal.
	doneCh := ctx.Done()
	for !jm.checkIdle(ctx) {
		select {
		case <-doneCh:
			ctx.Infof("Canceled")
			jm.CancelAll(ctx)
			// Note that we do not break out of the loop here, we expect runner to wind down and exit.
			doneCh = nil
		case <-time.After(50 * time.Millisecond):
		}
	}
	// Refresh locks one last time for jobs that were paused.
	jm.jobRunner.RefreshLocks()
	return nil
}