def _process()

in azext_edge/edge/providers/orchestration/deletion.py [0:0]


    def _process(self, force: bool = False):
        todo_extensions = []
        if self.include_dependencies:
            todo_extensions.extend(self.resource_map.extensions)
        else:
            # instance delete should delete AIO extension too
            # TODO: @c-ryan-k hacky
            aio_ext_obj = self.resource_map.connected_cluster.get_extensions_by_type(EXTENSION_TYPE_OPS).get(
                EXTENSION_TYPE_OPS, {}
            )
            if aio_ext_obj:
                aio_ext_id: str = aio_ext_obj.get("id", "")
                aio_ext = next(
                    (ext for ext in self.resource_map.extensions if ext.resource_id.lower() == aio_ext_id.lower()), None
                )
                if aio_ext:
                    todo_extensions.append(aio_ext)
        todo_custom_locations = self.resource_map.custom_locations
        todo_resource_sync_rules = []
        todo_resources = []

        for cl in todo_custom_locations:
            todo_resource_sync_rules.extend(self.resource_map.get_resource_sync_rules(cl.resource_id))
            todo_resources.extend(self.resource_map.get_resources(cl.resource_id))

        batched_work = self._batch_resources(
            resources=todo_resources,
            resource_sync_rules=todo_resource_sync_rules,
            custom_locations=todo_custom_locations,
            extensions=todo_extensions,
        )
        if not batched_work:
            logger.warning("Nothing to delete :)")
            return

        if not force:
            if not self.resource_map.connected_cluster.connected:
                logger.warning(
                    "Deletion cancelled. The cluster is not connected to Azure. "
                    "Use --force to continue anyway, which may lead to errors."
                )
                return

        try:
            for batches_key in batched_work:
                self._render_display(f"[red]Deleting {batches_key}...")
                for batch in batched_work[batches_key]:
                    # TODO: @digimaun - Show summary as result
                    lros = self._delete_batch(batch)
                    [lro.result() for lro in lros]
        finally:
            self._stop_display()