azext_iot/iothub/providers/message_endpoint.py (540 lines of code) (raw):

# coding=utf-8 # -------------------------------------------------------------------------------------------- # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. See License.txt in the project root for license information. # -------------------------------------------------------------------------------------------- from typing import Optional from knack.log import get_logger from azure.cli.core.azclierror import ( ArgumentUsageError, RequiredArgumentMissingError, ResourceNotFoundError, InvalidArgumentValueError, MutuallyExclusiveArgumentError ) from azext_iot.common.embedded_cli import EmbeddedCLI from azext_iot.common.utility import handle_service_exception from azext_iot.iothub.common import ( BYTES_PER_MEGABYTE, FORCE_DELETE_WARNING, INVALID_CLI_CORE_FOR_COSMOS, NULL_WARNING, SYSTEM_ASSIGNED_IDENTITY, AuthenticationType, EncodingFormat, EndpointType, IoTHubSDKVersion ) from azext_iot.iothub.providers.base import IoTHubProvider from azext_iot.common._azure import parse_cosmos_db_connection_string from azure.mgmt.iothub.models import ManagedIdentity from azure.core.exceptions import HttpResponseError logger = get_logger(__name__) class MessageEndpoint(IoTHubProvider): def __init__( self, cmd, hub_name: str, rg: Optional[str] = None, ): super(MessageEndpoint, self).__init__(cmd, hub_name, rg, dataplane=False) # Temporary flag to check for which cosmos property to look for. self.support_cosmos = IoTHubSDKVersion.NoCosmos.value if hasattr(self.hub_resource.properties.routing.endpoints, "cosmos_db_sql_collections"): self.support_cosmos = IoTHubSDKVersion.CosmosCollections.value if hasattr(self.hub_resource.properties.routing.endpoints, "cosmos_db_sql_containers"): self.support_cosmos = IoTHubSDKVersion.CosmosContainers.value self.cli = EmbeddedCLI(cli_ctx=self.cmd.cli_ctx) def create( self, endpoint_name: str, endpoint_type: str, endpoint_account_name: Optional[str] = None, endpoint_resource_group: Optional[str] = None, endpoint_subscription_id: Optional[str] = None, endpoint_policy_name: Optional[str] = None, connection_string: Optional[str] = None, container_name: Optional[str] = None, encoding: Optional[str] = None, batch_frequency: int = 300, chunk_size_window: int = 300, file_name_format: str = '{iothub}/{partition}/{YYYY}/{MM}/{DD}/{HH}/{mm}', endpoint_uri: Optional[str] = None, entity_path: Optional[str] = None, database_name: Optional[str] = None, primary_key: Optional[str] = None, secondary_key: Optional[str] = None, partition_key_name: Optional[str] = None, partition_key_template: Optional[str] = None, identity: Optional[str] = None ): if not endpoint_resource_group: endpoint_resource_group = self.hub_resource.additional_properties["resourcegroup"] if not endpoint_subscription_id: endpoint_subscription_id = self.hub_resource.additional_properties['subscriptionid'] if connection_string and identity: raise MutuallyExclusiveArgumentError("Please use either --connection-string or --identity, both were provided.") authentication_type = AuthenticationType.KeyBased.value endpoint_identity = None if identity: authentication_type = AuthenticationType.IdentityBased.value if identity != SYSTEM_ASSIGNED_IDENTITY: endpoint_identity = ManagedIdentity( user_assigned_identity=identity ) elif not connection_string: # check for args to get the connection string self._connection_string_retrieval_args_check( endpoint_type=endpoint_type, endpoint_account_name=endpoint_account_name, entity_path=entity_path, endpoint_policy_name=endpoint_policy_name ) # Base props shared among all endpoints new_endpoint = { "connectionString": connection_string, "name": endpoint_name, "subscriptionId": endpoint_subscription_id, "resourceGroup": endpoint_resource_group, "authenticationType": authentication_type, "endpointUri": endpoint_uri, "identity": endpoint_identity } fetch_connection_string = identity is None and not connection_string endpoints = self.hub_resource.properties.routing.endpoints if endpoint_type.lower() == EndpointType.EventHub.value: if fetch_connection_string: new_endpoint["connectionString"] = get_eventhub_cstring( cmd=self.cli, namespace_name=endpoint_account_name, eventhub_name=entity_path, policy_name=endpoint_policy_name, rg=endpoint_resource_group, sub=endpoint_subscription_id ) elif entity_path: new_endpoint["entityPath"] = entity_path.replace("~", "/") endpoints.event_hubs.append(new_endpoint) elif endpoint_type.lower() == EndpointType.ServiceBusQueue.value: if fetch_connection_string: new_endpoint["connectionString"] = get_servicebus_queue_cstring( cmd=self.cli, namespace_name=endpoint_account_name, queue_name=entity_path, policy_name=endpoint_policy_name, rg=endpoint_resource_group, sub=endpoint_subscription_id ) if entity_path: new_endpoint["entityPath"] = entity_path endpoints.service_bus_queues.append(new_endpoint) elif endpoint_type.lower() == EndpointType.ServiceBusTopic.value: if fetch_connection_string: new_endpoint["connectionString"] = get_servicebus_topic_cstring( cmd=self.cli, namespace_name=endpoint_account_name, topic_name=entity_path, policy_name=endpoint_policy_name, rg=endpoint_resource_group, sub=endpoint_subscription_id ) if entity_path: new_endpoint["entityPath"] = entity_path endpoints.service_bus_topics.append(new_endpoint) elif endpoint_type.lower() == EndpointType.CosmosDBContainer.value: if fetch_connection_string: # try to get connection string - this will be used to get keys + uri connection_string = get_cosmos_db_cstring( cmd=self.cli, account_name=endpoint_account_name, rg=endpoint_resource_group, sub=endpoint_subscription_id ) if connection_string: # parse out key from connection string if not primary_key and not secondary_key: parsed_cs = parse_cosmos_db_connection_string(connection_string) primary_key = parsed_cs["AccountKey"] secondary_key = parsed_cs["AccountKey"] # parse out endpoint uri from connection string if not endpoint_uri: new_endpoint["endpointUri"] = parsed_cs["AccountEndpoint"] if authentication_type != AuthenticationType.IdentityBased.value and not any([primary_key, secondary_key]): raise RequiredArgumentMissingError( "Primary key via --primary-key, secondary key via --secondary-key, or connection string via " "--connection-string is required." ) if primary_key and not secondary_key: secondary_key = primary_key if secondary_key and not primary_key: primary_key = secondary_key if not new_endpoint["endpointUri"]: raise RequiredArgumentMissingError( "Endpoint uri via --endpoint-uri or connection string via --connection-string is required." ) if partition_key_name and not partition_key_template: partition_key_template = '{deviceid}-{YYYY}-{MM}' # cosmos db doesn't need connection strings del new_endpoint["connectionString"] new_endpoint.update({ "databaseName": database_name, "primaryKey": primary_key, "secondaryKey": secondary_key, "partitionKeyName": partition_key_name, "partitionKeyTemplate": partition_key_template, }) # @vilit - None checks for when the service breaks things if self.support_cosmos == IoTHubSDKVersion.CosmosContainers.value: new_endpoint["containerName"] = container_name if endpoints.cosmos_db_sql_containers is None: endpoints.cosmos_db_sql_containers = [] endpoints.cosmos_db_sql_containers.append(new_endpoint) if self.support_cosmos == IoTHubSDKVersion.CosmosCollections.value: new_endpoint["collectionName"] = container_name if endpoints.cosmos_db_sql_collections is None: endpoints.cosmos_db_sql_collections = [] endpoints.cosmos_db_sql_collections.append(new_endpoint) elif endpoint_type.lower() == EndpointType.AzureStorageContainer.value: if fetch_connection_string: # try to get connection string new_endpoint["connectionString"] = get_storage_cstring( cmd=self.cli, account_name=endpoint_account_name, rg=endpoint_resource_group, sub=endpoint_subscription_id ) if not container_name: raise RequiredArgumentMissingError("Container name is required.") new_endpoint.update({ "containerName": container_name, "encoding": encoding.lower() if encoding else EncodingFormat.AVRO.value, "fileNameFormat": file_name_format, "batchFrequencyInSeconds": batch_frequency, "maxChunkSizeInBytes": (chunk_size_window * BYTES_PER_MEGABYTE), }) endpoints.storage_containers.append(new_endpoint) try: return self.discovery.client.begin_create_or_update( self.hub_resource.additional_properties["resourcegroup"], self.hub_resource.name, self.hub_resource, if_match=self.hub_resource.etag ) except HttpResponseError as e: handle_service_exception(e) def update( self, endpoint_name: str, endpoint_type: str, endpoint_resource_group: Optional[str] = None, endpoint_subscription_id: Optional[str] = None, connection_string: Optional[str] = None, batch_frequency: Optional[int] = None, chunk_size_window: Optional[int] = None, file_name_format: Optional[str] = None, endpoint_uri: Optional[str] = None, entity_path: Optional[str] = None, database_name: Optional[str] = None, primary_key: Optional[str] = None, secondary_key: Optional[str] = None, partition_key_name: Optional[str] = None, partition_key_template: Optional[str] = None, identity: Optional[str] = None ): # if nothing is provided -> should we block? # have the user say the type. Will make args easier (as in we do not need to check for unneeded args) original_endpoint = self._show_by_type(endpoint_name=endpoint_name, endpoint_type=endpoint_type) if any([connection_string, primary_key, secondary_key]) and identity: cosmos_db = endpoint_type.lower() == EndpointType.CosmosDBContainer.value optional_msg = ", --primary-key and/or --secondary-key," if cosmos_db else "" error_msg = "Please use either --connection-string" + optional_msg + " or --identity." raise MutuallyExclusiveArgumentError(error_msg) # Properties for all endpoint types if endpoint_resource_group: original_endpoint.resource_group = endpoint_resource_group if endpoint_subscription_id: original_endpoint.subscription_id = endpoint_subscription_id if endpoint_uri: # Handle this later with cosmos db connection string parsing original_endpoint.endpoint_uri = endpoint_uri # Identity/Connection String schenanigans # If Identity and Connection String args are provided, Identity wins if identity: if endpoint_type.lower() == EndpointType.CosmosDBContainer.value: if original_endpoint.primary_key or original_endpoint.secondary_key: logger.warning(NULL_WARNING.format("Primary and secondary keys")) original_endpoint.primary_key = None original_endpoint.secondary_key = None else: if original_endpoint.connection_string: logger.warning(NULL_WARNING.format("The connection string")) original_endpoint.connection_string = None original_endpoint.authentication_type = AuthenticationType.IdentityBased.value if identity == SYSTEM_ASSIGNED_IDENTITY: original_endpoint.identity = None else: original_endpoint.identity = ManagedIdentity( user_assigned_identity=identity ) elif any([connection_string, primary_key, secondary_key]): if original_endpoint.identity: logger.warning(NULL_WARNING.format("The managed identity property")) original_endpoint.identity = None original_endpoint.authentication_type = AuthenticationType.KeyBased.value if endpoint_type.lower() != EndpointType.CosmosDBContainer.value: original_endpoint.endpoint_uri = None if hasattr(original_endpoint, "entity_path"): if original_endpoint.entity_path: logger.warning(NULL_WARNING.format("The entity path")) original_endpoint.entity_path = None if endpoint_type.lower() != EndpointType.CosmosDBContainer.value: original_endpoint.connection_string = connection_string else: if primary_key: original_endpoint.primary_key = primary_key if secondary_key: original_endpoint.secondary_key = secondary_key # Properties by specific types if endpoint_type in [ EndpointType.EventHub.value, EndpointType.ServiceBusQueue.value, EndpointType.ServiceBusTopic.value ] and entity_path and not connection_string: # only set entity_path if no connection string original_endpoint.entity_path = entity_path if endpoint_type == EndpointType.AzureStorageContainer.value: if file_name_format: original_endpoint.file_name_format = file_name_format if batch_frequency: original_endpoint.batch_frequency_in_seconds = batch_frequency if chunk_size_window: original_endpoint.max_chunk_size_in_bytes = (chunk_size_window * BYTES_PER_MEGABYTE) elif endpoint_type == EndpointType.CosmosDBContainer.value: if connection_string: # parse out key from connection string parsed_cs = parse_cosmos_db_connection_string(connection_string) if not primary_key and not secondary_key: original_endpoint.primary_key = parsed_cs["AccountKey"] original_endpoint.secondary_key = parsed_cs["AccountKey"] # parse out endpoint uri from connection string if not endpoint_uri: original_endpoint.endpoint_uri = parsed_cs["AccountEndpoint"] if database_name: original_endpoint.database_name = database_name if partition_key_name: original_endpoint.partition_key_name = None if partition_key_name == "" else partition_key_name if partition_key_template: original_endpoint.partition_key_template = None if partition_key_template == "" else partition_key_template return self.discovery.client.begin_create_or_update( self.hub_resource.additional_properties["resourcegroup"], self.hub_resource.name, self.hub_resource, if_match=self.hub_resource.etag ) def _connection_string_retrieval_args_check( self, endpoint_type: str, endpoint_account_name: Optional[str] = None, entity_path: Optional[str] = None, endpoint_policy_name: Optional[str] = None, ): error_msg = "Please provide a connection string '--connection-string/-c'" if not ( endpoint_account_name and entity_path and endpoint_policy_name ) and endpoint_type.lower() in [ EndpointType.EventHub.value, EndpointType.ServiceBusQueue.value, EndpointType.ServiceBusTopic.value ]: raise ArgumentUsageError( error_msg + " or endpoint namespace '--endpoint-namespace', endpoint " "entity path '--entity-path', and policy name '--policy-name'." ) elif not endpoint_account_name and endpoint_type.lower() in [ EndpointType.AzureStorageContainer.value, EndpointType.CosmosDBContainer.value ]: raise ArgumentUsageError( error_msg + " or endpoint account '--endpoint-account'." ) def _show_by_type(self, endpoint_name: str, endpoint_type: Optional[str] = None): endpoints = self.hub_resource.properties.routing.endpoints endpoint_list = [] if endpoint_type is None or endpoint_type.lower() == EndpointType.EventHub.value: endpoint_list.extend(endpoints.event_hubs) if endpoint_type is None or endpoint_type.lower() == EndpointType.ServiceBusQueue.value: endpoint_list.extend(endpoints.service_bus_queues) if endpoint_type is None or endpoint_type.lower() == EndpointType.ServiceBusTopic.value: endpoint_list.extend(endpoints.service_bus_topics) if endpoint_type is None or endpoint_type.lower() == EndpointType.AzureStorageContainer.value: endpoint_list.extend(endpoints.storage_containers) if (endpoint_type is None or endpoint_type.lower() == EndpointType.CosmosDBContainer.value): if self.support_cosmos == IoTHubSDKVersion.CosmosContainers.value: endpoint_list.extend(endpoints.cosmos_db_sql_containers) elif self.support_cosmos == IoTHubSDKVersion.CosmosCollections.value: endpoint_list.extend(endpoints.cosmos_db_sql_collections) for endpoint in endpoint_list: if endpoint.name.lower() == endpoint_name.lower(): return endpoint if endpoint_type: raise ResourceNotFoundError( f"{endpoint_type} endpoint {endpoint_name} not found in IoT Hub {self.hub_resource.name}." ) raise ResourceNotFoundError(f"Endpoint {endpoint_name} not found in IoT Hub {self.hub_resource.name}.") def show(self, endpoint_name: str): return self._show_by_type(endpoint_name=endpoint_name) def list(self, endpoint_type: Optional[str] = None): endpoints = self.hub_resource.properties.routing.endpoints if not endpoint_type: return endpoints endpoint_type = endpoint_type.lower() if EndpointType.EventHub.value == endpoint_type: return endpoints.event_hubs elif EndpointType.ServiceBusQueue.value == endpoint_type: return endpoints.service_bus_queues elif EndpointType.ServiceBusTopic.value == endpoint_type: return endpoints.service_bus_topics elif EndpointType.CosmosDBContainer.value == endpoint_type: if self.support_cosmos == IoTHubSDKVersion.CosmosContainers.value: return endpoints.cosmos_db_sql_containers elif self.support_cosmos == IoTHubSDKVersion.CosmosCollections.value: return endpoints.cosmos_db_sql_collections elif EndpointType.CosmosDBContainer.value == endpoint_type: raise InvalidArgumentValueError(INVALID_CLI_CORE_FOR_COSMOS) elif EndpointType.AzureStorageContainer.value == endpoint_type: return endpoints.storage_containers def delete( self, endpoint_name: Optional[str] = None, endpoint_type: Optional[str] = None, force: bool = False ): endpoints = self.hub_resource.properties.routing.endpoints if endpoint_type: endpoint_type = endpoint_type.lower() if ( EndpointType.CosmosDBContainer.value == endpoint_type and self.support_cosmos == IoTHubSDKVersion.NoCosmos.value ): raise InvalidArgumentValueError(INVALID_CLI_CORE_FOR_COSMOS) if self.hub_resource.properties.routing.enrichments or self.hub_resource.properties.routing.routes: # collect endpoints to remove endpoint_names = [] if endpoint_name: # use show to check if this endpoint "exists" in the current extension state try: self.show(endpoint_name) endpoint_names.append(endpoint_name) except ResourceNotFoundError: pass else: if not endpoint_type or endpoint_type == EndpointType.EventHub.value: endpoint_names.extend([e.name for e in endpoints.event_hubs]) if not endpoint_type or endpoint_type == EndpointType.ServiceBusQueue.value: endpoint_names.extend([e.name for e in endpoints.service_bus_queues]) if not endpoint_type or endpoint_type == EndpointType.ServiceBusTopic.value: endpoint_names.extend([e.name for e in endpoints.service_bus_topics]) if not endpoint_type or endpoint_type == EndpointType.CosmosDBContainer.value: if self.support_cosmos == IoTHubSDKVersion.CosmosContainers.value: endpoint_names.extend([e.name for e in endpoints.cosmos_db_sql_containers]) if self.support_cosmos == IoTHubSDKVersion.CosmosCollections.value: endpoint_names.extend([e.name for e in endpoints.cosmos_db_sql_collections]) if not endpoint_type or endpoint_type == EndpointType.AzureStorageContainer.value: endpoint_names.extend([e.name for e in endpoints.storage_containers]) # only do the routing and enrichment checks if there are endpoints to check. if force and endpoint_names: # remove enrichments if self.hub_resource.properties.routing.enrichments: enrichments = self.hub_resource.properties.routing.enrichments enrichments = [e for e in enrichments if not any(n for n in e.endpoint_names if n in endpoint_names)] self.hub_resource.properties.routing.enrichments = enrichments # remove routes if self.hub_resource.properties.routing.routes: routes = self.hub_resource.properties.routing.routes routes = [r for r in routes if r.endpoint_names[0] not in endpoint_names] self.hub_resource.properties.routing.routes = routes elif endpoint_names: # warn if needed: conflicts = [] if self.hub_resource.properties.routing.enrichments: enrichments = self.hub_resource.properties.routing.enrichments num_enrichments = len( [e for e in enrichments if any(n for n in e.endpoint_names if n in endpoint_names)] ) if num_enrichments > 0: enrichment_msg = f"{num_enrichments} message enrichment" + ("s" if num_enrichments > 1 else "") conflicts.append(enrichment_msg) if self.hub_resource.properties.routing.routes: routes = self.hub_resource.properties.routing.routes num_routes = len([r for r in routes if r.endpoint_names[0] in endpoint_names]) if num_routes > 0: enrichment_msg = f"{num_routes} route" + ("s" if num_routes > 1 else "") conflicts.append(enrichment_msg) if conflicts: logger.warn(FORCE_DELETE_WARNING.format(" and ".join(conflicts))) if endpoint_name: # Delete endpoint by name endpoint_name = endpoint_name.lower() if not endpoint_type or EndpointType.EventHub.value == endpoint_type: endpoints.event_hubs = [e for e in endpoints.event_hubs if e.name.lower() != endpoint_name] if not endpoint_type or EndpointType.ServiceBusQueue.value == endpoint_type: endpoints.service_bus_queues = [e for e in endpoints.service_bus_queues if e.name.lower() != endpoint_name] if not endpoint_type or EndpointType.ServiceBusTopic.value == endpoint_type: endpoints.service_bus_topics = [e for e in endpoints.service_bus_topics if e.name.lower() != endpoint_name] if not endpoint_type or endpoint_type == EndpointType.CosmosDBContainer.value: if self.support_cosmos == IoTHubSDKVersion.CosmosContainers.value: cosmos_db_endpoints = endpoints.cosmos_db_sql_containers if endpoints.cosmos_db_sql_containers else [] endpoints.cosmos_db_sql_containers = [ e for e in cosmos_db_endpoints if e.name.lower() != endpoint_name ] if self.support_cosmos == IoTHubSDKVersion.CosmosCollections.value: cosmos_db_endpoints = endpoints.cosmos_db_sql_collections if endpoints.cosmos_db_sql_collections else [] endpoints.cosmos_db_sql_collections = [ e for e in cosmos_db_endpoints if e.name.lower() != endpoint_name ] if not endpoint_type or EndpointType.AzureStorageContainer.value == endpoint_type: endpoints.storage_containers = [e for e in endpoints.storage_containers if e.name.lower() != endpoint_name] elif endpoint_type: # Delete all endpoints in type if EndpointType.EventHub.value == endpoint_type: endpoints.event_hubs = [] elif EndpointType.ServiceBusQueue.value == endpoint_type: endpoints.service_bus_queues = [] elif EndpointType.ServiceBusTopic.value == endpoint_type: endpoints.service_bus_topics = [] elif EndpointType.CosmosDBContainer.value == endpoint_type: if self.support_cosmos == IoTHubSDKVersion.CosmosContainers.value: endpoints.cosmos_db_sql_containers = [] elif self.support_cosmos == IoTHubSDKVersion.CosmosCollections.value: endpoints.cosmos_db_sql_collections = [] elif EndpointType.AzureStorageContainer.value == endpoint_type: endpoints.storage_containers = [] else: # Delete all endpoints endpoints.event_hubs = [] endpoints.service_bus_queues = [] endpoints.service_bus_topics = [] if self.support_cosmos == IoTHubSDKVersion.CosmosContainers.value: endpoints.cosmos_db_sql_containers = [] if self.support_cosmos == IoTHubSDKVersion.CosmosCollections.value: endpoints.cosmos_db_sql_collections = [] endpoints.storage_containers = [] try: return self.discovery.client.begin_create_or_update( self.hub_resource.additional_properties["resourcegroup"], self.hub_resource.name, self.hub_resource, if_match=self.hub_resource.etag ) except HttpResponseError as e: handle_service_exception(e) def get_eventhub_cstring( cmd, namespace_name: str, eventhub_name: str, policy_name: str, rg: str, sub: str ) -> str: return cmd.invoke( "eventhubs eventhub authorization-rule keys list --namespace-name {} --resource-group {} " "--eventhub-name {} --name {}".format( namespace_name, rg, eventhub_name, policy_name ), subscription=sub ).as_json()["primaryConnectionString"] def get_servicebus_topic_cstring( cmd, namespace_name: str, topic_name: str, policy_name: str, rg: str, sub: str ) -> str: return cmd.invoke( "servicebus topic authorization-rule keys list --namespace-name {} --resource-group {} " "--topic-name {} --name {}".format( namespace_name, rg, topic_name, policy_name ), subscription=sub ).as_json()["primaryConnectionString"] def get_servicebus_queue_cstring( cmd, namespace_name: str, queue_name: str, policy_name: str, rg: str, sub: str ) -> str: return cmd.invoke( "servicebus queue authorization-rule keys list --namespace-name {} --resource-group {} " "--queue-name {} --name {}".format( namespace_name, rg, queue_name, policy_name ), subscription=sub ).as_json()["primaryConnectionString"] def get_cosmos_db_cstring( cmd, account_name: str, rg: str, sub: str ) -> str: output = cmd.invoke( 'cosmosdb keys list --resource-group {} --name {} --type connection-strings'.format( rg, account_name ), subscription=sub ).as_json() for cs_object in output["connectionStrings"]: if cs_object["description"] == "Primary SQL Connection String": return cs_object["connectionString"] def get_storage_cstring(cmd, account_name: str, rg: str, sub: str) -> str: return cmd.invoke( "storage account show-connection-string -n {} -g {}".format( account_name, rg ), subscription=sub ).as_json()["connectionString"]