azext_iot/iothub/commands_message_route.py (114 lines of code) (raw):

# coding=utf-8 # -------------------------------------------------------------------------------------------- # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. See License.txt in the project root for license information. # -------------------------------------------------------------------------------------------- from typing import Optional from azext_iot.iothub.providers.message_route import MessageRoute from knack.log import get_logger logger = get_logger(__name__) def message_route_create( cmd, hub_name: str, route_name: str, source_type: str, endpoint_name: str, enabled: bool = True, condition: str = "true", resource_group_name: Optional[str] = None, ): message_route_provider = MessageRoute( cmd=cmd, hub_name=hub_name, rg=resource_group_name ) return message_route_provider.create( route_name=route_name, source_type=source_type, endpoint_name=endpoint_name, enabled=enabled, condition=condition ) def message_route_update( cmd, hub_name: str, route_name: str, source_type: Optional[str] = None, endpoint_name: Optional[str] = None, enabled: Optional[bool] = None, condition: Optional[str] = None, resource_group_name: Optional[str] = None, ): message_route_provider = MessageRoute( cmd=cmd, hub_name=hub_name, rg=resource_group_name ) return message_route_provider.update( route_name=route_name, source_type=source_type, endpoint_name=endpoint_name, enabled=enabled, condition=condition ) def message_route_show( cmd, hub_name: str, route_name: str, resource_group_name: Optional[str] = None, ): message_route_provider = MessageRoute( cmd=cmd, hub_name=hub_name, rg=resource_group_name ) return message_route_provider.show(route_name=route_name) def message_route_list( cmd, hub_name: str, source_type: Optional[str] = None, resource_group_name: Optional[str] = None, ): message_route_provider = MessageRoute( cmd=cmd, hub_name=hub_name, rg=resource_group_name ) return message_route_provider.list(source_type=source_type) def message_route_delete( cmd, hub_name: str, route_name: Optional[str] = None, source_type: Optional[str] = None, resource_group_name: Optional[str] = None, ): message_route_provider = MessageRoute( cmd=cmd, hub_name=hub_name, rg=resource_group_name ) return message_route_provider.delete(route_name=route_name, source_type=source_type) def message_route_test( cmd, hub_name: str, route_name: Optional[str] = None, source_type: Optional[str] = None, body: Optional[str] = None, app_properties: Optional[str] = None, system_properties: Optional[str] = None, resource_group_name: Optional[str] = None, ): message_route_provider = MessageRoute( cmd=cmd, hub_name=hub_name, rg=resource_group_name ) return message_route_provider.test( route_name=route_name, source_type=source_type, body=body, app_properties=app_properties, system_properties=system_properties ) def message_fallback_route_show( cmd, hub_name: str, resource_group_name: Optional[str] = None, ): message_route_provider = MessageRoute( cmd=cmd, hub_name=hub_name, rg=resource_group_name ) return message_route_provider.show_fallback() def message_fallback_route_set( cmd, hub_name: str, enabled: bool, resource_group_name: Optional[str] = None, ): message_route_provider = MessageRoute( cmd=cmd, hub_name=hub_name, rg=resource_group_name ) return message_route_provider.set_fallback(enabled=enabled)