uamqp/authentication/common.py (132 lines of code) (raw):
#-------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
#--------------------------------------------------------------------------
# pylint: disable=super-init-not-called,no-self-use
import logging
import certifi
from uamqp import c_uamqp, constants
from uamqp.constants import TransportType
_logger = logging.getLogger(__name__)
class AMQPAuth(object):
"""AMQP authentication mixin.
:param hostname: The AMQP endpoint hostname.
:type hostname: str or bytes
:param port: The TLS port - default for AMQP is 5671.
:type port: int
:param verify: The path to a user-defined certificate.
:type verify: str
:param http_proxy: HTTP proxy configuration. This should be a dictionary with
the following keys present: 'proxy_hostname' and 'proxy_port'. Additional optional
keys are 'username' and 'password'.
:type http_proxy: dict
:param transport_type: The transport protocol type - default is ~uamqp.TransportType.Amqp.
~uamqp.TransportType.AmqpOverWebsocket is applied when http_proxy is set or the
transport type is explicitly requested.
:type transport_type: ~uamqp.TransportType
:param encoding: The encoding to use if hostname is provided as a str.
Default is 'UTF-8'.
:type encoding: str
"""
def __init__(self, hostname, port=None, verify=None, http_proxy=None,
transport_type=TransportType.Amqp, encoding='UTF-8'):
self._encoding = encoding
self.hostname = self._encode(hostname)
self.cert_file = verify
self.sasl = _SASL()
self.set_io(self.hostname, port, http_proxy, transport_type)
def _build_proxy_config(self, hostname, port, proxy_settings):
config = c_uamqp.HTTPProxyConfig()
config.hostname = hostname
config.port = port
for key, value in proxy_settings.items():
proxy_settings[key] = self._encode(value)
config.proxy_hostname = proxy_settings['proxy_hostname']
config.proxy_port = proxy_settings['proxy_port']
username = proxy_settings.get('username')
if username:
config.username = username
password = proxy_settings.get('password')
if password:
config.password = password
return config
def _encode(self, value):
return value.encode(self._encoding) if isinstance(value, str) else value
def set_io(self, hostname, port, http_proxy, transport_type):
if transport_type and transport_type.value == TransportType.AmqpOverWebsocket.value or http_proxy is not None:
self.set_wsio(hostname, port or constants.DEFAULT_AMQP_WSS_PORT, http_proxy)
else:
self.set_tlsio(hostname, port or constants.DEFAULT_AMQPS_PORT)
def set_wsio(self, hostname, port, http_proxy):
"""Setup the default underlying Web Socket IO layer.
:param hostname: The endpoint hostname.
:type hostname: bytes
:param port: The WSS port.
:type port: int
"""
_wsio_config = c_uamqp.WSIOConfig()
_wsio_config.hostname = hostname
_wsio_config.port = port
_default_tlsio = c_uamqp.get_default_tlsio()
_tlsio_config = c_uamqp.TLSIOConfig()
_tlsio_config.hostname = hostname
_tlsio_config.port = port
if http_proxy:
proxy_config = self._build_proxy_config(hostname, port, http_proxy)
_tlsio_config.set_proxy_config(proxy_config)
_wsio_config.set_tlsio_config(_default_tlsio, _tlsio_config)
_underlying_xio = c_uamqp.xio_from_wsioconfig(_wsio_config) # pylint: disable=attribute-defined-outside-init
cert = self.cert_file or certifi.where()
with open(cert, 'rb') as cert_handle:
cert_data = cert_handle.read()
try:
_underlying_xio.set_certificates(cert_data)
except ValueError:
_logger.warning('Unable to set external certificates.')
self.sasl_client = _SASLClient(_underlying_xio, self.sasl) # pylint: disable=attribute-defined-outside-init
self.consumed = False # pylint: disable=attribute-defined-outside-init
def set_tlsio(self, hostname, port):
"""Setup the default underlying TLS IO layer. On Windows this is
Schannel, on Linux and MacOS this is OpenSSL.
:param hostname: The endpoint hostname.
:type hostname: bytes
:param port: The TLS port.
:type port: int
"""
_default_tlsio = c_uamqp.get_default_tlsio()
_tlsio_config = c_uamqp.TLSIOConfig()
_tlsio_config.hostname = hostname
_tlsio_config.port = int(port)
_underlying_xio = c_uamqp.xio_from_tlsioconfig(_default_tlsio, _tlsio_config) # pylint: disable=attribute-defined-outside-init
cert = self.cert_file or certifi.where()
with open(cert, 'rb') as cert_handle:
cert_data = cert_handle.read()
try:
_underlying_xio.set_certificates(cert_data)
except ValueError:
_logger.warning('Unable to set external certificates.')
self.sasl_client = _SASLClient(_underlying_xio, self.sasl) # pylint: disable=attribute-defined-outside-init
self.consumed = False # pylint: disable=attribute-defined-outside-init
def close(self):
"""Close the authentication layer and cleanup
all the authentication wrapper objects.
"""
self.sasl_client.close()
self.sasl.close()
class SASLPlain(AMQPAuth):
"""SASL Plain AMQP authentication.
This is SASL authentication using a basic username and password.
:param hostname: The AMQP endpoint hostname.
:type hostname: str or bytes
:param username: The authentication username.
:type username: bytes or str
:param password: The authentication password.
:type password: bytes or str
:param port: The TLS port - default for AMQP is 5671.
:type port: int
:param verify: The path to a user-defined certificate.
:type verify: str
:param http_proxy: HTTP proxy configuration. This should be a dictionary with
the following keys present: 'proxy_hostname' and 'proxy_port'. Additional optional
keys are 'username' and 'password'.
:type http_proxy: dict
:param transport_type: The transport protocol type - default is ~uamqp.TransportType.Amqp.
~uamqp.TransportType.AmqpOverWebsocket is applied when http_proxy is set or the
transport type is explicitly requested.
:type transport_type: ~uamqp.TransportType
:param encoding: The encoding to use if hostname and credentials
are provided as a str. Default is 'UTF-8'.
:type encoding: str
"""
def __init__(
self, hostname, username, password, port=None,
verify=None, http_proxy=None, transport_type=TransportType.Amqp, encoding='UTF-8'):
self._encoding = encoding
self.hostname = self._encode(hostname)
self.username = self._encode(username)
self.password = self._encode(password)
self.cert_file = verify
self.sasl = _SASLPlain(self.username, self.password)
self.set_io(self.hostname, port, http_proxy, transport_type)
class SASLAnonymous(AMQPAuth):
"""SASL Annoymous AMQP authentication mixin.
SASL connection with no credentials. If intending to use annoymous
auth to set up a CBS session once connected, use SASTokenAuth
or the CBSAuthMixin instead.
:param hostname: The AMQP endpoint hostname.
:type hostname: str or bytes
:param port: The TLS port - default for AMQP is 5671.
:type port: int
:param verify: The path to a user-defined certificate.
:type verify: str
:param http_proxy: HTTP proxy configuration. This should be a dictionary with
the following keys present: 'proxy_hostname' and 'proxy_port'. Additional optional
keys are 'username' and 'password'.
:type http_proxy: dict
:param transport_type: The transport protocol type - default is ~uamqp.TransportType.Amqp.
~uamqp.TransportType.AmqpOverWebsocket is applied when http_proxy is set or the
transport type is explicitly requested.
:type transport_type: ~uamqp.TransportType
:param encoding: The encoding to use if hostname is provided as a str.
Default is 'UTF-8'.
:type encoding: str
"""
def __init__(self, hostname, port=None, verify=None,
http_proxy=None, transport_type=TransportType.Amqp, encoding='UTF-8'):
self._encoding = encoding
self.hostname = self._encode(hostname)
self.cert_file = verify
self.sasl = _SASLAnonymous()
self.set_io(self.hostname, port, http_proxy, transport_type)
class _SASLClient(object):
def __init__(self, io, sasl):
"""Create a SASLClient.
This will own the input "io" and be responsible for its destruction.
"""
self._underlying_io = io
self._sasl = sasl
self._io_config = c_uamqp.SASLClientIOConfig(io, self._sasl.mechanism)
self._xio = c_uamqp.xio_from_saslioconfig(self._io_config)
def get_client(self):
return self._xio
def close(self):
self._xio.destroy()
self._underlying_io.destroy()
class _SASL(object):
def __init__(self):
self._interface = self._get_interface() # pylint: disable=assignment-from-none
self.mechanism = self._get_mechanism()
def _get_interface(self):
return None
def _get_mechanism(self):
return c_uamqp.get_sasl_mechanism()
def close(self):
self.mechanism.destroy()
class _SASLAnonymous(_SASL):
def _get_interface(self):
return c_uamqp.saslanonymous_get_interface()
def _get_mechanism(self):
return c_uamqp.get_sasl_mechanism(self._interface)
class _SASLPlain(_SASL):
def __init__(self, authcid, passwd, authzid=None):
self._sasl_config = c_uamqp.SASLPlainConfig()
self._sasl_config.authcid = authcid
self._sasl_config.passwd = passwd
if authzid:
self._sasl_config.authzid = authzid
super(_SASLPlain, self).__init__()
def _get_interface(self):
return c_uamqp.saslplain_get_interface()
def _get_mechanism(self):
return c_uamqp.get_plain_sasl_mechanism(self._interface, self._sasl_config)