plugins/teststeps/sleep/sleep.go (83 lines of code) (raw):

// Copyright (c) Facebook, Inc. and its affiliates. // // This source code is licensed under the MIT license found in the // LICENSE file in the root directory of this source tree. package sleep import ( "encoding/json" "errors" "fmt" "time" "github.com/facebookincubator/contest/pkg/event" "github.com/facebookincubator/contest/pkg/event/testevent" "github.com/facebookincubator/contest/pkg/test" "github.com/facebookincubator/contest/pkg/xcontext" "github.com/facebookincubator/contest/plugins/teststeps" ) // Name is the name used to look this plugin up. var Name = "Sleep" // Events defines the events that a TestStep is allow to emit var Events = []event.Name{} // sleepStep implements an echo-style printing plugin. type sleepStep struct { } // New initializes and returns a new EchoStep. It implements the TestStepFactory // interface. func New() test.TestStep { return &sleepStep{} } // Load returns the name, factory and events which are needed to register the step. func Load() (string, test.TestStepFactory, []event.Name) { return Name, New, Events } func getDuration(params test.TestStepParameters) (time.Duration, error) { durP := params.GetOne("duration") if durP.IsEmpty() { return 0, errors.New("Missing 'duration' field in sleep parameters") } dur, err := time.ParseDuration(durP.String()) if err != nil { return 0, fmt.Errorf("invalid duration %q: %w", durP.String(), err) } return dur, nil } // ValidateParameters validates the parameters that will be passed to the Run // and Resume methods of the test step. func (ss *sleepStep) ValidateParameters(_ xcontext.Context, params test.TestStepParameters) error { _, err := getDuration(params) return err } // Name returns the name of the Step func (ss *sleepStep) Name() string { return Name } type sleepStepData struct { DeadlineMS int64 `json:"D"` } // Run executes the step func (ss *sleepStep) Run(ctx xcontext.Context, ch test.TestStepChannels, params test.TestStepParameters, ev testevent.Emitter, resumeState json.RawMessage) (json.RawMessage, error) { dur, err := getDuration(params) if err != nil { return nil, err } fn := func(ctx xcontext.Context, target *teststeps.TargetWithData) error { var deadline time.Time // copy, can be different per target var sleepTime = dur // handle resume if target.Data != nil { ssd := sleepStepData{} if err := json.Unmarshal(target.Data, &ssd); err != nil { return fmt.Errorf("invalid resume state: %w", err) } deadline = time.Unix(ssd.DeadlineMS/1000, (ssd.DeadlineMS%1000)*1000000) sleepTime = time.Until(deadline) ctx.Debugf("restored with %v unix, in %s", ssd.DeadlineMS, time.Until(deadline)) } else { deadline = time.Now().Add(dur) } // now sleep select { case <-time.After(sleepTime): return nil case <-ctx.Until(xcontext.ErrPaused): ctx.Debugf("%s: Paused with %s left", target.Target, time.Until(deadline)) ssd := &sleepStepData{ DeadlineMS: deadline.UnixNano() / 1000000, } var err error target.Data, err = json.Marshal(ssd) if err != nil { return err } return xcontext.ErrPaused case <-ctx.Done(): ctx.Debugf("%s: Cancelled with %s left", target.Target, time.Until(deadline)) } return nil } return teststeps.ForEachTargetWithResume(ctx, ch, resumeState, 1, fn) }