uamqp/connection.py (208 lines of code) (raw):
#-------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
#--------------------------------------------------------------------------
import logging
import threading
import time
import uuid
import uamqp
from uamqp import c_uamqp, compat, errors, utils
_logger = logging.getLogger(__name__)
class Connection(object):
"""An AMQP Connection. A single Connection can have multiple Sessions, and
can be shared between multiple Clients.
:ivar max_frame_size: Maximum AMQP frame size. Default is 63488 bytes.
:vartype max_frame_size: int
:ivar channel_max: Maximum number of Session channels in the Connection.
:vartype channel_max: int
:ivar idle_timeout: Timeout in milliseconds after which the Connection will close
if there is no further activity.
:vartype idle_timeout: int
:ivar properties: Connection properties.
:vartype properties: dict
:param hostname: The hostname of the AMQP service with which to establish
a connection.
:type hostname: bytes or str
:param sasl: Authentication for the connection. If none is provided SASL Annoymous
authentication will be used.
:type sasl: ~uamqp.authentication.common.AMQPAuth
:param container_id: The name for the client, also known as the Container ID.
If no name is provided, a random GUID will be used.
:type container_id: str or bytes
:param max_frame_size: Maximum AMQP frame size. Default is 63488 bytes.
:type max_frame_size: int
:param channel_max: Maximum number of Session channels in the Connection.
:type channel_max: int
:param idle_timeout: Timeout in milliseconds after which the Connection will close
if there is no further activity.
:type idle_timeout: int
:param properties: Connection properties.
:type properties: dict
:param remote_idle_timeout_empty_frame_send_ratio: Ratio of empty frames to
idle time for Connections with no activity. Value must be between
0.0 and 1.0 inclusive. Default is 0.5.
:type remote_idle_timeout_empty_frame_send_ratio: float
:param debug: Whether to turn on network trace logs. If `True`, trace logs
will be logged at INFO level. Default is `False`.
:type debug: bool
:param encoding: The encoding to use for parameters supplied as strings.
Default is 'UTF-8'
:type encoding: str
"""
def __init__(self, hostname, sasl,
container_id=False,
max_frame_size=None,
channel_max=None,
idle_timeout=None,
properties=None,
remote_idle_timeout_empty_frame_send_ratio=None,
error_policy=None,
debug=False,
encoding='UTF-8'):
uamqp._Platform.initialize() # pylint: disable=protected-access
self.container_id = container_id if container_id else str(uuid.uuid4())
if isinstance(self.container_id, str):
self.container_id = self.container_id.encode(encoding)
self.hostname = hostname.encode(encoding) if isinstance(hostname, str) else hostname
self.auth = sasl
self._cbs = None
self.error_policy = error_policy or errors.ErrorPolicy()
self._debug = debug
self._conn = self._create_connection(sasl)
self._sessions = []
self._lock = threading.Lock()
self._state = c_uamqp.ConnectionState.UNKNOWN
self._encoding = encoding
self._settings = {}
self._error = None
self._closing = False
if max_frame_size:
self._settings['max_frame_size'] = max_frame_size
self.max_frame_size = max_frame_size
if channel_max:
self._settings['channel_max'] = channel_max
self.channel_max = channel_max
if idle_timeout:
self._settings['idle_timeout'] = idle_timeout
self.idle_timeout = idle_timeout
if properties:
self._settings['properties'] = properties
self.properties = properties
if remote_idle_timeout_empty_frame_send_ratio:
self._conn.remote_idle_timeout_empty_frame_send_ratio = remote_idle_timeout_empty_frame_send_ratio
def __enter__(self):
"""Open the Connection in a context manager."""
return self
def __exit__(self, *args):
"""Close the Connection when exiting a context manager."""
self.destroy()
def _create_connection(self, sasl):
if sasl.consumed:
raise ValueError("The supplied authentication has already been consumed by "
"another connection. Please create a fresh instance.")
sasl.consumed = True
conn = c_uamqp.create_connection(
sasl.sasl_client.get_client(),
self.hostname,
self.container_id,
self)
conn.set_trace(self._debug)
conn.subscribe_to_close_event(self)
return conn
def _close(self):
_logger.info("Shutting down connection %r.", self.container_id)
self._closing = True
if self._cbs:
self.auth.close_authenticator()
self._cbs = None
self._conn.destroy()
self.auth.close()
_logger.info("Connection shutdown complete %r.", self.container_id)
def _close_received(self, error):
"""Callback called when a connection CLOSE frame is received.
This callback will process the received CLOSE error to determine if
the connection is recoverable or whether it should be shutdown.
:param error: The error information from the close
frame.
:type error: ~uamqp.errors.ErrorResponse
"""
if error:
condition = error.condition
description = error.description
info = error.info
else:
condition = b"amqp:unknown-error"
description = None
info = None
_logger.info("Received Connection close event: %r\nConnection: %r\nDescription: %r\nDetails: %r",
condition, self.container_id, description, info)
self._error = errors._process_connection_error(self.error_policy, condition, description, info) # pylint: disable=protected-access
def _state_changed(self, previous_state, new_state):
"""Callback called whenever the underlying Connection undergoes
a change of state. This function wraps the states as Enums for logging
purposes.
:param previous_state: The previous Connection state.
:type previous_state: int
:param new_state: The new Connection state.
:type new_state: int
"""
try:
try:
_previous_state = c_uamqp.ConnectionState(previous_state)
except ValueError:
_previous_state = c_uamqp.ConnectionState.UNKNOWN
try:
_new_state = c_uamqp.ConnectionState(new_state)
except ValueError:
_new_state = c_uamqp.ConnectionState.UNKNOWN
self._state = _new_state
_logger.info("Connection %r state changed from %r to %r", self.container_id, _previous_state, _new_state)
if (_new_state == c_uamqp.ConnectionState.END and _previous_state != c_uamqp.ConnectionState.CLOSE_RCVD) or\
_new_state == c_uamqp.ConnectionState.ERROR:
if not self._closing and not self._error:
_logger.info("Connection with ID %r unexpectedly in an error state. Closing: %r, Error: %r",
self.container_id, self._closing, self._error)
condition = b"amqp:unknown-error"
description = b"Connection in an unexpected error state."
self._error = errors._process_connection_error(self.error_policy, condition, description, None) # pylint: disable=protected-access
except KeyboardInterrupt:
_logger.error("Received shutdown signal while updating connection state from {} to {}".format(
previous_state, new_state))
self._error = errors.AMQPClientShutdown()
def lock(self, timeout=3.0):
try:
if not self._lock.acquire(timeout=timeout): # pylint: disable=unexpected-keyword-arg
raise compat.TimeoutException("Failed to acquire connection lock.")
except TypeError: # Timeout isn't supported in Py2.7
self._lock.acquire()
def release(self):
try:
self._lock.release()
except (RuntimeError, threading.ThreadError):
pass
except:
_logger.debug("Got error when attempting to release connection lock.")
try:
self._lock.release()
except (RuntimeError, threading.ThreadError):
pass
raise
def destroy(self):
"""Close the connection, and close any associated
CBS authentication session.
"""
try:
self.lock()
_logger.debug("Unlocked connection %r to close.", self.container_id)
self._close()
finally:
self.release()
uamqp._Platform.deinitialize() # pylint: disable=protected-access
def redirect(self, redirect_error, auth):
"""Redirect the connection to an alternative endpoint.
:param redirect: The Link DETACH redirect details.
:type redirect: ~uamqp.errors.LinkRedirect
:param auth: Authentication credentials to the redirected endpoint.
:type auth: ~uamqp.authentication.common.AMQPAuth
"""
try:
self.lock()
_logger.info("Redirecting connection %r.", self.container_id)
if self.hostname == redirect_error.hostname:
return
if self._state != c_uamqp.ConnectionState.END:
_logger.info("Connection not closed yet - shutting down.")
self._close()
self.hostname = redirect_error.hostname
self.auth = auth
self._conn = self._create_connection(auth)
for setting, value in self._settings.items():
setattr(self, setting, value)
self._error = None
self._closing = False
finally:
_logger.info("Finished redirecting connection %r.", self.container_id)
self.release()
def work(self):
"""Perform a single Connection iteration."""
try:
raise self._error
except TypeError:
pass
except Exception as e:
_logger.warning("%r", e)
raise
try:
self.lock()
self._conn.do_work()
except compat.TimeoutException:
_logger.debug("Connection %r timed out while waiting for lock acquisition.", self.container_id)
finally:
self.release()
def sleep(self, seconds):
"""Lock the connection for a given number of seconds.
:param seconds: Length of time to lock the connection.
:type seconds: int
"""
try:
self.lock()
time.sleep(seconds)
except compat.TimeoutException:
_logger.debug("Connection %r timed out while waiting for lock acquisition.", self.container_id)
finally:
self.release()
@property
def max_frame_size(self):
return self._conn.max_frame_size
@max_frame_size.setter
def max_frame_size(self, value):
self._conn.max_frame_size = int(value)
@property
def channel_max(self):
return self._conn.channel_max
@channel_max.setter
def channel_max(self, value):
self._conn.channel_max = int(value)
@property
def idle_timeout(self):
return self._conn.idle_timeout
@idle_timeout.setter
def idle_timeout(self, value):
self._conn.idle_timeout = int(value)
@property
def properties(self):
return self._conn.properties
@properties.setter
def properties(self, value):
if not isinstance(value, dict):
raise TypeError("Connection properties must be a dictionary.")
self._conn.properties = utils.data_factory(value, encoding=self._encoding)
@property
def remote_max_frame_size(self):
return self._conn.remote_max_frame_size