modules/python/csi/csi.py (212 lines of code) (raw):

import time import argparse import os import json from datetime import datetime, timezone from concurrent.futures import ThreadPoolExecutor, as_completed from clients.kubernetes_client import KubernetesClient, client KUBERNETERS_CLIENT=KubernetesClient() # TODO: Move to utils folder later to be shared with other modules def validate_node_count(node_label, node_count, operation_timeout_in_minutes): kube_client = KubernetesClient() ready_node_count = 0 timeout = time.time() + (operation_timeout_in_minutes * 60) print(f"Validating {node_count} nodes with label {node_label} are ready.") while time.time() < timeout: ready_nodes = kube_client.get_ready_nodes(label_selector=node_label) ready_node_count = len(ready_nodes) print(f"Currently {ready_node_count} nodes are ready.") if ready_node_count == node_count: break print(f"Waiting for {node_count} nodes to be ready.") time.sleep(10) if ready_node_count != node_count: raise Exception(f"Only {ready_node_count} nodes are ready, expected {node_count} nodes!") def calculate_percentiles(disk_number): """Calculate percentile values for pods.""" p50 = disk_number // 2 p90 = disk_number * 9 // 10 p99 = disk_number * 99 // 100 return p50, p90, p99, disk_number def create_statefulset(namespace, replicas, storage_class): """Create a StatefulSet dynamically.""" statefulset = client.V1StatefulSet( api_version="apps/v1", kind="StatefulSet", metadata=client.V1ObjectMeta(name="statefulset-local"), spec=client.V1StatefulSetSpec( pod_management_policy="Parallel", # Default is OrderedReady replicas=replicas, selector=client.V1LabelSelector(match_labels={"app": "nginx"}), service_name="statefulset-local", template=client.V1PodTemplateSpec( metadata=client.V1ObjectMeta(labels={"app": "nginx"}), spec=client.V1PodSpec( node_selector={"kubernetes.io/os": "linux"}, containers=[ client.V1Container( name="statefulset-local", image="mcr.microsoft.com/oss/nginx/nginx:1.19.5", command=[ "/bin/bash", "-c", "set -euo pipefail; while true; do echo $(date) >> /mnt/local/outfile; sleep 1; done", ], volume_mounts=[ client.V1VolumeMount(name="persistent-storage", mount_path="/mnt/local") ], ) ], ), ), volume_claim_templates=[ client.V1PersistentVolumeClaimTemplate( metadata=client.V1ObjectMeta( name="persistent-storage", annotations={"volume.beta.kubernetes.io/storage-class": storage_class}, ), spec=client.V1PersistentVolumeClaimSpec( access_modes=["ReadWriteOnce"], resources=client.V1ResourceRequirements(requests={"storage": "1Gi"}), ), ) ], ), ) app_client = KUBERNETERS_CLIENT.get_app_client() statefulset_obj = app_client.create_namespaced_stateful_set(namespace, statefulset) return statefulset_obj def log_duration(description, start_time, log_file): """Log the time duration of an operation.""" end_time = datetime.now() duration = int((end_time - start_time).total_seconds()) if ":" in description: raise Exception("Description cannot contain a colon ':' character!") with open(log_file, 'a', encoding='utf-8') as file: file.write(f"{description}: {duration}\n") print(f"{description}: {duration}s") def wait_for_condition(check_function, target, comparison="gte", interval=1): """ Wait for a condition using a given check function. The check function should return a list of items. The condition is satisfied when the length of the list meets the target. """ while True: current_list = check_function() current = len(current_list) print(f"Current: {current}, Target: {target}") if (comparison == "gte" and current >= target) or (comparison == "lte" and current <= target): return current time.sleep(interval) def monitor_thresholds(description, monitor_function, thresholds, comparison, start_time, log_file): """Monitor thresholds and log their completion.""" for target, threshold_desc in thresholds: wait_for_condition(monitor_function, target, comparison) log_duration(f"{description} {threshold_desc}", start_time, log_file) def execute_attach_detach(disk_number, storage_class, wait_time, result_dir): """Execute the attach detach test.""" print(f"Starting running test with {disk_number} disks and {storage_class} storage class") # Create the result directory and log file if not os.path.exists(result_dir): os.mkdir(result_dir) log_file = os.path.join(result_dir, f"attachdetach-{disk_number}.txt") namespace = f"test-{time.time_ns()}" p50, p90, p99, p100 = calculate_percentiles(disk_number) print(f"Percentiles: p50={p50}, p90={p90}, p99={p99}, p100={p100}") attach_thresholds = [(p50, "p50"), (p90, "p90"), (p99, "p99"), (p100, "p100")] detach_thresholds = [(p100 - p50, "p50"), (p100 - p90, "p90"), (p100 - p99, "p99"), (0, "p100")] # Create a namespace namespace_obj = KUBERNETERS_CLIENT.create_namespace(namespace) print(f"Created namespace {namespace_obj.metadata.name}") # Start the timer creation_start_time = datetime.now() # Create StatefulSet statefulset = create_statefulset(namespace, disk_number, storage_class) print(f"Created StatefulSet {statefulset.metadata.name}") # Measure PVC creation and attachment with ThreadPoolExecutor(max_workers=2) as executor: futures = [] futures.append( executor.submit( monitor_thresholds, "PV creation", lambda: KUBERNETERS_CLIENT.get_bound_persistent_volume_claims_by_namespace(namespace), attach_thresholds, "gte", creation_start_time, log_file ) ) futures.append( executor.submit( monitor_thresholds, "PV attachment", lambda: KUBERNETERS_CLIENT.get_ready_pods_by_namespace(namespace), attach_thresholds, "gte", creation_start_time, log_file ) ) # Wait for all threads to complete for future in as_completed(futures): future.result() # Blocks until the thread finishes execution print(f"Measuring creation and attachment of PVCs completed! Waiting for {wait_time} seconds before starting deletion.") time.sleep(wait_time) # Start the timer deletion_start_time = datetime.now() # Delete StatefulSet KUBERNETERS_CLIENT.app.delete_namespaced_stateful_set(statefulset.metadata.name, namespace) KUBERNETERS_CLIENT.delete_persistent_volume_claim_by_namespace(namespace) # Measure PVC detachment with ThreadPoolExecutor(max_workers=2) as executor: future = executor.submit( monitor_thresholds, "PV detachment", lambda: KUBERNETERS_CLIENT.get_attached_volume_attachments(), detach_thresholds, "lte", deletion_start_time, log_file ) future.result() KUBERNETERS_CLIENT.delete_namespace(namespace) print("Measuring detachment of PVCs completed.") def collect_attach_detach(case_name, node_number, disk_number, storage_class, cloud_info, run_id, run_url, result_dir): raw_result_file = os.path.join(result_dir, f"attachdetach-{disk_number}.txt") result_file = os.path.join(result_dir, "results.json") print(f"Collecting attach detach test results from {raw_result_file} into {result_file}") with open(raw_result_file, 'r', encoding='utf-8') as file: content = file.read() print(content) # Parse metrics from the result file metrics = {} for line in content.splitlines(): if ':' in line: # Only process lines with key-value pairs key, value = map(str.strip, line.split(':', 1)) metrics[key.replace(' ', '_')] = value print(f"Parsed metrics: {metrics}") content = { "timestamp": datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ'), "case_name": case_name, "node_number": node_number, "disk_number": disk_number, "storage_class": storage_class, "result": metrics, "cloud_info": cloud_info, "run_id": run_id, "run_url": run_url } os.makedirs(os.path.dirname(result_file), exist_ok=True) with open(result_file, 'w', encoding='utf-8') as file: file.write(json.dumps(content)) def main(): parser = argparse.ArgumentParser(description="CSI Benchmark.") subparsers = parser.add_subparsers(dest="command") # Sub-command for validate parser_validate = subparsers.add_parser("validate", help="Validate node count") parser_validate.add_argument("node_label", type=str, help="Node label selector") parser_validate.add_argument("node_count", type=int, help="Number of nodes") parser_validate.add_argument("operation_timeout", type=int, help="Timeout for the operation in seconds") # Sub-command for execute_attach_detach parser_execute = subparsers.add_parser("execute", help="Execute attach detach test") parser_execute.add_argument("disk_number", type=int, help="Disk number") parser_execute.add_argument("storage_class", type=str, help="Storage class") parser_execute.add_argument("wait_time", type=int, help="Wait time before deletion") parser_execute.add_argument("result_dir", type=str, help="Result directory") # Sub-command for collect_attach_detach parser_collect = subparsers.add_parser("collect", help="Collect attach detach test results") parser_collect.add_argument("case_name", type=str, help="Case name") parser_collect.add_argument("node_number", type=int, help="Node number") parser_collect.add_argument("disk_number", type=int, help="Disk number") parser_collect.add_argument("storage_class", type=str, help="Storage class") parser_collect.add_argument("cloud_info", type=str, help="Cloud info") parser_collect.add_argument("run_id", type=str, help="Run ID") parser_collect.add_argument("run_url", type=str, help="Run URL") parser_collect.add_argument("result_dir", type=str, help="Result directory") args = parser.parse_args() if args.command == "validate": validate_node_count(args.node_label, args.node_count, args.operation_timeout) elif args.command == "execute": execute_attach_detach(args.disk_number, args.storage_class, args.wait_time, args.result_dir) elif args.command == "collect": collect_attach_detach(args.case_name, args.node_number, args.disk_number, args.storage_class, args.cloud_info, args.run_id, args.run_url, args.result_dir) if __name__ == "__main__": main()