in redshift_connector/plugin/jwt_credentials_provider.py [0:0]
def refresh(self: "JwtCredentialsProvider") -> None:
import boto3 # type: ignore
client = boto3.client("sts")
try:
_logger.debug("JWT: {}".format(self.jwt))
if self.jwt is None:
raise InterfaceError("Unable to refresh, no jwt provided")
jwt: str = self.process_jwt(self.jwt)
decoded_jwt: typing.Optional[typing.List[typing.Union[str, bytes]]] = self.decode_jwt(self.jwt)
self.db_user = self.derive_database_user(decoded_jwt)
response = client.assume_role_with_web_identity(
RoleArn=self.role_arn,
RoleSessionName=self.role_session_name,
WebIdentityToken=jwt,
DurationSeconds=self.duration if (self.duration is not None) and (self.duration > 0) else None,
)
if (
self.role_session_name is None
or self.role_session_name == ""
or self.role_session_name == JwtCredentialsProvider.DEFAULT_ROLE_SESSION_NAME
):
# Use user name as role session name for security purposes
self.role_session_name = self.db_user
stscred: typing.Dict[str, typing.Any] = response["Credentials"]
credentials: CredentialsHolder = CredentialsHolder(stscred)
credentials.set_metadata(self.read_metadata())
key: str = self.get_cache_key()
self.cache[key] = credentials
except client.exceptions.MalformedPolicyDocumentException as e:
_logger.error("MalformedPolicyDocumentException: %s", e)
raise e
except client.exceptions.PackedPolicyTooLargeException as e:
_logger.error("PackedPolicyTooLargeException: %s", e)
raise e
except client.exceptions.IDPRejectedClaimException as e:
_logger.error("IDPRejectedClaimException: %s", e)
raise e
except client.exceptions.InvalidIdentityTokenException as e:
_logger.error("InvalidIdentityTokenException: %s", e)
raise e
except client.exceptions.ExpiredTokenException as e:
_logger.error("ExpiredTokenException: %s", e)
raise e
except client.exceptions.RegionDisabledException as e:
_logger.error("RegionDisabledException: %s", e)
raise e
except Exception as e:
_logger.error("Other Exception: %s", e)
raise e