plugins/teststeps/teststeps.go (150 lines of code) (raw):

// Copyright (c) Facebook, Inc. and its affiliates. // // This source code is licensed under the MIT license found in the // LICENSE file in the root directory of this source tree. package teststeps import ( "encoding/json" "fmt" "reflect" "sync" "github.com/facebookincubator/contest/pkg/target" "github.com/facebookincubator/contest/pkg/test" "github.com/facebookincubator/contest/pkg/xcontext" ) // PerTargetFunc is a function type that is called on each target by the // ForEachTarget function below. type PerTargetFunc func(ctx xcontext.Context, target *target.Target) error // ForEachTarget is a simple helper to write plugins that apply a given PerTargetFunc // independenly to each target. This helper handles routing through the in/out channels, // and forwards cancel/pause signals to the PerTargetFunc. // Note this helper does NOT saving state and pausing a test step, so it is only suited for // short-running tests or for environments that don't use job resumption. // Use ForEachTargetWithResume below for a similar helper with full resumption support. // This function wraps the logic that handles target routing through the in/out // The implementation of the per-target function is responsible for // reacting to cancel/pause signals and return quickly. func ForEachTarget(pluginName string, ctx xcontext.Context, ch test.TestStepChannels, f PerTargetFunc) (json.RawMessage, error) { reportTarget := func(t *target.Target, err error) { if err != nil { ctx.Errorf("%s: ForEachTarget: failed to apply test step function on target %s: %v", pluginName, t, err) } else { ctx.Debugf("%s: ForEachTarget: target %s completed successfully", pluginName, t) } select { case ch.Out <- test.TestStepResult{Target: t, Err: err}: case <-ctx.Done(): ctx.Debugf("%s: ForEachTarget: received cancellation signal while reporting result", pluginName) } } var wg sync.WaitGroup func() { for { select { case tgt, ok := <-ch.In: if !ok { ctx.Debugf("%s: ForEachTarget: all targets have been received", pluginName) return } ctx.Debugf("%s: ForEachTarget: received target %s", pluginName, tgt) wg.Add(1) go func() { defer wg.Done() err := f(ctx, tgt) reportTarget(tgt, err) }() case <-ctx.Done(): ctx.Debugf("%s: ForEachTarget: incoming loop canceled", pluginName) return } } }() wg.Wait() return nil, nil } // MarshalState serializes the provided state struct as JSON. // It sets the Version field to the specified value. func MarshalState(state interface{}, version int) (json.RawMessage, error) { { // Set the version. vs := reflect.Indirect(reflect.ValueOf(state)) if vs.Kind() != reflect.Struct { return nil, fmt.Errorf("state must be a struct") } vf := vs.FieldByName("Version") if vf.Kind() == 0 { return nil, fmt.Errorf("no Version field in struct") } vf.SetInt(int64(version)) } data, err := json.Marshal(state) if err != nil { return nil, err } return data, xcontext.ErrPaused } // TargetWithData holds a step target and the pause/resumption data for it // Each per-target function gets this passed in and can store any data // required to resume in data. type TargetWithData struct { Target *target.Target Data json.RawMessage } // PerTargetWithResumeFunc is the function that is called per target by ForEachTargetWithResume // It must obey the context and quickly return on cancellation and pause signals. // Functions can modify target and store any data required for resumption in target.data. type PerTargetWithResumeFunc func(ctx xcontext.Context, target *TargetWithData) error // parallelTargetsState is the internal state of ForEachTargetWithResume. type parallelTargetsState struct { Version int `json:"V"` Targets []*TargetWithData `json:"TWD,omitempty"` } // ForEachTargetWithResume is a helper to write plugins that support job resumption. // This helper is for plugins that want to apply a single function to all targets, independently. // This helper calls the supplied PerTargetWithResumeFunc immediately for each target received, // in a separate goroutine. When the function returns, the target is sent to the output channels so it can // run through the next test step. // This helper directly accepts the resumeState from the Run method of the TestStep interface, and the // return value can directly be passed back to the framework. // The helper automatically manages data returned on pause and makes sure the function is called again // with the same data on job resumption. The helper will not call functions again that succeeded or failed // before the pause signal was received. // The supplied PerTargetWithResumeFunc must react to pause and cancellation signals as normal. func ForEachTargetWithResume(ctx xcontext.Context, ch test.TestStepChannels, resumeState json.RawMessage, currentStepStateVersion int, f PerTargetWithResumeFunc) (json.RawMessage, error) { var ss parallelTargetsState // Parse resume state, if any. if len(resumeState) > 0 { if err := json.Unmarshal(resumeState, &ss); err != nil { return nil, fmt.Errorf("invalid resume state: %w", err) } if ss.Version != currentStepStateVersion { return nil, fmt.Errorf("incompatible resume state: want %d, got %d", currentStepStateVersion, ss.Version) } } var wg sync.WaitGroup pauseStates := make(chan *TargetWithData) handleTarget := func(tgt2 *TargetWithData) { defer wg.Done() err := f(ctx, tgt2) switch err { case xcontext.ErrCanceled: // nothing to do for failed case xcontext.ErrPaused: select { case pauseStates <- tgt2: case <-ctx.Done(): ctx.Debugf("ForEachTargetWithResume: received cancellation signal while pausing") } default: // nil or error if err != nil { ctx.Errorf("ForEachTargetWithResume: failed to apply test step function on target %s: %v", tgt2.Target.ID, err) } else { ctx.Debugf("ForEachTargetWithResume: target %s completed successfully", tgt2.Target.ID) } select { case ch.Out <- test.TestStepResult{Target: tgt2.Target, Err: err}: case <-ctx.Done(): ctx.Debugf("ForEachTargetWithResume: received cancellation signal while reporting result") } } } // restart paused targets for _, state := range ss.Targets { ctx.Debugf("ForEachTargetWithResume: resuming target %s", state.Target.ID) wg.Add(1) go handleTarget(state) } // delete info about running targets ss.Targets = nil var err error mainloop: for { select { // no need to check for pause here, pausing closes the channel case tgt, ok := <-ch.In: if !ok { break mainloop } ctx.Debugf("ForEachTargetWithResume: received target %s", tgt) wg.Add(1) go handleTarget(&TargetWithData{Target: tgt}) case <-ctx.Done(): ctx.Debugf("ForEachTargetWithResume: canceled, terminating") err = xcontext.ErrCanceled break mainloop } } // close pauseStates to signal all handlers are done go func() { wg.Wait() close(pauseStates) }() for ps := range pauseStates { ss.Targets = append(ss.Targets, ps) } // wrap up if !ctx.IsSignaledWith(xcontext.ErrPaused) && len(ss.Targets) > 0 { return nil, fmt.Errorf("ForEachTargetWithResume: some target functions paused, but no pause signal received: %v ", ss.Targets) } if ctx.IsSignaledWith(xcontext.ErrPaused) { return MarshalState(&ss, currentStepStateVersion) } return nil, err }