test-runner/connections.py (119 lines of code) (raw):
# Copyright (c) Microsoft. All rights reserved.
# Licensed under the MIT license. See LICENSE file in the project root for
# full license information.
import adapters
import base64
from horton_settings import settings
from horton_logging import logger
def separator(message=""):
return message.center(132, "-")
def get_ca_cert(settings_object):
if (
settings_object.connection_type == "connection_string_with_edge_gateway"
and settings.iotedge.ca_cert_base64
):
return {
"cert": base64.b64decode(settings.iotedge.ca_cert_base64).decode("utf-8")
}
else:
return {}
async def _get_module_client_adapter(settings_object):
"""
get a module client adapter for the given settings object
"""
if not settings_object.device_id or not settings_object.module_id:
return None
adapter = adapters.create_adapter(settings_object.adapter_address, "module_client")
adapter.device_id = settings_object.device_id
adapter.module_id = settings_object.module_id
return adapter
async def _get_device_client_adapter(settings_object):
"""
get a device client adapter for the given settings object
"""
if not settings_object.device_id:
return None
adapter = adapters.create_adapter(settings_object.adapter_address, "device_client")
adapter.device_id = settings_object.device_id
return adapter
async def _get_eventhub_client_adapter(settings_object):
"""
get an eventhub client adapter that we can use to watch telemetry operations
"""
adapter = adapters.create_adapter(settings_object.adapter_address, "eventhub")
await adapter.create_from_connection_string(settings_object.connection_string)
return adapter
async def _get_registry_client_adapter(settings_object):
"""
connect the client adapter for the Registry implementation we're using
"""
adapter = adapters.create_adapter(settings_object.adapter_address, "registry")
await adapter.connect(settings_object.connection_string)
return adapter
async def _get_service_client_adapter(settings_object):
"""
connect the client for the ServiceClient implementation we're using return the client object
"""
adapter = adapters.create_adapter(settings_object.adapter_address, "service")
await adapter.connect(settings_object.connection_string)
return adapter
async def _get_system_control_adapter(settings_object):
"""
return an object that can be used to control the operating system
"""
adapter = adapters.create_adapter(settings_object.adapter_address, "system_control")
await adapter.set_network_destination(
settings_object.test_destination, settings.test_module.transport
)
return adapter
async def get_adapter(settings_object):
"""
get a client adapter object for the givving settings object
"""
if not settings_object.adapter_address:
return None
elif settings_object.object_type in ["iothub_device", "leaf_device"]:
adapter = await _get_device_client_adapter(settings_object)
elif settings_object.object_type in ["iothub_module", "iotedge_module"]:
adapter = await _get_module_client_adapter(settings_object)
elif settings_object.object_type == "iothub_registry":
adapter = await _get_registry_client_adapter(settings_object)
elif settings_object.object_type == "iothub_service":
adapter = await _get_service_client_adapter(settings_object)
elif settings_object.object_type == "eventhub":
adapter = await _get_eventhub_client_adapter(settings_object)
elif settings_object.object_type == "system_control":
adapter = await _get_system_control_adapter(settings_object)
else:
assert "invalid object_type: {}".format(settings_object.object_type)
adapter.capabilities = settings_object.capabilities
adapter.settings = settings_object
settings_object.adapter = adapter
return adapter
async def create_client(settings_object):
adapter = settings_object.adapter
if settings_object.connection_type == "symmetric_key":
await adapter.create_from_symmetric_key(
settings_object.transport,
hostname=settings_object.iothub_host_name,
symmetric_key=settings_object.symmetric_key,
device_id=settings_object.device_id,
)
elif settings_object.capabilities.v2_connect_group:
if settings_object.connection_type == "environment":
await adapter.create_from_environment(settings_object.transport)
elif settings_object.connection_type.startswith("connection_string"):
await adapter.create_from_connection_string(
settings_object.transport,
settings_object.connection_string,
get_ca_cert(settings_object),
)
else:
if settings_object.connection_type == "environment":
await adapter.connect_from_environment(settings_object.transport)
elif settings_object.connection_type.startswith("connection_string"):
await adapter.connect(
settings_object.transport,
settings_object.connection_string,
get_ca_cert(settings_object),
)
async def cleanup_adapter(settings_object):
if settings_object.adapter:
logger(separator("{} finalizer".format(settings_object.name)))
try:
if (
hasattr(settings_object.adapter, "capabilities")
and hasattr(settings_object.adapter.capabilities, "v2_connect_group")
and settings_object.adapter.capabilities.v2_connect_group
):
logger("Destroying")
await settings_object.adapter.destroy()
elif hasattr(settings_object.adapter, "disconnect"):
logger("Disconnecting")
await settings_object.adapter.disconnect()
elif hasattr(settings_object.adapter, "destroy"):
logger("Destroying")
await settings_object.adapter.destroy()
logger("done finalizing {}".format(settings_object.name))
except Exception as e:
logger(
"exception disconnecting {} module: {}".format(settings_object.name, e)
)
finally:
settings_object.adapter = None