azext_edge/edge/providers/orchestration/connected_cluster.py (86 lines of code) (raw):

# coding=utf-8 # ---------------------------------------------------------------------------------------------- # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. See License file in the project root for license information. # ---------------------------------------------------------------------------------------------- from typing import List, Optional, Union, Dict from ...util.resource_graph import ResourceGraph QUERIES = { "get_custom_location_for_namespace": """ resources | where type =~ 'microsoft.extendedlocation/customlocations' | where properties.hostResourceId =~ '{resource_id}' | where properties.namespace =~ '{namespace}' | project id, name, location, properties, apiVersion """, "get_aio_extensions": """ kubernetesconfigurationresources | where type =~ 'microsoft.kubernetesconfiguration/extensions' | where id startswith '{resource_id}' | where properties.ExtensionType startswith 'microsoft.iotoperations' or properties.ExtensionType =~ 'microsoft.deviceregistry.assets' or properties.ExtensionType =~ 'microsoft.azure.secretstore' or properties.ExtensionType =~ 'microsoft.arc.containerstorage' | project id, name, apiVersion """, "get_aio_custom_locations": """ resources | where type =~ 'microsoft.extendedlocation/customlocations' | where properties.hostResourceId =~ '{resource_id}' | extend clusterExtensionIds=properties.clusterExtensionIds | mv-expand clusterExtensionIds | extend clusterExtensionId = tolower(clusterExtensionIds) | join kind=inner( extendedlocationresources | where type =~ 'microsoft.extendedlocation/customLocations/enabledResourcetypes' | project clusterExtensionId = tolower(properties.clusterExtensionId), extensionType = tolower(properties.extensionType) | where extensionType startswith 'microsoft.iotoperations' or extensionType startswith 'microsoft.deviceregistry' ) on clusterExtensionId | distinct id, name, apiVersion """, "get_aio_resources": """ resources | where extendedLocation.name =~ '{custom_location_id}' | where type startswith 'microsoft.iotoperations' or type startswith 'microsoft.deviceregistry' or type startswith 'microsoft.secretsync' | project id, name, apiVersion, type """, "get_resource_sync_rules": """ resources | where type =~ "microsoft.extendedlocation/customlocations/resourcesyncrules" | where id startswith '{custom_location_id}' | project id, name, apiVersion """, } class ConnectedCluster: def __init__(self, cmd, subscription_id: str, cluster_name: str, resource_group_name: str): self.subscription_id = subscription_id self.cluster_name = cluster_name self.resource_group_name = resource_group_name self.resource_graph = ResourceGraph(cmd=cmd, subscriptions=[self.subscription_id]) self._resource_state = None # TODO - @digimaun - temp necessary due to circular import from ..orchestration.resources import ConnectedClusters self.clusters = ConnectedClusters(cmd, subscription_id) @property def resource_id(self) -> str: return self.resource["id"] @property def resource(self) -> dict: if not self._resource_state: self._resource_state = self.clusters.show( resource_group_name=self.resource_group_name, cluster_name=self.cluster_name ) return self._resource_state @property def location(self) -> str: return self.resource["location"] @property def connected(self) -> bool: properties = self.resource.get("properties", {}) return "connectivityStatus" in properties and properties["connectivityStatus"].lower() == "connected" @property def extensions(self) -> List[dict]: return list( self.clusters.extensions.list(resource_group_name=self.resource_group_name, cluster_name=self.cluster_name) ) def get_extensions_by_type(self, *type_names: str) -> Optional[Dict[str, dict]]: extensions = self.extensions desired_extension_map = {name.lower(): None for name in type_names} for extension in extensions: extension_type = extension["properties"].get("extensionType", "").lower() if extension_type in desired_extension_map: desired_extension_map[extension_type] = extension return desired_extension_map def get_custom_location_for_namespace(self, namespace: str) -> Optional[dict]: query = QUERIES["get_custom_location_for_namespace"].format(resource_id=self.resource_id, namespace=namespace) result = self.resource_graph.query_resources(query=query) return self._process_query_result(result, first=True) def get_aio_extensions(self) -> Optional[List[dict]]: query = QUERIES["get_aio_extensions"].format(resource_id=self.resource_id) # TODO - @digimaun microsoft.azurekeyvaultsecretsprovider optionality result = self.resource_graph.query_resources(query=query) return self._process_query_result(result) def get_aio_custom_locations(self) -> Optional[List[dict]]: query = QUERIES["get_aio_custom_locations"].format(resource_id=self.resource_id) result = self.resource_graph.query_resources(query=query) return self._process_query_result(result) def get_aio_resources(self, custom_location_id: str) -> Optional[List[dict]]: query = QUERIES["get_aio_resources"].format(custom_location_id=custom_location_id) result = self.resource_graph.query_resources(query=query) return self._process_query_result(result) def get_resource_sync_rules(self, custom_location_id: str) -> Optional[List[dict]]: query = QUERIES["get_resource_sync_rules"].format(custom_location_id=custom_location_id) result = self.resource_graph.query_resources(query=query) return self._process_query_result(result) def update_aio_extension(self, extension_name: str, properties: dict) -> dict: update_payload = {"properties": properties} return self.clusters.extensions.update_cluster_extension( resource_group_name=self.resource_group_name, cluster_name=self.cluster_name, extension_name=extension_name, update_payload=update_payload, ) def _process_query_result(self, result: dict, first: bool = False) -> Optional[Union[dict, List[dict]]]: if "data" in result and result["data"]: if first: return result["data"][0] return result["data"]