in pkg/jobmanager/resume.go [36:74]
func (jm *JobManager) resumeJob(ctx xcontext.Context, jobID types.JobID) error {
ctx.Debugf("attempting to resume job %d", jobID)
results, err := jm.frameworkEvManager.Fetch(
ctx,
frameworkevent.QueryJobID(jobID),
frameworkevent.QueryEventName(job.EventJobPaused),
)
if err != nil {
return fmt.Errorf("failed to query resume state for job %d: %w", jobID, err)
}
if len(results) == 0 {
return fmt.Errorf("no resume state found for job %d", jobID)
}
// Sort by EmitTime in descending order.
sort.Slice(results, func(i, j int) bool { return results[i].EmitTime.After(results[j].EmitTime) })
var resumeState job.PauseEventPayload
if results[0].Payload == nil {
return fmt.Errorf("invald resume state for job %d: %+v", jobID, results[0])
}
if err := json.Unmarshal(*results[0].Payload, &resumeState); err != nil {
return fmt.Errorf("invald resume state for job %d: %w", jobID, err)
}
if resumeState.Version != job.CurrentPauseEventPayloadVersion {
return fmt.Errorf("incompatible resume state version (want %d, got %d)",
job.CurrentPauseEventPayloadVersion, resumeState.Version)
}
req, err := jm.jsm.GetJobRequest(ctx, jobID)
if err != nil {
return fmt.Errorf("failed to retrieve job descriptor for %d: %w", jobID, err)
}
j, err := NewJobFromExtendedDescriptor(ctx, jm.pluginRegistry, req.ExtendedDescriptor)
if err != nil {
return fmt.Errorf("failed to create job %d: %w", jobID, err)
}
j.ID = jobID
ctx.Debugf("running resumed job %d", j.ID)
jm.startJob(ctx, j, &resumeState)
return nil
}