in uamqp/authentication/cbs_auth.py [0:0]
def handle_token(self):
"""This function is called periodically to check the status of the current
token if there is one, and request a new one if needed.
If the token request fails, it will be retried according to the retry policy.
A token refresh will be attempted if the token will expire soon.
This function will return a tuple of two booleans. The first represents whether
the token authentication has not completed within it's given timeout window. The
second indicates whether the token negotiation is still in progress.
:raises: ~uamqp.errors.AuthenticationException if the token authentication fails.
:raises: ~uamqp.errors.TokenExpired if the token has expired and cannot be
refreshed.
:rtype: tuple[bool, bool]
"""
# pylint: disable=protected-access
timeout = False
in_progress = False
try:
self._connection.lock()
if self._connection._closing or self._connection._error:
return timeout, in_progress
auth_status = self._cbs_auth.get_status()
auth_status = constants.CBSAuthStatus(auth_status)
if auth_status == constants.CBSAuthStatus.Error:
if self.retries >= self._retry_policy.retries: # pylint: disable=no-member
_logger.warning("Authentication Put-Token failed. Retries exhausted.")
raise errors.TokenAuthFailure(*self._cbs_auth.get_failure_info())
error_code, error_description = self._cbs_auth.get_failure_info()
_logger.info("Authentication status: %r, description: %r", error_code, error_description)
_logger.info("Authentication Put-Token failed. Retrying.")
self.retries += 1 # pylint: disable=no-member
time.sleep(self._retry_policy.backoff)
self._cbs_auth.authenticate()
in_progress = True
elif auth_status == constants.CBSAuthStatus.Failure:
raise errors.AuthenticationException("Failed to open CBS authentication link.")
elif auth_status == constants.CBSAuthStatus.Expired:
raise errors.TokenExpired("CBS Authentication Expired.")
elif auth_status == constants.CBSAuthStatus.Timeout:
timeout = True
elif auth_status == constants.CBSAuthStatus.InProgress:
in_progress = True
elif auth_status == constants.CBSAuthStatus.RefreshRequired:
_logger.info("Token on connection %r will expire soon - attempting to refresh.",
self._connection.container_id)
self.update_token()
if self.token != self._prev_token:
self._cbs_auth.refresh(self.token, int(self.expires_at))
else:
_logger.info(
"The newly acquired token on connection %r is the same as the previous one,"
" will keep attempting to refresh",
self._connection.container_id
)
elif auth_status == constants.CBSAuthStatus.Idle:
self._cbs_auth.authenticate()
in_progress = True
elif auth_status != constants.CBSAuthStatus.Ok:
raise errors.AuthenticationException("Invalid auth state.")
except compat.TimeoutException:
_logger.debug("CBS auth timed out while waiting for lock acquisition.")
return None, None
except ValueError as e:
raise errors.AuthenticationException(
"Token authentication failed: {}".format(e))
finally:
self._connection.release()
return timeout, in_progress