in modules/python/csi/csi.py [0:0]
def execute_attach_detach(disk_number, storage_class, wait_time, result_dir):
"""Execute the attach detach test."""
print(f"Starting running test with {disk_number} disks and {storage_class} storage class")
# Create the result directory and log file
if not os.path.exists(result_dir):
os.mkdir(result_dir)
log_file = os.path.join(result_dir, f"attachdetach-{disk_number}.txt")
namespace = f"test-{time.time_ns()}"
p50, p90, p99, p100 = calculate_percentiles(disk_number)
print(f"Percentiles: p50={p50}, p90={p90}, p99={p99}, p100={p100}")
attach_thresholds = [(p50, "p50"), (p90, "p90"), (p99, "p99"), (p100, "p100")]
detach_thresholds = [(p100 - p50, "p50"), (p100 - p90, "p90"), (p100 - p99, "p99"), (0, "p100")]
# Create a namespace
namespace_obj = KUBERNETERS_CLIENT.create_namespace(namespace)
print(f"Created namespace {namespace_obj.metadata.name}")
# Start the timer
creation_start_time = datetime.now()
# Create StatefulSet
statefulset = create_statefulset(namespace, disk_number, storage_class)
print(f"Created StatefulSet {statefulset.metadata.name}")
# Measure PVC creation and attachment
with ThreadPoolExecutor(max_workers=2) as executor:
futures = []
futures.append(
executor.submit(
monitor_thresholds,
"PV creation",
lambda: KUBERNETERS_CLIENT.get_bound_persistent_volume_claims_by_namespace(namespace),
attach_thresholds,
"gte",
creation_start_time,
log_file
)
)
futures.append(
executor.submit(
monitor_thresholds,
"PV attachment",
lambda: KUBERNETERS_CLIENT.get_ready_pods_by_namespace(namespace),
attach_thresholds,
"gte",
creation_start_time,
log_file
)
)
# Wait for all threads to complete
for future in as_completed(futures):
future.result() # Blocks until the thread finishes execution
print(f"Measuring creation and attachment of PVCs completed! Waiting for {wait_time} seconds before starting deletion.")
time.sleep(wait_time)
# Start the timer
deletion_start_time = datetime.now()
# Delete StatefulSet
KUBERNETERS_CLIENT.app.delete_namespaced_stateful_set(statefulset.metadata.name, namespace)
KUBERNETERS_CLIENT.delete_persistent_volume_claim_by_namespace(namespace)
# Measure PVC detachment
with ThreadPoolExecutor(max_workers=2) as executor:
future = executor.submit(
monitor_thresholds,
"PV detachment",
lambda: KUBERNETERS_CLIENT.get_attached_volume_attachments(),
detach_thresholds,
"lte",
deletion_start_time,
log_file
)
future.result()
KUBERNETERS_CLIENT.delete_namespace(namespace)
print("Measuring detachment of PVCs completed.")