def execute_attach_detach()

in modules/python/csi/csi.py [0:0]


def execute_attach_detach(disk_number, storage_class, wait_time, result_dir):
    """Execute the attach detach test."""
    print(f"Starting running test with {disk_number} disks and {storage_class} storage class")

    # Create the result directory and log file
    if not os.path.exists(result_dir):
        os.mkdir(result_dir)
    log_file = os.path.join(result_dir, f"attachdetach-{disk_number}.txt")

    namespace = f"test-{time.time_ns()}"

    p50, p90, p99, p100 = calculate_percentiles(disk_number)
    print(f"Percentiles: p50={p50}, p90={p90}, p99={p99}, p100={p100}")
    attach_thresholds = [(p50, "p50"), (p90, "p90"), (p99, "p99"), (p100, "p100")]
    detach_thresholds = [(p100 - p50, "p50"), (p100 - p90, "p90"), (p100 - p99, "p99"), (0, "p100")]

    # Create a namespace
    namespace_obj = KUBERNETERS_CLIENT.create_namespace(namespace)
    print(f"Created namespace {namespace_obj.metadata.name}")

    # Start the timer
    creation_start_time = datetime.now()

    # Create StatefulSet
    statefulset = create_statefulset(namespace, disk_number, storage_class)
    print(f"Created StatefulSet {statefulset.metadata.name}")

    # Measure PVC creation and attachment
    with ThreadPoolExecutor(max_workers=2) as executor:
        futures = []
        futures.append(
            executor.submit(
                monitor_thresholds,
                "PV creation",
                lambda: KUBERNETERS_CLIENT.get_bound_persistent_volume_claims_by_namespace(namespace),
                attach_thresholds,
                "gte",
                creation_start_time,
                log_file
            )
        )
        futures.append(
            executor.submit(
                monitor_thresholds,
                "PV attachment",
                lambda: KUBERNETERS_CLIENT.get_ready_pods_by_namespace(namespace),
                attach_thresholds,
                "gte",
                creation_start_time,
                log_file
            )
        )

        # Wait for all threads to complete
        for future in as_completed(futures):
            future.result() # Blocks until the thread finishes execution

    print(f"Measuring creation and attachment of PVCs completed! Waiting for {wait_time} seconds before starting deletion.")
    time.sleep(wait_time)

    # Start the timer
    deletion_start_time = datetime.now()

    # Delete StatefulSet
    KUBERNETERS_CLIENT.app.delete_namespaced_stateful_set(statefulset.metadata.name, namespace)
    KUBERNETERS_CLIENT.delete_persistent_volume_claim_by_namespace(namespace)

    # Measure PVC detachment
    with ThreadPoolExecutor(max_workers=2) as executor:
        future = executor.submit(
            monitor_thresholds,
            "PV detachment",
            lambda: KUBERNETERS_CLIENT.get_attached_volume_attachments(),
            detach_thresholds,
            "lte",
            deletion_start_time,
            log_file
        )
        future.result()

    KUBERNETERS_CLIENT.delete_namespace(namespace)
    print("Measuring detachment of PVCs completed.")