modules/python/clusterloader2/cri/cri.py (213 lines of code) (raw):

import json import os import argparse import math from datetime import datetime, timezone from clusterloader2.utils import parse_xml_to_json, run_cl2_command, get_measurement, str2bool from clients.kubernetes_client import KubernetesClient, client as k8s_client from utils.logger_config import get_logger, setup_logging setup_logging() logger = get_logger(__name__) DAEMONSETS_PER_NODE_MAP = { "aws": 2, "aks": 6 } MEMORY_SCALE_FACTOR = 0.95 # 95% of the total allocatable memory to account for error margin def override_config_clusterloader2( node_count, node_per_step, max_pods, repeats, operation_timeout, load_type, scale_enabled, pod_startup_latency_threshold, provider, scrape_kubelets, override_file): client = KubernetesClient(os.path.expanduser("~/.kube/config")) nodes = client.get_nodes(label_selector="cri-resource-consume=true") if len(nodes) == 0: raise Exception("No nodes found with the label cri-resource-consume=true") node = nodes[0] allocatable_cpu = node.status.allocatable["cpu"] allocatable_memory = node.status.allocatable["memory"] logger.info(f"Node {node.metadata.name} has allocatable cpu of {allocatable_cpu} and allocatable memory of {allocatable_memory}") cpu_value = int(allocatable_cpu.replace("m", "")) # Bottlerocket OS SKU on EKS has allocatable_memory property in Mi. AKS and Amazon Linux (default SKUs) # user Ki. Handling the Mi case here and converting Mi to Ki, if needed. if "Mi" in allocatable_memory: memory_value = int(allocatable_memory.replace("Mi", "")) * 1024 elif "Ki" in allocatable_memory: memory_value = int(allocatable_memory.replace("Ki", "")) else: raise Exception("Unexpected format of allocatable memory node property") logger.info(f"Node {node.metadata.name} has cpu value of {cpu_value} and memory value of {memory_value}") allocated_cpu, allocated_memory = client.get_daemonsets_pods_allocated_resources("kube-system", node.metadata.name) logger.info(f"Node {node.metadata.name} has allocated cpu of {allocated_cpu} and allocated memory of {allocated_memory}") cpu_value -= allocated_cpu memory_value -= allocated_memory # Calculate request cpu and memory for each pod pod_count = max_pods - DAEMONSETS_PER_NODE_MAP[provider] cpu_request = cpu_value // pod_count memory_request_in_ki = math.ceil(memory_value * MEMORY_SCALE_FACTOR // pod_count) memory_request_in_k = int(memory_request_in_ki // 1.024) logger.info(f"CPU request for each pod: {cpu_request}m, memory request for each pod: {memory_request_in_k}K, total pod per node: {pod_count}") # Calculate the number of steps to scale up steps = node_count // node_per_step logger.info(f"Scaled enabled: {scale_enabled}, node per step: {node_per_step}, steps: {steps}, scrape kubelets: {scrape_kubelets}") with open(override_file, 'w', encoding='utf-8') as file: file.write(f"CL2_DEPLOYMENT_SIZE: {pod_count}\n") file.write(f"CL2_RESOURCE_CONSUME_MEMORY: {memory_request_in_k}\n") file.write(f"CL2_RESOURCE_CONSUME_MEMORY_KI: {memory_request_in_ki}Ki\n") file.write(f"CL2_RESOURCE_CONSUME_CPU: {cpu_request}\n") file.write(f"CL2_REPEATS: {repeats}\n") file.write(f"CL2_NODE_COUNT: {node_count}\n") file.write(f"CL2_NODE_PER_STEP: {node_per_step}\n") file.write(f"CL2_STEPS: {steps}\n") file.write(f"CL2_OPERATION_TIMEOUT: {operation_timeout}\n") file.write(f"CL2_LOAD_TYPE: {load_type}\n") file.write(f"CL2_SCALE_ENABLED: {str(scale_enabled).lower()}\n") file.write("CL2_PROMETHEUS_TOLERATE_MASTER: true\n") file.write("CL2_PROMETHEUS_CPU_SCALE_FACTOR: 30.0\n") file.write("CL2_PROMETHEUS_MEMORY_LIMIT_FACTOR: 30.0\n") file.write("CL2_PROMETHEUS_MEMORY_SCALE_FACTOR: 30.0\n") file.write("CL2_PROMETHEUS_NODE_SELECTOR: \"prometheus: \\\"true\\\"\"\n") file.write(f"CL2_POD_STARTUP_LATENCY_THRESHOLD: {pod_startup_latency_threshold}\n") file.write(f"CL2_PROVIDER: {provider}\n") file.write(f"CL2_SCRAPE_KUBELETS: {str(scrape_kubelets).lower()}\n") file.close() def execute_clusterloader2(cl2_image, cl2_config_dir, cl2_report_dir, kubeconfig, provider, scrape_kubelets): run_cl2_command(kubeconfig, cl2_image, cl2_config_dir, cl2_report_dir, provider, overrides=True, enable_prometheus=True, tear_down_prometheus=False, scrape_kubelets=scrape_kubelets) def verify_measurement(): client = KubernetesClient(os.path.expanduser("~/.kube/config")) nodes = client.get_nodes(label_selector="cri-resource-consume=true") user_pool = [node.metadata.name for node in nodes] logger.info(f"User pool: {user_pool}") # Create an API client api_client = k8s_client.ApiClient() for node_name in user_pool: url = f"/api/v1/nodes/{node_name}/proxy/metrics" try: response = api_client.call_api( resource_path=url, method="GET", auth_settings=['BearerToken'], response_type="str", _preload_content=True ) metrics = response[0] # The first item contains the response data filtered_metrics = "\n".join( line for line in metrics.splitlines() if line.startswith("kubelet_pod_start") or line.startswith("kubelet_runtime_operations") ) logger.info("##[section]Metrics for node:", node_name) # pylint: disable=logging-too-many-args logger.info(filtered_metrics) # pylint: disable=logging-too-many-args except k8s_client.ApiException as e: logger.error(f"Error fetching metrics: {e}") def collect_clusterloader2( node_count, max_pods, repeats, load_type, cl2_report_dir, cloud_info, run_id, run_url, result_file, scrape_kubelets ): if scrape_kubelets: verify_measurement() details = parse_xml_to_json(os.path.join(cl2_report_dir, "junit.xml"), indent = 2) json_data = json.loads(details) testsuites = json_data["testsuites"] if testsuites: status = "success" if testsuites[0]["failures"] == 0 else "failure" else: raise Exception(f"No testsuites found in the report! Raw data: {details}") template = { "timestamp": datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ'), "node_count": node_count, "max_pods": max_pods, "churn_rate": repeats, "load_type": load_type, "status": status, "group": None, "measurement": None, "percentile": None, "data": None, "cloud_info": cloud_info, "run_id": run_id, "run_url": run_url } content = "" for f in os.listdir(cl2_report_dir): file_path = os.path.join(cl2_report_dir, f) with open(file_path, 'r', encoding='utf-8') as file: measurement, group_name = get_measurement(file_path) if not measurement: continue logger.info(measurement, group_name) data = json.loads(file.read()) if measurement == "ResourceUsageSummary": for percentile, items in data.items(): template["measurement"] = measurement template["group"] = group_name template["percentile"] = percentile for item in items: template["data"] = item content += json.dumps(template) + "\n" elif "dataItems" in data: items = data["dataItems"] if not items: logger.info(f"No data items found in {file_path}") logger.info(f"Data:\n{data}") continue for item in items: template["measurement"] = measurement template["group"] = group_name template["percentile"] = "dataItems" template["data"] = item content += json.dumps(template) + "\n" os.makedirs(os.path.dirname(result_file), exist_ok=True) with open(result_file, 'w', encoding='utf-8') as file: file.write(content) def main(): parser = argparse.ArgumentParser(description="CRI Kubernetes resources.") subparsers = parser.add_subparsers(dest="command") # Sub-command for override_config_clusterloader2 parser_override = subparsers.add_parser("override", help="Override CL2 config file") parser_override.add_argument("node_count", type=int, help="Number of nodes") parser_override.add_argument("node_per_step", type=int, help="Number of nodes to scale per step") parser_override.add_argument("max_pods", type=int, help="Number of maximum pods per node") parser_override.add_argument("repeats", type=int, help="Number of times to repeat the resource consumer deployment") parser_override.add_argument("operation_timeout", type=str, default="2m", help="Operation timeout") parser_override.add_argument("load_type", type=str, choices=["memory", "cpu"], default="memory", help="Type of load to generate") parser_override.add_argument("scale_enabled", type=str2bool, choices=[True, False], default=False, help="Whether scale operation is enabled. Must be either True or False") parser_override.add_argument("pod_startup_latency_threshold", type=str, default="15s", help="Pod startup latency threshold") parser_override.add_argument("provider", type=str, help="Cloud provider name") parser_override.add_argument("scrape_kubelets", type=str2bool, choices=[True, False], default=False, help="Whether to scrape kubelets") parser_override.add_argument("cl2_override_file", type=str, help="Path to the overrides of CL2 config file") # Sub-command for execute_clusterloader2 parser_execute = subparsers.add_parser("execute", help="Execute resource consume operation") parser_execute.add_argument("cl2_image", type=str, help="Name of the CL2 image") parser_execute.add_argument("cl2_config_dir", type=str, help="Path to the CL2 config directory") parser_execute.add_argument("cl2_report_dir", type=str, help="Path to the CL2 report directory") parser_execute.add_argument("kubeconfig", type=str, help="Path to the kubeconfig file") parser_execute.add_argument("provider", type=str, help="Cloud provider name") parser_execute.add_argument("scrape_kubelets", type=str2bool, choices=[True, False], default=False, help="Whether to scrape kubelets") # Sub-command for collect_clusterloader2 parser_collect = subparsers.add_parser("collect", help="Collect resource consume data") parser_collect.add_argument("node_count", type=int, help="Number of nodes") parser_collect.add_argument("max_pods", type=int, help="Number of maximum pods per node") parser_collect.add_argument("repeats", type=int, help="Number of times to repeat the resource consumer deployment") parser_collect.add_argument("load_type", type=str, choices=["memory", "cpu"], default="memory", help="Type of load to generate") parser_collect.add_argument("cl2_report_dir", type=str, help="Path to the CL2 report directory") parser_collect.add_argument("cloud_info", type=str, help="Cloud information") parser_collect.add_argument("run_id", type=str, help="Run ID") parser_collect.add_argument("run_url", type=str, help="Run URL") parser_collect.add_argument("result_file", type=str, help="Path to the result file") parser_collect.add_argument("scrape_kubelets", type=str2bool, choices=[True, False], default=False, help="Whether to scrape kubelets") args = parser.parse_args() if args.command == "override": override_config_clusterloader2(args.node_count, args.node_per_step, args.max_pods, args.repeats, args.operation_timeout, args.load_type, args.scale_enabled, args.pod_startup_latency_threshold, args.provider, args.scrape_kubelets, args.cl2_override_file) elif args.command == "execute": execute_clusterloader2(args.cl2_image, args.cl2_config_dir, args.cl2_report_dir, args.kubeconfig, args.provider, args.scrape_kubelets) elif args.command == "collect": collect_clusterloader2(args.node_count, args.max_pods, args.repeats, args.load_type, args.cl2_report_dir, args.cloud_info, args.run_id, args.run_url, args.result_file, args.scrape_kubelets) if __name__ == "__main__": main()