def update_azure_container_storage()

in src/aks-preview/azext_aks_preview/managed_cluster_decorator.py [0:0]


    def update_azure_container_storage(self, mc: ManagedCluster) -> ManagedCluster:
        """Update azure container storage for the Managed Cluster object
        :return: ManagedCluster
        """
        self._ensure_mc(mc)
        # read the azure container storage values passed
        enable_pool_type = self.context.raw_param.get("enable_azure_container_storage")
        disable_pool_type = self.context.raw_param.get("disable_azure_container_storage")
        enable_azure_container_storage = enable_pool_type is not None
        disable_azure_container_storage = disable_pool_type is not None
        nodepool_list = self.context.raw_param.get("azure_container_storage_nodepools")
        ephemeral_disk_volume_type = self.context.raw_param.get("ephemeral_disk_volume_type")
        ephemeral_disk_nvme_perf_tier = self.context.raw_param.get("ephemeral_disk_nvme_perf_tier")
        if enable_azure_container_storage and disable_azure_container_storage:
            raise MutuallyExclusiveArgumentError(
                'Conflicting flags. Cannot set --enable-azure-container-storage '
                'and --disable-azure-container-storage together.'
            )

        if (ephemeral_disk_volume_type is not None or ephemeral_disk_nvme_perf_tier is not None) and \
           not enable_azure_container_storage:
            params_defined_arr = []
            if ephemeral_disk_volume_type is not None:
                params_defined_arr.append('--ephemeral-disk-volume-type')
            if ephemeral_disk_nvme_perf_tier is not None:
                params_defined_arr.append('--ephemeral-disk-nvme-perf-tier')

            params_defined = 'and '.join(params_defined_arr)
            raise RequiredArgumentMissingError(
                f'Cannot set {params_defined} without the parameter --enable-azure-container-storage.'
            )

        # pylint: disable=too-many-nested-blocks
        if enable_azure_container_storage or disable_azure_container_storage:
            # Require the agent pool profiles for azure container storage
            # operations. Raise exception if not found.
            if not mc.agent_pool_profiles:
                raise UnknownError(
                    "Encounter an unexpected error while getting agent pool profiles from the cluster "
                    "in the process of updating agentpool profile."
                )
            storagepool_name = self.context.raw_param.get("storage_pool_name")
            pool_option = self.context.raw_param.get("storage_pool_option")
            pool_sku = self.context.raw_param.get("storage_pool_sku")
            pool_size = self.context.raw_param.get("storage_pool_size")
            agentpool_details = {}
            from azext_aks_preview.azurecontainerstorage._helpers import get_extension_installed_and_cluster_configs

            try:
                (
                    is_extension_installed,
                    is_azureDisk_enabled,
                    is_elasticSan_enabled,
                    is_ephemeralDisk_localssd_enabled,
                    is_ephemeralDisk_nvme_enabled,
                    current_core_value,
                    existing_ephemeral_disk_volume_type,
                    existing_perf_tier,
                ) = get_extension_installed_and_cluster_configs(
                    self.cmd,
                    self.context.get_resource_group_name(),
                    self.context.get_name(),
                    mc.agent_pool_profiles,
                )
            except UnknownError as e:
                logger.error("\nError fetching installed extension and cluster config: %s", e)
                return mc
            except Exception as ex:  # pylint: disable=broad-except
                logger.error("Exception fetching installed extension and cluster config: %s", ex)
                return mc

            vm_cache_generated = self.context.get_intermediate(
                "vm_cache_generated",
                default_value=False,
            )

            if not vm_cache_generated:
                from azext_aks_preview.azurecontainerstorage._helpers import generate_vm_sku_cache_for_region
                generate_vm_sku_cache_for_region(self.cmd.cli_ctx, self.context.get_location())
                self.context.set_intermediate("vm_cache_generated", True, overwrite_exists=True)

            if enable_azure_container_storage:
                from azext_aks_preview.azurecontainerstorage._consts import (
                    CONST_ACSTOR_IO_ENGINE_LABEL_KEY,
                    CONST_ACSTOR_IO_ENGINE_LABEL_VAL
                )
                labelled_nodepool_arr = []
                for agentpool in mc.agent_pool_profiles:
                    pool_details = {}
                    nodepool_name = agentpool.name
                    pool_details["vm_size"] = agentpool.vm_size
                    pool_details["count"] = agentpool.count
                    pool_details["os_type"] = agentpool.os_type
                    pool_details["mode"] = agentpool.mode
                    pool_details["node_taints"] = agentpool.node_taints
                    pool_details["zoned"] = agentpool.availability_zones is not None
                    if agentpool.node_labels is not None:
                        node_labels = agentpool.node_labels
                        if node_labels is not None and \
                           node_labels.get(CONST_ACSTOR_IO_ENGINE_LABEL_KEY) is not None and \
                           nodepool_name is not None:
                            labelled_nodepool_arr.append(nodepool_name)
                        pool_details["node_labels"] = node_labels
                    agentpool_details[nodepool_name] = pool_details

                # Incase of a new installation, if the nodepool list is not defined
                # then check for all the nodepools which are marked with acstor io-engine
                # labels and include them for installation. If none of the nodepools are
                # labelled, either pick nodepool1 as default, or if only
                # one nodepool exists, choose the only nodepool by default.
                if not is_extension_installed:
                    if nodepool_list is None:
                        nodepool_list = ""
                        if len(labelled_nodepool_arr) > 0:
                            nodepool_list = ','.join(labelled_nodepool_arr)
                        elif len(agentpool_details) == 1:
                            nodepool_list = ','.join(agentpool_details.keys())

                from azext_aks_preview.azurecontainerstorage._validators import (
                    validate_enable_azure_container_storage_params
                )
                validate_enable_azure_container_storage_params(
                    enable_pool_type,
                    storagepool_name,
                    pool_sku,
                    pool_option,
                    pool_size,
                    nodepool_list,
                    agentpool_details,
                    is_extension_installed,
                    is_azureDisk_enabled,
                    is_elasticSan_enabled,
                    is_ephemeralDisk_localssd_enabled,
                    is_ephemeralDisk_nvme_enabled,
                    ephemeral_disk_volume_type,
                    ephemeral_disk_nvme_perf_tier,
                    existing_ephemeral_disk_volume_type,
                    existing_perf_tier,
                )

                if is_ephemeralDisk_nvme_enabled and ephemeral_disk_nvme_perf_tier is not None:
                    # Adding this intermediate and check to ensure that the below
                    # message prompt doesn't appear twice when aks-preview extension
                    # is called from both update_mc_profile_preview and update_mc_profile_default.
                    is_azure_container_storage_perf_tier_op_set = self.context.get_intermediate(
                        "azure_container_storage_perf_tier_op_set",
                        default_value="default",
                    )

                    if is_azure_container_storage_perf_tier_op_set == "default":
                        msg = (
                            "Changing ephemeralDisk NVMe performance tier may result in a temporary "
                            "interruption to the applications using Azure Container Storage. Do you "
                            "want to continue with this operation?"
                        )

                        if not (self.context.get_yes() or prompt_y_n(msg, default="n")):
                            raise DecoratorEarlyExitException()

                        self.context.set_intermediate(
                            "azure_container_storage_perf_tier_op_set",
                            True,
                            overwrite_exists=True
                        )

                # If the extension is already installed,
                # we expect that the Azure Container Storage
                # nodes are already labelled. Use those label
                # to generate the nodepool_list.
                if is_extension_installed:
                    nodepool_list = ','.join(labelled_nodepool_arr)
                else:
                    # Set Azure Container Storage labels on the required nodepools.
                    nodepool_list_arr = nodepool_list.split(',')
                    for agentpool in mc.agent_pool_profiles:
                        labels = agentpool.node_labels
                        if agentpool.name in nodepool_list_arr:
                            if labels is None:
                                labels = {}
                            labels[CONST_ACSTOR_IO_ENGINE_LABEL_KEY] = CONST_ACSTOR_IO_ENGINE_LABEL_VAL
                        else:
                            # Remove residual Azure Container Storage labels
                            # from any other nodepools where its not intended
                            if labels is not None:
                                labels.pop(CONST_ACSTOR_IO_ENGINE_LABEL_KEY, None)
                        agentpool.node_labels = labels

                # set intermediates
                self.context.set_intermediate("azure_container_storage_nodepools", nodepool_list, overwrite_exists=True)
                self.context.set_intermediate("enable_azure_container_storage", True, overwrite_exists=True)

            if disable_azure_container_storage:
                from azext_aks_preview.azurecontainerstorage._validators import (
                    validate_disable_azure_container_storage_params
                )
                validate_disable_azure_container_storage_params(
                    disable_pool_type,
                    storagepool_name,
                    pool_sku,
                    pool_option,
                    pool_size,
                    nodepool_list,
                    is_extension_installed,
                    is_azureDisk_enabled,
                    is_elasticSan_enabled,
                    is_ephemeralDisk_localssd_enabled,
                    is_ephemeralDisk_nvme_enabled,
                    ephemeral_disk_volume_type,
                    ephemeral_disk_nvme_perf_tier,
                )

                is_pre_disable_validate_set = self.context.get_intermediate(
                    "pre_disable_validate_azure_container_storage",
                    default_value="default",
                )

                # pre_disable_validate_azure_container_storage will be set to
                # True or False if the updated version az aks cli is being used.
                # If that is the case, we will skip showing the prompt.
                if is_pre_disable_validate_set == "default":
                    pre_disable_validate = False

                    msg = (
                        "Disabling Azure Container Storage will forcefully delete all the storage pools in the cluster "
                        "and affect the applications using these storage pools. Forceful deletion of storage pools can "
                        "also lead to leaking of storage resources which are being consumed. Do you want to validate "
                        "whether any of the storage pools are being used before disabling Azure Container Storage?"
                    )

                    from azext_aks_preview.azurecontainerstorage._consts import (
                        CONST_ACSTOR_ALL,
                    )
                    if disable_pool_type != CONST_ACSTOR_ALL:
                        msg = (
                            f"Disabling Azure Container Storage for storage pool type {disable_pool_type} "
                            "will forcefully delete all the storage pools of the same type and affect the "
                            "applications using these storage pools. Forceful deletion of storage pools can "
                            "also lead to leaking of storage resources which are being consumed. Do you want to "
                            f"validate whether any of the storage pools of type {disable_pool_type} are being used "
                            "before disabling Azure Container Storage?"
                        )
                    if self.context.get_yes() or prompt_y_n(msg, default="y"):
                        pre_disable_validate = True

                    # set intermediate
                    self.context.set_intermediate("disable_azure_container_storage", True, overwrite_exists=True)
                    self.context.set_intermediate(
                        "pre_disable_validate_azure_container_storage",
                        pre_disable_validate,
                        overwrite_exists=True
                    )

            # Set intermediates
            self.context.set_intermediate("is_extension_installed", is_extension_installed, overwrite_exists=True)
            self.context.set_intermediate("is_azureDisk_enabled", is_azureDisk_enabled, overwrite_exists=True)
            self.context.set_intermediate("is_elasticSan_enabled", is_elasticSan_enabled, overwrite_exists=True)
            self.context.set_intermediate("current_core_value", current_core_value, overwrite_exists=True)
            self.context.set_intermediate(
                "current_ephemeral_nvme_perf_tier",
                existing_perf_tier,
                overwrite_exists=True
            )
            self.context.set_intermediate(
                "existing_ephemeral_disk_volume_type",
                existing_ephemeral_disk_volume_type,
                overwrite_exists=True
            )
            self.context.set_intermediate(
                "is_ephemeralDisk_nvme_enabled",
                is_ephemeralDisk_nvme_enabled,
                overwrite_exists=True
            )
            self.context.set_intermediate(
                "is_ephemeralDisk_localssd_enabled",
                is_ephemeralDisk_localssd_enabled,
                overwrite_exists=True
            )

        return mc