func()

in pkg/runner/job_runner.go [68:364]


func (jr *JobRunner) Run(ctx xcontext.Context, j *job.Job, resumeState *job.PauseEventPayload) (*job.PauseEventPayload, error) {
	var delay time.Duration
	runID := types.RunID(1)
	testID := 1
	keepJobEntry := false

	// Values are for plugins to read...
	ctx = xcontext.WithValue(ctx, types.KeyJobID, j.ID)
	// .. Fields are for structured logging
	ctx, jobCancel := xcontext.WithCancel(ctx.WithField("job_id", j.ID))

	jr.jobsMapLock.Lock()
	jr.jobsMap[j.ID] = &jobInfo{jobID: j.ID, jobCtx: ctx, jobCancel: jobCancel}
	jr.jobsMapLock.Unlock()
	defer func() {
		if !keepJobEntry {
			jr.jobsMapLock.Lock()
			delete(jr.jobsMap, j.ID)
			jr.jobsMapLock.Unlock()
		}
	}()

	if resumeState != nil {
		runID = resumeState.RunID
		if resumeState.TestID > 0 {
			testID = resumeState.TestID
		}
		if resumeState.StartAt != nil {
			// This may get negative. It's fine.
			delay = resumeState.StartAt.Sub(jr.clock.Now())
		}
	}

	if j.Runs == 0 {
		ctx.Infof("Running job '%s' (id %v) indefinitely, current run #%d test #%d", j.Name, j.ID, runID, testID)
	} else {
		ctx.Infof("Running job '%s' %d times, starting at #%d test #%d", j.Name, j.Runs, runID, testID)
	}
	tl := target.GetLocker()
	ev := storage.NewTestEventFetcher()

	var runErr error

	for ; runID <= types.RunID(j.Runs) || j.Runs == 0; runID++ {
		runCtx := xcontext.WithValue(ctx, types.KeyRunID, runID)
		runCtx = runCtx.WithField("run_id", runID)
		if delay > 0 {
			nextRun := jr.clock.Now().Add(delay)
			runCtx.Infof("Sleeping %s before the next run...", delay)
			select {
			case <-jr.clock.After(delay):
			case <-ctx.Until(xcontext.ErrPaused):
				resumeState = &job.PauseEventPayload{
					Version: job.CurrentPauseEventPayloadVersion,
					JobID:   j.ID,
					RunID:   runID,
					StartAt: &nextRun,
				}
				runCtx.Infof("Job paused with %s left until next run", nextRun.Sub(jr.clock.Now()))
				return resumeState, xcontext.ErrPaused
			case <-ctx.Done():
				return nil, xcontext.ErrCanceled
			}
		}

		// If we can't emit the run start event, we ignore the error. The framework will
		// try to rebuild the status if it detects that an event might have gone missing
		payload := RunStartedPayload{RunID: runID}
		err := jr.emitEvent(runCtx, j.ID, EventRunStarted, payload)
		if err != nil {
			runCtx.Warnf("Could not emit event run (run %d) start for job %d: %v", runID, j.ID, err)
		}

		for ; testID <= len(j.Tests); testID++ {
			t := j.Tests[testID-1]
			runCtx.Infof("Run #%d: fetching targets for test '%s'", runID, t.Name)
			bundle := t.TargetManagerBundle
			var (
				targets  []*target.Target
				errCh    = make(chan error, 1)
				acquired = false
			)
			// the Acquire semantic is synchronous, so that the implementation
			// is simpler on the user's side. We run it in a goroutine in
			// order to use a timeout for target acquisition.
			go func() {
				// If resuming with targets already acquired, just make sure we still own them.
				if resumeState != nil && resumeState.Targets != nil {
					targets = resumeState.Targets
					if err := tl.RefreshLocks(runCtx, j.ID, jr.targetLockDuration, targets); err != nil {
						errCh <- fmt.Errorf("Failed to refresh locks %v: %w", targets, err)
					}
					errCh <- nil
					return
				}
				if len(targets) == 0 {
					targets, err = bundle.TargetManager.Acquire(runCtx, j.ID, j.TargetManagerAcquireTimeout+jr.targetLockDuration, bundle.AcquireParameters, tl)
					if err != nil {
						errCh <- err
						return
					}
					acquired = true
				}
				// Lock all the targets returned by Acquire.
				// Targets can also be locked in the `Acquire` method, for
				// example to allow dynamic acquisition.
				// We lock them again to ensure that all the acquired
				// targets are locked before running the job.
				// Locking an already-locked target (by the same owner)
				// extends the locking deadline.
				if err := tl.Lock(runCtx, j.ID, jr.targetLockDuration, targets); err != nil {
					errCh <- fmt.Errorf("Target locking failed: %w", err)
				}
				errCh <- nil
			}()
			// wait for targets up to a certain amount of time
			select {
			case err := <-errCh:
				if err != nil {
					err = fmt.Errorf("run #%d: cannot fetch targets for test '%s': %v", runID, t.Name, err)
					runCtx.Errorf(err.Error())
					return nil, err
				}
				// Associate targets with the job. Background routine will refresh the locks periodically.
				jr.jobsMapLock.Lock()
				jr.jobsMap[j.ID].targets = targets
				jr.jobsMapLock.Unlock()
			case <-jr.clock.After(j.TargetManagerAcquireTimeout):
				return nil, fmt.Errorf("target manager acquire timed out after %s", j.TargetManagerAcquireTimeout)
				// Note: not handling cancellation here to allow TM plugins to wrap up correctly.
				// We have timeout to ensure it doesn't get stuck forever.
			}

			header := testevent.Header{JobID: j.ID, RunID: runID, TestName: t.Name}
			testEventEmitter := storage.NewTestEventEmitter(header)

			// Emit events tracking targets acquisition
			if acquired {
				runErr = jr.emitTargetEvents(runCtx, testEventEmitter, targets, target.EventTargetAcquired)
			}

			// Check for pause during target acquisition.
			select {
			case <-ctx.Until(xcontext.ErrPaused):
				runCtx.Infof("pause requested for job ID %v", j.ID)
				resumeState = &job.PauseEventPayload{
					Version: job.CurrentPauseEventPayloadVersion,
					JobID:   j.ID,
					RunID:   runID,
					TestID:  testID,
					Targets: targets,
				}
				return resumeState, xcontext.ErrPaused
			default:
			}

			if runErr == nil {
				runCtx.Infof("Run #%d: running test #%d for job '%s' (job ID: %d) on %d targets",
					runID, testID, j.Name, j.ID, len(targets))
				testRunner := NewTestRunner()
				var testRunnerState json.RawMessage
				if resumeState != nil {
					testRunnerState = resumeState.TestRunnerState
				}
				testRunnerState, err := testRunner.Run(ctx, t, targets, j.ID, runID, testRunnerState)
				if err == xcontext.ErrPaused {
					resumeState := &job.PauseEventPayload{
						Version:         job.CurrentPauseEventPayloadVersion,
						JobID:           j.ID,
						RunID:           runID,
						TestID:          testID,
						Targets:         targets,
						TestRunnerState: testRunnerState,
					}
					// Return without releasing targets and keep the job entry so locks continue to be refreshed
					// all the way to server exit.
					keepJobEntry = true
					return resumeState, err
				} else {
					runErr = err
				}
			}

			// Job is done, release all the targets
			go func() {
				// the Release semantic is synchronous, so that the implementation
				// is simpler on the user's side. We run it in a goroutine in
				// order to use a timeout for target acquisition. If Release fails, whether
				// due to an error or for a timeout, the whole Job is considered failed
				err := bundle.TargetManager.Release(runCtx, j.ID, targets, bundle.ReleaseParameters)
				// Announce that targets have been released
				_ = jr.emitTargetEvents(runCtx, testEventEmitter, targets, target.EventTargetReleased)
				// Stop refreshing the targets.
				// Here we rely on the fact that jobsMapLock is held continuously during refresh.
				jr.jobsMapLock.Lock()
				jr.jobsMap[j.ID].targets = nil
				jr.jobsMapLock.Unlock()
				if err := tl.Unlock(runCtx, j.ID, targets); err == nil {
					runCtx.Infof("Unlocked %d target(s) for job ID %d", len(targets), j.ID)
				} else {
					runCtx.Warnf("Failed to unlock %d target(s) (%v): %v", len(targets), targets, err)
				}
				errCh <- err
			}()
			select {
			case err := <-errCh:
				if err != nil {
					errRelease := fmt.Sprintf("Failed to release targets: %v", err)
					runCtx.Errorf(errRelease)
					return nil, fmt.Errorf(errRelease)
				}
			case <-jr.clock.After(j.TargetManagerReleaseTimeout):
				return nil, fmt.Errorf("target manager release timed out after %s", j.TargetManagerReleaseTimeout)
				// Ignore cancellation here, we want release and unlock to happen in that case.
			}
			// return the Run error only after releasing the targets, and only
			// if we are not running indefinitely. An error returned by the TestRunner
			// is considered a fatal condition and will cause the termination of the
			// whole job.
			if runErr != nil {
				return nil, runErr
			}

			resumeState = nil
		}

		// Calculate results for this run via the registered run reporters
		runCoordinates := job.RunCoordinates{JobID: j.ID, RunID: runID}
		for _, bundle := range j.RunReporterBundles {
			runStatus, err := jr.BuildRunStatus(ctx, runCoordinates, j)
			if err != nil {
				ctx.Warnf("could not build run status for job %d: %v. Run report will not execute", j.ID, err)
				continue
			}
			success, data, err := bundle.Reporter.RunReport(runCtx, bundle.Parameters, runStatus, ev)
			if err != nil {
				ctx.Warnf("Run reporter failed while calculating run results, proceeding anyway: %v", err)
			} else {
				if success {
					ctx.Infof("Run #%d of job %d considered successful according to %s", runID, j.ID, bundle.Reporter.Name())
				} else {
					ctx.Errorf("Run #%d of job %d considered failed according to %s", runID, j.ID, bundle.Reporter.Name())
				}
			}
			report := &job.Report{
				JobID:        j.ID,
				RunID:        runID,
				ReporterName: bundle.Reporter.Name(),
				ReportTime:   jr.clock.Now(),
				Success:      success,
				Data:         data,
			}
			if err := jr.jobStorage.StoreReport(ctx, report); err != nil {
				ctx.Warnf("Could not store job run report: %v", err)
			}
		}

		testID = 1
		delay = j.RunInterval
	}

	// Prepare final reports.

	for _, bundle := range j.FinalReporterBundles {
		// Build a RunStatus object for each run that we executed. We need to check if we interrupted
		// execution early and we did not perform all runs
		runStatuses, err := jr.BuildRunStatuses(ctx, j)
		if err != nil {
			ctx.Warnf("could not calculate run statuses: %v. Run report will not execute", err)
			continue
		}

		success, data, err := bundle.Reporter.FinalReport(ctx, bundle.Parameters, runStatuses, ev)
		if err != nil {
			ctx.Warnf("Final reporter failed while calculating test results, proceeding anyway: %v", err)
		} else {
			if success {
				ctx.Infof("Job %d (%d runs out of %d desired) considered successful", j.ID, runID-1, j.Runs)
			} else {
				ctx.Errorf("Job %d (%d runs out of %d desired) considered failed", j.ID, runID-1, j.Runs)
			}
		}
		report := &job.Report{
			JobID:        j.ID,
			RunID:        0,
			ReporterName: bundle.Reporter.Name(),
			ReportTime:   jr.clock.Now(),
			Success:      success,
			Data:         data,
		}
		if err := jr.jobStorage.StoreReport(ctx, report); err != nil {
			ctx.Warnf("Could not store job run report: %v", err)
		}
	}

	return nil, nil
}