azext_edge/edge/providers/orchestration/resource_map.py (134 lines of code) (raw):
# coding=utf-8
# ----------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License file in the project root for license information.
# ----------------------------------------------------------------------------------------------
from typing import Dict, List, Optional
from knack.log import get_logger
from rich.tree import Tree
from .common import EXTENSION_TYPE_OPS
from .connected_cluster import ConnectedCluster
logger = get_logger(__name__)
class IoTOperationsResource:
def __init__(self, resource_id: str, display_name: str, api_version: str):
self.resource_id = resource_id
self.display_name = display_name
self.api_version = api_version
self._segments: Optional[int] = None
@property
def segments(self) -> int:
if not self._segments:
self._segments = len(self.resource_id.split("/"))
return self._segments
class CustomLocationsContainer:
def __init__(self, resource: IoTOperationsResource):
self.resource: IoTOperationsResource = resource
self.resource_sync_rules: List[IoTOperationsResource] = []
self.related_resources: List[IoTOperationsResource] = []
class ClusterContainer:
def __init__(self):
self.extensions: List[IoTOperationsResource] = []
self.custom_locations: Dict[str, CustomLocationsContainer] = {}
class IoTOperationsResourceMap:
def __init__(
self,
cmd,
cluster_name: str,
resource_group_name: str,
subscription_id: Optional[str] = None,
defer_refresh: bool = False,
):
from azure.cli.core.commands.client_factory import get_subscription_id
self.subscription_id = subscription_id or get_subscription_id(cli_ctx=cmd.cli_ctx)
self.connected_cluster = ConnectedCluster(
cmd=cmd,
subscription_id=self.subscription_id,
cluster_name=cluster_name,
resource_group_name=resource_group_name,
)
self._cluster_container = ClusterContainer()
if not defer_refresh:
self.refresh_resource_state()
@property
def extensions(self) -> List[IoTOperationsResource]:
return self._cluster_container.extensions
@property
def custom_locations(self) -> List[IoTOperationsResource]:
if self._cluster_container.custom_locations:
cl_records = list(self._cluster_container.custom_locations.values())
return [record.resource for record in cl_records]
return []
def get_resource_sync_rules(self, custom_location_id: str) -> List[IoTOperationsResource]:
if custom_location_id in self._cluster_container.custom_locations:
return self._cluster_container.custom_locations[custom_location_id].resource_sync_rules
return []
def get_resources(self, custom_location_id: str) -> List[IoTOperationsResource]:
if custom_location_id in self._cluster_container.custom_locations:
related_resources = self._cluster_container.custom_locations[custom_location_id].related_resources
return sorted(related_resources, key=lambda r: (r.segments, r.display_name.lower()), reverse=True)
return []
def refresh_resource_state(self):
refreshed_cluster_container = ClusterContainer()
custom_locations = self.connected_cluster.get_aio_custom_locations()
if custom_locations:
for cl in custom_locations:
cl_container = CustomLocationsContainer(
resource=IoTOperationsResource(
resource_id=cl["id"], display_name=cl["name"], api_version=cl["apiVersion"]
)
)
cl_sync_rules = self.connected_cluster.get_resource_sync_rules(cl["id"])
if cl_sync_rules:
for sync_rule in cl_sync_rules:
cl_container.resource_sync_rules.append(
IoTOperationsResource(
resource_id=sync_rule["id"],
display_name=sync_rule["name"],
api_version=sync_rule["apiVersion"],
)
)
cl_resources = self.connected_cluster.get_aio_resources(cl["id"])
if cl_resources:
for resource in cl_resources:
cl_container.related_resources.append(
IoTOperationsResource(
resource_id=resource["id"],
display_name=resource["name"],
api_version=resource["apiVersion"],
)
)
refreshed_cluster_container.custom_locations[cl["id"]] = cl_container
extensions = self.connected_cluster.get_aio_extensions()
if extensions:
for ext in extensions:
refreshed_cluster_container.extensions.append(
IoTOperationsResource(
resource_id=ext["id"],
display_name=ext["name"],
api_version=ext["apiVersion"],
)
)
self._cluster_container = refreshed_cluster_container
def build_tree(self, include_dependencies: bool = True, category_color: str = "cyan") -> Tree:
tree = Tree(f"[green]{self.connected_cluster.cluster_name}")
extensions_node = tree.add(label=f"[{category_color}]extensions")
if not include_dependencies:
# only show aio extension
# TODO: @c-ryan-k hacky
aio_ext_obj = self.connected_cluster.get_extensions_by_type(EXTENSION_TYPE_OPS).get(
EXTENSION_TYPE_OPS, {}
)
if aio_ext_obj:
aio_ext_id: str = aio_ext_obj.get("id", "")
aio_ext = next((ext for ext in self.extensions if ext.resource_id.lower() == aio_ext_id.lower()), None)
if aio_ext:
extensions_node.add(aio_ext.display_name)
else:
[extensions_node.add(ext.display_name) for ext in self.extensions]
custom_locations = self.custom_locations
if custom_locations:
root_cl_node = tree.add(label=f"[{category_color}]customLocations")
for cl in custom_locations:
cl_node = root_cl_node.add(cl.display_name)
resource_sync_rules = self.get_resource_sync_rules(cl.resource_id)
rsr_node = cl_node.add(f"[{category_color}]resourceSyncRules")
if resource_sync_rules:
[rsr_node.add(rsr.display_name) for rsr in resource_sync_rules]
resource_node = cl_node.add(f"[{category_color}]resources")
cl_resources = self.get_resources(cl.resource_id)
if cl_resources:
[resource_node.add(resource.display_name) for resource in cl_resources]
return tree