modules/python/clusterloader2/slo/network_policy_scale.py (272 lines of code) (raw):

import json import os import argparse from datetime import datetime, timezone from clusterloader2.utils import parse_xml_to_json, get_measurement,run_cl2_command, str2bool def configure_clusterloader2( number_of_groups, clients_per_group, servers_per_group, workers_per_client, netpol_type, test_duration_secs, cilium_enabled, cilium_envoy_enabled, override_file, ): # Ensure the directory for override_file exists override_dir = os.path.dirname(override_file) if not os.path.exists(override_dir): os.makedirs(override_dir, exist_ok=True) with open(override_file, "w", encoding="utf-8") as file: # prometheus server config file.write("# Prometheus server config\n") file.write("CL2_PROMETHEUS_TOLERATE_MASTER: true\n") file.write("CL2_PROMETHEUS_MEMORY_LIMIT_FACTOR: 100.0\n") file.write("CL2_PROMETHEUS_MEMORY_SCALE_FACTOR: 100.0\n") file.write("CL2_PROMETHEUS_CPU_SCALE_FACTOR: 30.0\n") file.write('CL2_PROMETHEUS_NODE_SELECTOR: "prometheus: \\"true\\""\n') file.write("CL2_ENABLE_IN_CLUSTER_NETWORK_LATENCY: false\n") file.write("PROMETHEUS_SCRAPE_KUBE_PROXY: false\n") if cilium_enabled: file.write("# Cilium config\n") file.write("CL2_CILIUM_ENABLED: true\n") file.write("CL2_PROMETHEUS_SCRAPE_CILIUM_OPERATOR: true\n") file.write("CL2_PROMETHEUS_SCRAPE_CILIUM_AGENT: true\n") file.write("CL2_PROMETHEUS_SCRAPE_CILIUM_AGENT_INTERVAL: 30s\n") if cilium_envoy_enabled: file.write("# Cilium Envoy config\n") file.write("CL2_CILIUM_ENVOY_ENABLED: true\n") file.write("CL2_PROMETHEUS_SCRAPE_CILIUM_ENVOY: true\n") # test config # add "s" at the end of test_duration_secs file.write("# Test config\n") test_duration = str(test_duration_secs) + "s" # Test config # add "s" at the end of test_duration_secs file.write("# Test config\n") test_duration = f"{test_duration_secs}s" file.write(f"CL2_DURATION: {test_duration}\n") file.write(f"CL2_NUMBER_OF_CLIENTS_PER_GROUP: {clients_per_group}\n") file.write(f"CL2_NUMBER_OF_SERVERS_PER_GROUP: {servers_per_group}\n") file.write(f"CL2_WORKERS_PER_CLIENT: {workers_per_client}\n") file.write(f"CL2_NUMBER_OF_GROUPS: {number_of_groups}\n") file.write(f"CL2_NETWORK_POLICY_TYPE: {netpol_type}\n") file.write("CL2_CLIENT_METRICS_GATHERING: true\n") # Disable non related tests in measurements.yaml file.write("# Disable non related tests in measurements.yaml\n") file.write("CL2_ENABLE_IN_CLUSTER_NETWORK_LATENCY: false\n") with open(override_file, "r", encoding="utf-8") as file: print(f"Content of file {override_file}:\n{file.read()}") file.close() def execute_clusterloader2( cl2_image, cl2_config_dir, cl2_report_dir, cl2_config_file, kubeconfig, provider, scrape_containerd ): run_cl2_command( kubeconfig, cl2_image, cl2_config_dir, cl2_report_dir, provider, cl2_config_file=cl2_config_file, overrides=True, enable_prometheus=True, scrape_containerd=scrape_containerd ) def collect_clusterloader2( node_count, pod_count, cl2_report_dir, cloud_info, run_id, run_url, result_file, test_type, ): details = parse_xml_to_json(os.path.join(cl2_report_dir, "junit.xml"), indent=2) json_data = json.loads(details) testsuites = json_data["testsuites"] provider = json.loads(cloud_info)["cloud"] if testsuites: status = "success" if testsuites[0]["failures"] == 0 else "failure" else: raise Exception(f"No testsuites found in the report! Raw data: {details}") # TODO: Expose optional parameter to include test details template = { "timestamp": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"), "node_count": node_count, "pod_count": pod_count, "status": status, "group": None, "measurement": None, "result": None, "cloud_info": provider, "run_id": run_id, "run_url": run_url, "test_type": test_type, } content = "" for f in os.listdir(cl2_report_dir): file_path = os.path.join(cl2_report_dir, f) with open(file_path, "r", encoding="utf-8") as file: print(f"Processing {file_path}") measurement, group_name = get_measurement(file_path) if not measurement: continue print(measurement, group_name) data = json.loads(file.read()) if "dataItems" in data: items = data["dataItems"] if not items: print(f"No data items found in {file_path}") print(f"Data:\n{data}") continue for item in items: result = template.copy() result["group"] = group_name result["measurement"] = measurement result["result"] = item content += json.dumps(result) + "\n" else: result = template.copy() result["group"] = group_name result["measurement"] = measurement result["result"] = data content += json.dumps(result) + "\n" os.makedirs(os.path.dirname(result_file), exist_ok=True) # os.chmod(os.path.dirname(result_file), 0o755) # Ensure the directory is writable with open(result_file, "w", encoding="utf-8") as file: file.write(content) def main(): parser = argparse.ArgumentParser(description="Network Policy Scale Test") subparsers = parser.add_subparsers(dest="command") # Sub-command for configure_clusterloader2 parser_configure = subparsers.add_parser( "configure", help="Configure ClusterLoader2 overrides file" ) parser_configure.add_argument( "--number_of_groups", type=int, required=True, help="Number of network policy groups to create", ) parser_configure.add_argument( "--clients_per_group", type=int, required=True, help="Number of client pods per group", ) parser_configure.add_argument( "--servers_per_group", type=int, required=True, help="Number of server pods per group", ) parser_configure.add_argument( "--workers_per_client", type=int, required=True, help="Number of workers per client pod", ) parser_configure.add_argument( "--netpol_type", type=str, required=True, choices=["k8s", "cnp", "ccnp"], help="Type of network policy", ) parser_configure.add_argument( "--test_duration_secs", type=int, required=True, help="Test duration in seconds" ) parser_configure.add_argument( "--provider", type=str, required=True, help="Cloud provider name" ) parser_configure.add_argument( "--cilium_enabled", type=str2bool, choices=[True, False], default=False, help="Whether cilium is enabled. Must be either True or False", ) parser_configure.add_argument( "--cilium_envoy_enabled", type=str2bool, choices=[True, False], default=False, help="Whether cilium envoy is enabled. Must be either True or False", ) parser_configure.add_argument( "--cl2_override_file", type=str, required=True, help="Path to the overrides of CL2 config file", ) # Sub-command for execute_clusterloader2 parser_execute = subparsers.add_parser("execute", help="Execute scale up operation") parser_execute.add_argument("--cl2_image", type=str, help="Name of the CL2 image") parser_execute.add_argument( "--cl2_config_dir", type=str, help="Path to the CL2 config directory" ) parser_execute.add_argument( "--cl2_report_dir", type=str, help="Path to the CL2 report directory" ) parser_execute.add_argument( "--cl2_config_file", type=str, help="Path to the CL2 config file" ) parser_execute.add_argument( "--kubeconfig", type=str, help="Path to the kubeconfig file" ) parser_execute.add_argument("--provider", type=str, help="Cloud provider name") # Sub-command for collect_clusterloader2 parser_collect = subparsers.add_parser("collect", help="Collect scale up data") parser_collect.add_argument("--node_count", type=int, help="Number of nodes") parser_collect.add_argument( "--pod_count", type=int, nargs="?", default=0, help="Maximum number of pods per node", ) parser_collect.add_argument( "--cl2_report_dir", type=str, help="Path to the CL2 report directory" ) parser_collect.add_argument("--cloud_info", type=str, help="Cloud information") parser_collect.add_argument("--run_id", type=str, help="Run ID") parser_collect.add_argument("--run_url", type=str, help="Run URL") parser_collect.add_argument( "--result_file", type=str, help="Path to the result file" ) parser_collect.add_argument( "--test_type", type=str, nargs="?", default="default-config", help="Description of test type", ) args = parser.parse_args() if args.command is None: parser.print_help() return if args.command == "configure": configure_clusterloader2( args.number_of_groups, args.clients_per_group, args.servers_per_group, args.workers_per_client, args.netpol_type, args.test_duration_secs, args.cilium_enabled, args.cilium_envoy_enabled, args.cl2_override_file, ) elif args.command == "execute": execute_clusterloader2( args.cl2_image, args.cl2_config_dir, args.cl2_report_dir, args.cl2_config_file, args.kubeconfig, args.provider, scrape_containerd=False, # for network policy scale test, we don't need to scrape containerd for now ) elif args.command == "collect": collect_clusterloader2( args.node_count, args.pod_count, args.cl2_report_dir, args.cloud_info, args.run_id, args.run_url, args.result_file, args.test_type, ) if __name__ == "__main__": main()