azext_edge/edge/providers/orchestration/resources/brokers.py (468 lines of code) (raw):

# coding=utf-8 # ---------------------------------------------------------------------------------------------- # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. See License file in the project root for license information. # ---------------------------------------------------------------------------------------------- from collections import defaultdict from typing import TYPE_CHECKING, Iterable, List, Optional from azure.cli.core.azclierror import InvalidArgumentValueError from azure.core.exceptions import ResourceNotFoundError from knack.log import get_logger from rich.console import Console from ....util.az_client import wait_for_terminal_state from ....util.common import parse_kvp_nargs, should_continue_prompt, parse_dot_notation, upsert_by_discriminator from ....util.queryable import Queryable from .instances import Instances from .reskit import GetInstanceExtLoc, get_file_config logger = get_logger(__name__) if TYPE_CHECKING: from ....vendor.clients.iotopsmgmt.operations import ( BrokerAuthenticationOperations, BrokerAuthorizationOperations, BrokerListenerOperations, BrokerOperations, ) console = Console() class Brokers(Queryable): def __init__(self, cmd): super().__init__(cmd=cmd) self.instances = Instances(cmd=cmd) self.iotops_mgmt_client = self.instances.iotops_mgmt_client self.ops: "BrokerOperations" = self.iotops_mgmt_client.broker self.listeners = BrokerListeners(self.iotops_mgmt_client.broker_listener, self.instances.get_ext_loc) self.authns = BrokerAuthn(self.iotops_mgmt_client.broker_authentication, self.instances.get_ext_loc) self.authzs = BrokerAuthz(self.iotops_mgmt_client.broker_authorization, self.instances.get_ext_loc) def show(self, name: str, instance_name: str, resource_group_name: str) -> dict: return self.ops.get(resource_group_name=resource_group_name, instance_name=instance_name, broker_name=name) def list(self, instance_name: str, resource_group_name: str) -> Iterable[dict]: return self.ops.list_by_resource_group(resource_group_name=resource_group_name, instance_name=instance_name) def delete( self, name: str, instance_name: str, resource_group_name: str, confirm_yes: Optional[bool] = None, **kwargs ): should_bail = not should_continue_prompt(confirm_yes=confirm_yes) if should_bail: return with console.status("Working..."): poller = self.ops.begin_delete( resource_group_name=resource_group_name, instance_name=instance_name, broker_name=name, ) return wait_for_terminal_state(poller, **kwargs) class BrokerListeners: def __init__(self, ops: "BrokerListenerOperations", get_ext_loc: GetInstanceExtLoc): self.ops = ops self.get_ext_loc = get_ext_loc def apply( self, name: str, broker_name: str, instance_name: str, resource_group_name: str, config_file: Optional[str] = None, **kwargs, ) -> dict: listener_config = get_file_config(config_file) resource = {} resource["extendedLocation"] = self.get_ext_loc(name=instance_name, resource_group_name=resource_group_name) resource["properties"] = listener_config with console.status("Working..."): poller = self.ops.begin_create_or_update( resource_group_name=resource_group_name, instance_name=instance_name, broker_name=broker_name, listener_name=name, resource=resource, ) return wait_for_terminal_state(poller, **kwargs) def _build_tls_config( self, tls_auto_issuer_ref: Optional[str] = None, tls_auto_duration: Optional[str] = None, tls_auto_key_algo: Optional[str] = None, tls_auto_key_rotation_policy: Optional[str] = None, tls_auto_renew_before: Optional[str] = None, tls_auto_san_dns: Optional[List[str]] = None, tls_auto_san_ip: Optional[List[str]] = None, tls_auto_secret_name: Optional[str] = None, tls_manual_secret_ref: Optional[str] = None, ): config = {} cm_config = defaultdict(dict) man_config = defaultdict(dict) if tls_auto_issuer_ref: tls_auto_issuer_ref = parse_kvp_nargs(tls_auto_issuer_ref) issuer_config = {} for key in ["group", "kind", "name"]: if key in tls_auto_issuer_ref: issuer_config[key] = tls_auto_issuer_ref[key] if "group" not in issuer_config: issuer_config["group"] = "cert-manager.io" cm_config["issuerRef"] = issuer_config if tls_auto_duration: cm_config["duration"] = tls_auto_duration if tls_auto_key_algo: cm_config["privateKey"]["algorithm"] = tls_auto_key_algo if tls_auto_key_rotation_policy: cm_config["privateKey"]["rotationPolicy"] = tls_auto_key_rotation_policy if tls_auto_renew_before: cm_config["renewBefore"] = tls_auto_renew_before if tls_auto_san_dns: cm_config["san"]["dns"] = tls_auto_san_dns if tls_auto_san_ip: cm_config["san"]["ip"] = tls_auto_san_ip if tls_auto_secret_name: cm_config["secretName"] = tls_auto_secret_name if tls_manual_secret_ref: man_config["secretRef"] = tls_manual_secret_ref if all([cm_config, man_config]): raise InvalidArgumentValueError("TLS may be setup with an automatic or manual config, not both.") if cm_config: config["tls"] = {"mode": "Automatic", "certManagerCertificateSpec": dict(cm_config)} if man_config: config["tls"] = {"mode": "Manual", "manual": dict(man_config)} return config def add_port( self, port: int, listener_name: str, broker_name: str, instance_name: str, resource_group_name: str, service_name: Optional[str] = None, service_type: Optional[str] = "LoadBalancer", authn_ref: Optional[str] = None, authz_ref: Optional[str] = None, protocol: Optional[str] = None, nodeport: Optional[int] = None, tls_auto_issuer_ref: Optional[str] = None, tls_auto_duration: Optional[str] = None, tls_auto_key_algo: Optional[str] = None, tls_auto_key_rotation_policy: Optional[str] = None, tls_auto_renew_before: Optional[str] = None, tls_auto_san_dns: Optional[List[str]] = None, tls_auto_san_ip: Optional[List[str]] = None, tls_auto_secret_name: Optional[str] = None, tls_manual_secret_ref: Optional[str] = None, show_config: Optional[bool] = None, **kwargs, ) -> dict: port_config = {"port": port} if authn_ref: port_config["authenticationRef"] = authn_ref if authz_ref: port_config["authorizationRef"] = authz_ref if protocol: port_config["protocol"] = protocol if nodeport: port_config["nodePort"] = nodeport tls_config = self._build_tls_config( tls_auto_issuer_ref=tls_auto_issuer_ref, tls_auto_duration=tls_auto_duration, tls_auto_key_algo=tls_auto_key_algo, tls_auto_key_rotation_policy=tls_auto_key_rotation_policy, tls_auto_renew_before=tls_auto_renew_before, tls_auto_san_dns=tls_auto_san_dns, tls_auto_san_ip=tls_auto_san_ip, tls_auto_secret_name=tls_auto_secret_name, tls_manual_secret_ref=tls_manual_secret_ref, ) port_config.update(tls_config) listener = {} try: listener = self.show( name=listener_name, broker_name=broker_name, instance_name=instance_name, resource_group_name=resource_group_name, ) except ResourceNotFoundError: pass if not listener: listener["name"] = listener_name listener["extendedLocation"] = self.get_ext_loc( name=instance_name, resource_group_name=resource_group_name ) listener["properties"] = {"serviceType": str(service_type)} if service_name: listener["properties"]["serviceName"] = service_name port_configs: List[dict] = listener["properties"].get("ports", []) listener["properties"]["ports"] = upsert_by_discriminator( initial=port_configs, disc_key="port", config=port_config ) if show_config: return listener["properties"] with console.status("Working..."): poller = self.ops.begin_create_or_update( resource_group_name=resource_group_name, instance_name=instance_name, broker_name=broker_name, listener_name=listener_name, resource=listener, ) return wait_for_terminal_state(poller, **kwargs) def remove_port( self, port: int, listener_name: str, broker_name: str, instance_name: str, resource_group_name: str, confirm_yes: Optional[bool] = None, **kwargs, ): listener = self.show( name=listener_name, broker_name=broker_name, instance_name=instance_name, resource_group_name=resource_group_name, ) port_configs = listener["properties"].get("ports", []) orig_configs_len = len(port_configs) port_configs = [port_config for port_config in port_configs if port_config["port"] != port] mod_configs_len = len(port_configs) listener["properties"]["ports"] = port_configs if orig_configs_len == mod_configs_len: logger.warning("No port modification detected.") return if not len(port_configs): logger.warning("Listener resource will be deleted as it will no longer have any ports configured.") self.delete( name=listener_name, broker_name=broker_name, instance_name=instance_name, resource_group_name=resource_group_name, confirm_yes=confirm_yes, **kwargs, ) return should_bail = not should_continue_prompt(confirm_yes=confirm_yes) if should_bail: return with console.status("Working..."): poller = self.ops.begin_create_or_update( resource_group_name=resource_group_name, instance_name=instance_name, broker_name=broker_name, listener_name=listener_name, resource=listener, ) return wait_for_terminal_state(poller, **kwargs) def show(self, name: str, broker_name: str, instance_name: str, resource_group_name: str) -> dict: return self.ops.get( listener_name=name, broker_name=broker_name, instance_name=instance_name, resource_group_name=resource_group_name, ) def list(self, broker_name: str, instance_name: str, resource_group_name: str) -> Iterable[dict]: return self.ops.list_by_resource_group( resource_group_name=resource_group_name, instance_name=instance_name, broker_name=broker_name ) def delete( self, name: str, broker_name: str, instance_name: str, resource_group_name: str, confirm_yes: Optional[bool] = None, **kwargs, ): should_bail = not should_continue_prompt(confirm_yes=confirm_yes) if should_bail: return with console.status("Working..."): poller = self.ops.begin_delete( listener_name=name, broker_name=broker_name, instance_name=instance_name, resource_group_name=resource_group_name, ) return wait_for_terminal_state(poller, **kwargs) class BrokerAuthn: def __init__(self, ops: "BrokerAuthenticationOperations", get_ext_loc: GetInstanceExtLoc): self.ops = ops self.get_ext_loc = get_ext_loc def apply( self, name: str, broker_name: str, instance_name: str, resource_group_name: str, config_file: str, **kwargs ): authn_config = get_file_config(config_file) resource = {} resource["extendedLocation"] = self.get_ext_loc(name=instance_name, resource_group_name=resource_group_name) resource["properties"] = authn_config with console.status("Working..."): poller = self.ops.begin_create_or_update( resource_group_name=resource_group_name, instance_name=instance_name, broker_name=broker_name, authentication_name=name, resource=resource, ) return wait_for_terminal_state(poller, **kwargs) def _build_authn_methods( self, sat_audiences: Optional[List[str]] = None, x509_client_ca_cm: Optional[str] = None, x509_attrs: Optional[List[str]] = None, custom_endpoint: Optional[str] = None, custom_ca_cm: Optional[str] = None, custom_x509_secret_ref: Optional[str] = None, custom_http_headers: Optional[List[str]] = None, ) -> List[dict]: methods = [] if sat_audiences: sat_config = {"method": "ServiceAccountToken", "serviceAccountTokenSettings": {"audiences": sat_audiences}} methods.append(sat_config) x509_config = defaultdict(dict) if x509_client_ca_cm: x509_config["x509Settings"]["trustedClientCaCert"] = x509_client_ca_cm if x509_attrs: x509_config["x509Settings"]["authorizationAttributes"] = parse_dot_notation(x509_attrs) if x509_config: x509_config["method"] = "X509" methods.append(dict(x509_config)) custom_config = defaultdict(dict) if custom_endpoint: custom_config["customSettings"]["endpoint"] = custom_endpoint if custom_ca_cm: custom_config["customSettings"]["caCertConfigMap"] = custom_ca_cm if custom_x509_secret_ref: custom_config["customSettings"]["auth"] = {"x509": {"secretRef": custom_x509_secret_ref}} if custom_http_headers: custom_config["customSettings"]["headers"] = parse_kvp_nargs(custom_http_headers) if custom_config: custom_config["method"] = "Custom" methods.append(dict(custom_config)) if not methods: raise InvalidArgumentValueError("At least one authn config is required.") return methods def add_method( self, name: str, broker_name: str, instance_name: str, resource_group_name: str, sat_audiences: Optional[List[str]] = None, x509_client_ca_cm: Optional[str] = None, x509_attrs: Optional[List[str]] = None, custom_endpoint: Optional[str] = None, custom_ca_cm: Optional[str] = None, custom_x509_secret_ref: Optional[str] = None, custom_http_headers: Optional[List[str]] = None, show_config: Optional[bool] = None, **kwargs, ): methods = self._build_authn_methods( sat_audiences=sat_audiences, x509_client_ca_cm=x509_client_ca_cm, x509_attrs=x509_attrs, custom_endpoint=custom_endpoint, custom_ca_cm=custom_ca_cm, custom_x509_secret_ref=custom_x509_secret_ref, custom_http_headers=custom_http_headers, ) authn = {} try: authn = self.show( name=name, broker_name=broker_name, instance_name=instance_name, resource_group_name=resource_group_name, ) except ResourceNotFoundError: pass if not authn: authn["name"] = name authn["extendedLocation"] = self.get_ext_loc(name=instance_name, resource_group_name=resource_group_name) authn["properties"] = {} authn_methods: List[dict] = authn["properties"].get("authenticationMethods", []) authn_methods.extend(methods) authn["properties"]["authenticationMethods"] = authn_methods if show_config: return authn["properties"] with console.status("Working..."): poller = self.ops.begin_create_or_update( resource_group_name=resource_group_name, instance_name=instance_name, broker_name=broker_name, authentication_name=name, resource=authn, ) return wait_for_terminal_state(poller, **kwargs) def show(self, name: str, broker_name: str, instance_name: str, resource_group_name: str) -> dict: return self.ops.get( authentication_name=name, broker_name=broker_name, instance_name=instance_name, resource_group_name=resource_group_name, ) def list(self, broker_name: str, instance_name: str, resource_group_name: str) -> Iterable[dict]: return self.ops.list_by_resource_group( resource_group_name=resource_group_name, instance_name=instance_name, broker_name=broker_name ) def delete( self, name: str, broker_name: str, instance_name: str, resource_group_name: str, confirm_yes: Optional[bool] = None, **kwargs, ): should_bail = not should_continue_prompt(confirm_yes=confirm_yes) if should_bail: return with console.status("Working..."): poller = self.ops.begin_delete( authentication_name=name, broker_name=broker_name, instance_name=instance_name, resource_group_name=resource_group_name, ) return wait_for_terminal_state(poller, **kwargs) class BrokerAuthz: def __init__(self, ops: "BrokerAuthorizationOperations", get_ext_loc: GetInstanceExtLoc): self.ops = ops self.get_ext_loc = get_ext_loc def apply( self, name: str, broker_name: str, instance_name: str, resource_group_name: str, config_file: str, **kwargs ): authz_config = get_file_config(config_file) resource = {} resource["extendedLocation"] = self.get_ext_loc(name=instance_name, resource_group_name=resource_group_name) resource["properties"] = authz_config with console.status("Working..."): poller = self.ops.begin_create_or_update( resource_group_name=resource_group_name, instance_name=instance_name, broker_name=broker_name, authorization_name=name, resource=resource, ) return wait_for_terminal_state(poller, **kwargs) def show(self, name: str, broker_name: str, instance_name: str, resource_group_name: str) -> dict: return self.ops.get( authorization_name=name, broker_name=broker_name, instance_name=instance_name, resource_group_name=resource_group_name, ) def list(self, broker_name: str, instance_name: str, resource_group_name: str) -> Iterable[dict]: return self.ops.list_by_resource_group( resource_group_name=resource_group_name, instance_name=instance_name, broker_name=broker_name ) def delete( self, name: str, broker_name: str, instance_name: str, resource_group_name: str, confirm_yes: Optional[bool] = None, **kwargs, ): should_bail = not should_continue_prompt(confirm_yes=confirm_yes) if should_bail: return with console.status("Working..."): poller = self.ops.begin_delete( authorization_name=name, broker_name=broker_name, instance_name=instance_name, resource_group_name=resource_group_name, ) return wait_for_terminal_state(poller, **kwargs)