modules/python/clusterloader2/autoscale/autoscale.py (189 lines of code) (raw):
import json
import os
import argparse
import re
import subprocess
from datetime import datetime, timezone
from clusterloader2.utils import parse_xml_to_json, run_cl2_command
from clients.kubernetes_client import KubernetesClient
from utils.logger_config import get_logger, setup_logging
setup_logging()
logger = get_logger(__name__)
def warmup_deployment_for_karpeneter(cl2_config_dir):
logger.info("WarmUp Deployment Started")
deployment_file = f"{cl2_config_dir}/warmup_deployment.yaml"
subprocess.run(["kubectl", "apply", "-f", deployment_file], check=True)
def cleanup_warmup_deployment_for_karpeneter(cl2_config_dir):
deployment_file = f"{cl2_config_dir}/warmup_deployment.yaml"
subprocess.run(["kubectl", "delete", "-f", deployment_file], check=True)
logger.info("WarmUp Deployment Deleted")
try:
subprocess.run(["kubectl", "delete", "nodeclaims", "--all"], check=True)
except Exception as e:
logger.error(f"Error while deleting node: {e}")
def calculate_cpu_request_for_clusterloader2(node_label_selector, node_count, pod_count, warmup_deployment, cl2_config_dir):
client = KubernetesClient(os.path.expanduser("~/.kube/config"))
timeout = 10 # 10 minutes
nodes = []
try:
nodes = client.wait_for_nodes_ready(1, timeout, label_selector=node_label_selector)
if len(nodes) == 0:
raise Exception(f"No nodes found with label selector: {node_label_selector}")
except Exception as e:
raise Exception(f"Error while getting nodes: {e}") from e
node = nodes[0]
allocatable_cpu = node.status.allocatable["cpu"]
logger.info(f"Node {node.metadata.name} has allocatable cpu of {allocatable_cpu}")
cpu_value = int(allocatable_cpu.replace("m", ""))
allocated_cpu, _ = client.get_daemonsets_pods_allocated_resources("kube-system", node.metadata.name)
logger.info(f"Node {node.metadata.name} has allocated cpu of {allocated_cpu}")
cpu_value -= allocated_cpu
# Remove warmup deployment cpu request from the total cpu value
if warmup_deployment in ["true", "True"]:
cpu_value -= 100
cleanup_warmup_deployment_for_karpeneter(cl2_config_dir)
# Calculate the cpu request for each pod
pods_per_node = pod_count // node_count
cpu_request = cpu_value // pods_per_node
return cpu_request
def override_config_clusterloader2(cpu_per_node, node_count, pod_count, scale_up_timeout, scale_down_timeout, loop_count, node_label_selector, node_selector, override_file, warmup_deployment, cl2_config_dir):
logger.info(f"CPU per node: {cpu_per_node}")
desired_node_count = 1
if warmup_deployment in ["true", "True"]:
warmup_deployment_for_karpeneter(cl2_config_dir)
desired_node_count = 0
cpu_request = calculate_cpu_request_for_clusterloader2(node_label_selector, node_count, pod_count, warmup_deployment, cl2_config_dir)
logger.info(f"Total number of nodes: {node_count}, total number of pods: {pod_count}")
logger.info(f"CPU request for each pod: {cpu_request}m")
# assuming the number of surge nodes is no more than 10
with open(override_file, 'w', encoding='utf-8') as file:
file.write(f"CL2_DEPLOYMENT_CPU: {cpu_request}m\n")
file.write(f"CL2_MIN_NODE_COUNT: {node_count}\n")
file.write(f"CL2_MAX_NODE_COUNT: {node_count + 10}\n")
file.write(f"CL2_DESIRED_NODE_COUNT: {desired_node_count}\n")
file.write(f"CL2_DEPLOYMENT_SIZE: {pod_count}\n")
file.write(f"CL2_SCALE_UP_TIMEOUT: {scale_up_timeout}\n")
file.write(f"CL2_SCALE_DOWN_TIMEOUT: {scale_down_timeout}\n")
file.write(f"CL2_LOOP_COUNT: {loop_count}\n")
file.write(f"CL2_NODE_LABEL_SELECTOR: {node_label_selector}\n")
file.write(f"CL2_NODE_SELECTOR: \"{node_selector}\"\n")
file.close()
def execute_clusterloader2(cl2_image, cl2_config_dir, cl2_report_dir, kubeconfig, provider):
run_cl2_command(kubeconfig, cl2_image, cl2_config_dir, cl2_report_dir, provider, overrides=True)
def collect_clusterloader2(
cpu_per_node,
capacity_type,
node_count,
pod_count,
cl2_report_dir,
cloud_info,
run_id,
run_url,
result_file
):
index_pattern = re.compile(r'(\d+)$')
raw_data = parse_xml_to_json(os.path.join(cl2_report_dir, "junit.xml"), indent = 2)
json_data = json.loads(raw_data)
testsuites = json_data["testsuites"]
summary = {}
metric_mappings = {
"WaitForRunningPodsUp": ("up", "wait_for_pods_seconds"),
"WaitForNodesUpPerc50": ("up", "wait_for_50Perc_nodes_seconds"),
"WaitForNodesUpPerc70": ("up", "wait_for_70Perc_nodes_seconds"),
"WaitForNodesUpPerc90": ("up", "wait_for_90Perc_nodes_seconds"),
"WaitForNodesUpPerc99": ("up", "wait_for_99Perc_nodes_seconds"),
"WaitForNodesUpPerc100": ("up", "wait_for_nodes_seconds"),
"WaitForRunningPodsDown": ("down", "wait_for_pods_seconds"),
"WaitForNodesDownPerc50": ("down", "wait_for_50Perc_nodes_seconds"),
"WaitForNodesDownPerc70": ("down", "wait_for_70Perc_nodes_seconds"),
"WaitForNodesDownPerc90": ("down", "wait_for_90Perc_nodes_seconds"),
"WaitForNodesDownPerc99": ("down", "wait_for_99Perc_nodes_seconds"),
"WaitForNodesDownPerc100": ("down", "wait_for_nodes_seconds"),
}
if testsuites:
# Process each loop
for testcase in testsuites[0]["testcases"]:
name = testcase["name"]
index = -1
match = index_pattern.search(name)
if match:
index = match.group()
if index not in summary:
summary[index] = {
"up": { "failures": 0 },
"down": { "failures": 0 }
}
else:
continue
failure = testcase["failure"]
for test_key, (category, summary_key) in metric_mappings.items():
if test_key in name:
summary[index][category][summary_key] = -1 if failure else testcase["time"]
summary[index][category]["failures"] += 1 if failure else 0
break # Exit loop once matched
content = ""
for index, inner_dict in summary.items():
for key, value in inner_dict.items():
data = {
"wait_for_nodes_seconds": value["wait_for_nodes_seconds"],
"wait_for_50Perc_nodes_seconds": value["wait_for_50Perc_nodes_seconds"],
"wait_for_70Perc_nodes_seconds": value["wait_for_70Perc_nodes_seconds"],
"wait_for_90Perc_nodes_seconds": value["wait_for_90Perc_nodes_seconds"],
"wait_for_99Perc_nodes_seconds": value["wait_for_99Perc_nodes_seconds"],
"wait_for_pods_seconds": value["wait_for_pods_seconds"],
"autoscale_result": "success" if value["failures"] == 0 else "failure"
}
# TODO: Expose optional parameter to include test details
result = {
"timestamp": datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ'),
"autoscale_type": key,
"cpu_per_node": cpu_per_node,
"capacity_type": capacity_type,
"node_count": node_count,
"pod_count": pod_count,
"data": data,
# "raw_data": raw_data,
"cloud_info": cloud_info,
"run_id": run_id,
"run_url": run_url
}
content += json.dumps(result) + "\n"
else:
raise Exception(f"No testsuites found in the report! Raw data: {raw_data}")
os.makedirs(os.path.dirname(result_file), exist_ok=True)
with open(result_file, 'w', encoding='utf-8') as file:
file.write(content)
def main():
parser = argparse.ArgumentParser(description="Autoscale Kubernetes resources.")
subparsers = parser.add_subparsers(dest="command")
# Sub-command for override_config_clusterloader2
parser_override = subparsers.add_parser("override", help="Override CL2 config file")
parser_override.add_argument("cpu_per_node", type=int, help="Name of cpu cores per node")
parser_override.add_argument("node_count", type=int, help="Number of nodes")
parser_override.add_argument("pod_count", type=int, help="Number of pods")
parser_override.add_argument("scale_up_timeout", type=str, help="Timeout before failing the scale up test")
parser_override.add_argument("scale_down_timeout", type=str, help="Timeout before failing the scale down test")
parser_override.add_argument("loop_count", type=int, help="Number of times to repeat the test")
parser_override.add_argument("node_label_selector", type=str, help="Node label selector")
parser_override.add_argument("node_selector", type=str, help="Node selector for the test pods")
parser_override.add_argument("cl2_override_file", type=str, help="Path to the overrides of CL2 config file")
parser_override.add_argument("warmup_deployment", type=str, help="Warmup deployment to get the cpu request")
parser_override.add_argument("cl2_config_dir", type=str, help="Path to the CL2 config directory")
# Sub-command for execute_clusterloader2
parser_execute = subparsers.add_parser("execute", help="Execute scale up operation")
parser_execute.add_argument("cl2_image", type=str, help="Name of the CL2 image")
parser_execute.add_argument("cl2_config_dir", type=str, help="Path to the CL2 config directory")
parser_execute.add_argument("cl2_report_dir", type=str, help="Path to the CL2 report directory")
parser_execute.add_argument("kubeconfig", type=str, help="Path to the kubeconfig file")
parser_execute.add_argument("provider", type=str, help="Cloud provider name")
# Sub-command for collect_clusterloader2
parser_collect = subparsers.add_parser("collect", help="Collect scale up data")
parser_collect.add_argument("cpu_per_node", type=int, help="Name of cpu cores per node")
parser_collect.add_argument("capacity_type", type=str, help="Capacity type", choices=["on-demand", "spot"], default="on-demand")
parser_collect.add_argument("node_count", type=int, help="Number of nodes")
parser_collect.add_argument("pod_count", type=int, help="Number of pods")
parser_collect.add_argument("cl2_report_dir", type=str, help="Path to the CL2 report directory")
parser_collect.add_argument("cloud_info", type=str, help="Cloud information")
parser_collect.add_argument("run_id", type=str, help="Run ID")
parser_collect.add_argument("run_url", type=str, help="Run URL")
parser_collect.add_argument("result_file", type=str, help="Path to the result file")
args = parser.parse_args()
if args.command == "override":
override_config_clusterloader2(args.cpu_per_node, args.node_count, args.pod_count, args.scale_up_timeout, args.scale_down_timeout, args.loop_count, args.node_label_selector, args.node_selector, args.cl2_override_file, args.warmup_deployment, args.cl2_config_dir)
elif args.command == "execute":
execute_clusterloader2(args.cl2_image, args.cl2_config_dir, args.cl2_report_dir, args.kubeconfig, args.provider)
elif args.command == "collect":
collect_clusterloader2(args.cpu_per_node, args.capacity_type, args.node_count, args.pod_count, args.cl2_report_dir, args.cloud_info, args.run_id, args.run_url, args.result_file)
if __name__ == "__main__":
main()