func()

in pkg/jobmanager/start.go [83:135]


func (jm *JobManager) runJob(ctx xcontext.Context, j *job.Job, resumeState *job.PauseEventPayload) {
	defer func() {
		jm.jobsMu.Lock()
		delete(jm.jobs, j.ID)
		jm.jobsMu.Unlock()
	}()

	ctx = ctx.WithField("job_id", j.ID)

	if err := jm.emitEvent(ctx, j.ID, job.EventJobStarted); err != nil {
		ctx.Errorf("failed to emit event: %v", err)
		return
	}

	start := time.Now()
	resumeState, err := jm.jobRunner.Run(ctx, j, resumeState)
	duration := time.Since(start)
	ctx.Debugf("Job %d: runner finished, err %v", j.ID, err)
	switch err {
	case xcontext.ErrCanceled:
		_ = jm.emitEvent(ctx, j.ID, job.EventJobCancelled)
		return
	case xcontext.ErrPaused:
		if err := jm.emitEventPayload(ctx, j.ID, job.EventJobPaused, resumeState); err != nil {
			_ = jm.emitErrEvent(ctx, j.ID, job.EventJobPauseFailed, fmt.Errorf("Job %+v failed pausing: %v", j, err))
		} else {
			ctx.Infof("Successfully paused job %d (run %d, %d targets)", j.ID, resumeState.RunID, len(resumeState.Targets))
			ctx.Debugf("Job %d pause state: %+v", j.ID, resumeState)
		}
		return
	}
	select {
	case <-ctx.Until(xcontext.ErrPaused):
		// We were asked to pause but failed to do so.
		pauseErr := fmt.Errorf("Job %+v failed pausing: %v", j, err)
		ctx.Errorf("%v", pauseErr)
		_ = jm.emitErrEvent(ctx, j.ID, job.EventJobPauseFailed, pauseErr)
		return
	default:
	}
	ctx.Infof("Job %d finished", j.ID)
	// at this point it is safe to emit the job status event. Note: this is
	// checking `err` from the `jm.jobRunner.Run()` call above.
	if err != nil {
		_ = jm.emitErrEvent(ctx, j.ID, job.EventJobFailed, fmt.Errorf("Job %d failed after %s: %w", j.ID, duration, err))
	} else {
		ctx.Infof("Job %+v completed after %s", j, duration)
		err = jm.emitEvent(ctx, j.ID, job.EventJobCompleted)
		if err != nil {
			ctx.Warnf("event emission failed: %v", err)
		}
	}
}