pkg/runner/job_runner.go (353 lines of code) (raw):

// Copyright (c) Facebook, Inc. and its affiliates. // // This source code is licensed under the MIT license found in the // LICENSE file in the root directory of this source tree. package runner import ( "encoding/json" "fmt" "sync" "time" "github.com/benbjohnson/clock" "github.com/facebookincubator/contest/pkg/event" "github.com/facebookincubator/contest/pkg/event/frameworkevent" "github.com/facebookincubator/contest/pkg/event/testevent" "github.com/facebookincubator/contest/pkg/job" "github.com/facebookincubator/contest/pkg/storage" "github.com/facebookincubator/contest/pkg/target" "github.com/facebookincubator/contest/pkg/types" "github.com/facebookincubator/contest/pkg/xcontext" ) // jobInfo describes jobs currently being run. type jobInfo struct { jobID types.JobID targets []*target.Target jobCtx xcontext.Context jobCancel func() } // JobRunner implements logic to run, cancel and stop Jobs type JobRunner struct { jobsMapLock sync.Mutex jobsMap map[types.JobID]*jobInfo // jobStorage is used to store job reports jobStorage storage.JobStorage // frameworkEventManager is used by the JobRunner to emit framework events frameworkEventManager frameworkevent.EmitterFetcher // testEvManager is used by the JobRunner to emit test events testEvManager testevent.Fetcher // targetLockDuration is the amount of time target lock is extended by // while the job is running. targetLockDuration time.Duration // clock is the time measurement device, mocked out in tests. clock clock.Clock stopLockRefresh chan struct{} lockRefreshStopped chan struct{} } // Run implements the main job running logic. It holds a registry of all running // jobs that can be referenced when when cancellation/pause/stop requests come in // // It returns: // // * [][]job.Report: all the run reports, grouped by run, sorted from first to // last // * []job.Report: all the final reports // * error: an error, if any func (jr *JobRunner) Run(ctx xcontext.Context, j *job.Job, resumeState *job.PauseEventPayload) (*job.PauseEventPayload, error) { var delay time.Duration runID := types.RunID(1) testID := 1 keepJobEntry := false // Values are for plugins to read... ctx = xcontext.WithValue(ctx, types.KeyJobID, j.ID) // .. Fields are for structured logging ctx, jobCancel := xcontext.WithCancel(ctx.WithField("job_id", j.ID)) jr.jobsMapLock.Lock() jr.jobsMap[j.ID] = &jobInfo{jobID: j.ID, jobCtx: ctx, jobCancel: jobCancel} jr.jobsMapLock.Unlock() defer func() { if !keepJobEntry { jr.jobsMapLock.Lock() delete(jr.jobsMap, j.ID) jr.jobsMapLock.Unlock() } }() if resumeState != nil { runID = resumeState.RunID if resumeState.TestID > 0 { testID = resumeState.TestID } if resumeState.StartAt != nil { // This may get negative. It's fine. delay = resumeState.StartAt.Sub(jr.clock.Now()) } } if j.Runs == 0 { ctx.Infof("Running job '%s' (id %v) indefinitely, current run #%d test #%d", j.Name, j.ID, runID, testID) } else { ctx.Infof("Running job '%s' %d times, starting at #%d test #%d", j.Name, j.Runs, runID, testID) } tl := target.GetLocker() ev := storage.NewTestEventFetcher() var runErr error for ; runID <= types.RunID(j.Runs) || j.Runs == 0; runID++ { runCtx := xcontext.WithValue(ctx, types.KeyRunID, runID) runCtx = runCtx.WithField("run_id", runID) if delay > 0 { nextRun := jr.clock.Now().Add(delay) runCtx.Infof("Sleeping %s before the next run...", delay) select { case <-jr.clock.After(delay): case <-ctx.Until(xcontext.ErrPaused): resumeState = &job.PauseEventPayload{ Version: job.CurrentPauseEventPayloadVersion, JobID: j.ID, RunID: runID, StartAt: &nextRun, } runCtx.Infof("Job paused with %s left until next run", nextRun.Sub(jr.clock.Now())) return resumeState, xcontext.ErrPaused case <-ctx.Done(): return nil, xcontext.ErrCanceled } } // If we can't emit the run start event, we ignore the error. The framework will // try to rebuild the status if it detects that an event might have gone missing payload := RunStartedPayload{RunID: runID} err := jr.emitEvent(runCtx, j.ID, EventRunStarted, payload) if err != nil { runCtx.Warnf("Could not emit event run (run %d) start for job %d: %v", runID, j.ID, err) } for ; testID <= len(j.Tests); testID++ { t := j.Tests[testID-1] runCtx.Infof("Run #%d: fetching targets for test '%s'", runID, t.Name) bundle := t.TargetManagerBundle var ( targets []*target.Target errCh = make(chan error, 1) acquired = false ) // the Acquire semantic is synchronous, so that the implementation // is simpler on the user's side. We run it in a goroutine in // order to use a timeout for target acquisition. go func() { // If resuming with targets already acquired, just make sure we still own them. if resumeState != nil && resumeState.Targets != nil { targets = resumeState.Targets if err := tl.RefreshLocks(runCtx, j.ID, jr.targetLockDuration, targets); err != nil { errCh <- fmt.Errorf("Failed to refresh locks %v: %w", targets, err) } errCh <- nil return } if len(targets) == 0 { targets, err = bundle.TargetManager.Acquire(runCtx, j.ID, j.TargetManagerAcquireTimeout+jr.targetLockDuration, bundle.AcquireParameters, tl) if err != nil { errCh <- err return } acquired = true } // Lock all the targets returned by Acquire. // Targets can also be locked in the `Acquire` method, for // example to allow dynamic acquisition. // We lock them again to ensure that all the acquired // targets are locked before running the job. // Locking an already-locked target (by the same owner) // extends the locking deadline. if err := tl.Lock(runCtx, j.ID, jr.targetLockDuration, targets); err != nil { errCh <- fmt.Errorf("Target locking failed: %w", err) } errCh <- nil }() // wait for targets up to a certain amount of time select { case err := <-errCh: if err != nil { err = fmt.Errorf("run #%d: cannot fetch targets for test '%s': %v", runID, t.Name, err) runCtx.Errorf(err.Error()) return nil, err } // Associate targets with the job. Background routine will refresh the locks periodically. jr.jobsMapLock.Lock() jr.jobsMap[j.ID].targets = targets jr.jobsMapLock.Unlock() case <-jr.clock.After(j.TargetManagerAcquireTimeout): return nil, fmt.Errorf("target manager acquire timed out after %s", j.TargetManagerAcquireTimeout) // Note: not handling cancellation here to allow TM plugins to wrap up correctly. // We have timeout to ensure it doesn't get stuck forever. } header := testevent.Header{JobID: j.ID, RunID: runID, TestName: t.Name} testEventEmitter := storage.NewTestEventEmitter(header) // Emit events tracking targets acquisition if acquired { runErr = jr.emitTargetEvents(runCtx, testEventEmitter, targets, target.EventTargetAcquired) } // Check for pause during target acquisition. select { case <-ctx.Until(xcontext.ErrPaused): runCtx.Infof("pause requested for job ID %v", j.ID) resumeState = &job.PauseEventPayload{ Version: job.CurrentPauseEventPayloadVersion, JobID: j.ID, RunID: runID, TestID: testID, Targets: targets, } return resumeState, xcontext.ErrPaused default: } if runErr == nil { runCtx.Infof("Run #%d: running test #%d for job '%s' (job ID: %d) on %d targets", runID, testID, j.Name, j.ID, len(targets)) testRunner := NewTestRunner() var testRunnerState json.RawMessage if resumeState != nil { testRunnerState = resumeState.TestRunnerState } testRunnerState, err := testRunner.Run(ctx, t, targets, j.ID, runID, testRunnerState) if err == xcontext.ErrPaused { resumeState := &job.PauseEventPayload{ Version: job.CurrentPauseEventPayloadVersion, JobID: j.ID, RunID: runID, TestID: testID, Targets: targets, TestRunnerState: testRunnerState, } // Return without releasing targets and keep the job entry so locks continue to be refreshed // all the way to server exit. keepJobEntry = true return resumeState, err } else { runErr = err } } // Job is done, release all the targets go func() { // the Release semantic is synchronous, so that the implementation // is simpler on the user's side. We run it in a goroutine in // order to use a timeout for target acquisition. If Release fails, whether // due to an error or for a timeout, the whole Job is considered failed err := bundle.TargetManager.Release(runCtx, j.ID, targets, bundle.ReleaseParameters) // Announce that targets have been released _ = jr.emitTargetEvents(runCtx, testEventEmitter, targets, target.EventTargetReleased) // Stop refreshing the targets. // Here we rely on the fact that jobsMapLock is held continuously during refresh. jr.jobsMapLock.Lock() jr.jobsMap[j.ID].targets = nil jr.jobsMapLock.Unlock() if err := tl.Unlock(runCtx, j.ID, targets); err == nil { runCtx.Infof("Unlocked %d target(s) for job ID %d", len(targets), j.ID) } else { runCtx.Warnf("Failed to unlock %d target(s) (%v): %v", len(targets), targets, err) } errCh <- err }() select { case err := <-errCh: if err != nil { errRelease := fmt.Sprintf("Failed to release targets: %v", err) runCtx.Errorf(errRelease) return nil, fmt.Errorf(errRelease) } case <-jr.clock.After(j.TargetManagerReleaseTimeout): return nil, fmt.Errorf("target manager release timed out after %s", j.TargetManagerReleaseTimeout) // Ignore cancellation here, we want release and unlock to happen in that case. } // return the Run error only after releasing the targets, and only // if we are not running indefinitely. An error returned by the TestRunner // is considered a fatal condition and will cause the termination of the // whole job. if runErr != nil { return nil, runErr } resumeState = nil } // Calculate results for this run via the registered run reporters runCoordinates := job.RunCoordinates{JobID: j.ID, RunID: runID} for _, bundle := range j.RunReporterBundles { runStatus, err := jr.BuildRunStatus(ctx, runCoordinates, j) if err != nil { ctx.Warnf("could not build run status for job %d: %v. Run report will not execute", j.ID, err) continue } success, data, err := bundle.Reporter.RunReport(runCtx, bundle.Parameters, runStatus, ev) if err != nil { ctx.Warnf("Run reporter failed while calculating run results, proceeding anyway: %v", err) } else { if success { ctx.Infof("Run #%d of job %d considered successful according to %s", runID, j.ID, bundle.Reporter.Name()) } else { ctx.Errorf("Run #%d of job %d considered failed according to %s", runID, j.ID, bundle.Reporter.Name()) } } report := &job.Report{ JobID: j.ID, RunID: runID, ReporterName: bundle.Reporter.Name(), ReportTime: jr.clock.Now(), Success: success, Data: data, } if err := jr.jobStorage.StoreReport(ctx, report); err != nil { ctx.Warnf("Could not store job run report: %v", err) } } testID = 1 delay = j.RunInterval } // Prepare final reports. for _, bundle := range j.FinalReporterBundles { // Build a RunStatus object for each run that we executed. We need to check if we interrupted // execution early and we did not perform all runs runStatuses, err := jr.BuildRunStatuses(ctx, j) if err != nil { ctx.Warnf("could not calculate run statuses: %v. Run report will not execute", err) continue } success, data, err := bundle.Reporter.FinalReport(ctx, bundle.Parameters, runStatuses, ev) if err != nil { ctx.Warnf("Final reporter failed while calculating test results, proceeding anyway: %v", err) } else { if success { ctx.Infof("Job %d (%d runs out of %d desired) considered successful", j.ID, runID-1, j.Runs) } else { ctx.Errorf("Job %d (%d runs out of %d desired) considered failed", j.ID, runID-1, j.Runs) } } report := &job.Report{ JobID: j.ID, RunID: 0, ReporterName: bundle.Reporter.Name(), ReportTime: jr.clock.Now(), Success: success, Data: data, } if err := jr.jobStorage.StoreReport(ctx, report); err != nil { ctx.Warnf("Could not store job run report: %v", err) } } return nil, nil } func (jr *JobRunner) lockRefresher() { // refresh locks a bit faster than locking timeout to avoid races interval := jr.targetLockDuration / 10 * 9 loop: for { select { case <-jr.clock.After(interval): jr.RefreshLocks() case <-jr.stopLockRefresh: break loop } } close(jr.lockRefreshStopped) } // StartLockRefresh starts the background lock refresh routine. func (jr *JobRunner) StartLockRefresh() { go jr.lockRefresher() } // StopLockRefresh stops the background lock refresh routine. func (jr *JobRunner) StopLockRefresh() { close(jr.stopLockRefresh) <-jr.lockRefreshStopped } // RefreshLocks refreshes locks for running or paused jobs. func (jr *JobRunner) RefreshLocks() { // Note: For simplicity we perform refresh for all jobs while continuously holding the lock over the entire map. // Should this become a problem, this can be made more granualar. When doing it, it is important to synchronise // refresh with release (avoid releasing targets while refresh is ongoing). jr.jobsMapLock.Lock() defer jr.jobsMapLock.Unlock() var wg sync.WaitGroup for jobID := range jr.jobsMap { ji := jr.jobsMap[jobID] if len(ji.targets) == 0 { continue } wg.Add(1) go func() { // Refresh locks for all the jobs in parallel. tl := target.GetLocker() select { case <-ji.jobCtx.Done(): // Job has been canceled, nothing to do break default: ji.jobCtx.Debugf("Refreshing target locks...") if err := tl.RefreshLocks(ji.jobCtx, ji.jobID, jr.targetLockDuration, ji.targets); err != nil { ji.jobCtx.Errorf("Failed to refresh %d locks for job ID %d (%v), aborting job", len(ji.targets), ji.jobID, err) // We lost our grip on targets, fold the tent and leave ASAP. ji.jobCancel() } } wg.Done() }() } wg.Wait() } // emitTargetEvents emits test events to keep track of Target acquisition and release func (jr *JobRunner) emitTargetEvents(ctx xcontext.Context, emitter testevent.Emitter, targets []*target.Target, eventName event.Name) error { // The events hold a serialization of the Target in the payload for _, t := range targets { data := testevent.Data{EventName: eventName, Target: t} if err := emitter.Emit(ctx, data); err != nil { ctx.Warnf("could not emit event %s: %v", eventName, err) return err } } return nil } func (jr *JobRunner) emitEvent(ctx xcontext.Context, jobID types.JobID, eventName event.Name, payload interface{}) error { payloadJSON, err := json.Marshal(payload) if err != nil { ctx.Warnf("could not encode payload for event %s: %v", eventName, err) return err } rawPayload := json.RawMessage(payloadJSON) ev := frameworkevent.Event{JobID: jobID, EventName: eventName, Payload: &rawPayload, EmitTime: jr.clock.Now()} if err := jr.frameworkEventManager.Emit(ctx, ev); err != nil { ctx.Warnf("could not emit event %s: %v", eventName, err) return err } return nil } // NewJobRunner returns a new JobRunner, which holds an empty registry of jobs func NewJobRunner(js storage.JobStorage, clk clock.Clock, lockDuration time.Duration) *JobRunner { jr := &JobRunner{ jobsMap: make(map[types.JobID]*jobInfo), jobStorage: js, frameworkEventManager: storage.NewFrameworkEventEmitterFetcher(), testEvManager: storage.NewTestEventFetcher(), targetLockDuration: lockDuration, clock: clk, stopLockRefresh: make(chan struct{}), lockRefreshStopped: make(chan struct{}), } return jr }