def convert_iothub_to_eventhub_conn_str()

in test-runner/adapters/direct_azure_rest/eventhub_connection_string.py [0:0]


def convert_iothub_to_eventhub_conn_str(iothub_conn_str):
    hostname, shared_access_key_name, shared_access_key = parse_iot_conn_str(
        iothub_conn_str
    )
    iot_hub_name = hostname.split(".")[0]
    operation = "/messages/events/ConsumerGroups/{}/Partitions/{}".format("$Default", 0)
    username = "{}@sas.root.{}".format(shared_access_key_name, iot_hub_name)
    sas_token = generate_sas_token(hostname, shared_access_key_name, shared_access_key)
    uri = "amqps://{}:{}@{}{}".format(
        quote_plus(username), quote_plus(sas_token), hostname, operation
    )
    source_uri = Source(uri)
    receive_client = ReceiveClient(source_uri)
    try:
        receive_client.receive_message_batch(max_batch_size=1)
    except LinkRedirect as redirect:
        # Once a redirect error is received, close the original client and recreate a new one to the re-directed address
        receive_client.close()
        fully_qualified_name = redirect.hostname.decode("utf-8")
        return "Endpoint=sb://{}/;SharedAccessKeyName={};SharedAccessKey={};EntityPath={}".format(
            fully_qualified_name,
            shared_access_key_name,
            shared_access_key,
            iot_hub_name,
        )
    except Exception as exp:
        raise ValueError(
            "{} is not an invalid IoT Hub connection string. The underlying exception is {}".format(
                iothub_conn_str, exp
            )
        )