uamqp/async_ops/connection_async.py (123 lines of code) (raw):
#-------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
#--------------------------------------------------------------------------
import asyncio
import logging
import uamqp
from uamqp import c_uamqp, connection
from uamqp.async_ops.utils import get_dict_with_loop_if_needed
_logger = logging.getLogger(__name__)
class ConnectionAsync(connection.Connection):
"""An Asynchronous AMQP Connection. A single Connection can have multiple
Sessions, and can be shared between multiple Clients.
:ivar max_frame_size: Maximum AMQP frame size. Default is 63488 bytes.
:vartype max_frame_size: int
:ivar channel_max: Maximum number of Session channels in the Connection.
:vartype channel_max: int
:ivar idle_timeout: Timeout in milliseconds after which the Connection will close
if there is no further activity.
:vartype idle_timeout: int
:ivar properties: Connection properties.
:vartype properties: dict
:param hostname: The hostname of the AMQP service with which to establish
a connection.
:type hostname: bytes or str
:param sasl: Authentication for the connection. If none is provided SASL Annoymous
authentication will be used.
:type sasl: ~uamqp.authentication.common.AMQPAuth
:param container_id: The name for the client, also known as the Container ID.
If no name is provided, a random GUID will be used.
:type container_id: str or bytes
:param max_frame_size: Maximum AMQP frame size. Default is 63488 bytes.
:type max_frame_size: int
:param channel_max: Maximum number of Session channels in the Connection.
:type channel_max: int
:param idle_timeout: Timeout in milliseconds after which the Connection will close
if there is no further activity.
:type idle_timeout: int
:param properties: Connection properties.
:type properties: dict
:param remote_idle_timeout_empty_frame_send_ratio: Ratio of empty frames to
idle time for Connections with no activity. Value must be between
0.0 and 1.0 inclusive. Default is 0.5.
:type remote_idle_timeout_empty_frame_send_ratio: float
:param debug: Whether to turn on network trace logs. If `True`, trace logs
will be logged at INFO level. Default is `False`.
:type debug: bool
:param encoding: The encoding to use for parameters supplied as strings.
Default is 'UTF-8'
:type encoding: str
"""
def __init__(self, hostname, sasl,
container_id=False,
max_frame_size=None,
channel_max=None,
idle_timeout=None,
properties=None,
remote_idle_timeout_empty_frame_send_ratio=None,
error_policy=None,
debug=False,
encoding='UTF-8',
loop=None):
self._internal_kwargs = get_dict_with_loop_if_needed(loop)
super(ConnectionAsync, self).__init__(
hostname, sasl,
container_id=container_id,
max_frame_size=max_frame_size,
channel_max=channel_max,
idle_timeout=idle_timeout,
properties=properties,
remote_idle_timeout_empty_frame_send_ratio=remote_idle_timeout_empty_frame_send_ratio,
error_policy=error_policy,
debug=debug,
encoding=encoding)
self._async_lock = asyncio.Lock(**self._internal_kwargs)
async def __aenter__(self):
"""Open the Connection in an async context manager."""
return self
async def __aexit__(self, *args):
"""Close the Connection when exiting an async context manager."""
_logger.debug("Exiting connection %r context.", self.container_id)
await self.destroy_async()
_logger.debug("Finished exiting connection %r context.", self.container_id)
async def _close_async(self):
_logger.info("Shutting down connection %r.", self.container_id)
self._closing = True
if self._cbs:
await self.auth.close_authenticator_async()
self._cbs = None
self._conn.destroy()
self.auth.close()
_logger.info("Connection shutdown complete %r.", self.container_id)
@property
def loop(self):
return self._internal_kwargs.get("loop")
async def lock_async(self, timeout=3.0):
await asyncio.wait_for(self._async_lock.acquire(), timeout=timeout, **self._internal_kwargs)
def release_async(self):
try:
self._async_lock.release()
except RuntimeError:
pass
except:
_logger.debug("Got error when attempting to release async connection lock.")
try:
self._async_lock.release()
except RuntimeError:
pass
raise
async def work_async(self):
"""Perform a single Connection iteration asynchronously."""
try:
raise self._error
except TypeError:
pass
except Exception as e:
_logger.warning("%r", e)
raise
try:
await self.lock_async()
if self._closing:
_logger.debug("Connection unlocked but shutting down.")
return
await asyncio.sleep(0, **self._internal_kwargs)
self._conn.do_work()
except asyncio.TimeoutError:
_logger.debug("Connection %r timed out while waiting for lock acquisition.", self.container_id)
finally:
await asyncio.sleep(0, **self._internal_kwargs)
self.release_async()
async def sleep_async(self, seconds):
"""Lock the connection for a given number of seconds.
:param seconds: Length of time to lock the connection.
:type seconds: int
"""
try:
await self.lock_async()
await asyncio.sleep(seconds, **self._internal_kwargs)
except asyncio.TimeoutError:
_logger.debug("Connection %r timed out while waiting for lock acquisition.", self.container_id)
finally:
self.release_async()
async def redirect_async(self, redirect_error, auth):
"""Redirect the connection to an alternative endpoint.
:param redirect: The Link DETACH redirect details.
:type redirect: ~uamqp.errors.LinkRedirect
:param auth: Authentication credentials to the redirected endpoint.
:type auth: ~uamqp.authentication.common.AMQPAuth
"""
_logger.info("Redirecting connection %r.", self.container_id)
try:
await self.lock_async()
if self.hostname == redirect_error.hostname:
return
if self._state != c_uamqp.ConnectionState.END:
await self._close_async()
self.hostname = redirect_error.hostname
self.auth = auth
self._conn = self._create_connection(auth)
for setting, value in self._settings.items():
setattr(self, setting, value)
self._error = None
self._closing = False
except asyncio.TimeoutError:
_logger.debug("Connection %r timed out while waiting for lock acquisition.", self.container_id)
finally:
self.release_async()
async def destroy_async(self):
"""Close the connection asynchronously, and close any associated
CBS authentication session.
"""
try:
await self.lock_async()
_logger.debug("Unlocked connection %r to close.", self.container_id)
await self._close_async()
except asyncio.TimeoutError:
_logger.debug(
"Connection %r timed out while waiting for lock acquisition on destroy. Destroying anyway.",
self.container_id)
await self._close_async()
finally:
self.release_async()
uamqp._Platform.deinitialize() # pylint: disable=protected-access