in pkg/jobmanager/start.go [83:135]
func (jm *JobManager) runJob(ctx xcontext.Context, j *job.Job, resumeState *job.PauseEventPayload) {
defer func() {
jm.jobsMu.Lock()
delete(jm.jobs, j.ID)
jm.jobsMu.Unlock()
}()
ctx = ctx.WithField("job_id", j.ID)
if err := jm.emitEvent(ctx, j.ID, job.EventJobStarted); err != nil {
ctx.Errorf("failed to emit event: %v", err)
return
}
start := time.Now()
resumeState, err := jm.jobRunner.Run(ctx, j, resumeState)
duration := time.Since(start)
ctx.Debugf("Job %d: runner finished, err %v", j.ID, err)
switch err {
case xcontext.ErrCanceled:
_ = jm.emitEvent(ctx, j.ID, job.EventJobCancelled)
return
case xcontext.ErrPaused:
if err := jm.emitEventPayload(ctx, j.ID, job.EventJobPaused, resumeState); err != nil {
_ = jm.emitErrEvent(ctx, j.ID, job.EventJobPauseFailed, fmt.Errorf("Job %+v failed pausing: %v", j, err))
} else {
ctx.Infof("Successfully paused job %d (run %d, %d targets)", j.ID, resumeState.RunID, len(resumeState.Targets))
ctx.Debugf("Job %d pause state: %+v", j.ID, resumeState)
}
return
}
select {
case <-ctx.Until(xcontext.ErrPaused):
// We were asked to pause but failed to do so.
pauseErr := fmt.Errorf("Job %+v failed pausing: %v", j, err)
ctx.Errorf("%v", pauseErr)
_ = jm.emitErrEvent(ctx, j.ID, job.EventJobPauseFailed, pauseErr)
return
default:
}
ctx.Infof("Job %d finished", j.ID)
// at this point it is safe to emit the job status event. Note: this is
// checking `err` from the `jm.jobRunner.Run()` call above.
if err != nil {
_ = jm.emitErrEvent(ctx, j.ID, job.EventJobFailed, fmt.Errorf("Job %d failed after %s: %w", j.ID, duration, err))
} else {
ctx.Infof("Job %+v completed after %s", j, duration)
err = jm.emitEvent(ctx, j.ID, job.EventJobCompleted)
if err != nil {
ctx.Warnf("event emission failed: %v", err)
}
}
}