in pkg/jobmanager/start.go [18:72]
func (jm *JobManager) start(ev *api.Event) *api.EventResponse {
msg := ev.Msg.(api.EventStartMsg)
var jd job.Descriptor
if err := json.Unmarshal([]byte(msg.JobDescriptor), &jd); err != nil {
return &api.EventResponse{Err: err}
}
if err := job.CheckTags(jd.Tags, false /* allowInternal */); err != nil {
return &api.EventResponse{Err: err}
}
// Add instance tag, if specified.
if jm.config.instanceTag != "" {
jd.Tags = job.AddTags(jd.Tags, jm.config.instanceTag)
}
j, err := NewJobFromDescriptor(ev.Context, jm.pluginRegistry, &jd)
if err != nil {
return &api.EventResponse{Err: err}
}
jdJSON, err := json.MarshalIndent(&jd, "", " ")
if err != nil {
return &api.EventResponse{Err: err}
}
// The job descriptor has been validated correctly, now use the JobRequestEmitter
// interface to obtain a JobRequest object with a valid id
request := job.Request{
JobName: j.Name,
JobDescriptor: string(jdJSON),
ExtendedDescriptor: j.ExtendedDescriptor,
Requestor: string(ev.Msg.Requestor()),
ServerID: ev.ServerID,
RequestTime: time.Now(),
}
jobID, err := jm.jsm.StoreJobRequest(ev.Context, &request)
if err != nil {
return &api.EventResponse{
Requestor: ev.Msg.Requestor(),
Err: fmt.Errorf("could not create job request: %v", err)}
}
j.ID = jobID
jm.startJob(ev.Context, j, nil)
return &api.EventResponse{
JobID: j.ID,
Requestor: ev.Msg.Requestor(),
Err: nil,
Status: &job.Status{
Name: j.Name,
State: string(job.EventJobStarted),
StartTime: time.Now(),
},
}
}