in plugins/teststeps/teststeps.go [124:214]
func ForEachTargetWithResume(ctx xcontext.Context, ch test.TestStepChannels, resumeState json.RawMessage, currentStepStateVersion int, f PerTargetWithResumeFunc) (json.RawMessage, error) {
var ss parallelTargetsState
// Parse resume state, if any.
if len(resumeState) > 0 {
if err := json.Unmarshal(resumeState, &ss); err != nil {
return nil, fmt.Errorf("invalid resume state: %w", err)
}
if ss.Version != currentStepStateVersion {
return nil, fmt.Errorf("incompatible resume state: want %d, got %d", currentStepStateVersion, ss.Version)
}
}
var wg sync.WaitGroup
pauseStates := make(chan *TargetWithData)
handleTarget := func(tgt2 *TargetWithData) {
defer wg.Done()
err := f(ctx, tgt2)
switch err {
case xcontext.ErrCanceled:
// nothing to do for failed
case xcontext.ErrPaused:
select {
case pauseStates <- tgt2:
case <-ctx.Done():
ctx.Debugf("ForEachTargetWithResume: received cancellation signal while pausing")
}
default:
// nil or error
if err != nil {
ctx.Errorf("ForEachTargetWithResume: failed to apply test step function on target %s: %v", tgt2.Target.ID, err)
} else {
ctx.Debugf("ForEachTargetWithResume: target %s completed successfully", tgt2.Target.ID)
}
select {
case ch.Out <- test.TestStepResult{Target: tgt2.Target, Err: err}:
case <-ctx.Done():
ctx.Debugf("ForEachTargetWithResume: received cancellation signal while reporting result")
}
}
}
// restart paused targets
for _, state := range ss.Targets {
ctx.Debugf("ForEachTargetWithResume: resuming target %s", state.Target.ID)
wg.Add(1)
go handleTarget(state)
}
// delete info about running targets
ss.Targets = nil
var err error
mainloop:
for {
select {
// no need to check for pause here, pausing closes the channel
case tgt, ok := <-ch.In:
if !ok {
break mainloop
}
ctx.Debugf("ForEachTargetWithResume: received target %s", tgt)
wg.Add(1)
go handleTarget(&TargetWithData{Target: tgt})
case <-ctx.Done():
ctx.Debugf("ForEachTargetWithResume: canceled, terminating")
err = xcontext.ErrCanceled
break mainloop
}
}
// close pauseStates to signal all handlers are done
go func() {
wg.Wait()
close(pauseStates)
}()
for ps := range pauseStates {
ss.Targets = append(ss.Targets, ps)
}
// wrap up
if !ctx.IsSignaledWith(xcontext.ErrPaused) && len(ss.Targets) > 0 {
return nil, fmt.Errorf("ForEachTargetWithResume: some target functions paused, but no pause signal received: %v ", ss.Targets)
}
if ctx.IsSignaledWith(xcontext.ErrPaused) {
return MarshalState(&ss, currentStepStateVersion)
}
return nil, err
}