alibabacloud_credentials/provider/default.py (74 lines of code) (raw):

from . import EnvironmentVariableCredentialsProvider, EcsRamRoleCredentialsProvider, \ OIDCRoleArnCredentialsProvider, URLCredentialsProvider, CLIProfileCredentialsProvider, ProfileCredentialsProvider from alibabacloud_credentials.provider.refreshable import Credentials from alibabacloud_credentials_api import ICredentialsProvider from alibabacloud_credentials.utils import auth_util as au from alibabacloud_credentials.exceptions import CredentialException class DefaultCredentialsProvider(ICredentialsProvider): def __init__(self, *, reuse_last_provider_enabled: bool = True): self.__reuse_last_provider_enabled = reuse_last_provider_enabled self.__last_used_provider = None self.__providers_chain = [ EnvironmentVariableCredentialsProvider() ] if au.enable_oidc_credential: self.__providers_chain.append(OIDCRoleArnCredentialsProvider()) self.__providers_chain.append(CLIProfileCredentialsProvider()) self.__providers_chain.append(ProfileCredentialsProvider()) if au.environment_ecs_metadata_disabled.lower() != 'true': self.__providers_chain.append(EcsRamRoleCredentialsProvider()) if au.environment_credentials_uri is not None and au.environment_credentials_uri != '': self.__providers_chain.append(URLCredentialsProvider()) def get_credentials(self) -> Credentials: if self.__reuse_last_provider_enabled and self.__last_used_provider is not None: credentials = self.__last_used_provider.get_credentials() return Credentials( access_key_id=credentials.get_access_key_id(), access_key_secret=credentials.get_access_key_secret(), security_token=credentials.get_security_token(), provider_name=f'{self.get_provider_name()}/{credentials.get_provider_name()}' ) error_messages = [] for provider in self.__providers_chain: try: credentials = provider.get_credentials() if credentials is not None: self.__last_used_provider = provider return Credentials( access_key_id=credentials.get_access_key_id(), access_key_secret=credentials.get_access_key_secret(), security_token=credentials.get_security_token(), provider_name=f'{self.get_provider_name()}/{credentials.get_provider_name()}' ) except Exception as e: error_messages.append(f'{type(provider).__name__}: {str(e)}') raise CredentialException( f'unable to load credentials from any of the providers in the chain: {error_messages}') async def get_credentials_async(self) -> Credentials: if self.__reuse_last_provider_enabled and self.__last_used_provider is not None: credentials = await self.__last_used_provider.get_credentials_async() return Credentials( access_key_id=credentials.get_access_key_id(), access_key_secret=credentials.get_access_key_secret(), security_token=credentials.get_security_token(), provider_name=f'{self.get_provider_name()}/{credentials.get_provider_name()}' ) error_messages = [] for provider in self.__providers_chain: try: credentials = await provider.get_credentials_async() if credentials is not None: self.__last_used_provider = provider return Credentials( access_key_id=credentials.get_access_key_id(), access_key_secret=credentials.get_access_key_secret(), security_token=credentials.get_security_token(), provider_name=f'{self.get_provider_name()}/{credentials.get_provider_name()}' ) except Exception as e: error_messages.append(f'{type(provider).__name__}: {str(e)}') raise CredentialException( f'unable to load credentials from any of the providers in the chain: {error_messages}') def get_provider_name(self) -> str: return 'default'