func Schedule()

in request/schedule.go [34:139]


func Schedule(ctx context.Context, spec *types.LoadProfileSpec, restCli []rest.Interface) (*Result, error) {
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()

	rndReqs, err := NewWeightedRandomRequests(spec)
	if err != nil {
		return nil, err
	}

	qps := spec.Rate
	if qps == 0 {
		qps = float64(math.MaxInt32)
	}
	limiter := rate.NewLimiter(rate.Limit(qps), 1)

	clients := spec.Client
	if clients == 0 {
		clients = spec.Conns
	}

	reqBuilderCh := rndReqs.Chan()
	var wg sync.WaitGroup

	respMetric := metrics.NewResponseMetric()
	for i := 0; i < clients; i++ {
		// reuse connection if clients > conns
		cli := restCli[i%len(restCli)]
		wg.Add(1)
		go func(cli rest.Interface) {
			defer wg.Done()

			for builder := range reqBuilderCh {
				req := builder.Build(cli)

				if err := limiter.Wait(ctx); err != nil {
					klog.V(5).Infof("Rate limiter wait failed: %v", err)
					cancel()
					return
				}

				klog.V(5).Infof("Request URL: %s", req.URL())

				req.Timeout(defaultTimeout)
				func() {
					start := time.Now()

					var bytes int64
					bytes, err := req.Do(context.Background())
					// Based on HTTP2 Spec Section 8.1 [1],
					//
					// A server can send a complete response prior to the client
					// sending an entire request if the response does not depend
					// on any portion of the request that has not been sent and
					// received. When this is true, a server MAY request that the
					// client abort transmission of a request without error by
					// sending a RST_STREAM with an error code of NO_ERROR after
					// sending a complete response (i.e., a frame with the END_STREAM
					// flag). Clients MUST NOT discard responses as a result of receiving
					// such a RST_STREAM, though clients can always discard responses
					// at their discretion for other reasons.
					//
					// We should mark NO_ERROR as nil here.
					//
					// [1]: https://httpwg.org/specs/rfc7540.html#HttpSequence
					if err != nil && isHTTP2StreamNoError(err) {
						err = nil
					}

					end := time.Now()
					latency := end.Sub(start).Seconds()

					respMetric.ObserveReceivedBytes(bytes)
					if err != nil {
						respMetric.ObserveFailure(req.URL().String(), end, latency, err)
						klog.V(5).Infof("Request stream failed: %v", err)
						return
					}
					respMetric.ObserveLatency(req.URL().String(), latency)
				}()
			}
		}(cli)
	}

	klog.V(2).InfoS("Setting",
		"clients", clients,
		"connections", len(restCli),
		"rate", qps,
		"total", spec.Total,
		"http2", !spec.DisableHTTP2,
		"content-type", spec.ContentType,
	)

	start := time.Now()

	rndReqs.Run(ctx, spec.Total)
	rndReqs.Stop()
	wg.Wait()

	totalDuration := time.Since(start)
	responseStats := respMetric.Gather()
	return &Result{
		ResponseStats: responseStats,
		Duration:      totalDuration,
		Total:         spec.Total,
	}, nil
}