func()

in pkg/jobmanager/start.go [18:72]


func (jm *JobManager) start(ev *api.Event) *api.EventResponse {
	msg := ev.Msg.(api.EventStartMsg)

	var jd job.Descriptor
	if err := json.Unmarshal([]byte(msg.JobDescriptor), &jd); err != nil {
		return &api.EventResponse{Err: err}
	}
	if err := job.CheckTags(jd.Tags, false /* allowInternal */); err != nil {
		return &api.EventResponse{Err: err}
	}
	// Add instance tag, if specified.
	if jm.config.instanceTag != "" {
		jd.Tags = job.AddTags(jd.Tags, jm.config.instanceTag)
	}
	j, err := NewJobFromDescriptor(ev.Context, jm.pluginRegistry, &jd)
	if err != nil {
		return &api.EventResponse{Err: err}
	}
	jdJSON, err := json.MarshalIndent(&jd, "", "    ")
	if err != nil {
		return &api.EventResponse{Err: err}
	}

	// The job descriptor has been validated correctly, now use the JobRequestEmitter
	// interface to obtain a JobRequest object with a valid id
	request := job.Request{
		JobName:            j.Name,
		JobDescriptor:      string(jdJSON),
		ExtendedDescriptor: j.ExtendedDescriptor,
		Requestor:          string(ev.Msg.Requestor()),
		ServerID:           ev.ServerID,
		RequestTime:        time.Now(),
	}
	jobID, err := jm.jsm.StoreJobRequest(ev.Context, &request)
	if err != nil {
		return &api.EventResponse{
			Requestor: ev.Msg.Requestor(),
			Err:       fmt.Errorf("could not create job request: %v", err)}
	}

	j.ID = jobID

	jm.startJob(ev.Context, j, nil)

	return &api.EventResponse{
		JobID:     j.ID,
		Requestor: ev.Msg.Requestor(),
		Err:       nil,
		Status: &job.Status{
			Name:      j.Name,
			State:     string(job.EventJobStarted),
			StartTime: time.Now(),
		},
	}
}