test-runner/adapters/abstract_iothub_apis.py (179 lines of code) (raw):
# Copyright (c) Microsoft. All rights reserved.
# Licensed under the MIT license. See LICENSE file in the project root for
# full license information.
import six
import abc
@six.add_metaclass(abc.ABCMeta)
class ServiceConnectDisconnect(object):
@abc.abstractmethod
def connect(self, connection_string):
pass
@abc.abstractmethod
def disconnect(self):
pass
@six.add_metaclass(abc.ABCMeta)
class Connect(object):
@abc.abstractmethod
def connect(self, transport, connection_string, ca_certificate):
pass
@abc.abstractmethod
def disconnect(self):
pass
@abc.abstractmethod
def create_from_connection_string(
self, transport, connection_string, ca_certificate
):
pass
@abc.abstractmethod
def create_from_x509(self, transport, x509):
pass
@abc.abstractmethod
def connect2(self):
pass
@abc.abstractmethod
def reconnect(self, force_password_renewal=False):
pass
@abc.abstractmethod
def disconnect2(self):
pass
@abc.abstractmethod
def destroy(self):
pass
@six.add_metaclass(abc.ABCMeta)
class DeviceConnect(object):
@abc.abstractmethod
def create_from_symmetric_key(self, transport, device_id, hostname, symmetric_key):
pass
@six.add_metaclass(abc.ABCMeta)
class ModuleConnect(object):
@abc.abstractmethod
def connect_from_environment(self, transport):
pass
@abc.abstractmethod
def create_from_environment(self, transport):
pass
@abc.abstractmethod
def create_from_symmetric_key(
self, transport, device_id, module_id, hostname, symmetric_key
):
pass
@six.add_metaclass(abc.ABCMeta)
class Twin(object):
@abc.abstractmethod
def enable_twin(self):
pass
@abc.abstractmethod
def get_twin(self):
pass
@abc.abstractmethod
def patch_twin(self, patch):
pass
@abc.abstractmethod
def wait_for_desired_property_patch(self):
pass
@six.add_metaclass(abc.ABCMeta)
class HandleMethods(object):
@abc.abstractmethod
def enable_methods(self):
pass
@abc.abstractmethod
def wait_for_method_and_return_response(
self, method_name, status_code, request_payload, response_payload
):
pass
@six.add_metaclass(abc.ABCMeta)
class Telemetry(object):
@abc.abstractmethod
def send_event(self, body):
pass
@six.add_metaclass(abc.ABCMeta)
class InputsAndOutputs(object):
@abc.abstractmethod
def enable_input_messages(self):
pass
@abc.abstractmethod
def send_output_event(self, output_name, body):
pass
@abc.abstractmethod
def wait_for_input_event(self, input_name):
pass
@six.add_metaclass(abc.ABCMeta)
class InvokeMethods(object):
@abc.abstractmethod
def call_module_method(self, device_id, module_id, method_invoke_parameters):
pass
@abc.abstractmethod
def call_device_method(self, device_id, method_invoke_parameters):
pass
@six.add_metaclass(abc.ABCMeta)
class ConnectionStatus(object):
@abc.abstractmethod
def get_connection_status(self):
pass
@abc.abstractmethod
def wait_for_connection_status_change(self, connection_status):
pass
@six.add_metaclass(abc.ABCMeta)
class C2d(object):
@abc.abstractmethod
def enable_c2d(self):
pass
@abc.abstractmethod
def wait_for_c2d_message(self):
pass
@six.add_metaclass(abc.ABCMeta)
class ServiceSideOfTwin(object):
@abc.abstractmethod
def get_module_twin(self, device_id, module_id):
pass
@abc.abstractmethod
def patch_module_twin(self, device_id, module_id, patch):
pass
@abc.abstractmethod
def get_device_twin(self, device_id):
pass
@abc.abstractmethod
def patch_device_twin(self, device_id, patch):
pass
@six.add_metaclass(abc.ABCMeta)
class BlobUpload(object):
@abc.abstractmethod
def get_storage_info_for_blob(self, blob_name):
pass
@abc.abstractmethod
def notify_blob_upload_status(
self, correlation_id, is_success, status_code, status_description
):
pass
@six.add_metaclass(abc.ABCMeta)
class AbstractModuleApi(
Connect,
ModuleConnect,
Telemetry,
Twin,
InputsAndOutputs,
HandleMethods,
InvokeMethods,
ConnectionStatus,
):
pass
@six.add_metaclass(abc.ABCMeta)
class AbstractDeviceApi(
Connect,
DeviceConnect,
C2d,
Telemetry,
Twin,
HandleMethods,
ConnectionStatus,
BlobUpload,
):
pass
@six.add_metaclass(abc.ABCMeta)
class AbstractRegistryApi(ServiceConnectDisconnect, ServiceSideOfTwin):
pass
@six.add_metaclass(abc.ABCMeta)
class AbstractServiceApi(ServiceConnectDisconnect, InvokeMethods):
@abc.abstractmethod
def send_c2d(self, device_id, message):
pass
@abc.abstractmethod
def get_blob_upload_status(self):
pass