modules/python/clusterloader2/slo/slo.py (267 lines of code) (raw):

import json import os import argparse import time from datetime import datetime, timezone from clusterloader2.utils import parse_xml_to_json, run_cl2_command, get_measurement, str2bool from clients.kubernetes_client import KubernetesClient DEFAULT_PODS_PER_NODE = 40 DEFAULT_NODES_PER_NAMESPACE = 100 CPU_REQUEST_LIMIT_MILLI = 1 DAEMONSETS_PER_NODE = { "aws": 2, "azure": 6, "aks": 6 } CPU_CAPACITY = { "aws": 0.94, "azure": 0.87, "aks": 0.87 } # TODO: Remove aks once CL2 update provider name to be azure def calculate_config(cpu_per_node, node_count, max_pods, provider, service_test, cnp_test, ccnp_test): throughput = 100 nodes_per_namespace = min(node_count, DEFAULT_NODES_PER_NAMESPACE) pods_per_node = DEFAULT_PODS_PER_NODE if service_test: pods_per_node = max_pods if cnp_test or ccnp_test: pods_per_node = max_pods # Different cloud has different reserved values and number of daemonsets # Using the same percentage will lead to incorrect nodes number as the number of nodes grow # For AWS, see: https://github.com/awslabs/amazon-eks-ami/blob/main/templates/al2/runtime/bootstrap.sh#L290 # For Azure, see: https://learn.microsoft.com/en-us/azure/aks/node-resource-reservations#cpu-reservations capacity = CPU_CAPACITY[provider] cpu_request = (cpu_per_node * 1000 * capacity) // pods_per_node cpu_request = max(cpu_request, CPU_REQUEST_LIMIT_MILLI) return throughput, nodes_per_namespace, pods_per_node, cpu_request def configure_clusterloader2( cpu_per_node, node_count, node_per_step, max_pods, repeats, operation_timeout, provider, cilium_enabled, scrape_containerd, service_test, cnp_test, ccnp_test, num_cnps, num_ccnps, dualstack, override_file): steps = node_count // node_per_step throughput, nodes_per_namespace, pods_per_node, cpu_request = calculate_config(cpu_per_node, node_per_step, max_pods, provider, service_test, cnp_test, ccnp_test) with open(override_file, 'w', encoding='utf-8') as file: file.write(f"CL2_NODES: {node_count}\n") file.write(f"CL2_LOAD_TEST_THROUGHPUT: {throughput}\n") file.write(f"CL2_NODES_PER_NAMESPACE: {nodes_per_namespace}\n") file.write(f"CL2_NODES_PER_STEP: {node_per_step}\n") file.write(f"CL2_PODS_PER_NODE: {pods_per_node}\n") file.write(f"CL2_DEPLOYMENT_SIZE: {pods_per_node}\n") file.write(f"CL2_LATENCY_POD_CPU: {cpu_request}\n") file.write(f"CL2_REPEATS: {repeats}\n") file.write(f"CL2_STEPS: {steps}\n") file.write(f"CL2_OPERATION_TIMEOUT: {operation_timeout}\n") file.write("CL2_PROMETHEUS_TOLERATE_MASTER: true\n") file.write("CL2_PROMETHEUS_MEMORY_LIMIT_FACTOR: 100.0\n") file.write("CL2_PROMETHEUS_MEMORY_SCALE_FACTOR: 100.0\n") file.write("CL2_PROMETHEUS_CPU_SCALE_FACTOR: 30.0\n") file.write("CL2_PROMETHEUS_NODE_SELECTOR: \"prometheus: \\\"true\\\"\"\n") file.write("CL2_POD_STARTUP_LATENCY_THRESHOLD: 3m\n") if scrape_containerd: file.write(f"CL2_SCRAPE_CONTAINERD: {str(scrape_containerd).lower()}\n") file.write("CONTAINERD_SCRAPE_INTERVAL: 5m\n") if cilium_enabled: file.write("CL2_CILIUM_METRICS_ENABLED: true\n") file.write("CL2_PROMETHEUS_SCRAPE_CILIUM_OPERATOR: true\n") file.write("CL2_PROMETHEUS_SCRAPE_CILIUM_AGENT: true\n") file.write("CL2_PROMETHEUS_SCRAPE_CILIUM_AGENT_INTERVAL: 30s\n") if service_test: file.write("CL2_SERVICE_TEST: true\n") else: file.write("CL2_SERVICE_TEST: false\n") if cnp_test: file.write("CL2_CNP_TEST: true\n") file.write(f"CL2_CNPS_PER_NAMESPACE: {num_cnps}\n") file.write(f"CL2_DUALSTACK: {dualstack}\n") file.write("CL2_GROUP_NAME: cnp-ccnp\n") if ccnp_test: file.write("CL2_CCNP_TEST: true\n") file.write(f"CL2_CCNPS: {num_ccnps}\n") file.write(f"CL2_DUALSTACK: {dualstack}\n") file.write("CL2_GROUP_NAME: cnp-ccnp\n") with open(override_file, 'r', encoding='utf-8') as file: print(f"Content of file {override_file}:\n{file.read()}") file.close() def validate_clusterloader2(node_count, operation_timeout_in_minutes=10): kube_client = KubernetesClient() ready_node_count = 0 timeout = time.time() + (operation_timeout_in_minutes * 60) while time.time() < timeout: ready_nodes = kube_client.get_ready_nodes() ready_node_count = len(ready_nodes) print(f"Currently {ready_node_count} nodes are ready.") if ready_node_count == node_count: break print(f"Waiting for {node_count} nodes to be ready.") time.sleep(10) if ready_node_count != node_count: raise Exception(f"Only {ready_node_count} nodes are ready, expected {node_count} nodes!") def execute_clusterloader2( cl2_image, cl2_config_dir, cl2_report_dir, cl2_config_file, kubeconfig, provider, scrape_containerd ): run_cl2_command(kubeconfig, cl2_image, cl2_config_dir, cl2_report_dir, provider, cl2_config_file=cl2_config_file, overrides=True, enable_prometheus=True, scrape_containerd=scrape_containerd) def collect_clusterloader2( cpu_per_node, node_count, max_pods, repeats, cl2_report_dir, cloud_info, run_id, run_url, service_test, cnp_test, ccnp_test, result_file, test_type, start_timestamp, ): details = parse_xml_to_json(os.path.join(cl2_report_dir, "junit.xml"), indent = 2) json_data = json.loads(details) testsuites = json_data["testsuites"] provider = json.loads(cloud_info)["cloud"] if testsuites: status = "success" if testsuites[0]["failures"] == 0 else "failure" else: raise Exception(f"No testsuites found in the report! Raw data: {details}") _, _, pods_per_node, _ = calculate_config(cpu_per_node, node_count, max_pods, provider, service_test, cnp_test, ccnp_test) pod_count = node_count * pods_per_node # TODO: Expose optional parameter to include test details template = { "timestamp": datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ'), "cpu_per_node": cpu_per_node, "node_count": node_count, "pod_count": pod_count, "churn_rate": repeats, "status": status, "group": None, "measurement": None, "result": None, # "test_details": details, "cloud_info": cloud_info, "run_id": run_id, "run_url": run_url, "test_type": test_type, "start_timestamp": start_timestamp, } content = "" for f in os.listdir(cl2_report_dir): file_path = os.path.join(cl2_report_dir, f) with open(file_path, 'r', encoding='utf-8') as file: print(f"Processing {file_path}") measurement, group_name = get_measurement(file_path) if not measurement: continue print(measurement, group_name) data = json.loads(file.read()) if "dataItems" in data: items = data["dataItems"] if not items: print(f"No data items found in {file_path}") print(f"Data:\n{data}") continue for item in items: result = template.copy() result["group"] = group_name result["measurement"] = measurement result["result"] = item content += json.dumps(result) + "\n" else: result = template.copy() result["group"] = group_name result["measurement"] = measurement result["result"] = data content += json.dumps(result) + "\n" os.makedirs(os.path.dirname(result_file), exist_ok=True) with open(result_file, 'w', encoding='utf-8') as file: file.write(content) def main(): parser = argparse.ArgumentParser(description="SLO Kubernetes resources.") subparsers = parser.add_subparsers(dest="command") # Sub-command for configure_clusterloader2 parser_configure = subparsers.add_parser("configure", help="Override CL2 config file") parser_configure.add_argument("cpu_per_node", type=int, help="CPU per node") parser_configure.add_argument("node_count", type=int, help="Number of nodes") parser_configure.add_argument("node_per_step", type=int, help="Number of nodes per scaling step") parser_configure.add_argument("max_pods", type=int, nargs='?', default=0, help="Maximum number of pods per node") parser_configure.add_argument("repeats", type=int, help="Number of times to repeat the deployment churn") parser_configure.add_argument("operation_timeout", type=str, help="Timeout before failing the scale up test") parser_configure.add_argument("provider", type=str, help="Cloud provider name") parser_configure.add_argument("cilium_enabled", type=str2bool, choices=[True, False], default=False, help="Whether cilium is enabled. Must be either True or False") parser_configure.add_argument("scrape_containerd", type=str2bool, choices=[True, False], default=False, help="Whether to scrape containerd metrics. Must be either True or False") parser_configure.add_argument("service_test", type=str2bool, choices=[True, False], default=False, help="Whether service test is running. Must be either True or False") parser_configure.add_argument("cnp_test", type=str2bool, choices=[True, False], nargs='?', default=False, help="Whether cnp test is running. Must be either True or False") parser_configure.add_argument("ccnp_test", type=str2bool, choices=[True, False], nargs='?', default=False, help="Whether ccnp test is running. Must be either True or False") parser_configure.add_argument("num_cnps", type=int, nargs='?', default=0, help="Number of cnps") parser_configure.add_argument("num_ccnps", type=int, nargs='?', default=0, help="Number of ccnps") parser_configure.add_argument("dualstack", type=str2bool, choices=[True, False], nargs='?', default=False, help="Whether cluster is dualstack. Must be either True or False") parser_configure.add_argument("cl2_override_file", type=str, help="Path to the overrides of CL2 config file") # Sub-command for validate_clusterloader2 parser_validate = subparsers.add_parser("validate", help="Validate cluster setup") parser_validate.add_argument("node_count", type=int, help="Number of desired nodes") parser_validate.add_argument("operation_timeout", type=int, default=600, help="Operation timeout to wait for nodes to be ready") # Sub-command for execute_clusterloader2 parser_execute = subparsers.add_parser("execute", help="Execute scale up operation") parser_execute.add_argument("cl2_image", type=str, help="Name of the CL2 image") parser_execute.add_argument("cl2_config_dir", type=str, help="Path to the CL2 config directory") parser_execute.add_argument("cl2_report_dir", type=str, help="Path to the CL2 report directory") parser_execute.add_argument("cl2_config_file", type=str, help="Path to the CL2 config file") parser_execute.add_argument("kubeconfig", type=str, help="Path to the kubeconfig file") parser_execute.add_argument("provider", type=str, help="Cloud provider name") parser_execute.add_argument("scrape_containerd", type=str2bool, choices=[True, False], default=False, help="Whether to scrape containerd metrics. Must be either True or False") # Sub-command for collect_clusterloader2 parser_collect = subparsers.add_parser("collect", help="Collect scale up data") parser_collect.add_argument("cpu_per_node", type=int, help="CPU per node") parser_collect.add_argument("node_count", type=int, help="Number of nodes") parser_collect.add_argument("max_pods", type=int, nargs='?', default=0, help="Maximum number of pods per node") parser_collect.add_argument("repeats", type=int, help="Number of times to repeat the deployment churn") parser_collect.add_argument("cl2_report_dir", type=str, help="Path to the CL2 report directory") parser_collect.add_argument("cloud_info", type=str, help="Cloud information") parser_collect.add_argument("run_id", type=str, help="Run ID") parser_collect.add_argument("run_url", type=str, help="Run URL") parser_collect.add_argument("service_test", type=str2bool, choices=[True, False], default=False, help="Whether service test is running. Must be either True or False") parser_collect.add_argument("cnp_test", type=str2bool, choices=[True, False], nargs='?', default=False, help="Whether cnp test is running. Must be either True or False") parser_collect.add_argument("ccnp_test", type=str2bool, choices=[True, False], nargs='?', default=False, help="Whether ccnp test is running. Must be either True or False") parser_collect.add_argument("result_file", type=str, help="Path to the result file") parser_collect.add_argument("test_type", type=str, nargs='?', default="default-config", help="Description of test type") parser_collect.add_argument("start_timestamp", type=str, help="Test start timestamp") args = parser.parse_args() if args.command == "configure": configure_clusterloader2(args.cpu_per_node, args.node_count, args.node_per_step, args.max_pods, args.repeats, args.operation_timeout, args.provider, args.cilium_enabled, args.scrape_containerd, args.service_test, args.cnp_test, args.ccnp_test, args.num_cnps, args.num_ccnps, args.dualstack, args.cl2_override_file) elif args.command == "validate": validate_clusterloader2(args.node_count, args.operation_timeout) elif args.command == "execute": execute_clusterloader2(args.cl2_image, args.cl2_config_dir, args.cl2_report_dir, args.cl2_config_file, args.kubeconfig, args.provider, args.scrape_containerd) elif args.command == "collect": collect_clusterloader2(args.cpu_per_node, args.node_count, args.max_pods, args.repeats, args.cl2_report_dir, args.cloud_info, args.run_id, args.run_url, args.service_test, args.cnp_test, args.ccnp_test, args.result_file, args.test_type, args.start_timestamp) if __name__ == "__main__": main()