func()

in pkg/jobmanager/resume.go [36:74]


func (jm *JobManager) resumeJob(ctx xcontext.Context, jobID types.JobID) error {
	ctx.Debugf("attempting to resume job %d", jobID)
	results, err := jm.frameworkEvManager.Fetch(
		ctx,
		frameworkevent.QueryJobID(jobID),
		frameworkevent.QueryEventName(job.EventJobPaused),
	)
	if err != nil {
		return fmt.Errorf("failed to query resume state for job %d: %w", jobID, err)
	}
	if len(results) == 0 {
		return fmt.Errorf("no resume state found for job %d", jobID)
	}
	// Sort by EmitTime in descending order.
	sort.Slice(results, func(i, j int) bool { return results[i].EmitTime.After(results[j].EmitTime) })
	var resumeState job.PauseEventPayload
	if results[0].Payload == nil {
		return fmt.Errorf("invald resume state for job %d: %+v", jobID, results[0])
	}
	if err := json.Unmarshal(*results[0].Payload, &resumeState); err != nil {
		return fmt.Errorf("invald resume state for job %d: %w", jobID, err)
	}
	if resumeState.Version != job.CurrentPauseEventPayloadVersion {
		return fmt.Errorf("incompatible resume state version (want %d, got %d)",
			job.CurrentPauseEventPayloadVersion, resumeState.Version)
	}
	req, err := jm.jsm.GetJobRequest(ctx, jobID)
	if err != nil {
		return fmt.Errorf("failed to retrieve job descriptor for %d: %w", jobID, err)
	}
	j, err := NewJobFromExtendedDescriptor(ctx, jm.pluginRegistry, req.ExtendedDescriptor)
	if err != nil {
		return fmt.Errorf("failed to create job %d: %w", jobID, err)
	}
	j.ID = jobID
	ctx.Debugf("running resumed job %d", j.ID)
	jm.startJob(ctx, j, &resumeState)
	return nil
}