in pkg/runner/job_runner.go [68:364]
func (jr *JobRunner) Run(ctx xcontext.Context, j *job.Job, resumeState *job.PauseEventPayload) (*job.PauseEventPayload, error) {
var delay time.Duration
runID := types.RunID(1)
testID := 1
keepJobEntry := false
// Values are for plugins to read...
ctx = xcontext.WithValue(ctx, types.KeyJobID, j.ID)
// .. Fields are for structured logging
ctx, jobCancel := xcontext.WithCancel(ctx.WithField("job_id", j.ID))
jr.jobsMapLock.Lock()
jr.jobsMap[j.ID] = &jobInfo{jobID: j.ID, jobCtx: ctx, jobCancel: jobCancel}
jr.jobsMapLock.Unlock()
defer func() {
if !keepJobEntry {
jr.jobsMapLock.Lock()
delete(jr.jobsMap, j.ID)
jr.jobsMapLock.Unlock()
}
}()
if resumeState != nil {
runID = resumeState.RunID
if resumeState.TestID > 0 {
testID = resumeState.TestID
}
if resumeState.StartAt != nil {
// This may get negative. It's fine.
delay = resumeState.StartAt.Sub(jr.clock.Now())
}
}
if j.Runs == 0 {
ctx.Infof("Running job '%s' (id %v) indefinitely, current run #%d test #%d", j.Name, j.ID, runID, testID)
} else {
ctx.Infof("Running job '%s' %d times, starting at #%d test #%d", j.Name, j.Runs, runID, testID)
}
tl := target.GetLocker()
ev := storage.NewTestEventFetcher()
var runErr error
for ; runID <= types.RunID(j.Runs) || j.Runs == 0; runID++ {
runCtx := xcontext.WithValue(ctx, types.KeyRunID, runID)
runCtx = runCtx.WithField("run_id", runID)
if delay > 0 {
nextRun := jr.clock.Now().Add(delay)
runCtx.Infof("Sleeping %s before the next run...", delay)
select {
case <-jr.clock.After(delay):
case <-ctx.Until(xcontext.ErrPaused):
resumeState = &job.PauseEventPayload{
Version: job.CurrentPauseEventPayloadVersion,
JobID: j.ID,
RunID: runID,
StartAt: &nextRun,
}
runCtx.Infof("Job paused with %s left until next run", nextRun.Sub(jr.clock.Now()))
return resumeState, xcontext.ErrPaused
case <-ctx.Done():
return nil, xcontext.ErrCanceled
}
}
// If we can't emit the run start event, we ignore the error. The framework will
// try to rebuild the status if it detects that an event might have gone missing
payload := RunStartedPayload{RunID: runID}
err := jr.emitEvent(runCtx, j.ID, EventRunStarted, payload)
if err != nil {
runCtx.Warnf("Could not emit event run (run %d) start for job %d: %v", runID, j.ID, err)
}
for ; testID <= len(j.Tests); testID++ {
t := j.Tests[testID-1]
runCtx.Infof("Run #%d: fetching targets for test '%s'", runID, t.Name)
bundle := t.TargetManagerBundle
var (
targets []*target.Target
errCh = make(chan error, 1)
acquired = false
)
// the Acquire semantic is synchronous, so that the implementation
// is simpler on the user's side. We run it in a goroutine in
// order to use a timeout for target acquisition.
go func() {
// If resuming with targets already acquired, just make sure we still own them.
if resumeState != nil && resumeState.Targets != nil {
targets = resumeState.Targets
if err := tl.RefreshLocks(runCtx, j.ID, jr.targetLockDuration, targets); err != nil {
errCh <- fmt.Errorf("Failed to refresh locks %v: %w", targets, err)
}
errCh <- nil
return
}
if len(targets) == 0 {
targets, err = bundle.TargetManager.Acquire(runCtx, j.ID, j.TargetManagerAcquireTimeout+jr.targetLockDuration, bundle.AcquireParameters, tl)
if err != nil {
errCh <- err
return
}
acquired = true
}
// Lock all the targets returned by Acquire.
// Targets can also be locked in the `Acquire` method, for
// example to allow dynamic acquisition.
// We lock them again to ensure that all the acquired
// targets are locked before running the job.
// Locking an already-locked target (by the same owner)
// extends the locking deadline.
if err := tl.Lock(runCtx, j.ID, jr.targetLockDuration, targets); err != nil {
errCh <- fmt.Errorf("Target locking failed: %w", err)
}
errCh <- nil
}()
// wait for targets up to a certain amount of time
select {
case err := <-errCh:
if err != nil {
err = fmt.Errorf("run #%d: cannot fetch targets for test '%s': %v", runID, t.Name, err)
runCtx.Errorf(err.Error())
return nil, err
}
// Associate targets with the job. Background routine will refresh the locks periodically.
jr.jobsMapLock.Lock()
jr.jobsMap[j.ID].targets = targets
jr.jobsMapLock.Unlock()
case <-jr.clock.After(j.TargetManagerAcquireTimeout):
return nil, fmt.Errorf("target manager acquire timed out after %s", j.TargetManagerAcquireTimeout)
// Note: not handling cancellation here to allow TM plugins to wrap up correctly.
// We have timeout to ensure it doesn't get stuck forever.
}
header := testevent.Header{JobID: j.ID, RunID: runID, TestName: t.Name}
testEventEmitter := storage.NewTestEventEmitter(header)
// Emit events tracking targets acquisition
if acquired {
runErr = jr.emitTargetEvents(runCtx, testEventEmitter, targets, target.EventTargetAcquired)
}
// Check for pause during target acquisition.
select {
case <-ctx.Until(xcontext.ErrPaused):
runCtx.Infof("pause requested for job ID %v", j.ID)
resumeState = &job.PauseEventPayload{
Version: job.CurrentPauseEventPayloadVersion,
JobID: j.ID,
RunID: runID,
TestID: testID,
Targets: targets,
}
return resumeState, xcontext.ErrPaused
default:
}
if runErr == nil {
runCtx.Infof("Run #%d: running test #%d for job '%s' (job ID: %d) on %d targets",
runID, testID, j.Name, j.ID, len(targets))
testRunner := NewTestRunner()
var testRunnerState json.RawMessage
if resumeState != nil {
testRunnerState = resumeState.TestRunnerState
}
testRunnerState, err := testRunner.Run(ctx, t, targets, j.ID, runID, testRunnerState)
if err == xcontext.ErrPaused {
resumeState := &job.PauseEventPayload{
Version: job.CurrentPauseEventPayloadVersion,
JobID: j.ID,
RunID: runID,
TestID: testID,
Targets: targets,
TestRunnerState: testRunnerState,
}
// Return without releasing targets and keep the job entry so locks continue to be refreshed
// all the way to server exit.
keepJobEntry = true
return resumeState, err
} else {
runErr = err
}
}
// Job is done, release all the targets
go func() {
// the Release semantic is synchronous, so that the implementation
// is simpler on the user's side. We run it in a goroutine in
// order to use a timeout for target acquisition. If Release fails, whether
// due to an error or for a timeout, the whole Job is considered failed
err := bundle.TargetManager.Release(runCtx, j.ID, targets, bundle.ReleaseParameters)
// Announce that targets have been released
_ = jr.emitTargetEvents(runCtx, testEventEmitter, targets, target.EventTargetReleased)
// Stop refreshing the targets.
// Here we rely on the fact that jobsMapLock is held continuously during refresh.
jr.jobsMapLock.Lock()
jr.jobsMap[j.ID].targets = nil
jr.jobsMapLock.Unlock()
if err := tl.Unlock(runCtx, j.ID, targets); err == nil {
runCtx.Infof("Unlocked %d target(s) for job ID %d", len(targets), j.ID)
} else {
runCtx.Warnf("Failed to unlock %d target(s) (%v): %v", len(targets), targets, err)
}
errCh <- err
}()
select {
case err := <-errCh:
if err != nil {
errRelease := fmt.Sprintf("Failed to release targets: %v", err)
runCtx.Errorf(errRelease)
return nil, fmt.Errorf(errRelease)
}
case <-jr.clock.After(j.TargetManagerReleaseTimeout):
return nil, fmt.Errorf("target manager release timed out after %s", j.TargetManagerReleaseTimeout)
// Ignore cancellation here, we want release and unlock to happen in that case.
}
// return the Run error only after releasing the targets, and only
// if we are not running indefinitely. An error returned by the TestRunner
// is considered a fatal condition and will cause the termination of the
// whole job.
if runErr != nil {
return nil, runErr
}
resumeState = nil
}
// Calculate results for this run via the registered run reporters
runCoordinates := job.RunCoordinates{JobID: j.ID, RunID: runID}
for _, bundle := range j.RunReporterBundles {
runStatus, err := jr.BuildRunStatus(ctx, runCoordinates, j)
if err != nil {
ctx.Warnf("could not build run status for job %d: %v. Run report will not execute", j.ID, err)
continue
}
success, data, err := bundle.Reporter.RunReport(runCtx, bundle.Parameters, runStatus, ev)
if err != nil {
ctx.Warnf("Run reporter failed while calculating run results, proceeding anyway: %v", err)
} else {
if success {
ctx.Infof("Run #%d of job %d considered successful according to %s", runID, j.ID, bundle.Reporter.Name())
} else {
ctx.Errorf("Run #%d of job %d considered failed according to %s", runID, j.ID, bundle.Reporter.Name())
}
}
report := &job.Report{
JobID: j.ID,
RunID: runID,
ReporterName: bundle.Reporter.Name(),
ReportTime: jr.clock.Now(),
Success: success,
Data: data,
}
if err := jr.jobStorage.StoreReport(ctx, report); err != nil {
ctx.Warnf("Could not store job run report: %v", err)
}
}
testID = 1
delay = j.RunInterval
}
// Prepare final reports.
for _, bundle := range j.FinalReporterBundles {
// Build a RunStatus object for each run that we executed. We need to check if we interrupted
// execution early and we did not perform all runs
runStatuses, err := jr.BuildRunStatuses(ctx, j)
if err != nil {
ctx.Warnf("could not calculate run statuses: %v. Run report will not execute", err)
continue
}
success, data, err := bundle.Reporter.FinalReport(ctx, bundle.Parameters, runStatuses, ev)
if err != nil {
ctx.Warnf("Final reporter failed while calculating test results, proceeding anyway: %v", err)
} else {
if success {
ctx.Infof("Job %d (%d runs out of %d desired) considered successful", j.ID, runID-1, j.Runs)
} else {
ctx.Errorf("Job %d (%d runs out of %d desired) considered failed", j.ID, runID-1, j.Runs)
}
}
report := &job.Report{
JobID: j.ID,
RunID: 0,
ReporterName: bundle.Reporter.Name(),
ReportTime: jr.clock.Now(),
Success: success,
Data: data,
}
if err := jr.jobStorage.StoreReport(ctx, report); err != nil {
ctx.Warnf("Could not store job run report: %v", err)
}
}
return nil, nil
}