pkg/jobmanager/status.go (115 lines of code) (raw):

// Copyright (c) Facebook, Inc. and its affiliates. // // This source code is licensed under the MIT license found in the // LICENSE file in the root directory of this source tree. package jobmanager import ( "encoding/json" "fmt" "time" log "github.com/sirupsen/logrus" "github.com/facebookincubator/contest/pkg/api" "github.com/facebookincubator/contest/pkg/event" "github.com/facebookincubator/contest/pkg/event/frameworkevent" "github.com/facebookincubator/contest/pkg/job" "github.com/facebookincubator/contest/pkg/storage" ) func (jm *JobManager) status(ev *api.Event) *api.EventResponse { ctx := storage.WithConsistencyModel(ev.Context, storage.ConsistentEventually) msg := ev.Msg.(api.EventStatusMsg) jobID := msg.JobID evResp := api.EventResponse{ JobID: jobID, Requestor: ev.Msg.Requestor(), Err: nil, } // Look up job request. req, err := jm.jsm.GetJobRequest(ctx, jobID) if err != nil { evResp.Err = fmt.Errorf("failed to fetch request for job ID %d: %w", jobID, err) return &evResp } currentJob, err := NewJobFromExtendedDescriptor(ctx, jm.pluginRegistry, req.ExtendedDescriptor) if err != nil { evResp.Err = fmt.Errorf("failed to build job object from job request: %w", err) return &evResp } // currentJob temporary object is just used as an interface to the job extended descriptor // so populate it with the other necessary fields such as id (currently 0) currentJob.ID = jobID // Is it for our instance? if jm.config.instanceTag != "" { found := false for _, tag := range currentJob.Tags { if tag == jm.config.instanceTag { found = true break } } if !found { evResp.Err = fmt.Errorf("job %d belongs to a different instance, this is %q", jobID, jm.config.instanceTag) return &evResp } } // Fetch all the events associated to changes of state of the Job jobEvents, err := jm.frameworkEvManager.Fetch(ctx, frameworkevent.QueryJobID(jobID), frameworkevent.QueryEventNames(job.JobStateEvents), ) if err != nil { evResp.Err = fmt.Errorf("could not fetch events associated to job state: %v", err) return &evResp } // Lookup job starting time and job termination time based on the events emitted var ( startTime time.Time endTime *time.Time ) completionEvents := make(map[event.Name]bool) for _, eventName := range job.JobCompletionEvents { completionEvents[eventName] = true } for _, ev := range jobEvents { if ev.EventName == job.EventJobStarted { startTime = ev.EmitTime } else if _, ok := completionEvents[ev.EventName]; ok { // A completion event has been seen for this Job. Only one completion event can be associated to the job if endTime != nil && !endTime.IsZero() { log.Warningf("Job %d is associated to multiple completion events", jobID) } endTime = &ev.EmitTime } } state := "Unknown" var stateErrMsg string if len(jobEvents) > 0 { je := jobEvents[len(jobEvents)-1] state = string(je.EventName) if je.EventName == job.EventJobFailed { // if there was a framework failure, retrieve the failure event and // the associated error message, so it can be exposed in the status. if je.Payload == nil { stateErrMsg = "internal error: EventJobFailed's payload is nil" } else { var ep ErrorEventPayload if err := json.Unmarshal(*je.Payload, &ep); err != nil { stateErrMsg = fmt.Sprintf("internal error: EventJobFailed's payload cannot be unmarshalled. Raw payload: %s, Error: %v", *je.Payload, err) } else { stateErrMsg = ep.Err.Error() } } } } report, err := jm.jsm.GetJobReport(ctx, jobID) if err != nil { evResp.Err = fmt.Errorf("could not fetch job report: %v", err) return &evResp } jobStatus := job.Status{ Name: currentJob.Name, StartTime: startTime, EndTime: endTime, State: state, StateErrMsg: stateErrMsg, JobReport: report, } jobStatus.RunStatuses, err = jm.jobRunner.BuildRunStatuses(ctx, currentJob) if err != nil { evResp.Err = fmt.Errorf("could not rebuild the statuses of the job: %v", err) return &evResp } if len(jobStatus.RunStatuses) > 0 { // NOTE: deprecated, keeping for backwards compat jobStatus.RunStatus = &jobStatus.RunStatuses[len(jobStatus.RunStatuses)-1] } evResp.Status = &jobStatus evResp.Err = nil return &evResp }