azext_edge/edge/providers/base.py (321 lines of code) (raw):

# coding=utf-8 # ---------------------------------------------------------------------------------------------- # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. See License file in the project root for license information. # ---------------------------------------------------------------------------------------------- import socket from contextlib import contextmanager from typing import Dict, Iterator, List, Optional, Union from urllib.request import urlopen from azure.cli.core.azclierror import ResourceNotFoundError from knack.log import get_logger from kubernetes import client, config from kubernetes.client.exceptions import ApiException from kubernetes.client.models import ( V1APIResourceList, V1ObjectMeta, V1Pod, V1PodList, V1Service, ) from ..common import K8sSecretType from ..util import is_enabled_str DEFAULT_NAMESPACE: str = "azure-iot-operations" logger = get_logger(__name__) generic = client.ApiClient() def load_config_context(context_name: Optional[str] = None): """ Load default config using a specific context or 'current-context' if not specified. """ from ..util import set_log_level # This will ensure --debug works with http(s) k8s interactions set_log_level("urllib3.connectionpool") config.load_kube_config(context=context_name) _, current_config = config.list_kube_config_contexts() global DEFAULT_NAMESPACE DEFAULT_NAMESPACE = current_config.get("namespace") or "azure-iot-operations" _namespaced_service_cache: dict = {} def get_namespaced_service(name: str, namespace: str, as_dict: bool = False) -> Union[V1Service, dict, None]: def retrieve_namespaced_service_from_cache(key: tuple): result = _namespaced_service_cache[key] if as_dict: return generic.sanitize_for_serialization(obj=result) return result target_service_key = (name, namespace) if target_service_key in _namespaced_service_cache: return retrieve_namespaced_service_from_cache(target_service_key) try: v1 = client.CoreV1Api() v1_service: V1Service = v1.read_namespaced_service(name=name, namespace=namespace) _namespaced_service_cache[target_service_key] = v1_service except ApiException as ae: logger.debug(str(ae)) else: return retrieve_namespaced_service_from_cache(target_service_key) _namespaced_pods_cache: dict = {} def get_namespaced_pods_by_prefix( prefix: str, namespace: str, label_selector: Optional[str] = None, as_dict: bool = False, ) -> Union[List[V1Pod], List[dict], None]: def filter_pods_by_prefix(pods: List[V1Pod], prefix: str) -> List[V1Pod]: return [pod for pod in pods if pod.metadata.name.startswith(prefix)] def filter_pods_from_cache(key: tuple): cached_pods = _namespaced_pods_cache[key] result = filter_pods_by_prefix(pods=cached_pods, prefix=prefix) if as_dict: return generic.sanitize_for_serialization(obj=result) return result target_pods_key = (namespace, label_selector) if target_pods_key in _namespaced_pods_cache: return filter_pods_from_cache(target_pods_key) try: v1 = client.CoreV1Api() if namespace: pods_list: V1PodList = v1.list_namespaced_pod(namespace, label_selector=label_selector) else: pods_list: V1PodList = v1.list_pod_for_all_namespaces(label_selector=label_selector) _namespaced_pods_cache[target_pods_key] = pods_list.items except ApiException as ae: logger.debug(str(ae)) else: return filter_pods_from_cache(target_pods_key) _custom_object_cache: dict = {} def get_custom_objects( group: str, version: str, plural: str, namespace: Optional[str] = None, use_cache: bool = True ) -> Union[dict, None]: target_resource_key = (group, version, plural, namespace) if use_cache: if target_resource_key in _custom_object_cache: return _custom_object_cache[target_resource_key] try: custom_client = client.CustomObjectsApi() kwargs = {"group": group, "version": version, "plural": plural} if namespace: kwargs["namespace"] = namespace f = custom_client.list_namespaced_custom_object else: f = custom_client.list_cluster_custom_object _custom_object_cache[target_resource_key] = f(**kwargs) except ApiException as ae: logger.debug(str(ae)) else: return _custom_object_cache[target_resource_key] _cluster_resource_api_cache: dict = {} def get_cluster_custom_api(group: str, version: str, raise_on_404: bool = False) -> Union[V1APIResourceList, None]: target_resource_api_key = (group, version) if target_resource_api_key in _cluster_resource_api_cache: return _cluster_resource_api_cache[target_resource_api_key] try: custom_client = client.CustomObjectsApi() _cluster_resource_api_cache[target_resource_api_key] = custom_client.get_api_resources( group=group, version=version ) except ApiException as ae: logger.debug(msg=str(ae)) if int(ae.status) == 404 and raise_on_404: raise ResourceNotFoundError(f"{group}/{version} resource API is not detected on the cluster.") else: return _cluster_resource_api_cache[target_resource_api_key] class PodRequest: def __init__(self, namespace: str, pod_name: str, pod_port: str): self.namespace = namespace self.pod_name = pod_name self.pod_port = pod_port def get(self, resource_path: str): with urlopen(self._build_url(resource_path=resource_path)) as response: return response.read().decode("utf-8") def _build_url(self, resource_path: str): return f"http://{self.pod_name}.{self.namespace}.kubernetes:{self.pod_port}{resource_path}" @contextmanager def portforward_http(namespace: str, pod_name: str, pod_port: str, **kwargs) -> Iterator[PodRequest]: from kubernetes.stream import portforward api = client.CoreV1Api() def kubernetes_create_connection(address, *args, **kwargs): dns_name = address[0] if isinstance(dns_name, bytes): dns_name = dns_name.decode() dns_name = dns_name.split(".") if len(dns_name) != 3 or dns_name[2] != "kubernetes": return socket_create_connection(address, *args, **kwargs) pf = portforward( api.connect_get_namespaced_pod_portforward, dns_name[0], dns_name[1], ports=str(address[1]), ) return pf.socket(address[1]) socket_create_connection = socket.create_connection try: socket.create_connection = kubernetes_create_connection pod_request = PodRequest(namespace=namespace, pod_name=pod_name, pod_port=pod_port) yield pod_request finally: socket.create_connection = socket_create_connection @contextmanager def portforward_socket(namespace: str, pod_name: str, pod_port: str) -> Iterator[socket.socket]: from kubernetes.stream import portforward from .edge_api import MqResourceKinds, MQ_ACTIVE_API api = client.CoreV1Api() pf = portforward( api.connect_get_namespaced_pod_portforward, pod_name, namespace, ports=str(pod_port), ) target_socket: socket.socket = pf.socket(int(pod_port))._socket internal_tls = False namespaced_brokers: dict = MQ_ACTIVE_API.get_resources(MqResourceKinds.BROKER, namespace=namespace) broker = None if namespaced_brokers and namespaced_brokers["items"]: broker: Dict[str, Union[str, dict]] = namespaced_brokers["items"][0] if broker and broker["spec"]: encrypt_internal_traffic = broker["spec"].get("advanced", {}).get("encryptInternalTraffic") if is_enabled_str(encrypt_internal_traffic): internal_tls = True if internal_tls: import ssl context = ssl.create_default_context() context.minimum_version = ssl.TLSVersion.TLSv1_2 context.check_hostname = False context.verify_mode = ssl.CERT_NONE target_socket = context.wrap_socket(sock=target_socket) target_socket.settimeout(10.0) yield target_socket target_socket.shutdown(socket.SHUT_RDWR) target_socket.close() def create_namespaced_secret( secret_name: str, namespace: str, data: Dict[str, str], labels: Optional[Dict[str, str]] = None, secret_type: K8sSecretType = K8sSecretType.opaque, delete_first: bool = False, ): if delete_first: delete_namespaced_secret(namespace=namespace, secret_name=secret_name) data_kw = {} if secret_type == K8sSecretType.opaque: data_kw = {"string_data": data} elif secret_type == K8sSecretType.tls: data_kw = {"data": data} else: raise RuntimeError(f"{secret_type} not supported.") v1_secret = client.V1Secret( metadata=V1ObjectMeta(name=secret_name, labels=labels), type=secret_type.value, **data_kw ) try: v1_api = client.CoreV1Api() return v1_api.create_namespaced_secret(namespace=namespace, body=v1_secret) except ApiException as ae: error_msg = str(ae) logger.debug(msg=error_msg) raise RuntimeError(error_msg) def get_namespaced_secret(namespace: str, secret_name: str) -> dict: result = None try: v1_api = client.CoreV1Api() result = v1_api.read_namespaced_secret(namespace=namespace, name=secret_name) except ApiException as ae: error_msg = str(ae) logger.debug(msg=error_msg) else: if result: return generic.sanitize_for_serialization(obj=result) def delete_namespaced_secret(namespace: str, secret_name: str, raise_on_404: bool = False): try: v1_api = client.CoreV1Api() v1_api.delete_namespaced_secret(namespace=namespace, name=secret_name) except ApiException as ae: error_msg = str(ae) logger.debug(msg=error_msg) if int(ae.status) == 404 and not raise_on_404: return raise RuntimeError(error_msg) def create_cluster_namespace(namespace: str) -> dict: result = None try: v1_api = client.CoreV1Api() result = v1_api.create_namespace(body=client.V1Namespace(metadata=V1ObjectMeta(name=namespace))) except ApiException as ae: error_msg = str(ae) logger.debug(msg=error_msg) raise RuntimeError(error_msg) else: if result: return generic.sanitize_for_serialization(obj=result) def get_cluster_namespace(namespace: str) -> dict: result = None try: v1_api = client.CoreV1Api() result = v1_api.read_namespace(name=namespace) except ApiException as ae: error_msg = str(ae) logger.debug(msg=error_msg) else: if result: return generic.sanitize_for_serialization(obj=result) def create_namespaced_custom_objects( group: str, version: str, namespace: str, plural: str, yaml_objects: List[dict], delete_first: bool = False ) -> List[dict]: if not yaml_objects: yaml_objects = [] result = [] try: custom_client = client.CustomObjectsApi() for data in yaml_objects: if delete_first: delete_namespaced_custom_object( group=group, version=version, namespace=namespace, plural=plural, name=data["metadata"]["name"] ) result.append( custom_client.create_namespaced_custom_object( group=group, version=version, namespace=namespace, plural=plural, body=data ) ) except ApiException as ae: error_msg = str(ae) logger.debug(msg=error_msg) raise RuntimeError(error_msg) else: return result def delete_namespaced_custom_object( group: str, version: str, namespace: str, plural: str, name: str, raise_on_404: bool = False ): try: custom_client = client.CustomObjectsApi() custom_client.delete_namespaced_custom_object( group=group, version=version, namespace=namespace, plural=plural, name=name, ) except ApiException as ae: error_msg = str(ae) logger.debug(msg=error_msg) if raise_on_404: if int(ae.status) == 404 and not raise_on_404: return raise RuntimeError(error_msg) def create_namespaced_configmap( namespace: str, cm_name: str, data: Dict[str, str], delete_first: bool = False ) -> dict: result = None try: v1_api = client.CoreV1Api() if delete_first: delete_namespaced_configmap(namespace=namespace, cm_name=cm_name) result = v1_api.create_namespaced_config_map( namespace=namespace, body=client.V1ConfigMap(data=data, metadata=V1ObjectMeta(name=cm_name)) ) except ApiException as ae: error_msg = str(ae) logger.debug(msg=error_msg) raise RuntimeError(error_msg) else: if result: return generic.sanitize_for_serialization(obj=result) def delete_namespaced_configmap(namespace: str, cm_name: str, raise_on_404: bool = False) -> dict: try: v1_api = client.CoreV1Api() v1_api.delete_namespaced_config_map(namespace=namespace, name=cm_name) except ApiException as ae: error_msg = str(ae) logger.debug(msg=error_msg) if raise_on_404: if int(ae.status) == 404 and not raise_on_404: return raise RuntimeError(error_msg)