pkg/runner/job_status.go (187 lines of code) (raw):

// Copyright (c) Facebook, Inc. and its affiliates. // // This source code is licensed under the MIT license found in the // LICENSE file in the root directory of this source tree. package runner import ( "encoding/json" "fmt" "github.com/facebookincubator/contest/pkg/event" "github.com/facebookincubator/contest/pkg/event/frameworkevent" "github.com/facebookincubator/contest/pkg/event/testevent" "github.com/facebookincubator/contest/pkg/job" "github.com/facebookincubator/contest/pkg/target" "github.com/facebookincubator/contest/pkg/test" "github.com/facebookincubator/contest/pkg/types" "github.com/facebookincubator/contest/pkg/xcontext" ) // targetRoutingEvents gather all event names which track the flow of targets // between TestSteps var targetRoutingEvents = map[event.Name]struct{}{ target.EventTargetIn: struct{}{}, target.EventTargetErr: struct{}{}, target.EventTargetOut: struct{}{}, target.EventTargetInErr: struct{}{}, } // buildTargetStatuses builds a list of TargetStepStatus, which represent the status of Targets within a TestStep func (jr *JobRunner) buildTargetStatuses(coordinates job.TestStepCoordinates, targetEvents []testevent.Event) ([]job.TargetStatus, error) { var targetStatuses []job.TargetStatus for _, testEvent := range targetEvents { // Update the TargetStatus object associated to the Target. If there is no TargetStatus associated yet, append it var targetStatus *job.TargetStatus for index, candidateStatus := range targetStatuses { if candidateStatus.Target.ID == testEvent.Data.Target.ID { targetStatus = &targetStatuses[index] break } } if targetStatus == nil { // There is no TargetStatus associated with this Target, create one targetStatuses = append(targetStatuses, job.TargetStatus{TestStepCoordinates: coordinates, Target: testEvent.Data.Target}) targetStatus = &targetStatuses[len(targetStatuses)-1] } // append non-routing events if _, isRoutingEvent := targetRoutingEvents[testEvent.Data.EventName]; !isRoutingEvent { targetStatus.Events = append(targetStatus.Events, testEvent) } evName := testEvent.Data.EventName if evName == target.EventTargetIn { targetStatus.InTime = testEvent.EmitTime } else if evName == target.EventTargetOut { targetStatus.OutTime = testEvent.EmitTime } else if evName == target.EventTargetErr { targetStatus.OutTime = testEvent.EmitTime errorPayload := target.ErrPayload{} jsonPayload, err := testEvent.Data.Payload.MarshalJSON() if err != nil { targetStatus.Error = fmt.Sprintf("could not marshal payload error: %v", err) } else { if err := json.Unmarshal(jsonPayload, &errorPayload); err != nil { targetStatus.Error = fmt.Sprintf("could not unmarshal payload error: %v", err) } else { targetStatus.Error = errorPayload.Error } } } } return targetStatuses, nil } // buildTestStepStatus builds the status object of a test step belonging to a test func (jr *JobRunner) buildTestStepStatus(ctx xcontext.Context, coordinates job.TestStepCoordinates) (*job.TestStepStatus, error) { testStepStatus := job.TestStepStatus{TestStepCoordinates: coordinates} // Fetch all Events associated to this TestStep testEvents, err := jr.testEvManager.Fetch(ctx, testevent.QueryJobID(coordinates.JobID), testevent.QueryRunID(coordinates.RunID), testevent.QueryTestName(coordinates.TestName), testevent.QueryTestStepLabel(coordinates.TestStepLabel), ) if err != nil { return nil, fmt.Errorf("could not fetch events associated to test step %s: %v", coordinates.TestStepLabel, err) } var stepEvents, targetEvents []testevent.Event for _, event := range testEvents { if event.Data.Target == nil { // we don't want target routing events in step events, but we want // them in target events below if _, skip := targetRoutingEvents[event.Data.EventName]; skip { ctx.Warnf("Found routing event '%s' with no target associated, this could indicate a bug", event.Data.EventName) continue } // this goes into TestStepStatus.Events stepEvents = append(stepEvents, event) } else { // this goes into TargetStatus.Events targetEvents = append(targetEvents, event) } } testStepStatus.Events = stepEvents targetStatuses, err := jr.buildTargetStatuses(coordinates, targetEvents) if err != nil { return nil, fmt.Errorf("could not build target status for test step %s: %v", coordinates.TestStepLabel, err) } testStepStatus.TargetStatuses = targetStatuses return &testStepStatus, nil } // buildTestStatus builds the status of a test belonging to a specific to a test func (jr *JobRunner) buildTestStatus(ctx xcontext.Context, coordinates job.TestCoordinates, currentJob *job.Job) (*job.TestStatus, error) { var currentTest *test.Test // Identify the test within the Job for which we are asking to calculate the status for _, candidateTest := range currentJob.Tests { if candidateTest.Name == coordinates.TestName { currentTest = candidateTest break } } if currentTest == nil { return nil, fmt.Errorf("job with id %d does not include any test named %s", coordinates.JobID, coordinates.TestName) } testStatus := job.TestStatus{ TestCoordinates: coordinates, TestStepStatuses: make([]job.TestStepStatus, len(currentTest.TestStepsBundles)), } // Build a TestStepStatus object for each TestStep for index, bundle := range currentTest.TestStepsBundles { testStepCoordinates := job.TestStepCoordinates{ TestCoordinates: coordinates, TestStepName: bundle.TestStep.Name(), TestStepLabel: bundle.TestStepLabel, } testStepStatus, err := jr.buildTestStepStatus(ctx, testStepCoordinates) if err != nil { return nil, fmt.Errorf("could not build TestStatus for test %s: %v", bundle.TestStep.Name(), err) } testStatus.TestStepStatuses[index] = *testStepStatus } // Calculate the overall status of the Targets which corresponds to the last TargetStatus // object recorded for each Target. // Fetch all events signaling that a Target has been acquired. This is the source of truth // indicating which Targets belong to a Test. targetAcquiredEvents, err := jr.testEvManager.Fetch(ctx, testevent.QueryJobID(coordinates.JobID), testevent.QueryRunID(coordinates.RunID), testevent.QueryTestName(coordinates.TestName), testevent.QueryEventName(target.EventTargetAcquired), ) if err != nil { return nil, fmt.Errorf("could not fetch events associated to target acquisition") } var targetStatuses []job.TargetStatus // Keep track of the last TargetStatus seen for each Target targetMap := make(map[string]job.TargetStatus) for _, testStepStatus := range testStatus.TestStepStatuses { for _, targetStatus := range testStepStatus.TargetStatuses { targetMap[targetStatus.Target.ID] = targetStatus } } for _, targetEvent := range targetAcquiredEvents { t := *targetEvent.Data.Target if _, ok := targetMap[t.ID]; !ok { // This Target is not associated to any TargetStatus, we assume it has not // started the test targetMap[t.ID] = job.TargetStatus{} } targetStatuses = append(targetStatuses, targetMap[t.ID]) } testStatus.TargetStatuses = targetStatuses return &testStatus, nil } // BuildRunStatus builds the status of a run with a job func (jr *JobRunner) BuildRunStatus(ctx xcontext.Context, coordinates job.RunCoordinates, currentJob *job.Job) (*job.RunStatus, error) { runStatus := job.RunStatus{RunCoordinates: coordinates, TestStatuses: make([]job.TestStatus, len(currentJob.Tests))} for index, currentTest := range currentJob.Tests { testCoordinates := job.TestCoordinates{RunCoordinates: coordinates, TestName: currentTest.Name} testStatus, err := jr.buildTestStatus(ctx, testCoordinates, currentJob) if err != nil { return nil, fmt.Errorf("could not rebuild status for test %s: %v", currentTest.Name, err) } runStatus.TestStatuses[index] = *testStatus } return &runStatus, nil } // BuildRunStatuses builds the status of all runs belonging to the job func (jr *JobRunner) BuildRunStatuses(ctx xcontext.Context, currentJob *job.Job) ([]job.RunStatus, error) { // Calculate the status only for the runs which effectively were executed runStartEvents, err := jr.frameworkEventManager.Fetch(ctx, frameworkevent.QueryEventName(EventRunStarted), frameworkevent.QueryJobID(currentJob.ID)) if err != nil { return nil, fmt.Errorf("could not determine how many runs were executed: %v", err) } if len(runStartEvents) == 0 { return nil, nil } numRuns := types.RunID(0) for _, runStartEvent := range runStartEvents { payload, err := runStartEvent.Payload.MarshalJSON() if err != nil { return nil, fmt.Errorf("could not extract JSON payload from RunStart event: %v", err) } payloadUnmarshaled := RunStartedPayload{} if err := json.Unmarshal(payload, &payloadUnmarshaled); err != nil { return nil, fmt.Errorf("could not unmarshal RunStarted event payload") } if payloadUnmarshaled.RunID > numRuns { numRuns = payloadUnmarshaled.RunID } } var runStatuses []job.RunStatus for runID := types.RunID(1); runID <= numRuns; runID++ { runCoordinates := job.RunCoordinates{JobID: currentJob.ID, RunID: runID} runStatus, err := jr.BuildRunStatus(ctx, runCoordinates, currentJob) if err != nil { return nil, fmt.Errorf("could not rebuild run status for run %d: %v", runID, err) } runStatuses = append(runStatuses, *runStatus) } return runStatuses, nil }