azext_iot/common/_azure.py (68 lines of code) (raw):

# coding=utf-8 # -------------------------------------------------------------------------------------------- # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. See License.txt in the project root for license information. # -------------------------------------------------------------------------------------------- from azure.cli.core.azclierror import AzureResponseError, CLIInternalError from azext_iot.common.utility import validate_key_value_pairs from azext_iot.common.auth import get_aad_token IOT_SERVICE_CS_TEMPLATE = "HostName={};SharedAccessKeyName={};SharedAccessKey={}" def _parse_connection_string(cs, validate=None, cstring_type="entity"): decomposed = validate_key_value_pairs(cs) decomposed_lower = dict((k.lower(), v) for k, v in decomposed.items()) if validate: for k in validate: if not any([decomposed.get(k), decomposed_lower.get(k.lower())]): raise ValueError( "{} connection string has missing property: {}".format( cstring_type, k ) ) return decomposed def parse_iot_hub_connection_string(cs): validate = ["HostName", "SharedAccessKeyName", "SharedAccessKey"] return _parse_connection_string(cs, validate, "IoT Hub") def parse_iot_dps_connection_string(cs): validate = ["HostName", "SharedAccessKeyName", "SharedAccessKey"] return _parse_connection_string(cs, validate, "IoT DPS") def parse_iot_device_connection_string(cs): validate = ["HostName", "DeviceId", "SharedAccessKey"] return _parse_connection_string(cs, validate, "Device") def parse_iot_device_module_connection_string(cs): validate = ["HostName", "DeviceId", "ModuleId", "SharedAccessKey"] return _parse_connection_string(cs, validate, "Module") def parse_iot_hub_message_endpoint_connection_string(cs): validate = ["Endpoint", "SharedAccessKeyName", "SharedAccessKey"] return _parse_connection_string(cs, validate, "Endpoint") def parse_storage_container_connection_string(cs): validate = ["AccountName", "AccountKey"] return _parse_connection_string(cs, validate, "Storage Container") def parse_cosmos_db_connection_string(cs): validate = ["AccountEndpoint", "AccountKey"] return _parse_connection_string(cs, validate, "Cosmos DB Collection") def parse_event_hub_connection_string(cs): # note that this is for an event hub instance, not namespace validate = ["Endpoint", "SharedAccessKeyName", "SharedAccessKey", "EntityPath"] return _parse_connection_string(cs, validate, "Event Hub") def get_iot_central_tokens(cmd, app_id, token, central_dns_suffix): import requests if not token: aad_token = get_aad_token(cmd.cli_ctx, resource="https://apps.azureiotcentral.com")[ "accessToken" ] token = "Bearer {}".format(aad_token) url = "https://{}.{}/system/iothubs/generateEventSasTokens".format( app_id, central_dns_suffix ) response = requests.post(url, headers={"Authorization": token}) tokens = response.json() additional_help = ( "Please ensure that the user is logged through the `az login` command, " "has the correct tenant set (the users home tenant) and " "has access to the application through http://apps.azureiotcentral.com" ) if tokens.get("error"): error_message = tokens["error"]["message"] if tokens["error"]["code"].startswith("403.043.004."): error_message = "{} {}".format(error_message, additional_help) raise AzureResponseError( "Error {} getting tokens. {}".format(tokens["error"]["code"], error_message) ) if tokens.get("message"): error_message = "{} {}".format(tokens["message"], additional_help) raise CLIInternalError(error_message) return tokens