pkg/jobmanager/start.go (111 lines of code) (raw):
// Copyright (c) Facebook, Inc. and its affiliates.
//
// This source code is licensed under the MIT license found in the
// LICENSE file in the root directory of this source tree.
package jobmanager
import (
"encoding/json"
"fmt"
"time"
"github.com/facebookincubator/contest/pkg/api"
"github.com/facebookincubator/contest/pkg/job"
"github.com/facebookincubator/contest/pkg/xcontext"
)
func (jm *JobManager) start(ev *api.Event) *api.EventResponse {
msg := ev.Msg.(api.EventStartMsg)
var jd job.Descriptor
if err := json.Unmarshal([]byte(msg.JobDescriptor), &jd); err != nil {
return &api.EventResponse{Err: err}
}
if err := job.CheckTags(jd.Tags, false /* allowInternal */); err != nil {
return &api.EventResponse{Err: err}
}
// Add instance tag, if specified.
if jm.config.instanceTag != "" {
jd.Tags = job.AddTags(jd.Tags, jm.config.instanceTag)
}
j, err := NewJobFromDescriptor(ev.Context, jm.pluginRegistry, &jd)
if err != nil {
return &api.EventResponse{Err: err}
}
jdJSON, err := json.MarshalIndent(&jd, "", " ")
if err != nil {
return &api.EventResponse{Err: err}
}
// The job descriptor has been validated correctly, now use the JobRequestEmitter
// interface to obtain a JobRequest object with a valid id
request := job.Request{
JobName: j.Name,
JobDescriptor: string(jdJSON),
ExtendedDescriptor: j.ExtendedDescriptor,
Requestor: string(ev.Msg.Requestor()),
ServerID: ev.ServerID,
RequestTime: time.Now(),
}
jobID, err := jm.jsm.StoreJobRequest(ev.Context, &request)
if err != nil {
return &api.EventResponse{
Requestor: ev.Msg.Requestor(),
Err: fmt.Errorf("could not create job request: %v", err)}
}
j.ID = jobID
jm.startJob(ev.Context, j, nil)
return &api.EventResponse{
JobID: j.ID,
Requestor: ev.Msg.Requestor(),
Err: nil,
Status: &job.Status{
Name: j.Name,
State: string(job.EventJobStarted),
StartTime: time.Now(),
},
}
}
func (jm *JobManager) startJob(ctx xcontext.Context, j *job.Job, resumeState *job.PauseEventPayload) {
jm.jobsMu.Lock()
defer jm.jobsMu.Unlock()
jobCtx, jobCancel := xcontext.WithCancel(ctx)
jobCtx, jobPause := xcontext.WithNotify(jobCtx, xcontext.ErrPaused)
jm.jobs[j.ID] = &jobInfo{job: j, pause: jobPause, cancel: jobCancel}
go jm.runJob(jobCtx, j, resumeState)
}
func (jm *JobManager) runJob(ctx xcontext.Context, j *job.Job, resumeState *job.PauseEventPayload) {
defer func() {
jm.jobsMu.Lock()
delete(jm.jobs, j.ID)
jm.jobsMu.Unlock()
}()
ctx = ctx.WithField("job_id", j.ID)
if err := jm.emitEvent(ctx, j.ID, job.EventJobStarted); err != nil {
ctx.Errorf("failed to emit event: %v", err)
return
}
start := time.Now()
resumeState, err := jm.jobRunner.Run(ctx, j, resumeState)
duration := time.Since(start)
ctx.Debugf("Job %d: runner finished, err %v", j.ID, err)
switch err {
case xcontext.ErrCanceled:
_ = jm.emitEvent(ctx, j.ID, job.EventJobCancelled)
return
case xcontext.ErrPaused:
if err := jm.emitEventPayload(ctx, j.ID, job.EventJobPaused, resumeState); err != nil {
_ = jm.emitErrEvent(ctx, j.ID, job.EventJobPauseFailed, fmt.Errorf("Job %+v failed pausing: %v", j, err))
} else {
ctx.Infof("Successfully paused job %d (run %d, %d targets)", j.ID, resumeState.RunID, len(resumeState.Targets))
ctx.Debugf("Job %d pause state: %+v", j.ID, resumeState)
}
return
}
select {
case <-ctx.Until(xcontext.ErrPaused):
// We were asked to pause but failed to do so.
pauseErr := fmt.Errorf("Job %+v failed pausing: %v", j, err)
ctx.Errorf("%v", pauseErr)
_ = jm.emitErrEvent(ctx, j.ID, job.EventJobPauseFailed, pauseErr)
return
default:
}
ctx.Infof("Job %d finished", j.ID)
// at this point it is safe to emit the job status event. Note: this is
// checking `err` from the `jm.jobRunner.Run()` call above.
if err != nil {
_ = jm.emitErrEvent(ctx, j.ID, job.EventJobFailed, fmt.Errorf("Job %d failed after %s: %w", j.ID, duration, err))
} else {
ctx.Infof("Job %+v completed after %s", j, duration)
err = jm.emitEvent(ctx, j.ID, job.EventJobCompleted)
if err != nil {
ctx.Warnf("event emission failed: %v", err)
}
}
}