util-images/request-benchmark/main.go (180 lines of code) (raw):

/* Copyright 2022 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package main import ( "context" "flag" "fmt" "io" "log" "net/http" "net/url" "os" "path/filepath" "strings" "time" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/util/flowcontrol" ) const ( HealthCheckRequests = 10 NamespaceTmpl = "%namespace%" ) type ContentType string const ( JSONContentType ContentType = "json" ProtoContentType ContentType = "proto" CBORContentType ContentType = "cbor" YAMLContentType ContentType = "yaml" ) var ( inflight = flag.Int("inflight", 1, "Benchmark inflight (number of parallel requests being made to the apiserver") namespace = flag.String("namespace", "", "Replace %namespace% in URI with provided namespace") URI = flag.String("uri", "", "Request URI") verb = flag.String("verb", "GET", "A verb to be used in requests.") qps = flag.Float64("qps", -1, "The qps limit for all requests") contentType = ContentType("json") ) func (c ContentType) String() string { return string(c) } func (c *ContentType) Set(value string) error { switch ContentType(value) { case JSONContentType, ProtoContentType, CBORContentType, YAMLContentType: *c = ContentType(value) return nil default: return fmt.Errorf("invalid content type: %s. Must be one of: [json, proto, cbor, yaml]", value) } } func init() { flag.Var(&contentType, "content-type", "Content type for requests (required). Valid values: [json, proto, cbor, yaml]") flag.Parse() } func getContentType(ct ContentType) (string, error) { switch ct { case JSONContentType: return "application/json", nil case ProtoContentType: return "application/vnd.kubernetes.protobuf", nil case CBORContentType: return "application/cbor", nil case YAMLContentType: return "application/yaml", nil default: return "", fmt.Errorf("unsupported content type: %s", ct) } } func main() { config, err := getConfig() if err != nil { panic(err) } config.QPS = float32(*qps) client, err := rest.HTTPClientFor(config) if err != nil { panic(err) } ctx := context.Background() serverURL, _, err := rest.DefaultServerUrlFor(config) if err != nil { panic(err) } url, err := url.Parse(strings.ReplaceAll(*URI, NamespaceTmpl, *namespace)) if err != nil { panic(err) } url.Host = serverURL.Host url.Scheme = serverURL.Scheme var rateLimiter flowcontrol.RateLimiter if *qps != -1 { rateLimiter = flowcontrol.NewTokenBucketRateLimiter(float32(*qps), 10) } if err := healthCheck(ctx, client, *url, rateLimiter); err != nil { panic(err) } log.Printf("Sending requests to '%s' with inflight %d. Press Ctrl+C to stop...", url, *inflight) for i := 0; i < *inflight; i++ { go func() { for { sendRequest(ctx, client, *url, rateLimiter) } }() } select {} // block main thread from ending } func healthCheck(ctx context.Context, client *http.Client, url url.URL, rateLimiter flowcontrol.RateLimiter) error { for i := 0; i < HealthCheckRequests; i++ { if sendRequest(ctx, client, url, rateLimiter) { return nil } } return fmt.Errorf("could not successfully send a request to %s", url.String()) } func sendRequest(ctx context.Context, client *http.Client, url url.URL, rateLimiter flowcontrol.RateLimiter) bool { req, err := http.NewRequestWithContext(ctx, *verb, url.String(), nil) if err != nil { log.Printf("Got error creating a request: %v\n", err) return false } contentType, err := getContentType(contentType) if err != nil { log.Printf("Invalid content type: %v", err) return false } req.Header.Set("Accept", contentType) err = tryThrottle(ctx, rateLimiter) if err != nil { log.Printf("Got error throttling a request: %v\n", err) return false } start := time.Now() resp, err := client.Do(req) if err != nil { log.Printf("Got error when sending a request: %v\n", err) return false } defer resp.Body.Close() if resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusPartialContent { log.Printf("Got bad status code: %v\n", resp.Status) return false } if resp.Header.Get("Content-Type") != contentType { log.Printf("Got bad content type: %q, expected %q\n", resp.Header.Get("Content-Type"), contentType) return false } written, err := io.Copy(io.Discard, resp.Body) if err != nil { log.Printf("Got error when reading response: %v\n", err) return false } log.Printf("Got response of %d bytes in %v", written, time.Since(start)) return true } func getConfig() (*rest.Config, error) { if _, ok := os.LookupEnv("KUBERNETES_PORT"); ok { return rest.InClusterConfig() } if kubeconfig, ok := os.LookupEnv("KUBECONFIG"); ok { return clientcmd.BuildConfigFromFlags("", kubeconfig) } if home, ok := os.LookupEnv("HOME"); ok { kubeconfig := filepath.Join(home, ".kube", "config") return clientcmd.BuildConfigFromFlags("", kubeconfig) } return nil, fmt.Errorf("could not create client-go config") } func tryThrottle(ctx context.Context, rateLimiter flowcontrol.RateLimiter) error { if rateLimiter == nil { return nil } now := time.Now() err := rateLimiter.Wait(ctx) if err != nil { err = fmt.Errorf("client rate limiter Wait returned an error: %w", err) } latency := time.Since(now) if latency > time.Second { log.Printf("Waited for %v due to client-side throttling, not priority and fairness", latency) } return err }