in azext_edge/edge/providers/orchestration/deletion.py [0:0]
def _process(self, force: bool = False):
todo_extensions = []
if self.include_dependencies:
todo_extensions.extend(self.resource_map.extensions)
else:
# instance delete should delete AIO extension too
# TODO: @c-ryan-k hacky
aio_ext_obj = self.resource_map.connected_cluster.get_extensions_by_type(EXTENSION_TYPE_OPS).get(
EXTENSION_TYPE_OPS, {}
)
if aio_ext_obj:
aio_ext_id: str = aio_ext_obj.get("id", "")
aio_ext = next(
(ext for ext in self.resource_map.extensions if ext.resource_id.lower() == aio_ext_id.lower()), None
)
if aio_ext:
todo_extensions.append(aio_ext)
todo_custom_locations = self.resource_map.custom_locations
todo_resource_sync_rules = []
todo_resources = []
for cl in todo_custom_locations:
todo_resource_sync_rules.extend(self.resource_map.get_resource_sync_rules(cl.resource_id))
todo_resources.extend(self.resource_map.get_resources(cl.resource_id))
batched_work = self._batch_resources(
resources=todo_resources,
resource_sync_rules=todo_resource_sync_rules,
custom_locations=todo_custom_locations,
extensions=todo_extensions,
)
if not batched_work:
logger.warning("Nothing to delete :)")
return
if not force:
if not self.resource_map.connected_cluster.connected:
logger.warning(
"Deletion cancelled. The cluster is not connected to Azure. "
"Use --force to continue anyway, which may lead to errors."
)
return
try:
for batches_key in batched_work:
self._render_display(f"[red]Deleting {batches_key}...")
for batch in batched_work[batches_key]:
# TODO: @digimaun - Show summary as result
lros = self._delete_batch(batch)
[lro.result() for lro in lros]
finally:
self._stop_display()