in uamqp/authentication/common.py [0:0]
def set_wsio(self, hostname, port, http_proxy):
"""Setup the default underlying Web Socket IO layer.
:param hostname: The endpoint hostname.
:type hostname: bytes
:param port: The WSS port.
:type port: int
"""
_wsio_config = c_uamqp.WSIOConfig()
_wsio_config.hostname = hostname
_wsio_config.port = port
_default_tlsio = c_uamqp.get_default_tlsio()
_tlsio_config = c_uamqp.TLSIOConfig()
_tlsio_config.hostname = hostname
_tlsio_config.port = port
if http_proxy:
proxy_config = self._build_proxy_config(hostname, port, http_proxy)
_tlsio_config.set_proxy_config(proxy_config)
_wsio_config.set_tlsio_config(_default_tlsio, _tlsio_config)
_underlying_xio = c_uamqp.xio_from_wsioconfig(_wsio_config) # pylint: disable=attribute-defined-outside-init
cert = self.cert_file or certifi.where()
with open(cert, 'rb') as cert_handle:
cert_data = cert_handle.read()
try:
_underlying_xio.set_certificates(cert_data)
except ValueError:
_logger.warning('Unable to set external certificates.')
self.sasl_client = _SASLClient(_underlying_xio, self.sasl) # pylint: disable=attribute-defined-outside-init
self.consumed = False # pylint: disable=attribute-defined-outside-init