azext_iot/monitor/builders/_common.py (46 lines of code) (raw):

# coding=utf-8 # -------------------------------------------------------------------------------------------- # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. See License.txt in the project root for license information. # -------------------------------------------------------------------------------------------- import uamqp import urllib from azext_iot.monitor.models.target import Target DEBUG = False async def convert_token_to_target(tokens) -> Target: token_expiry = tokens["expiry"] event_hub_token = tokens["eventhubSasToken"] sas_token = event_hub_token["sasToken"] path = event_hub_token["entityPath"] raw_url = event_hub_token["hostname"] url = urllib.parse.urlparse(raw_url) hostname = url.hostname meta_data = await _query_meta_data_internal(hostname, path, sas_token, token_expiry) partition_count = meta_data[b"partition_count"] partitions = [str(i) for i in range(int(partition_count))] auth = _build_auth_container_from_token(hostname, path, sas_token, token_expiry) return Target(hostname=hostname, path=path, partitions=partitions, auth=auth) async def query_meta_data(address, path, auth): source = uamqp.address.Source(address) receive_client = uamqp.ReceiveClientAsync( source, auth=auth, timeout=30000, debug=DEBUG ) try: await receive_client.open_async() message = uamqp.Message(application_properties={"name": path}) response = await receive_client.mgmt_request_async( message, b"READ", op_type=b"com.microsoft:eventhub", status_code_field=b"status-code", description_fields=b"status-description", timeout=30000, ) test = response.get_data() return test finally: await receive_client.close_async() async def _query_meta_data_internal(hostname, path, sas_token, token_expiry): address = "amqps://{}/{}/$management".format(hostname, path) auth = _build_auth_container_from_token(hostname, path, sas_token, token_expiry) return await query_meta_data(address=address, path=path, auth=auth) def _build_auth_container_from_token(hostname, path, token, expiry): sas_uri = "sb://{}/{}".format(hostname, path) return uamqp.authentication.SASTokenAsync( audience=sas_uri, uri=sas_uri, expires_at=expiry, token=token )