test-runner/drop_scenario.py (117 lines of code) (raw):
# Copyright (c) Microsoft. All rights reserved.
# Licensed under the MIT license. See LICENSE file in the project root for
# full license information.
import pytest
import asyncio
import pytest_asyncio
from adapters import adapter_config
from horton_settings import settings
from horton_logging import logger
try:
async_fixture = pytest_asyncio.fixture
except AttributeError:
async_fixture = pytest.fixture
class DropScenarioBaseClass(object):
@pytest.fixture(scope="class", autouse=True)
def extend_rest_timeout(self, request):
previous_timeout = adapter_config.default_api_timeout
adapter_config.default_api_timeout = max(300, previous_timeout)
logger(
"Starting test class: Adjusting REST timeout to {} seconds".format(
adapter_config.default_api_timeout
)
)
def fin():
adapter_config.default_api_timeout = previous_timeout
logger(
"Finishing test class: Replacing old REST timeout of {} seconds".format(
previous_timeout
)
)
request.addfinalizer(fin)
@pytest.fixture(
params=[
pytest.param("DROP", id="Drop using iptables DROP"),
pytest.param("REJECT", id="Drop using iptables REJECT"),
]
)
def drop_mechanism(self, request):
"""
Parametrized fixture which lets our tests run against the full set
of dropping mechanisms. Every test in this file will run using each value
for this array of parameters.
"""
return request.param
@pytest.fixture
def test_module_transport(self):
return settings.test_module.transport
@async_fixture(autouse=True)
async def reconnect_after_each_test(self, system_control):
# if this test is going to drop packets, add a finalizer to make sure we always stop
# stop dropping it when we're done. Calling reconnect twice in a row is allowed.
try:
yield
finally:
logger("in finalizer: no longer dropping packets")
await system_control.reconnect_network()
async def start_dropping(
self, *, system_control, drop_mechanism, test_module_transport
):
logger("start drop packets")
await system_control.disconnect_network(drop_mechanism)
async def wait_for_disconnection_event(self, *, client):
status = await client.get_connection_status()
assert status == "connected"
logger("waiting for client disconnection event")
await client.wait_for_connection_status_change("disconnected")
logger("client disconnection event received")
async def stop_dropping(self, *, system_control):
logger("stop dropping packets")
await system_control.reconnect_network()
async def wait_for_reconnection_event(self, *, client):
logger("waiting for client reconnection event")
await client.wait_for_connection_status_change("connected")
logger("client reconnection event received")
class NetworkGlitchClientConnected(DropScenarioBaseClass):
"""
Tests using this class will disconnect the network before making
the API call that we're testing. This way, the SDK thinks it's still
connected, but it's not. The initial network call should fail, and the
SDK retry logic should catch this and retry the operation until it succeeds.
"""
@pytest.fixture
def before_api_call(
self, client, drop_mechanism, system_control, test_module_transport
):
async def func():
await client.connect2()
assert await client.get_connection_status() == "connected"
await self.start_dropping(
system_control=system_control,
drop_mechanism=drop_mechanism,
test_module_transport=test_module_transport,
)
return func
@pytest.fixture
def after_api_call(self, client, system_control):
async def func():
await self.wait_for_disconnection_event(client=client)
await asyncio.sleep(5)
await self.stop_dropping(system_control=system_control)
await self.wait_for_reconnection_event(client=client)
await asyncio.sleep(5)
return func
class NetworkGlitchClientDisconnected(DropScenarioBaseClass):
"""
Tests using this class will disconnect the network and wait for the
SDK to notice the disconnection before making the API call that we're
testing. This way, the SDK knows that it's not connected. The SDK
should try to reconnect the client and fail. Retry logic should catch
this and retry the operation until it succeeds.
"""
@pytest.fixture(
params=[
pytest.param("DROP", id="Drop using iptables DROP"),
# REJECT tests no longer work here after state machine changes
# pytest.param("REJECT", id="Drop using iptables REJECT"),
]
)
def drop_mechanism(self, request):
"""
Parametrized fixture which lets our tests run against the full set
of dropping mechanisms. Every test in this file will run using each value
for this array of parameters.
"""
return request.param
@pytest.fixture
def before_api_call(
self, client, drop_mechanism, system_control, test_module_transport
):
async def func():
await client.connect2()
assert await client.get_connection_status() == "connected"
await self.start_dropping(
system_control=system_control,
drop_mechanism=drop_mechanism,
test_module_transport=test_module_transport,
)
await self.wait_for_disconnection_event(client=client)
return func
@pytest.fixture
def after_api_call(self, client, system_control):
async def func():
await asyncio.sleep(5)
await self.stop_dropping(system_control=system_control)
await self.wait_for_reconnection_event(client=client)
await asyncio.sleep(5)
return func