network/benchmarks/netperf/lib/utilslib.go (341 lines of code) (raw):

package lib import ( "context" "fmt" "time" api "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/clientcmd" ) var everythingSelector metav1.ListOptions = metav1.ListOptions{} func setupClient(kubeConfig string) (*kubernetes.Clientset, error) { config, err := clientcmd.BuildConfigFromFlags("", kubeConfig) if err != nil { return nil, fmt.Errorf("failed to create config: %v", err) } clientset, err := kubernetes.NewForConfig(config) if err != nil { return nil, fmt.Errorf("failed to create clientset: %v", err) } return clientset, nil } func getMinionNodes(c *kubernetes.Clientset) (*api.NodeList, error) { nodes, err := c.CoreV1().Nodes().List( context.Background(), metav1.ListOptions{ FieldSelector: "spec.unschedulable=false", // for now the tests can only run on linux/amd64 nodes LabelSelector: "kubernetes.io/os=linux,kubernetes.io/arch=amd64", }) if err != nil { return nil, fmt.Errorf("failed to get nodes: %v", err) } return nodes, nil } func createServices(c *kubernetes.Clientset, testNamespace string) error { if _, err := c.CoreV1().Namespaces().Get(context.Background(), testNamespace, metav1.GetOptions{}); err != nil { _, err := c.CoreV1().Namespaces().Create(context.Background(), &api.Namespace{ObjectMeta: metav1.ObjectMeta{Name: testNamespace}}, metav1.CreateOptions{}) if err != nil { return fmt.Errorf("failed to create namespace %s: %v", testNamespace, err) } } // Create the orchestrator service that points to the coordinator pod orchLabels := map[string]string{"app": "netperf-orch"} orchService := &api.Service{ ObjectMeta: metav1.ObjectMeta{ Name: "netperf-orch", }, Spec: api.ServiceSpec{ Selector: orchLabels, Ports: []api.ServicePort{{ Name: "netperf-orch", Protocol: api.ProtocolTCP, Port: orchestratorPort, TargetPort: intstr.FromInt(orchestratorPort), }}, Type: api.ServiceTypeClusterIP, }, } if _, err := c.CoreV1().Services(testNamespace).Create(context.Background(), orchService, metav1.CreateOptions{}); err != nil { return fmt.Errorf("failed to create orchestrator service: %v", err) } fmt.Println("Created orchestrator service") // Create the netperf-w2 service that points a clusterIP at the worker 2 pod netperfW2Labels := map[string]string{"app": "netperf-w2"} netperfW2Service := &api.Service{ ObjectMeta: metav1.ObjectMeta{ Name: "netperf-w2", }, Spec: api.ServiceSpec{ Selector: netperfW2Labels, Ports: []api.ServicePort{ { Name: "netperf-w2", Protocol: api.ProtocolTCP, Port: iperf3Port, TargetPort: intstr.FromInt(iperf3Port), }, { Name: "netperf-w2-qperf19766", Protocol: api.ProtocolTCP, Port: qperf19766, TargetPort: intstr.FromInt(qperf19766), }, { Name: "netperf-w2-qperf19765", Protocol: api.ProtocolTCP, Port: qperf19765, TargetPort: intstr.FromInt(qperf19765), }, { Name: "netperf-w2-sctp", Protocol: api.ProtocolSCTP, Port: iperf3Port, TargetPort: intstr.FromInt(iperf3Port), }, { Name: "netperf-w2-udp", Protocol: api.ProtocolUDP, Port: iperf3Port, TargetPort: intstr.FromInt(iperf3Port), }, { Name: "netperf-w2-netperf", Protocol: api.ProtocolTCP, Port: netperfPort, TargetPort: intstr.FromInt(netperfPort), }, }, Type: api.ServiceTypeClusterIP, }, } if _, err := c.CoreV1().Services(testNamespace).Create(context.Background(), netperfW2Service, metav1.CreateOptions{}); err != nil { return fmt.Errorf("failed to create netperf-w2 service: %v", err) } fmt.Println("Created netperf-w2 service") return nil } func createRCs(c *kubernetes.Clientset, testParams TestParams, primaryNode, secondaryNode api.Node) error { // Create the orchestrator RC name := "netperf-orch" fmt.Println("Creating replication controller", name) replicas := int32(1) _, err := c.CoreV1().ReplicationControllers(testParams.TestNamespace).Create(context.Background(), &api.ReplicationController{ ObjectMeta: metav1.ObjectMeta{Name: name}, Spec: api.ReplicationControllerSpec{ Replicas: &replicas, Selector: map[string]string{"app": name}, Template: &api.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{"app": name}, }, Spec: api.PodSpec{ NodeSelector: map[string]string{"kubernetes.io/os": "linux", "kubernetes.io/arch": "amd64"}, Containers: []api.Container{ { Name: name, Image: testParams.Image, Ports: []api.ContainerPort{{ContainerPort: orchestratorPort}}, Args: []string{ "--mode=orchestrator", fmt.Sprintf("--testFrom=%d", testParams.TestFrom), fmt.Sprintf("--testTo=%d", testParams.TestTo), }, ImagePullPolicy: "Always", }, }, TerminationGracePeriodSeconds: new(int64), }, }, }, }, metav1.CreateOptions{}) if err != nil { return fmt.Errorf("error creating orchestrator replication controller %s: %v", name, err) } fmt.Println("Created orchestrator replication controller") for i := 1; i <= 3; i++ { // Bring up pods slowly time.Sleep(3 * time.Second) kubeNode := primaryNode.GetName() if i == 3 { kubeNode = secondaryNode.GetName() } name = fmt.Sprintf("netperf-w%d", i) fmt.Println("Creating replication controller", name) portSpec := []api.ContainerPort{} if i > 1 { // Worker W1 is a client-only pod - no ports are exposed portSpec = append(portSpec, api.ContainerPort{ContainerPort: iperf3Port, Protocol: api.ProtocolTCP}) portSpec = append(portSpec, api.ContainerPort{ContainerPort: iperf3Port, Protocol: api.ProtocolSCTP}) } workerEnv := []api.EnvVar{ {Name: "worker", Value: name}, {Name: "kubeNode", Value: kubeNode}, {Name: "podname", Value: name}, } replicas := int32(1) _, err := c.CoreV1().ReplicationControllers(testParams.TestNamespace).Create(context.Background(), &api.ReplicationController{ ObjectMeta: metav1.ObjectMeta{Name: name}, Spec: api.ReplicationControllerSpec{ Replicas: &replicas, Selector: map[string]string{"app": name}, Template: &api.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{"app": name}, }, Spec: api.PodSpec{ NodeName: kubeNode, Containers: []api.Container{ { Name: name, Image: testParams.Image, Ports: portSpec, Args: []string{ "--mode=worker", fmt.Sprintf("--testFrom=%d", testParams.TestFrom), fmt.Sprintf("--testTo=%d", testParams.TestTo), }, Env: workerEnv, ImagePullPolicy: "Always", }, }, TerminationGracePeriodSeconds: new(int64), }, }, }, }, metav1.CreateOptions{}) if err != nil { return fmt.Errorf("error creating worker replication controller %s: %v", name, err) } } return nil } func executeTests(c *kubernetes.Clientset, testParams TestParams, primaryNode, secondaryNode api.Node) ([]Result, error) { results := make([]Result, testParams.Iterations) for i := 0; i < testParams.Iterations; i++ { cleanup(c, testParams.TestNamespace) if err := createServices(c, testParams.TestNamespace); err != nil { return nil, fmt.Errorf("failed to create services: %v", err) } time.Sleep(3 * time.Second) if err := createRCs(c, testParams, primaryNode, secondaryNode); err != nil { return nil, fmt.Errorf("failed to create replication controllers: %v", err) } fmt.Println("Waiting for netperf pods to start up") orchestratorPodName, err := getOrchestratorPodName(c, testParams.TestNamespace, 3*time.Minute) if err != nil { return nil, fmt.Errorf("failed to get orchestrator pod name: %v", err) } fmt.Println("Orchestrator Pod is", orchestratorPodName) var jsonFilePath string var csvFilePath string // The pods orchestrate themselves, we just wait for the results file to show up in the orchestrator container for { // Monitor the orchestrator pod for the CSV results file csvdata, err := getDataFromPod(c, orchestratorPodName, csvDataMarker, csvEndDataMarker, testParams.TestNamespace) if err != nil { return nil, fmt.Errorf("error getting CSV data from orchestrator pod: %v", err) } if csvdata == nil { fmt.Println("Scanned orchestrator pod filesystem - no results file found yet...") time.Sleep(60 * time.Second) continue } if testParams.JsonOutput { jsondata, err := getDataFromPod(c, orchestratorPodName, jsonDataMarker, jsonEndDataMarker, testParams.TestNamespace) if err != nil { return nil, fmt.Errorf("error getting JSON data from orchestrator pod: %v", err) } if jsondata == nil { fmt.Println("Scanned orchestrator pod filesystem - no json data found yet...") time.Sleep(60 * time.Second) continue } jsonFilePath, err = processRawData(jsondata, testParams.TestNamespace, testParams.Tag, "json") if err != nil { return nil, fmt.Errorf("error processing JSON data: %v", err) } } csvFilePath, err = processRawData(csvdata, testParams.TestNamespace, testParams.Tag, "csv") if err != nil { return nil, fmt.Errorf("error processing CSV data: %v", err) } break } fmt.Printf("TEST RUN (Iteration %d) FINISHED - cleaning up services and pods\n", i) results[i] = Result{JsonResultFile: jsonFilePath, CsvResultFile: csvFilePath} } return results, nil } func getOrchestratorPodName(c *kubernetes.Clientset, testNamespace string, timeout time.Duration) (string, error) { timeoutCh := time.After(timeout) ticker := time.NewTicker(5 * time.Second) defer ticker.Stop() for { select { case <-ticker.C: fmt.Println("Waiting for orchestrator pod creation") pods, err := c.CoreV1().Pods(testNamespace).List(context.Background(), metav1.ListOptions{ LabelSelector: "app=netperf-orch", }) if err != nil { fmt.Println("Failed to fetch pods - waiting for pod creation", err) continue } if len(pods.Items) == 0 { fmt.Println("No orchestrator pods found yet") continue } pod := pods.Items[0] podStatus := pod.Status if podStatus.Phase == api.PodRunning { return pod.GetName(), nil } for _, containerStatus := range podStatus.ContainerStatuses { if waiting := containerStatus.State.Waiting; waiting != nil { switch waiting.Reason { case "ErrImagePull", "CrashLoopBackOff", "ImagePullBackOff": return "", fmt.Errorf("orchestrator pod error: %s - %v", waiting.Reason, waiting.Message) } } } fmt.Println("Orchestrator pod is not running yet") case <-timeoutCh: return "", fmt.Errorf("timed out waiting for orchestrator pod to be created") } } } func cleanup(c *kubernetes.Clientset, testNamespace string) { syncCtx := context.Background() // Cleanup existing rcs, pods and services in our namespace rcs, err := c.CoreV1().ReplicationControllers(testNamespace).List(syncCtx, everythingSelector) if err != nil { fmt.Println("Failed to get replication controllers", err) return } for _, rc := range rcs.Items { fmt.Println("Deleting rc", rc.GetName()) if err := c.CoreV1().ReplicationControllers(testNamespace).Delete( context.Background(), rc.GetName(), metav1.DeleteOptions{}); err != nil { fmt.Println("Failed to delete rc", rc.GetName(), err) } } pods, err := c.CoreV1().Pods(testNamespace).List(syncCtx, everythingSelector) if err != nil { fmt.Println("Failed to get pods", err) return } for _, pod := range pods.Items { fmt.Println("Deleting pod", pod.GetName()) if err := c.CoreV1().Pods(testNamespace).Delete(context.Background(), pod.GetName(), metav1.DeleteOptions{GracePeriodSeconds: new(int64)}); err != nil { fmt.Println("Failed to delete pod", pod.GetName(), err) } } svcs, err := c.CoreV1().Services(testNamespace).List(syncCtx, everythingSelector) if err != nil { fmt.Println("Failed to get services", err) return } for _, svc := range svcs.Items { fmt.Println("Deleting svc", svc.GetName()) err := c.CoreV1().Services(testNamespace).Delete( context.Background(), svc.GetName(), metav1.DeleteOptions{}) if err != nil { fmt.Println("Failed to get service", err) } } }