azext_edge/edge/providers/orchestration/deletion.py (193 lines of code) (raw):

# coding=utf-8 # ---------------------------------------------------------------------------------------------- # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. See License file in the project root for license information. # ---------------------------------------------------------------------------------------------- from sys import maxsize from time import sleep from typing import TYPE_CHECKING, Dict, List, Optional, Tuple from azure.cli.core.azclierror import ArgumentUsageError from knack.log import get_logger from rich import print from rich.console import NewLine from rich.live import Live from rich.progress import Progress, SpinnerColumn, TimeElapsedColumn from rich.table import Table from ...util.az_client import get_resource_client, wait_for_terminal_states from ...util.common import should_continue_prompt from .common import EXTENSION_TYPE_OPS from .resource_map import IoTOperationsResource, IoTOperationsResourceMap from .resources import Instances logger = get_logger(__name__) if TYPE_CHECKING: from azure.core.polling import LROPoller def delete_ops_resources( cmd, resource_group_name: str, instance_name: Optional[str] = None, cluster_name: Optional[str] = None, confirm_yes: Optional[bool] = None, no_progress: Optional[bool] = None, force: Optional[bool] = None, include_dependencies: Optional[bool] = None, ): manager = DeletionManager( cmd=cmd, instance_name=instance_name, cluster_name=cluster_name, resource_group_name=resource_group_name, no_progress=no_progress, include_dependencies=include_dependencies, ) manager.do_work(confirm_yes=confirm_yes, force=force) class DeletionManager: def __init__( self, cmd, resource_group_name: str, instance_name: Optional[str] = None, cluster_name: Optional[str] = None, include_dependencies: Optional[bool] = None, no_progress: Optional[bool] = None, ): from azure.cli.core.commands.client_factory import get_subscription_id self.cmd = cmd self.instance_name = instance_name self.cluster_name = cluster_name self.resource_group_name = resource_group_name self.instances = Instances(self.cmd) self.include_dependencies = include_dependencies self.subscription_id = get_subscription_id(cli_ctx=cmd.cli_ctx) self.resource_client = get_resource_client(self.subscription_id) self._render_progress = not no_progress self._live = Live(None, transient=False, refresh_per_second=8, auto_refresh=self._render_progress) self._progress_bar = Progress( SpinnerColumn(), *Progress.get_default_columns(), "Elapsed:", TimeElapsedColumn(), transient=False, ) self._progress_shown = False def do_work(self, confirm_yes: Optional[bool] = None, force: Optional[bool] = None): self.resource_map = self._get_resource_map() # Ensure cluster exists with existing resource_map pattern. self.resource_map.connected_cluster.resource self.resource_map.refresh_resource_state() self._display_resource_tree() should_bail = not should_continue_prompt(confirm_yes=confirm_yes) if should_bail: return self._process(force=force) def _get_resource_map(self) -> IoTOperationsResourceMap: if not any([self.cluster_name, self.instance_name]): raise ArgumentUsageError("Please provide either an instance name or cluster name.") if self.instance_name: self.instance = self.instances.show(name=self.instance_name, resource_group_name=self.resource_group_name) return self.instances.get_resource_map(self.instance) return IoTOperationsResourceMap( cmd=self.cmd, cluster_name=self.cluster_name, resource_group_name=self.resource_group_name, defer_refresh=True, ) def _display_resource_tree(self): if self._render_progress: print(self.resource_map.build_tree(include_dependencies=self.include_dependencies, category_color="red")) def _render_display(self, description: str): if self._render_progress: grid = Table.grid(expand=False) grid.add_column() grid.add_row(NewLine(1)) grid.add_row(description) grid.add_row(NewLine(1)) grid.add_row(self._progress_bar) if not self._progress_shown: self._task_id = self._progress_bar.add_task(description="Work.", total=None) self._progress_shown = True self._live.update(grid, refresh=True) if not self._live.is_started: self._live.start(True) def _stop_display(self): if self._render_progress and self._live.is_started: if self._progress_shown: self._progress_bar.update(self._task_id, description="Done.") sleep(0.5) self._live.stop() def _process(self, force: bool = False): todo_extensions = [] if self.include_dependencies: todo_extensions.extend(self.resource_map.extensions) else: # instance delete should delete AIO extension too # TODO: @c-ryan-k hacky aio_ext_obj = self.resource_map.connected_cluster.get_extensions_by_type(EXTENSION_TYPE_OPS).get( EXTENSION_TYPE_OPS, {} ) if aio_ext_obj: aio_ext_id: str = aio_ext_obj.get("id", "") aio_ext = next( (ext for ext in self.resource_map.extensions if ext.resource_id.lower() == aio_ext_id.lower()), None ) if aio_ext: todo_extensions.append(aio_ext) todo_custom_locations = self.resource_map.custom_locations todo_resource_sync_rules = [] todo_resources = [] for cl in todo_custom_locations: todo_resource_sync_rules.extend(self.resource_map.get_resource_sync_rules(cl.resource_id)) todo_resources.extend(self.resource_map.get_resources(cl.resource_id)) batched_work = self._batch_resources( resources=todo_resources, resource_sync_rules=todo_resource_sync_rules, custom_locations=todo_custom_locations, extensions=todo_extensions, ) if not batched_work: logger.warning("Nothing to delete :)") return if not force: if not self.resource_map.connected_cluster.connected: logger.warning( "Deletion cancelled. The cluster is not connected to Azure. " "Use --force to continue anyway, which may lead to errors." ) return try: for batches_key in batched_work: self._render_display(f"[red]Deleting {batches_key}...") for batch in batched_work[batches_key]: # TODO: @digimaun - Show summary as result lros = self._delete_batch(batch) [lro.result() for lro in lros] finally: self._stop_display() def _batch_resources( self, resources: Optional[List[IoTOperationsResource]] = None, resource_sync_rules: Optional[List[IoTOperationsResource]] = None, custom_locations: Optional[List[IoTOperationsResource]] = None, extensions: Optional[List[IoTOperationsResource]] = None, ) -> Dict[str, List[List[IoTOperationsResource]]]: batched_work: Dict[str, List[List[IoTOperationsResource]]] = {} if resources: # resource_map.get_resources will sort resources in descending order by segment then display name resource_batches: List[List[IoTOperationsResource]] = [] last_segments = maxsize current_batch = [] for resource in resources: current_segments = resource.segments if current_segments < last_segments and current_batch: resource_batches.append(current_batch) current_batch = [] current_batch.append(resource) last_segments = current_segments if current_batch: resource_batches.append(current_batch) batched_work["resources"] = resource_batches if resource_sync_rules: batched_work["resource sync rules"] = [resource_sync_rules] if custom_locations: batched_work["custom locations"] = [custom_locations] if extensions: batched_work["extensions"] = [extensions] return batched_work def _delete_batch(self, resource_batch: List[IoTOperationsResource]) -> Tuple["LROPoller"]: return wait_for_terminal_states( *[ self.resource_client.resources.begin_delete_by_id( resource_id=resource.resource_id, api_version=resource.api_version ) for resource in resource_batch ] )