test-runner/adapters/direct_azure_rest/eventhub_connection_string.py (66 lines of code) (raw):
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
import time
from base64 import b64encode, b64decode
from hashlib import sha256
from hmac import HMAC
import asyncio
from urllib.parse import urlencode, quote_plus
from uamqp import ReceiveClient, Source
from uamqp.errors import LinkRedirect
# code from https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/eventhub/azure-eventhub/samples/async_samples/iot_hub_connection_string_receive_async.py
def generate_sas_token(uri, policy, key, expiry=None):
"""Create a shared access signiture token as a string literal.
:returns: SAS token as string literal.
:rtype: str
"""
if not expiry:
expiry = time.time() + 3600
encoded_uri = quote_plus(uri)
ttl = int(expiry)
sign_key = "%s\n%d" % (encoded_uri, ttl)
signature = b64encode(
HMAC(b64decode(key), sign_key.encode("utf-8"), sha256).digest()
)
result = {"sr": uri, "sig": signature, "se": str(ttl)}
if policy:
result["skn"] = policy
return "SharedAccessSignature " + urlencode(result)
def parse_iot_conn_str(iothub_conn_str):
hostname = None
shared_access_key_name = None
shared_access_key = None
for element in iothub_conn_str.split(";"):
key, _, value = element.partition("=")
if key.lower() == "hostname":
hostname = value.rstrip("/")
elif key.lower() == "sharedaccesskeyname":
shared_access_key_name = value
elif key.lower() == "sharedaccesskey":
shared_access_key = value
if not all([hostname, shared_access_key_name, shared_access_key]):
raise ValueError("Invalid connection string")
return hostname, shared_access_key_name, shared_access_key
def convert_iothub_to_eventhub_conn_str(iothub_conn_str):
hostname, shared_access_key_name, shared_access_key = parse_iot_conn_str(
iothub_conn_str
)
iot_hub_name = hostname.split(".")[0]
operation = "/messages/events/ConsumerGroups/{}/Partitions/{}".format("$Default", 0)
username = "{}@sas.root.{}".format(shared_access_key_name, iot_hub_name)
sas_token = generate_sas_token(hostname, shared_access_key_name, shared_access_key)
uri = "amqps://{}:{}@{}{}".format(
quote_plus(username), quote_plus(sas_token), hostname, operation
)
source_uri = Source(uri)
receive_client = ReceiveClient(source_uri)
try:
receive_client.receive_message_batch(max_batch_size=1)
except LinkRedirect as redirect:
# Once a redirect error is received, close the original client and recreate a new one to the re-directed address
receive_client.close()
fully_qualified_name = redirect.hostname.decode("utf-8")
return "Endpoint=sb://{}/;SharedAccessKeyName={};SharedAccessKey={};EntityPath={}".format(
fully_qualified_name,
shared_access_key_name,
shared_access_key,
iot_hub_name,
)
except Exception as exp:
raise ValueError(
"{} is not an invalid IoT Hub connection string. The underlying exception is {}".format(
iothub_conn_str, exp
)
)