in request/schedule.go [34:139]
func Schedule(ctx context.Context, spec *types.LoadProfileSpec, restCli []rest.Interface) (*Result, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
rndReqs, err := NewWeightedRandomRequests(spec)
if err != nil {
return nil, err
}
qps := spec.Rate
if qps == 0 {
qps = float64(math.MaxInt32)
}
limiter := rate.NewLimiter(rate.Limit(qps), 1)
clients := spec.Client
if clients == 0 {
clients = spec.Conns
}
reqBuilderCh := rndReqs.Chan()
var wg sync.WaitGroup
respMetric := metrics.NewResponseMetric()
for i := 0; i < clients; i++ {
// reuse connection if clients > conns
cli := restCli[i%len(restCli)]
wg.Add(1)
go func(cli rest.Interface) {
defer wg.Done()
for builder := range reqBuilderCh {
req := builder.Build(cli)
if err := limiter.Wait(ctx); err != nil {
klog.V(5).Infof("Rate limiter wait failed: %v", err)
cancel()
return
}
klog.V(5).Infof("Request URL: %s", req.URL())
req.Timeout(defaultTimeout)
func() {
start := time.Now()
var bytes int64
bytes, err := req.Do(context.Background())
// Based on HTTP2 Spec Section 8.1 [1],
//
// A server can send a complete response prior to the client
// sending an entire request if the response does not depend
// on any portion of the request that has not been sent and
// received. When this is true, a server MAY request that the
// client abort transmission of a request without error by
// sending a RST_STREAM with an error code of NO_ERROR after
// sending a complete response (i.e., a frame with the END_STREAM
// flag). Clients MUST NOT discard responses as a result of receiving
// such a RST_STREAM, though clients can always discard responses
// at their discretion for other reasons.
//
// We should mark NO_ERROR as nil here.
//
// [1]: https://httpwg.org/specs/rfc7540.html#HttpSequence
if err != nil && isHTTP2StreamNoError(err) {
err = nil
}
end := time.Now()
latency := end.Sub(start).Seconds()
respMetric.ObserveReceivedBytes(bytes)
if err != nil {
respMetric.ObserveFailure(req.URL().String(), end, latency, err)
klog.V(5).Infof("Request stream failed: %v", err)
return
}
respMetric.ObserveLatency(req.URL().String(), latency)
}()
}
}(cli)
}
klog.V(2).InfoS("Setting",
"clients", clients,
"connections", len(restCli),
"rate", qps,
"total", spec.Total,
"http2", !spec.DisableHTTP2,
"content-type", spec.ContentType,
)
start := time.Now()
rndReqs.Run(ctx, spec.Total)
rndReqs.Stop()
wg.Wait()
totalDuration := time.Since(start)
responseStats := respMetric.Gather()
return &Result{
ResponseStats: responseStats,
Duration: totalDuration,
Total: spec.Total,
}, nil
}