def create_statefulset()

in modules/python/csi/csi.py [0:0]


def create_statefulset(namespace, replicas, storage_class):
    """Create a StatefulSet dynamically."""
    statefulset = client.V1StatefulSet(
        api_version="apps/v1",
        kind="StatefulSet",
        metadata=client.V1ObjectMeta(name="statefulset-local"),
        spec=client.V1StatefulSetSpec(
            pod_management_policy="Parallel", # Default is OrderedReady
            replicas=replicas,
            selector=client.V1LabelSelector(match_labels={"app": "nginx"}),
            service_name="statefulset-local",
            template=client.V1PodTemplateSpec(
                metadata=client.V1ObjectMeta(labels={"app": "nginx"}),
                spec=client.V1PodSpec(
                    node_selector={"kubernetes.io/os": "linux"},
                    containers=[
                        client.V1Container(
                            name="statefulset-local",
                            image="mcr.microsoft.com/oss/nginx/nginx:1.19.5",
                            command=[
                                "/bin/bash",
                                "-c",
                                "set -euo pipefail; while true; do echo $(date) >> /mnt/local/outfile; sleep 1; done",
                            ],
                            volume_mounts=[
                                client.V1VolumeMount(name="persistent-storage", mount_path="/mnt/local")
                            ],
                        )
                    ],
                ),
            ),
            volume_claim_templates=[
                client.V1PersistentVolumeClaimTemplate(
                    metadata=client.V1ObjectMeta(
                        name="persistent-storage",
                        annotations={"volume.beta.kubernetes.io/storage-class": storage_class},
                    ),
                    spec=client.V1PersistentVolumeClaimSpec(
                        access_modes=["ReadWriteOnce"],
                        resources=client.V1ResourceRequirements(requests={"storage": "1Gi"}),
                    ),
                )
            ],
        ),
    )
    app_client = KUBERNETERS_CLIENT.get_app_client()
    statefulset_obj = app_client.create_namespaced_stateful_set(namespace, statefulset)
    return statefulset_obj