in uamqp/authentication/cbs_auth_async.py [0:0]
def __init__(self, audience, uri,
get_token,
expires_in=datetime.timedelta(seconds=constants.AUTH_EXPIRATION_SECS),
expires_at=None,
port=None,
timeout=10,
retry_policy=TokenRetryPolicy(),
verify=None,
token_type=b"jwt",
http_proxy=None,
transport_type=TransportType.Amqp,
encoding='UTF-8',
**kwargs): # pylint: disable=no-member
self._retry_policy = retry_policy
self._encoding = encoding
self._refresh_window = kwargs.pop("refresh_window", 0)
self._prev_token = None
self.uri = uri
parsed = compat.urlparse(uri) # pylint: disable=no-member
self.cert_file = verify
self.hostname = (kwargs.get("custom_endpoint_hostname") or parsed.hostname).encode(self._encoding)
is_coroutine(get_token)
self.get_token = get_token
self.audience = self._encode(audience)
self.token_type = self._encode(token_type)
self.token = None
self.expires_at, self.expires_in = self._set_expiry(expires_at, expires_in)
self.timeout = timeout
self.retries = 0
self.sasl = _SASL()
self.set_io(self.hostname, port, http_proxy, transport_type)