uamqp/address.py (108 lines of code) (raw):
#-------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
#--------------------------------------------------------------------------
import logging
from uamqp import c_uamqp, compat, constants, utils
_logger = logging.getLogger(__name__)
class Address(object):
"""Represents an AMQP endpoint.
:ivar address: The endpoint URL.
:vartype address: str
:ivar durable: Whether the endpoint is durable.
:vartype: bool
:ivar expiry_policy: The endpoint expiry policy
:ivar timeout: The endpoint timeout in seconds.
:vartype timeout: float
:ivar dynamic: Whether the endpoint is dynamic.
:vartype dynamic: bool
:ivar distribution_mode: The endpoint distribution mode.
:vartype distribution_mode: str
:param address: An AMQP endpoint URL.
:type address: str or bytes
:param encoding: The encoding used if address is supplied
as a str rather than bytes. Default is UTF-8.
"""
def __init__(self, address, encoding='UTF-8'):
address = address.encode(encoding) if isinstance(address, str) else address
self.parsed_address = self._validate_address(address)
self._encoding = encoding
self._address = None
addr = self.parsed_address.path
if self.parsed_address.hostname:
addr = self.parsed_address.hostname + addr
if self.parsed_address.scheme:
addr = self.parsed_address.scheme + b"://" + addr
self._c_address = c_uamqp.string_value(addr)
@classmethod
def from_c_obj(cls, c_value, encoding='UTF-8'):
address = c_value.address
py_obj = cls(address, encoding=encoding)
py_obj._address = c_value # pylint: disable=protected-access
return py_obj
def __repr__(self):
"""Get the Address as a URL.
:rtype: bytes
"""
return self.parsed_address.geturl()
def __str__(self):
"""Get the Address as a URL.
:rtype: str
"""
return self.parsed_address.geturl().decode(self._encoding)
@property
def hostname(self):
return self.parsed_address.hostname.decode(self._encoding)
@property
def scheme(self):
return self.parsed_address.scheme.decode(self._encoding)
@property
def username(self):
if self.parsed_address.username:
return self.parsed_address.username.decode(self._encoding)
return None
@property
def password(self):
if self.parsed_address.password:
return self.parsed_address.password.decode(self._encoding)
return None
@property
def address(self):
return self._address.address.decode(self._encoding)
@property
def durable(self):
return self._address.durable
@durable.setter
def durable(self, value):
self._address.durable = value
@property
def expiry_policy(self):
return self._address.expiry_policy
@expiry_policy.setter
def expiry_policy(self, value):
self._address.expiry_policy = value
@property
def timeout(self):
return self._address.timeout
@timeout.setter
def timeout(self, value):
self._address.timeout = value
@property
def dynamic(self):
return self._address.dynamic
@dynamic.setter
def dynamic(self, value):
self._address.dynamic = value
@property
def distribution_mode(self):
return self._address.distribution_mode.decode(self._encoding)
@distribution_mode.setter
def distribution_mode(self, value):
mode = value.encode(self._encoding) if isinstance(value, str) else value
self._address.distribution_mode = mode
def _validate_address(self, address):
"""Confirm that supplied address is a valid URL and
has an `amqp` or `amqps` scheme.
:param address: The endpiont URL.
:type address: str
:rtype: ~urllib.parse.ParseResult
"""
parsed = compat.urlparse(address)
if not parsed.path:
raise ValueError("Invalid {} address: {}".format(
self.__class__.__name__, parsed))
return parsed
class Source(Address):
"""Represents an AMQP Source endpoint.
:ivar address: The endpoint URL.
:vartype address: str
:ivar durable: Whether the endpoint is durable.
:vartype: bool
:ivar expiry_policy: The endpoint expiry policy
:ivar timeout: The endpoint timeout in seconds.
:vartype timeout: float
:ivar dynamic: Whether the endpoint is dynamic.
:vartype dynamic: bool
:ivar distribution_mode: The endpoint distribution mode.
:vartype distribution_mode: str
:param address: An AMQP endpoint URL.
:type address: str or bytes
:param encoding: The encoding used if address is supplied
as a str rather than bytes. Default is UTF-8.
"""
def __init__(self, address, encoding='UTF-8'):
super(Source, self).__init__(address, encoding)
self.filter_key = constants.STRING_FILTER
self._address = c_uamqp.create_source()
self._address.address = self._c_address
def get_filter(self, name=constants.STRING_FILTER):
"""Get the filter on the source.
:param name: The name of the filter. This will be encoded as
an AMQP Symbol. By default this is set to b'apache.org:selector-filter:string'.
:type name: bytes
"""
try:
filter_key = c_uamqp.symbol_value(name)
return self._address.filter_set[filter_key].value
except (TypeError, KeyError):
return None
def set_filter(self, value, name=constants.STRING_FILTER, descriptor=constants.STRING_FILTER):
"""Set a filter on the endpoint. Only one filter
can be applied to an endpoint.
:param value: The filter to apply to the endpoint. Set to None for a NULL filter.
:type value: bytes or str or None
:param name: The name of the filter. This will be encoded as
an AMQP Symbol. By default this is set to b'apache.org:selector-filter:string'.
:type name: bytes
:param descriptor: The descriptor used if the filter is to be encoded as a described value.
This will be encoded as an AMQP Symbol. By default this is set to b'apache.org:selector-filter:string'.
Set to None if the filter should not be encoded as a described value.
:type descriptor: bytes or None
"""
value = value.encode(self._encoding) if isinstance(value, str) else value
filter_set = c_uamqp.dict_value()
filter_key = c_uamqp.symbol_value(name)
filter_value = utils.data_factory(value, encoding=self._encoding)
if value is not None and descriptor is not None:
descriptor = c_uamqp.symbol_value(descriptor)
filter_value = c_uamqp.described_value(descriptor, filter_value)
filter_set[filter_key] = filter_value
self._address.filter_set = filter_set
class Target(Address):
"""Represents an AMQP Target endpoint.
:ivar address: The endpoint URL.
:vartype address: str
:ivar durable: Whether the endpoint is durable.
:vartype: bool
:ivar expiry_policy: The endpoint expiry policy
:ivar timeout: The endpoint timeout in seconds.
:vartype timeout: float
:ivar dynamic: Whether the endpoint is dynamic.
:vartype dynamic: bool
:ivar distribution_mode: The endpoint distribution mode.
:vartype distribution_mode: str
:param address: An AMQP endpoint URL.
:type address: str or bytes
:param encoding: The encoding used if address is supplied
as a str rather than bytes. Default is UTF-8.
"""
def __init__(self, address, encoding='UTF-8'):
super(Target, self).__init__(address, encoding)
self._address = c_uamqp.create_target()
self._address.address = self._c_address