azext_iot/iothub/providers/message_route.py (164 lines of code) (raw):

# coding=utf-8 # -------------------------------------------------------------------------------------------- # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. See License.txt in the project root for license information. # -------------------------------------------------------------------------------------------- from typing import Optional from knack.log import get_logger from azure.cli.core.azclierror import ResourceNotFoundError from azext_iot.common.utility import handle_service_exception, process_json_arg from azext_iot.iothub.common import RouteSourceType from azext_iot.iothub.providers.base import IoTHubProvider from azure.core.exceptions import HttpResponseError logger = get_logger(__name__) class MessageRoute(IoTHubProvider): def __init__( self, cmd, hub_name: str, rg: Optional[str] = None, ): super(MessageRoute, self).__init__(cmd, hub_name, rg, dataplane=False) def create( self, route_name: str, source_type: str, endpoint_name: str, enabled: bool = True, condition: str = "true", ): self.hub_resource.properties.routing.routes.append( { "source": source_type, "name": route_name, "endpointNames": endpoint_name.split(), "condition": condition, "isEnabled": enabled } ) try: return self.discovery.client.begin_create_or_update( resource_group_name=self.hub_resource.additional_properties['resourcegroup'], resource_name=self.hub_resource.name, iot_hub_description=self.hub_resource, if_match=self.hub_resource.etag ) except HttpResponseError as e: handle_service_exception(e) def update( self, route_name: str, source_type: Optional[str] = None, endpoint_name: Optional[str] = None, enabled: Optional[bool] = None, condition: Optional[str] = None, ): route = self.show(route_name=route_name) route.source = route.source if source_type is None else source_type route.endpoint_names = route.endpoint_names if endpoint_name is None else endpoint_name.split() route.condition = route.condition if condition is None else condition route.is_enabled = route.is_enabled if enabled is None else enabled try: return self.discovery.client.begin_create_or_update( resource_group_name=self.hub_resource.additional_properties['resourcegroup'], resource_name=self.hub_resource.name, iot_hub_description=self.hub_resource, if_match=self.hub_resource.etag ) except HttpResponseError as e: handle_service_exception(e) def show(self, route_name: str): routes = self.hub_resource.properties.routing.routes for route in routes: if route.name.lower() == route_name.lower(): return route raise ResourceNotFoundError("No route found.") def list(self, source_type: Optional[str] = None): routes = self.hub_resource.properties.routing.routes if source_type: return [route for route in routes if route.source.lower() == source_type.lower()] return routes def delete(self, route_name: Optional[str] = None, source_type: Optional[str] = None): routing = self.hub_resource.properties.routing if not route_name and not source_type: routing.routes = [] elif route_name: routing.routes = [route for route in routing.routes if route.name.lower() != route_name.lower()] else: routing.routes = [route for route in routing.routes if route.source.lower() != source_type.lower()] try: return self.discovery.client.begin_create_or_update( resource_group_name=self.hub_resource.additional_properties['resourcegroup'], resource_name=self.hub_resource.name, iot_hub_description=self.hub_resource, if_match=self.hub_resource.etag ) except HttpResponseError as e: handle_service_exception(e) def test( self, route_name: Optional[str] = None, source_type: Optional[str] = None, body: Optional[str] = None, app_properties: Optional[str] = None, system_properties: Optional[str] = None ): if app_properties: app_properties = process_json_arg(content=app_properties, argument_name="app_properties") if system_properties: system_properties = process_json_arg(content=system_properties, argument_name="system_properties") route_message = { "body": body, "appProperties": app_properties, "systemProperties": system_properties } if route_name: route = self.show(route_name) test_route_input = { "message": route_message, "twin": None, "route": route } return self.discovery.client.test_route( iot_hub_name=self.hub_resource.name, resource_group_name=self.hub_resource.additional_properties['resourcegroup'], input=test_route_input ) if source_type: test_all_routes_input = { "routingSource": source_type, "message": route_message, "twin": None } return self.discovery.client.test_all_routes( iot_hub_name=self.hub_resource.name, resource_group_name=self.hub_resource.additional_properties['resourcegroup'], input=test_all_routes_input ) # for all types, need to test all types one by one routes = [] fallback = None for type in RouteSourceType.list_valid_types(): test_all_routes_input = { "routingSource": type, "message": route_message, "twin": None } result = self.discovery.client.test_all_routes( iot_hub_name=self.hub_resource.name, resource_group_name=self.hub_resource.additional_properties['resourcegroup'], input=test_all_routes_input ).routes # Fallback for if no routes pass if len(result) == 1 and result[0].properties.name == "$fallback": fallback = result else: routes.extend(result) if len(routes) == 0 and fallback: routes = fallback return {"routes": routes} def show_fallback(self): return self.hub_resource.properties.routing.fallback_route def set_fallback(self, enabled: bool): fallback_route = self.hub_resource.properties.routing.fallback_route fallback_route.is_enabled = enabled self.discovery.client.begin_create_or_update( resource_group_name=self.hub_resource.additional_properties['resourcegroup'], resource_name=self.hub_resource.name, iot_hub_description=self.hub_resource, if_match=self.hub_resource.etag ) return self.show_fallback()