test-runner/adapters/rest/rest_iothub_apis.py (379 lines of code) (raw):
# Copyright (c) Microsoft. All rights reserved.
# Licensed under the MIT license. See LICENSE file in the project root for
# full license information.
import json
import time
from .generated.e2erestapi.aio import (
AzureIOTEndToEndTestWrapperRestApi as GeneratedAsyncApi,
)
from .generated.e2erestapi.models import MethodInvoke, EventBody
from .. import adapter_config
from .rest_decorators import log_entry_and_exit
from ..abstract_iothub_apis import (
AbstractDeviceApi,
AbstractModuleApi,
AbstractServiceApi,
AbstractRegistryApi,
)
class ServiceConnectDisconnect(object):
@log_entry_and_exit(print_args=False)
async def connect(self, connection_string):
result = await self.rest_endpoint.connect(
connection_string, timeout=adapter_config.default_api_timeout
)
self.connection_id = result.connection_id
@log_entry_and_exit
async def disconnect(self):
if self.connection_id:
await self.rest_endpoint.disconnect(
self.connection_id, timeout=adapter_config.default_api_timeout
)
self.connection_id = ""
class Connect(object):
@log_entry_and_exit(print_args=False)
async def connect(self, transport, connection_string, ca_certificate):
result = await self.rest_endpoint.connect(
transport,
connection_string,
ca_certificate=ca_certificate,
timeout=adapter_config.default_api_timeout,
)
self.connection_id = result.connection_id
@log_entry_and_exit
async def disconnect(self):
if self.connection_id:
await self.rest_endpoint.disconnect(
self.connection_id, timeout=adapter_config.default_api_timeout
)
self.connection_id = ""
# give edgeHub a chance to disconnect MessagingServiceClient from IoTHub. It does this lazily after the module disconnects from edgeHub
time.sleep(2)
@log_entry_and_exit(print_args=False)
async def create_from_connection_string(
self, transport, connection_string, ca_certificate
):
result = await self.rest_endpoint.create_from_connection_string(
transport,
connection_string,
ca_certificate=ca_certificate,
timeout=adapter_config.default_api_timeout,
)
assert not self.connection_id
self.connection_id = result.connection_id
@log_entry_and_exit(print_args=False)
async def create_from_x509(self, transport, x509):
result = await self.rest_endpoint.create_from_x509(
transport, x509, timeout=adapter_config.default_api_timeout
)
assert not self.connection_id
self.connection_id = result.connection_id
@log_entry_and_exit
async def connect2(self):
if self.connection_id:
await self.rest_endpoint.connect2(
self.connection_id, timeout=adapter_config.default_api_timeout
)
@log_entry_and_exit
async def reconnect(self, force_password_renewal=False):
if self.connection_id:
await self.rest_endpoint.reconnect(
self.connection_id,
force_password_renewal,
timeout=adapter_config.default_api_timeout,
)
@log_entry_and_exit
async def disconnect2(self):
if self.connection_id:
await self.rest_endpoint.disconnect2(
self.connection_id, timeout=adapter_config.default_api_timeout
)
# give edgeHub a chance to disconnect MessagingServiceClient from IoTHub. It does this lazily after the module disconnects from edgeHub
time.sleep(2)
@log_entry_and_exit
async def destroy(self):
if self.connection_id:
await self.rest_endpoint.destroy(
self.connection_id, timeout=adapter_config.default_api_timeout
)
self.connection_id = ""
# give edgeHub a chance to disconnect MessagingServiceClient from IoTHub. It does this lazily after the module disconnects from edgeHub
time.sleep(2)
class DeviceConnect(object):
@log_entry_and_exit
async def create_from_symmetric_key(
self, transport, device_id, hostname, symmetric_key
):
result = await self.rest_endpoint.create_from_symmetric_key(
transport,
device_id,
hostname,
symmetric_key,
timeout=adapter_config.default_api_timeout,
)
assert not self.connection_id
self.connection_id = result.connection_id
class ModuleConnect(object):
@log_entry_and_exit
async def connect_from_environment(self, transport):
result = await self.rest_endpoint.connect_from_environment(
transport, timeout=adapter_config.default_api_timeout
)
assert not self.connection_id
self.connection_id = result.connection_id
@log_entry_and_exit
async def create_from_environment(self, transport):
result = await self.rest_endpoint.create_from_environment(
transport, timeout=adapter_config.default_api_timeout
)
assert not self.connection_id
self.connection_id = result.connection_id
@log_entry_and_exit
async def create_from_symmetric_key(
self, transport, device_id, module_id, hostname, symmetric_key
):
result = await self.rest_endpoint.create_from_symmetric_key(
transport,
device_id,
module_id,
hostname,
symmetric_key,
timeout=adapter_config.default_api_timeout,
)
assert not self.connection_id
self.connection_id = result.connection_id
class Twin(object):
@log_entry_and_exit
async def enable_twin(self):
return await self.rest_endpoint.enable_twin(
self.connection_id, timeout=adapter_config.default_api_timeout
)
@log_entry_and_exit
async def get_twin(self):
twin = await self.rest_endpoint.get_twin(
self.connection_id, timeout=adapter_config.default_api_timeout
)
return twin.as_dict()
@log_entry_and_exit
async def patch_twin(self, patch):
await self.rest_endpoint.patch_twin(
self.connection_id, patch, timeout=adapter_config.default_api_timeout
)
@log_entry_and_exit
async def wait_for_desired_property_patch(self):
patch = await self.rest_endpoint.wait_for_desired_properties_patch(
self.connection_id, timeout=adapter_config.default_api_timeout
)
return patch.as_dict()
class HandleMethods(object):
@log_entry_and_exit
async def enable_methods(self):
return await self.rest_endpoint.enable_methods(
self.connection_id, timeout=adapter_config.default_api_timeout
)
"""
wait_for_method_and_return_response
Description: This is a poorly named method. It is essentially create a
method callback and then wait for a method call.
"""
@log_entry_and_exit
async def wait_for_method_and_return_response(
self, method_name, status_code, request_payload, response_payload
):
request_and_response = {
"requestPayload": request_payload,
"responsePayload": response_payload,
"statusCode": status_code,
}
return await self.rest_endpoint.wait_for_method_and_return_response(
self.connection_id,
method_name,
request_and_response,
timeout=adapter_config.default_api_timeout,
)
class Telemetry(object):
@log_entry_and_exit
async def send_event(self, body):
await self.rest_endpoint.send_event(
self.connection_id,
EventBody(body=body),
timeout=adapter_config.default_api_timeout,
)
class InputsAndOutputs(object):
@log_entry_and_exit
async def send_output_event(self, output_name, body):
await self.rest_endpoint.send_output_event(
self.connection_id,
output_name,
EventBody(body=body),
timeout=adapter_config.default_api_timeout,
)
@log_entry_and_exit
async def enable_input_messages(self):
return await self.rest_endpoint.enable_input_messages(
self.connection_id, timeout=adapter_config.default_api_timeout
)
@log_entry_and_exit
async def wait_for_input_event(self, input_name):
return await self.rest_endpoint.wait_for_input_message(
self.connection_id, input_name, timeout=adapter_config.default_api_timeout
)
class InvokeMethods(object):
@log_entry_and_exit
async def call_module_method(self, device_id, module_id, method_invoke_parameters):
method_invoke = MethodInvoke(
method_name=method_invoke_parameters["methodName"],
payload=method_invoke_parameters["payload"],
response_timeout_in_seconds=method_invoke_parameters[
"responseTimeoutInSeconds"
],
connect_timeout_in_seconds=method_invoke_parameters[
"connectTimeoutInSeconds"
],
)
return await self.rest_endpoint.invoke_module_method(
self.connection_id,
device_id,
module_id,
method_invoke,
timeout=adapter_config.default_api_timeout,
)
@log_entry_and_exit
async def call_device_method(self, device_id, method_invoke_parameters):
method_invoke = MethodInvoke(
method_name=method_invoke_parameters["methodName"],
payload=method_invoke_parameters["payload"],
response_timeout_in_seconds=method_invoke_parameters[
"responseTimeoutInSeconds"
],
connect_timeout_in_seconds=method_invoke_parameters[
"connectTimeoutInSeconds"
],
)
return await self.rest_endpoint.invoke_device_method(
self.connection_id,
device_id,
method_invoke,
timeout=adapter_config.default_api_timeout,
)
class ConnectionStatus(object):
@log_entry_and_exit
async def get_connection_status(self):
status = await self.rest_endpoint.get_connection_status(
self.connection_id, timeout=adapter_config.default_api_timeout
)
try:
return json.loads(status)
except ValueError:
return status
@log_entry_and_exit
async def wait_for_connection_status_change(self, connection_status):
status = await self.rest_endpoint.wait_for_connection_status_change(
self.connection_id,
connection_status,
timeout=adapter_config.default_api_timeout,
)
try:
return json.loads(status)
except ValueError:
return status
class C2d(object):
@log_entry_and_exit
async def enable_c2d(self):
await self.rest_endpoint.enable_c2d_messages(
self.connection_id, timeout=adapter_config.default_api_timeout
)
@log_entry_and_exit
async def wait_for_c2d_message(self):
return await self.rest_endpoint.wait_for_c2d_message(
self.connection_id, timeout=adapter_config.default_api_timeout
)
class ServiceSideOfTwin(object):
@log_entry_and_exit
async def get_module_twin(self, device_id, module_id):
twin = await self.rest_endpoint.get_module_twin(
self.connection_id,
device_id,
module_id,
timeout=adapter_config.default_api_timeout,
)
return twin.as_dict()
@log_entry_and_exit
async def patch_module_twin(self, device_id, module_id, patch):
await self.rest_endpoint.patch_module_twin(
self.connection_id,
device_id,
module_id,
patch,
timeout=adapter_config.default_api_timeout,
)
@log_entry_and_exit
async def get_device_twin(self, device_id):
twin = await self.rest_endpoint.get_device_twin(
self.connection_id, device_id, timeout=adapter_config.default_api_timeout
)
return twin.as_dict()
@log_entry_and_exit
async def patch_device_twin(self, device_id, patch):
await self.rest_endpoint.patch_device_twin(
self.connection_id,
device_id,
patch,
timeout=adapter_config.default_api_timeout,
)
class BlobUpload(object):
@log_entry_and_exit
async def get_storage_info_for_blob(self, blob_name):
return await self.rest_endpoint.get_storage_info_for_blob(
self.connection_id, blob_name
)
@log_entry_and_exit
async def notify_blob_upload_status(
self, correlation_id, is_success, status_code, status_description
):
await self.rest_endpoint.notify_blob_upload_status(
self.connection_id,
correlation_id,
is_success,
status_code,
status_description,
)
class DeviceApi(
Connect,
DeviceConnect,
C2d,
Telemetry,
Twin,
HandleMethods,
ConnectionStatus,
BlobUpload,
AbstractDeviceApi,
):
def __init__(self, hostname):
self.rest_endpoint = GeneratedAsyncApi(hostname).device
self.rest_endpoint.config.retry_policy.retries = 0
self.connection_id = ""
class ModuleApi(
Connect,
ModuleConnect,
Telemetry,
Twin,
InputsAndOutputs,
HandleMethods,
InvokeMethods,
ConnectionStatus,
AbstractModuleApi,
):
def __init__(self, hostname):
self.rest_endpoint = GeneratedAsyncApi(hostname).module
self.rest_endpoint.config.retry_policy.retries = 0
self.connection_id = ""
class RegistryApi(ServiceConnectDisconnect, ServiceSideOfTwin, AbstractRegistryApi):
def __init__(self, hostname):
self.rest_endpoint = GeneratedAsyncApi(hostname).registry
self.rest_endpoint.config.retry_policy.retries = 0
self.connection_id = ""
class ServiceApi(ServiceConnectDisconnect, InvokeMethods, AbstractServiceApi):
def __init__(self, hostname):
self.rest_endpoint = GeneratedAsyncApi(hostname).service
self.rest_endpoint.config.retry_policy.retries = 0
self.connection_id = ""
@log_entry_and_exit
async def send_c2d(self, device_id, event_body):
await self.rest_endpoint.send_c2d(
self.connection_id,
device_id,
EventBody(body=event_body),
timeout=adapter_config.default_api_timeout,
)
@log_entry_and_exit
async def get_blob_upload_status(self):
raise NotImplementedError()