func ForEachTargetWithResume()

in plugins/teststeps/teststeps.go [124:214]


func ForEachTargetWithResume(ctx xcontext.Context, ch test.TestStepChannels, resumeState json.RawMessage, currentStepStateVersion int, f PerTargetWithResumeFunc) (json.RawMessage, error) {
	var ss parallelTargetsState

	// Parse resume state, if any.
	if len(resumeState) > 0 {
		if err := json.Unmarshal(resumeState, &ss); err != nil {
			return nil, fmt.Errorf("invalid resume state: %w", err)
		}
		if ss.Version != currentStepStateVersion {
			return nil, fmt.Errorf("incompatible resume state: want %d, got %d", currentStepStateVersion, ss.Version)
		}
	}

	var wg sync.WaitGroup
	pauseStates := make(chan *TargetWithData)

	handleTarget := func(tgt2 *TargetWithData) {
		defer wg.Done()

		err := f(ctx, tgt2)
		switch err {
		case xcontext.ErrCanceled:
			// nothing to do for failed
		case xcontext.ErrPaused:
			select {
			case pauseStates <- tgt2:
			case <-ctx.Done():
				ctx.Debugf("ForEachTargetWithResume: received cancellation signal while pausing")
			}
		default:
			// nil or error
			if err != nil {
				ctx.Errorf("ForEachTargetWithResume: failed to apply test step function on target %s: %v", tgt2.Target.ID, err)
			} else {
				ctx.Debugf("ForEachTargetWithResume: target %s completed successfully", tgt2.Target.ID)
			}
			select {
			case ch.Out <- test.TestStepResult{Target: tgt2.Target, Err: err}:
			case <-ctx.Done():
				ctx.Debugf("ForEachTargetWithResume: received cancellation signal while reporting result")
			}
		}
	}

	// restart paused targets
	for _, state := range ss.Targets {
		ctx.Debugf("ForEachTargetWithResume: resuming target %s", state.Target.ID)
		wg.Add(1)
		go handleTarget(state)
	}
	// delete info about running targets
	ss.Targets = nil

	var err error
mainloop:
	for {
		select {
		// no need to check for pause here, pausing closes the channel
		case tgt, ok := <-ch.In:
			if !ok {
				break mainloop
			}
			ctx.Debugf("ForEachTargetWithResume: received target %s", tgt)
			wg.Add(1)
			go handleTarget(&TargetWithData{Target: tgt})
		case <-ctx.Done():
			ctx.Debugf("ForEachTargetWithResume: canceled, terminating")
			err = xcontext.ErrCanceled
			break mainloop
		}
	}

	// close pauseStates to signal all handlers are done
	go func() {
		wg.Wait()
		close(pauseStates)
	}()

	for ps := range pauseStates {
		ss.Targets = append(ss.Targets, ps)
	}

	// wrap up
	if !ctx.IsSignaledWith(xcontext.ErrPaused) && len(ss.Targets) > 0 {
		return nil, fmt.Errorf("ForEachTargetWithResume: some target functions paused, but no pause signal received: %v ", ss.Targets)
	}
	if ctx.IsSignaledWith(xcontext.ErrPaused) {
		return MarshalState(&ss, currentStepStateVersion)
	}
	return nil, err
}