azext_edge/edge/commands_mq.py (266 lines of code) (raw):

# coding=utf-8 # ---------------------------------------------------------------------------------------------- # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. See License file in the project root for license information. # ---------------------------------------------------------------------------------------------- from typing import Iterable, Optional, List from knack.log import get_logger from .providers.orchestration.resources import Brokers from .common import DEFAULT_BROKER from .providers.orchestration.common import MqServiceType logger = get_logger(__name__) def show_broker(cmd, broker_name: str, instance_name: str, resource_group_name: str) -> dict: return Brokers(cmd).show(name=broker_name, instance_name=instance_name, resource_group_name=resource_group_name) def list_brokers(cmd, instance_name: str, resource_group_name: str) -> Iterable[dict]: return Brokers(cmd).list(instance_name=instance_name, resource_group_name=resource_group_name) def delete_broker( cmd, broker_name: str, instance_name: str, resource_group_name: str, confirm_yes: Optional[bool] = None, **kwargs ): return Brokers(cmd).delete( name=broker_name, instance_name=instance_name, resource_group_name=resource_group_name, confirm_yes=confirm_yes, **kwargs, ) def apply_broker_listener( cmd, listener_name: str, instance_name: str, resource_group_name: str, config_file: str, broker_name: str = DEFAULT_BROKER, **kwargs, ) -> dict: return Brokers(cmd).listeners.apply( name=listener_name, broker_name=broker_name, instance_name=instance_name, resource_group_name=resource_group_name, config_file=config_file, **kwargs, ) def add_broker_listener_port( cmd, port: int, listener_name: str, instance_name: str, resource_group_name: str, broker_name: str = DEFAULT_BROKER, service_name: Optional[str] = None, service_type: Optional[str] = MqServiceType.LOADBALANCER.value, authn_ref: Optional[str] = None, authz_ref: Optional[str] = None, protocol: Optional[str] = None, nodeport: Optional[int] = None, tls_auto_issuer_ref: Optional[str] = None, tls_auto_duration: Optional[str] = None, tls_auto_key_algo: Optional[str] = None, tls_auto_key_rotation_policy: Optional[str] = None, tls_auto_renew_before: Optional[str] = None, tls_auto_san_dns: Optional[List[str]] = None, tls_auto_san_ip: Optional[List[str]] = None, tls_auto_secret_name: Optional[str] = None, tls_manual_secret_ref: Optional[str] = None, show_config: Optional[bool] = None, **kwargs, ) -> dict: return Brokers(cmd).listeners.add_port( port=port, listener_name=listener_name, broker_name=broker_name, instance_name=instance_name, resource_group_name=resource_group_name, service_name=service_name, service_type=service_type, authn_ref=authn_ref, authz_ref=authz_ref, protocol=protocol, nodeport=nodeport, tls_auto_issuer_ref=tls_auto_issuer_ref, tls_auto_duration=tls_auto_duration, tls_auto_key_algo=tls_auto_key_algo, tls_auto_key_rotation_policy=tls_auto_key_rotation_policy, tls_auto_renew_before=tls_auto_renew_before, tls_auto_san_dns=tls_auto_san_dns, tls_auto_san_ip=tls_auto_san_ip, tls_auto_secret_name=tls_auto_secret_name, tls_manual_secret_ref=tls_manual_secret_ref, show_config=show_config, **kwargs, ) def remove_broker_listener_port( cmd, port: int, listener_name: str, instance_name: str, resource_group_name: str, broker_name: str = DEFAULT_BROKER, confirm_yes: Optional[bool] = None, **kwargs, ): return Brokers(cmd).listeners.remove_port( port=port, listener_name=listener_name, broker_name=broker_name, instance_name=instance_name, resource_group_name=resource_group_name, confirm_yes=confirm_yes, **kwargs, ) def show_broker_listener( cmd, listener_name: str, instance_name: str, resource_group_name: str, broker_name: str = DEFAULT_BROKER ) -> dict: return Brokers(cmd).listeners.show( name=listener_name, broker_name=broker_name, instance_name=instance_name, resource_group_name=resource_group_name, ) def list_broker_listeners( cmd, instance_name: str, resource_group_name: str, broker_name: str = DEFAULT_BROKER ) -> Iterable[dict]: return Brokers(cmd).listeners.list( broker_name=broker_name, instance_name=instance_name, resource_group_name=resource_group_name ) def delete_broker_listener( cmd, listener_name: str, instance_name: str, resource_group_name: str, broker_name: str = DEFAULT_BROKER, confirm_yes: Optional[bool] = None, **kwargs, ): return Brokers(cmd).listeners.delete( name=listener_name, broker_name=broker_name, instance_name=instance_name, resource_group_name=resource_group_name, confirm_yes=confirm_yes, **kwargs, ) def apply_broker_authn( cmd, authn_name: str, instance_name: str, resource_group_name: str, config_file: str, broker_name: str = DEFAULT_BROKER, **kwargs, ) -> dict: return Brokers(cmd).authns.apply( name=authn_name, broker_name=broker_name, instance_name=instance_name, resource_group_name=resource_group_name, config_file=config_file, **kwargs, ) def add_broker_authn_method( cmd, authn_name: str, instance_name: str, resource_group_name: str, sat_audiences: Optional[List[str]] = None, x509_client_ca_cm: Optional[str] = None, x509_attrs: Optional[List[str]] = None, custom_endpoint: Optional[str] = None, custom_ca_cm: Optional[str] = None, custom_x509_secret_ref: Optional[str] = None, custom_http_headers: Optional[List[str]] = None, broker_name: str = DEFAULT_BROKER, show_config: Optional[bool] = None, **kwargs, ) -> dict: return Brokers(cmd).authns.add_method( name=authn_name, broker_name=broker_name, instance_name=instance_name, resource_group_name=resource_group_name, sat_audiences=sat_audiences, x509_client_ca_cm=x509_client_ca_cm, x509_attrs=x509_attrs, custom_endpoint=custom_endpoint, custom_ca_cm=custom_ca_cm, custom_x509_secret_ref=custom_x509_secret_ref, custom_http_headers=custom_http_headers, show_config=show_config, **kwargs, ) def show_broker_authn( cmd, authn_name: str, instance_name: str, resource_group_name: str, broker_name: str = DEFAULT_BROKER ) -> dict: return Brokers(cmd).authns.show( name=authn_name, broker_name=broker_name, instance_name=instance_name, resource_group_name=resource_group_name, ) def list_broker_authns( cmd, instance_name: str, resource_group_name: str, broker_name: str = DEFAULT_BROKER ) -> Iterable[dict]: return Brokers(cmd).authns.list( broker_name=broker_name, instance_name=instance_name, resource_group_name=resource_group_name ) def delete_broker_authn( cmd, authn_name: str, instance_name: str, resource_group_name: str, broker_name: str = DEFAULT_BROKER, confirm_yes: Optional[bool] = None, **kwargs, ): return Brokers(cmd).authns.delete( name=authn_name, broker_name=broker_name, instance_name=instance_name, resource_group_name=resource_group_name, confirm_yes=confirm_yes, **kwargs, ) def apply_broker_authz( cmd, authz_name: str, instance_name: str, resource_group_name: str, config_file: str, broker_name: str = DEFAULT_BROKER, **kwargs, ) -> dict: return Brokers(cmd).authzs.apply( name=authz_name, broker_name=broker_name, instance_name=instance_name, resource_group_name=resource_group_name, config_file=config_file, **kwargs, ) def show_broker_authz( cmd, authz_name: str, instance_name: str, resource_group_name: str, broker_name: str = DEFAULT_BROKER ) -> dict: return Brokers(cmd).authzs.show( name=authz_name, broker_name=broker_name, instance_name=instance_name, resource_group_name=resource_group_name, ) def list_broker_authzs( cmd, instance_name: str, resource_group_name: str, broker_name: str = DEFAULT_BROKER ) -> Iterable[dict]: return Brokers(cmd).authzs.list( broker_name=broker_name, instance_name=instance_name, resource_group_name=resource_group_name ) def delete_broker_authz( cmd, authz_name: str, instance_name: str, resource_group_name: str, broker_name: str = DEFAULT_BROKER, confirm_yes: Optional[bool] = None, **kwargs, ): return Brokers(cmd).authzs.delete( name=authz_name, broker_name=broker_name, instance_name=instance_name, resource_group_name=resource_group_name, confirm_yes=confirm_yes, **kwargs, )