azext_edge/edge/providers/edge_api/base.py (67 lines of code) (raw):
# coding=utf-8
# ----------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License file in the project root for license information.
# ----------------------------------------------------------------------------------------------
from enum import Enum
from typing import Dict, FrozenSet, Iterable, List, Union, Optional
from kubernetes.client.models import V1APIResourceList
from ...providers.base import get_cluster_custom_api, get_custom_objects
from azure.cli.core.azclierror import ResourceNotFoundError
class EdgeResourceApi:
def __init__(self, group: str, version: str, moniker: str, label: Optional[str] = None):
self.group: str = group
self.version: str = version
self.moniker: str = moniker
self.label: Optional[str] = label
self._api: V1APIResourceList = None
self._kinds: Dict[str, str] = None
def as_str(self) -> str:
return f"{self.group}/{self.version}"
def is_deployed(self, raise_on_404: bool = False) -> bool:
return self._get_api(raise_on_404) is not None
def _get_api(self, raise_on_404: bool = False):
self._api = get_cluster_custom_api(group=self.group, version=self.version, raise_on_404=raise_on_404)
return self._api
@property
def kinds(self) -> Union[FrozenSet[str], None]:
if self._kinds:
return frozenset(self._kinds.keys())
if not self._api:
self._get_api()
if self._api:
self._kinds = {}
for resource in self._api.resources:
rn: str = resource.name
if "/" in rn:
rn = rn[: rn.index("/")]
self._kinds[resource.kind.lower()] = rn
return frozenset(self._kinds.keys())
def get_resources(self, kind: Union[str, Enum], namespace: Optional[str] = None):
if isinstance(kind, Enum):
kind = kind.value
if self.kinds and kind in self.kinds:
return get_custom_objects(
group=self.group, version=self.version, plural=self._kinds[kind], namespace=namespace
)
class EdgeApiManager:
def __init__(self, resource_apis: Iterable[EdgeResourceApi]):
self.resource_apis: FrozenSet[EdgeResourceApi] = frozenset(resource_apis)
self.api_group_map: Dict[str, List[str]] = {}
for api in self.resource_apis:
if api.group not in self.api_group_map:
self.api_group_map[api.group] = []
self.api_group_map[api.group].append(api.version)
def as_str(self):
apis_str = ""
sep = "\n" if len(self.api_group_map) > 1 else ""
for group in self.api_group_map:
# TODO: Fix the separator logic
apis_str += f"{group}/[{','.join(self.api_group_map[group])}]{sep}"
return apis_str
def get_deployed(self, raise_on_404: bool = False) -> Iterable[EdgeResourceApi]:
result = []
for api in self.resource_apis:
if api.is_deployed():
result.append(api)
if not result and raise_on_404:
error_msg = f"The following APIs are not detected on the cluster:\n{self.as_str()}"
raise ResourceNotFoundError(error_msg)
return result
@property
def apis(self) -> FrozenSet[EdgeResourceApi]:
return self.resource_apis