pkg/jobmanager/resume.go (64 lines of code) (raw):
// Copyright (c) Facebook, Inc. and its affiliates.
//
// This source code is licensed under the MIT license found in the
// LICENSE file in the root directory of this source tree.
package jobmanager
import (
"encoding/json"
"fmt"
"sort"
"github.com/facebookincubator/contest/pkg/event/frameworkevent"
"github.com/facebookincubator/contest/pkg/job"
"github.com/facebookincubator/contest/pkg/types"
"github.com/facebookincubator/contest/pkg/xcontext"
)
func (jm *JobManager) resumeJobs(ctx xcontext.Context, serverID string) error {
pausedJobs, err := jm.listMyJobs(ctx, serverID, job.JobStatePaused)
if err != nil {
return fmt.Errorf("failed to list paused jobs: %w", err)
}
ctx.Infof("Found %d paused jobs for %s/%s", len(pausedJobs), jm.config.instanceTag, serverID)
for _, jobID := range pausedJobs {
if err := jm.resumeJob(ctx, jobID); err != nil {
ctx.Errorf("failed to resume job %d: %v, failing it", jobID, err)
if err = jm.emitErrEvent(ctx, jobID, job.EventJobFailed, fmt.Errorf("failed to resume job %d: %w", jobID, err)); err != nil {
ctx.Warnf("Failed to emit event for %d: %v", jobID, err)
}
}
}
return nil
}
func (jm *JobManager) resumeJob(ctx xcontext.Context, jobID types.JobID) error {
ctx.Debugf("attempting to resume job %d", jobID)
results, err := jm.frameworkEvManager.Fetch(
ctx,
frameworkevent.QueryJobID(jobID),
frameworkevent.QueryEventName(job.EventJobPaused),
)
if err != nil {
return fmt.Errorf("failed to query resume state for job %d: %w", jobID, err)
}
if len(results) == 0 {
return fmt.Errorf("no resume state found for job %d", jobID)
}
// Sort by EmitTime in descending order.
sort.Slice(results, func(i, j int) bool { return results[i].EmitTime.After(results[j].EmitTime) })
var resumeState job.PauseEventPayload
if results[0].Payload == nil {
return fmt.Errorf("invald resume state for job %d: %+v", jobID, results[0])
}
if err := json.Unmarshal(*results[0].Payload, &resumeState); err != nil {
return fmt.Errorf("invald resume state for job %d: %w", jobID, err)
}
if resumeState.Version != job.CurrentPauseEventPayloadVersion {
return fmt.Errorf("incompatible resume state version (want %d, got %d)",
job.CurrentPauseEventPayloadVersion, resumeState.Version)
}
req, err := jm.jsm.GetJobRequest(ctx, jobID)
if err != nil {
return fmt.Errorf("failed to retrieve job descriptor for %d: %w", jobID, err)
}
j, err := NewJobFromExtendedDescriptor(ctx, jm.pluginRegistry, req.ExtendedDescriptor)
if err != nil {
return fmt.Errorf("failed to create job %d: %w", jobID, err)
}
j.ID = jobID
ctx.Debugf("running resumed job %d", j.ID)
jm.startJob(ctx, j, &resumeState)
return nil
}