cmd/core/runner/concurrency.go (67 lines of code) (raw):

/* Copyright (c) Facebook, Inc. and its affiliates. All rights reserved. This source code is licensed under the BSD-style license found in the LICENSE file in the root directory of this source tree. */ package runner import ( "context" "time" "github.com/facebookincubator/fbender/cmd/core/options" "github.com/facebookincubator/fbender/recorders" "github.com/facebookincubator/fbender/tester" "github.com/facebookincubator/fbender/utils" "github.com/pinterest/bender" ) // ConcurrencyRunner is a test runner for load test concurrency commands. type ConcurrencyRunner struct { runner workerSem *bender.WorkerSemaphore spinnerCancel context.CancelFunc } // NewConcurrencyRunner returns new ConcurrencyRunner. func NewConcurrencyRunner(params *Params) *ConcurrencyRunner { return &ConcurrencyRunner{ runner: runner{ Params: params, }, } } // Before prepares requests, recorders and interval generator. func (r *ConcurrencyRunner) Before(workers tester.Workers, opts interface{}) error { if err := r.runner.Before(workers, opts); err != nil { return err } o, ok := opts.(*options.Options) if !ok { return tester.ErrInvalidOptions } r.workerSem = bender.NewWorkerSemaphore() go func() { r.workerSem.Signal(workers) }() r.requests = make(chan interface{}) ctx, cancel := context.WithCancel(context.Background()) go func() { for i := 0; ; i++ { select { case <-ctx.Done(): close(r.requests) return default: r.requests <- r.Params.RequestGenerator(i) } } }() // We want tne progressbar to measure the time passed. const scale = 10 count := int(o.Duration/time.Second) * scale r.progress, r.bar = recorders.NewLoadTestProgress(count) r.progress.Start() go func() { for i := 0; i < count; i++ { time.Sleep(time.Second / scale) r.bar.Incr() } cancel() r.progress.Stop() r.spinnerCancel = utils.NewBackgroundSpinner("Waiting for tests to finish", 0) }() return nil } // After cleans up after the test. func (r *ConcurrencyRunner) After(test int, options interface{}) { r.spinnerCancel() r.runner.After(test, options) } // WorkerSemaphore returns a worker semaphore for concurrency test. func (r *ConcurrencyRunner) WorkerSemaphore() *bender.WorkerSemaphore { return r.workerSem }