azext_edge/edge/providers/orchestration/resources/clusters.py (54 lines of code) (raw):

# coding=utf-8 # ---------------------------------------------------------------------------------------------- # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. See License file in the project root for license information. # ---------------------------------------------------------------------------------------------- from typing import TYPE_CHECKING, Iterable, Optional from knack.log import get_logger from ....util.az_client import get_clusterconfig_mgmt_client, get_connectedk8s_mgmt_client, wait_for_terminal_state from ....util.queryable import Queryable logger = get_logger(__name__) if TYPE_CHECKING: from ....vendor.clients.clusterconfigmgmt.operations import ExtensionsOperations from ....vendor.clients.connectedclustermgmt.operations import ConnectedClusterOperations class ConnectedClusters(Queryable): def __init__(self, cmd, subscription_id: Optional[str] = None): super().__init__(cmd=cmd, subscriptions=[subscription_id] if subscription_id else None) self.connectedk8s_mgmt_client = get_connectedk8s_mgmt_client( subscription_id=self.subscriptions[0], ) self.ops: "ConnectedClusterOperations" = self.connectedk8s_mgmt_client.connected_cluster self.extensions: ClusterExtensions = ClusterExtensions(cmd) def show(self, resource_group_name: str, cluster_name: str) -> dict: return self.ops.get( resource_group_name=resource_group_name, cluster_name=cluster_name, ) class ClusterExtensions(Queryable): def __init__(self, cmd): super().__init__(cmd=cmd) self.clusterconfig_mgmt_client = get_clusterconfig_mgmt_client( subscription_id=self.default_subscription_id, ) self.ops: "ExtensionsOperations" = self.clusterconfig_mgmt_client.extensions def list(self, resource_group_name: str, cluster_name: str) -> Iterable[dict]: return self.ops.list( resource_group_name=resource_group_name, cluster_rp="Microsoft.Kubernetes", cluster_resource_name="connectedClusters", cluster_name=cluster_name, ) def update_cluster_extension( self, resource_group_name: str, cluster_name: str, extension_name: str, update_payload: dict, **operation_kwargs, ) -> Iterable[dict]: return wait_for_terminal_state( self.ops.begin_update( resource_group_name=resource_group_name, cluster_rp="Microsoft.Kubernetes", cluster_resource_name="connectedClusters", cluster_name=cluster_name, extension_name=extension_name, patch_extension=update_payload, **operation_kwargs, ) )