test-runner/conftest.py (200 lines of code) (raw):

# Copyright (c) Microsoft. All rights reserved. # Licensed under the MIT license. See LICENSE file in the project root for # full license information. import pytest import sys import logging from adapters import adapter_config from dump_object import dump_object import runtime_capabilities import scenarios from distutils.version import LooseVersion from horton_logging import set_logger from horton_settings import settings, ObjectWithAdapter from fixtures import ( # noqa: F401 eventhub, registry, friend, leaf_device, service, test_device, test_module, system_control, telemetry_payload, ) from hooks import ( # noqa: F401 pytest_runtest_logstart, pytest_runtest_logfinish, pytest_runtest_teardown, pytest_pyfunc_call, pytest_runtestloop, ) # default to logging.INFO logging.basicConfig(level=logging.INFO) # AMQP is chatty at INFO level. Dial this down to WARNING. logging.getLogger("uamqp").setLevel(level=logging.WARNING) logging.getLogger("paho").setLevel(level=logging.DEBUG) logging.getLogger("adapters.direct_azure_rest.amqp_service_client").setLevel( level=logging.WARNING ) # info level can leak credentials into the log logging.getLogger("azure.iot.device").setLevel(level=logging.INFO) logging.getLogger("azure.eventhub").setLevel(level=logging.DEBUG) logging.getLogger("asyncio").setLevel(logging.DEBUG) class Unbuffered(object): def __init__(self, stream): self.stream = stream def write(self, data): self.stream.write(data) self.stream.flush() def writelines(self, datas): self.stream.writelines(datas) self.stream.flush() def __getattr__(self, attr): return getattr(self.stream, attr) sys.stdout = Unbuffered(sys.stdout) sys.stderr = Unbuffered(sys.stderr) def pytest_addoption(parser): parser.addoption( "--scenario", help="scenario to run", required=True, type=str, choices=scenarios.scenarios.keys(), ) parser.addoption( "--local", action="store_true", default=False, help="run tests against local module (probably in debugger)", ) parser.addoption( "--transport", action="store", default="mqtt", help="transport to use for test", choices=["mqtt", "mqttws", "amqp", "amqpws"], ) parser.addoption( "--debug-container", action="store_true", default=False, help="adjust run for container debugging (disable timeouts)", ) parser.addoption( "--async", action="store_true", default=False, help="run async tests (currently pythonv2 only)", ) skip_for_c_connection_string = set( ["invokesModuleMethodCalls", "invokesDeviceMethodCalls"] ) def _get_marker(item, marker): if LooseVersion(pytest.__version__) < LooseVersion("3.6"): return item.get_marker(marker) else: return item.get_closest_marker(marker) def remove_tests_not_in_marker_list(items, markers): """ remove all items that don't have one of the specified markers set """ remaining = [] for item in items: keep = False for marker in markers: if _get_marker(item, marker): keep = True if keep: remaining.append(item) items[:] = remaining def skip_tests_by_marker(items, skiplist, reason): skip_me = pytest.mark.skip(reason=reason) for markname in skiplist: print("skipping {} because {}:".format(markname, reason)) for item in items: if markname in item.keywords: item.add_marker(skip_me) print(" " + str(item)) __tracebackhide__ = True def set_transport(transport): print("Using " + transport) settings.friend_module.transport = "mqtt" settings.horton.transport = transport settings.test_module.transport = transport settings.leaf_device.transport = transport settings.test_device.transport = transport def set_local_system_control(): if settings.system_control.adapter_address: settings.system_control.adapter_address = "http://localhost:{}".format( settings.system_control.container_port ) def set_local(): print("Running against local module") if settings.test_module.connection_type == "environment": settings.test_module.connection_type = "connection_string_with_edge_gateway" # any objects that were previously using the test module host port now use # the test module container port. for obj in (settings.test_module, settings.test_device, settings.leaf_device): if obj.host_port == settings.test_module.host_port: obj.adapter_address = "http://localhost:{}".format( settings.test_module.container_port ) set_local_system_control() def set_async(): if settings.test_module.device_id: settings.test_module.wrapper_api.set_flags_sync({"test_async": True}) else: raise Exception("--async specified, but test module does not support async") def add_service_settings(): class ServiceSettings: pass settings.eventhub = ObjectWithAdapter("eventhub", "eventhub") settings.eventhub.connection_string = settings.iothub.connection_string settings.eventhub.adapter_address = "direct_rest" settings.registry = ObjectWithAdapter("registry", "iothub_registry") settings.registry.connection_string = settings.iothub.connection_string settings.registry.adapter_address = settings.test_module.adapter_address settings.service = ObjectWithAdapter("service", "iothub_service") settings.service.connection_string = settings.iothub.connection_string settings.service.adapter_address = settings.test_module.adapter_address def adjust_surfaces_for_missing_implementations(): if ( settings.test_module.language not in runtime_capabilities.language_has_service_client ): settings.registry.adapter_address = "direct_rest" settings.service.adapter_address = "direct_rest" if ( settings.test_module.language not in runtime_capabilities.language_has_leaf_device_client ): settings.leaf_device.adapter_address = settings.friend_module.adapter_address settings.leaf_device.container_port = settings.friend_module.container_port settings.leaf_device.host_port = settings.friend_module.host_port settings.leaf_device.language = settings.friend_module.language # adapter has changed. recollect capabilities. runtime_capabilities.collect_capabilities(settings.leaf_device) if ( settings.test_module.language not in runtime_capabilities.language_has_full_device_client ): settings.test_device.adapter_address = None def only_include_scenario_tests(items, scenario_name): markers = scenarios.scenarios[scenario_name] remove_tests_not_in_marker_list(items, markers) def pytest_collection_modifyitems(config, items): print("") if not settings.test_module.connection_string: raise Exception( "settings are missing credentials. Please run `horton get_credentials` and try again" ) set_transport(config.getoption("--transport")) if config.getoption("--local"): set_local() set_logger() runtime_capabilities.collect_all_capabilities() if config.getoption("--async"): set_async() add_service_settings() adjust_surfaces_for_missing_implementations() only_include_scenario_tests(items, config.getoption("--scenario")) if getattr(config, "_origargs", None): adapter_config.logger("HORTON: starting run: {}".format(config._origargs)) elif getattr(config, "invocation_params", None): adapter_config.logger( "HORTON: starting run: {}".format(config.invocation_params.args) ) if config.getoption("--debug-container"): print("Debugging the container. Removing all timeouts") adapter_config.default_api_timeout = 3600 config._env_timeout = 0 dump_object(settings)