docker_images/pythonv2/wrapper/python_glue/connection_status.py (47 lines of code) (raw):
# Copyright (c) Microsoft. All rights reserved.
# Licensed under the MIT license. See LICENSE file in the project root for
# full license information.
import logging
from threading import Event
logger = logging.getLogger(__name__)
class ConnectionStatus(object):
def _get_pipeline(self):
try:
return self.client._mqtt_pipeline
except AttributeError:
return self.client._iothub_pipeline
def _attach_connect_event_watcher(self):
"""
Since the iothub clients don't expose on_connected and on_disconnected events,
we have to add our own.
"""
self.connected = False
self.connected_event = Event()
self.disconnected_event = Event()
old_on_connected = self._get_pipeline().on_connected
def new_on_connected():
logger.info("new_on_connected")
self.connected = True
self.connected_event.set()
old_on_connected()
self._get_pipeline().on_connected = new_on_connected
old_on_disconnected = self._get_pipeline().on_disconnected
def new_on_disconnected():
logger.info("new_on_disconnected")
self.connected = False
self.disconnected_event.set()
old_on_disconnected()
self._get_pipeline().on_disconnected = new_on_disconnected
def get_connection_status_sync(self):
if self.connected:
return "connected"
else:
return "disconnected"
def wait_for_connection_status_change_sync(self, connection_status):
if self.get_connection_status_sync() == connection_status:
logger.info("Client is alredy {}. Returning.".format(connection_status))
return connection_status
if self.connected:
logger.info("Client appears connected. Waiting for client to disconenct")
self.disconnected_event.clear()
self.disconnected_event.wait()
logger.info("client is disconnected. completing.")
else:
logger.info("Client appears disconnected. Waiting for client to conenct")
self.connected_event.clear()
self.connected_event.wait()
logger.info("client is connected. completing.")
return self.get_connection_status_sync()