modules/python/clusterloader2/utils.py (107 lines of code) (raw):
from xml.dom import minidom
import json
import os
import argparse
import docker
from clients.docker_client import DockerClient
POD_STARTUP_LATENCY_FILE_PREFIX_MEASUREMENT_MAP = {
"PodStartupLatency_PodStartupLatency_": "PodStartupLatency_PodStartupLatency",
"StatefulPodStartupLatency_PodStartupLatency_": "StatefulPodStartupLatency_PodStartupLatency",
"StatelessPodStartupLatency_PodStartupLatency_": "StatelessPodStartupLatency_PodStartupLatency",
}
NETWORK_METRIC_PREFIXES = ["APIResponsivenessPrometheus", "InClusterNetworkLatency", "NetworkProgrammingLatency"]
PROM_QUERY_PREFIX = "GenericPrometheusQuery"
RESOURCE_USAGE_SUMMARY_PREFIX = "ResourceUsageSummary"
NETWORK_POLICY_SOAK_MEASUREMENT_PREFIX = "NetworkPolicySoakMeasurement"
def run_cl2_command(kubeconfig, cl2_image, cl2_config_dir, cl2_report_dir, provider, cl2_config_file="config.yaml", overrides=False, enable_prometheus=False, tear_down_prometheus=True,
enable_exec_service=False, scrape_kubelets=False, scrape_containerd=False):
docker_client = DockerClient()
command=f"""--provider={provider} --v=2
--enable-exec-service={enable_exec_service}
--enable-prometheus-server={enable_prometheus}
--prometheus-scrape-kubelets={scrape_kubelets}
--kubeconfig /root/.kube/config
--testconfig /root/perf-tests/clusterloader2/config/{cl2_config_file}
--report-dir /root/perf-tests/clusterloader2/results
--tear-down-prometheus-server={tear_down_prometheus}"""
if scrape_containerd:
command += f" --prometheus-scrape-containerd={scrape_containerd}"
if overrides:
command += " --testoverrides=/root/perf-tests/clusterloader2/config/overrides.yaml"
volumes = {
kubeconfig: {'bind': '/root/.kube/config', 'mode': 'rw'},
cl2_config_dir: {'bind': '/root/perf-tests/clusterloader2/config', 'mode': 'rw'},
cl2_report_dir: {'bind': '/root/perf-tests/clusterloader2/results', 'mode': 'rw'}
}
if provider == "aws":
aws_path = os.path.expanduser("~/.aws/credentials")
volumes[aws_path] = {'bind': '/root/.aws/credentials', 'mode': 'rw'}
print(f"Running clusterloader2 with command: {command} and volumes: {volumes}")
try:
container = docker_client.run_container(cl2_image, command, volumes, detach=True)
for log in container.logs(stream=True):
print(log.decode('utf-8'), end='')
container.wait()
except docker.errors.ContainerError as e:
print(f"Container exited with a non-zero status code: {e.exit_status}\n{e.stderr.decode('utf-8')}")
def get_measurement(file_path):
file_name = os.path.basename(file_path)
for file_prefix, measurement in POD_STARTUP_LATENCY_FILE_PREFIX_MEASUREMENT_MAP.items():
if file_name.startswith(file_prefix):
group_name = file_name.split("_")[2]
return measurement, group_name
for file_prefix in NETWORK_METRIC_PREFIXES:
if file_name.startswith(file_prefix):
group_name = file_name.split("_")[1]
return file_prefix, group_name
if file_name.startswith(PROM_QUERY_PREFIX):
group_name = file_name.split("_")[1]
measurement_name = file_name.split("_")[0][len(PROM_QUERY_PREFIX)+1:]
return measurement_name, group_name
if file_name.startswith(RESOURCE_USAGE_SUMMARY_PREFIX):
group_name = file_name.split("_")[1]
return RESOURCE_USAGE_SUMMARY_PREFIX, group_name
if file_name.startswith(NETWORK_POLICY_SOAK_MEASUREMENT_PREFIX):
group_name = file_name.split("_")[1]
return NETWORK_POLICY_SOAK_MEASUREMENT_PREFIX, group_name
return None, None
def parse_xml_to_json(file_path, indent = 0):
with open(file_path, 'r', encoding='utf-8') as file:
xml_content = file.read()
dom = minidom.parseString(xml_content)
result = {
"testsuites": []
}
# Extract test suites
testsuites = dom.getElementsByTagName("testsuite")
for testsuite in testsuites:
suite_name = testsuite.getAttribute("name")
suite_tests = int(testsuite.getAttribute("tests"))
suite_failures = int(testsuite.getAttribute("failures"))
suite_errors = int(testsuite.getAttribute("errors"))
suite_result = {
"name": suite_name,
"tests": suite_tests,
"failures": suite_failures,
"errors": suite_errors,
"testcases": []
}
# Extract test cases
testcases = testsuite.getElementsByTagName("testcase")
for testcase in testcases:
case_name = testcase.getAttribute("name")
case_classname = testcase.getAttribute("classname")
case_time = testcase.getAttribute("time")
case_result = {
"name": case_name,
"classname": case_classname,
"time": case_time,
"failure": None
}
# Check for failure
failure = testcase.getElementsByTagName("failure")
if failure:
failure_message = failure[0].firstChild.nodeValue
case_result["failure"] = failure_message
suite_result["testcases"].append(case_result)
result["testsuites"].append(suite_result)
# Convert the result dictionary to JSON
json_result = json.dumps(result, indent = indent)
return json_result
def str2bool(val):
if isinstance(val, bool):
return val
if val.lower() in ("true", "yes", "1"):
return True
if val.lower() in ("false", "no", "0"):
return False
raise argparse.ArgumentTypeError("Boolean value expected.")