in test-runner/adapters/direct_azure_rest/eventhub_connection_string.py [0:0]
def convert_iothub_to_eventhub_conn_str(iothub_conn_str):
hostname, shared_access_key_name, shared_access_key = parse_iot_conn_str(
iothub_conn_str
)
iot_hub_name = hostname.split(".")[0]
operation = "/messages/events/ConsumerGroups/{}/Partitions/{}".format("$Default", 0)
username = "{}@sas.root.{}".format(shared_access_key_name, iot_hub_name)
sas_token = generate_sas_token(hostname, shared_access_key_name, shared_access_key)
uri = "amqps://{}:{}@{}{}".format(
quote_plus(username), quote_plus(sas_token), hostname, operation
)
source_uri = Source(uri)
receive_client = ReceiveClient(source_uri)
try:
receive_client.receive_message_batch(max_batch_size=1)
except LinkRedirect as redirect:
# Once a redirect error is received, close the original client and recreate a new one to the re-directed address
receive_client.close()
fully_qualified_name = redirect.hostname.decode("utf-8")
return "Endpoint=sb://{}/;SharedAccessKeyName={};SharedAccessKey={};EntityPath={}".format(
fully_qualified_name,
shared_access_key_name,
shared_access_key,
iot_hub_name,
)
except Exception as exp:
raise ValueError(
"{} is not an invalid IoT Hub connection string. The underlying exception is {}".format(
iothub_conn_str, exp
)
)