alibabacloud_credentials/providers.py (575 lines of code) (raw):

import calendar import configparser import json import os import time import requests from Tea.core import TeaCore from alibabacloud_credentials import credentials from alibabacloud_credentials.exceptions import CredentialException from alibabacloud_credentials.models import Config from alibabacloud_credentials.utils import auth_constant as ac from alibabacloud_credentials.utils import auth_util as au from alibabacloud_credentials.utils import parameter_helper as ph class AlibabaCloudCredentialsProvider: """BaseProvider class""" duration_seconds = 3600 timeout = 3000 def __init__(self, config=None): if isinstance(config, Config): self.type = config.type self.access_key_id = config.access_key_id self.access_key_secret = config.access_key_secret self.role_arn = config.role_arn self.role_session_name = config.role_session_name self.public_key_id = config.public_key_id self.role_name = config.role_name self.disable_imds_v1 = config.disable_imds_v1 self.oidc_provider_arn = config.oidc_provider_arn self.oidc_token_file_path = config.oidc_token_file_path self.private_key_file = config.private_key_file self.bearer_token = config.bearer_token self.security_token = config.security_token self.host = config.host self.timeout = config.timeout or AlibabaCloudCredentialsProvider.timeout self.connect_timeout = config.connect_timeout or AlibabaCloudCredentialsProvider.timeout self.proxy = config.proxy self.sts_endpoint = config.sts_endpoint def _set_arg(self, key, value): if value is not None: setattr(self, key, value) val = getattr(self, key, None) if val is None: setattr(self, key, None) def _verify_empty_args(self, *args, config): if None in args and config is None: raise CredentialException( '"%s" needs to receive a "model.Config" object or other necessary args' % self.__class__ ) def get_credentials(self): raise NotImplementedError('get_credentials() must be overridden') class DefaultCredentialsProvider(AlibabaCloudCredentialsProvider): def __init__(self): super().__init__() self.user_configuration_providers = [ EnvironmentVariableCredentialsProvider() ] if au.enable_oidc_credential: self.user_configuration_providers.append(OIDCRoleArnCredentialProvider( role_session_name=au.environment_role_session_name, role_arn=au.environment_role_arn, oidc_provider_arn=au.environment_oidc_provider_arn, oidc_token_file_path=au.environment_oidc_token_file )) self.user_configuration_providers.append(ProfileCredentialsProvider()) role_name = au.environment_ECSMeta_data if role_name is not None: self.user_configuration_providers.append(EcsRamRoleCredentialProvider(role_name)) self.user_configuration_providers.append(CredentialsUriProvider()) def get_credentials(self): for provider in self.user_configuration_providers: credential = provider.get_credentials() if credential is not None: return credential raise CredentialException("not found credentials") def add_credentials_provider(self, p): self.user_configuration_providers.append(p) def remove_credentials_provider(self, p): self.user_configuration_providers.remove(p) def contains_credentials_provider(self, p): return self.user_configuration_providers.__contains__(p) def clear_credentials_provider(self): self.user_configuration_providers.clear() class EcsRamRoleCredentialProvider(AlibabaCloudCredentialsProvider): """EcsRamRoleCredentialProvider""" default_metadata_token_duration = 21600 def __init__(self, role_name=None, config=None): self._verify_empty_args(role_name, config=config) super().__init__(config) self.__url_in_ecs_metadata = "/latest/meta-data/ram/security-credentials/" self.__url_in_ecs_metadata_token = "/latest/api/token" self.__ecs_metadata_fetch_error_msg = "Failed to get RAM session credentials from ECS metadata service." self.__ecs_metadata_token_fetch_error_msg = "Failed to get token from ECS Metadata Service." self.__metadata_service_host = "100.100.100.200" self._set_arg('role_name', role_name) self.disable_imds_v1 = au.environment_imds_v1_disabled and au.environment_imds_v1_disabled.lower() == 'true' if isinstance(config, Config): self.disable_imds_v1 = config.disable_imds_v1 is not None and config.disable_imds_v1 == True def _get_role_name(self, url=None): tea_request = ph.get_new_request() tea_request.headers['host'] = url if url else self.__metadata_service_host metadata_token = self._get_metadata_token(url) if metadata_token is not None: tea_request.headers['X-aliyun-ecs-metadata-token'] = metadata_token if not url: tea_request.pathname = self.__url_in_ecs_metadata response = TeaCore.do_action(tea_request) if response.status_code != 200: raise CredentialException(self.__ecs_metadata_fetch_error_msg + " HttpCode=" + str(response.status_code)) self.role_name = response.body.decode('utf-8') async def _get_role_name_async(self, url=None): tea_request = ph.get_new_request() tea_request.headers['host'] = url if url else self.__metadata_service_host metadata_token = await self._get_metadata_token_async(url) if metadata_token is not None: tea_request.headers['X-aliyun-ecs-metadata-token'] = metadata_token if not url: tea_request.pathname = self.__url_in_ecs_metadata response = await TeaCore.async_do_action(tea_request) if response.status_code != 200: raise CredentialException(self.__ecs_metadata_fetch_error_msg + " HttpCode=" + str(response.status_code)) self.role_name = response.body.decode('utf-8') def _get_metadata_token(self, url=None): tea_request = ph.get_new_request() tea_request.method = 'PUT' tea_request.headers['host'] = url if url else self.__metadata_service_host tea_request.headers['X-aliyun-ecs-metadata-token-ttl-seconds'] = str(self.default_metadata_token_duration) if not url: tea_request.pathname = self.__url_in_ecs_metadata_token try: response = TeaCore.do_action(tea_request) if response.status_code != 200: raise CredentialException( self.__ecs_metadata_token_fetch_error_msg + " HttpCode=" + str(response.status_code)) return response.body.decode('utf-8') except Exception as e: if self.disable_imds_v1: raise e return None async def _get_metadata_token_async(self, url=None): tea_request = ph.get_new_request() tea_request.method = 'PUT' tea_request.headers['host'] = url if url else self.__metadata_service_host tea_request.headers['X-aliyun-ecs-metadata-token-ttl-seconds'] = str(self.default_metadata_token_duration) if not url: tea_request.pathname = self.__url_in_ecs_metadata_token try: response = await TeaCore.async_do_action(tea_request) if response.status_code != 200: raise CredentialException( self.__ecs_metadata_token_fetch_error_msg + " HttpCode=" + str(response.status_code)) return response.body.decode('utf-8') except Exception as e: if self.disable_imds_v1: raise e return None def _create_credential(self, url=None): tea_request = ph.get_new_request() tea_request.headers['host'] = url if url else self.__metadata_service_host metadata_token = self._get_metadata_token(url) if metadata_token is not None: tea_request.headers['X-aliyun-ecs-metadata-token'] = metadata_token if not url: tea_request.pathname = self.__url_in_ecs_metadata + self.role_name # request response = TeaCore.do_action(tea_request) if response.status_code != 200: raise CredentialException(self.__ecs_metadata_fetch_error_msg + " HttpCode=" + str(response.status_code)) dic = json.loads(response.body.decode('utf-8')) content_code = dic.get('Code') content_access_key_id = dic.get('AccessKeyId') content_access_key_secret = dic.get('AccessKeySecret') content_security_token = dic.get('SecurityToken') content_expiration = dic.get('Expiration') if content_code != "Success": raise CredentialException(self.__ecs_metadata_fetch_error_msg) # 先转换为时间数组 time_array = time.strptime(content_expiration, "%Y-%m-%dT%H:%M:%SZ") # 转换为时间戳 time_stamp = calendar.timegm(time_array) return credentials.EcsRamRoleCredential(content_access_key_id, content_access_key_secret, content_security_token, time_stamp, self) def get_credentials(self): if self.role_name == "": self._get_role_name() return self._create_credential() async def _create_credential_async(self, url=None): tea_request = ph.get_new_request() tea_request.headers['host'] = url if url else self.__metadata_service_host metadata_token = await self._get_metadata_token_async(url) if metadata_token is not None: tea_request.headers['X-aliyun-ecs-metadata-token'] = metadata_token if not url: tea_request.pathname = self.__url_in_ecs_metadata + self.role_name # request response = await TeaCore.async_do_action(tea_request) if response.status_code != 200: raise CredentialException(self.__ecs_metadata_fetch_error_msg + " HttpCode=" + str(response.status_code)) dic = json.loads(response.body.decode('utf-8')) content_code = dic.get('Code') content_access_key_id = dic.get('AccessKeyId') content_access_key_secret = dic.get('AccessKeySecret') content_security_token = dic.get('SecurityToken') content_expiration = dic.get('Expiration') if content_code != "Success": raise CredentialException(self.__ecs_metadata_fetch_error_msg) # 先转换为时间数组 time_array = time.strptime(content_expiration, "%Y-%m-%dT%H:%M:%SZ") # 转换为时间戳 time_stamp = calendar.timegm(time_array) return credentials.EcsRamRoleCredential(content_access_key_id, content_access_key_secret, content_security_token, time_stamp, self) async def get_credentials_async(self): if self.role_name == "": await self._get_role_name_async() return await self._create_credential_async() class RamRoleArnCredentialProvider(AlibabaCloudCredentialsProvider): """RamRoleArnCredentialProvider""" def __init__(self, access_key_id=None, access_key_secret=None, role_session_name=None, role_arn=None, region_id=None, policy=None, config=None): self._verify_empty_args(access_key_id, access_key_secret, config=config) super().__init__(config) self._set_arg('role_arn', role_arn) self._set_arg('access_key_id', access_key_id) self._set_arg('access_key_secret', access_key_secret) self._set_arg('region_id', region_id) self._set_arg('role_session_name', role_session_name) self._set_arg('policy', policy) if region_id is None and au.environment_sts_region is not None: self._set_arg('region_id', au.environment_sts_region) if self.region_id is not None: self._set_arg('sts_endpoint', f'sts.{self.region_id}.aliyuncs.com') else: self._set_arg('sts_endpoint', 'sts.aliyuncs.com' if config is None or config.sts_endpoint is None else config.sts_endpoint) def get_credentials(self): return self._create_credentials() def _create_credentials(self): # 获取credential 先实现签名用工具类 tea_request = ph.get_new_request() tea_request.query = { 'Action': 'AssumeRole', 'Format': 'JSON', 'Version': '2015-04-01', 'DurationSeconds': str(self.duration_seconds), 'RoleArn': self.role_arn, 'AccessKeyId': self.access_key_id, 'RoleSessionName': self.role_session_name, 'SignatureMethod': 'HMAC-SHA1', 'SignatureVersion': '1.0' } tea_request.query["Timestamp"] = ph.get_iso_8061_date() tea_request.query["SignatureNonce"] = ph.get_uuid() if self.policy is not None: tea_request.query["Policy"] = self.policy string_to_sign = ph.compose_string_to_sign("GET", tea_request.query) signature = ph.sign_string(string_to_sign, self.access_key_secret + "&") tea_request.query["Signature"] = signature tea_request.protocol = 'https' tea_request.headers['host'] = self.sts_endpoint # request response = TeaCore.do_action(tea_request) if response.status_code == 200: dic = json.loads(response.body.decode('utf-8')) if "Credentials" in dic: cre = dic.get("Credentials") # 先转换为时间数组 time_array = time.strptime(cre.get("Expiration"), "%Y-%m-%dT%H:%M:%SZ") # 转换为时间戳 expiration = calendar.timegm(time_array) return credentials.RamRoleArnCredential(cre.get("AccessKeyId"), cre.get("AccessKeySecret"), cre.get("SecurityToken"), expiration, self) raise CredentialException(response.body.decode('utf-8')) async def get_credentials_async(self): return await self._create_credentials_async() async def _create_credentials_async(self): # 获取credential 先实现签名用工具类 tea_request = ph.get_new_request() tea_request.query = { 'Action': 'AssumeRole', 'Format': 'JSON', 'Version': '2015-04-01', 'DurationSeconds': str(self.duration_seconds), 'RoleArn': self.role_arn, 'AccessKeyId': self.access_key_id, 'RoleSessionName': self.role_session_name, 'SignatureMethod': 'HMAC-SHA1', 'SignatureVersion': '1.0' } tea_request.query["Timestamp"] = ph.get_iso_8061_date() tea_request.query["SignatureNonce"] = ph.get_uuid() if self.policy is not None: tea_request.query["Policy"] = self.policy string_to_sign = ph.compose_string_to_sign("GET", tea_request.query) signature = ph.sign_string(string_to_sign, self.access_key_secret + "&") tea_request.query["Signature"] = signature tea_request.protocol = 'https' tea_request.headers['host'] = self.sts_endpoint # request response = await TeaCore.async_do_action(tea_request) if response.status_code == 200: dic = json.loads(response.body.decode('utf-8')) if "Credentials" in dic: cre = dic.get("Credentials") # 先转换为时间数组 time_array = time.strptime(cre.get("Expiration"), "%Y-%m-%dT%H:%M:%SZ") # 转换为时间戳 expiration = calendar.timegm(time_array) return credentials.RamRoleArnCredential(cre.get("AccessKeyId"), cre.get("AccessKeySecret"), cre.get("SecurityToken"), expiration, self) raise CredentialException(response.body.decode('utf-8')) class OIDCRoleArnCredentialProvider(AlibabaCloudCredentialsProvider): """OIDCRoleArnCredentialProvider""" def __init__(self, role_session_name=None, role_arn=None, oidc_provider_arn=None, oidc_token_file_path=None, region_id=None, policy=None, config=None): self._verify_empty_args(role_arn, oidc_provider_arn, oidc_token_file_path, config=config) super().__init__(config) self._set_arg('role_arn', role_arn) self._set_arg('oidc_provider_arn', oidc_provider_arn) if oidc_token_file_path is not None: self._set_arg('oidc_token_file_path', oidc_token_file_path) elif config.oidc_token_file_path is not None: self._set_arg('oidc_token_file_path', oidc_token_file_path) elif au.environment_oidc_token_file is not None: self._set_arg('oidc_token_file_path', au.environment_oidc_token_file) else: raise CredentialException( 'The oidc_token_file_path does not exist and env ALIBABA_CLOUD_OIDC_TOKEN_FILE is none.') self._set_arg('region_id', region_id) self._set_arg('role_session_name', role_session_name) self._set_arg('policy', policy) if region_id is None and au.environment_sts_region is not None: self._set_arg('region_id', au.environment_sts_region) if self.region_id is not None: self._set_arg('sts_endpoint', f'sts.{self.region_id}.aliyuncs.com') else: self._set_arg('sts_endpoint', 'sts.aliyuncs.com' if config is None or config.sts_endpoint is None else config.sts_endpoint) def get_credentials(self): return self._create_credentials() def _create_credentials(self): # 获取credential 先实现签名用工具类 oidc_token = au.get_private_key(self.oidc_token_file_path) tea_request = ph.get_new_request() tea_request.query = { 'Action': 'AssumeRoleWithOIDC', 'Format': 'JSON', 'Version': '2015-04-01', 'DurationSeconds': str(self.duration_seconds), 'RoleArn': self.role_arn, 'OIDCProviderArn': self.oidc_provider_arn, 'OIDCToken': oidc_token, 'RoleSessionName': self.role_session_name or 'defaultSessionName' } tea_request.query["Timestamp"] = ph.get_iso_8061_date() tea_request.query["SignatureNonce"] = ph.get_uuid() if self.policy is not None: tea_request.query["Policy"] = self.policy tea_request.protocol = 'https' tea_request.headers['host'] = self.sts_endpoint # request response = TeaCore.do_action(tea_request) if response.status_code == 200: dic = json.loads(response.body.decode('utf-8')) if "Credentials" in dic: cre = dic.get("Credentials") # 先转换为时间数组 time_array = time.strptime(cre.get("Expiration"), "%Y-%m-%dT%H:%M:%SZ") # 转换为时间戳 expiration = calendar.timegm(time_array) return credentials.OIDCRoleArnCredential(cre.get("AccessKeyId"), cre.get("AccessKeySecret"), cre.get("SecurityToken"), expiration, self) raise CredentialException(response.body.decode('utf-8')) async def get_credentials_async(self): return await self._create_credentials_async() async def _create_credentials_async(self): # 获取credential 先实现签名用工具类 oidc_token = au.get_private_key(self.oidc_token_file_path) tea_request = ph.get_new_request() tea_request.query = { 'Action': 'AssumeRoleWithOIDC', 'Format': 'JSON', 'Version': '2015-04-01', 'DurationSeconds': str(self.duration_seconds), 'RoleArn': self.role_arn, 'OIDCProviderArn': self.oidc_provider_arn, 'OIDCToken': oidc_token, 'RoleSessionName': self.role_session_name or 'defaultSessionName' } tea_request.query["Timestamp"] = ph.get_iso_8061_date() tea_request.query["SignatureNonce"] = ph.get_uuid() if self.policy is not None: tea_request.query["Policy"] = self.policy tea_request.protocol = 'https' tea_request.headers['host'] = self.sts_endpoint # request response = await TeaCore.async_do_action(tea_request) if response.status_code == 200: dic = json.loads(response.body.decode('utf-8')) if "Credentials" in dic: cre = dic.get("Credentials") # 先转换为时间数组 time_array = time.strptime(cre.get("Expiration"), "%Y-%m-%dT%H:%M:%SZ") # 转换为时间戳 expiration = calendar.timegm(time_array) return credentials.OIDCRoleArnCredential(cre.get("AccessKeyId"), cre.get("AccessKeySecret"), cre.get("SecurityToken"), expiration, self) raise CredentialException(response.body.decode('utf-8')) class RsaKeyPairCredentialProvider(AlibabaCloudCredentialsProvider): def __init__(self, access_key_id=None, access_key_secret=None, region_id=None, config=None): self._verify_empty_args(access_key_id, access_key_secret, config=config) super().__init__(config) self._set_arg('access_key_id', access_key_id) self._set_arg('access_key_secret', access_key_secret) self._set_arg('region_id', region_id) async def get_credentials_async(self): return await self._create_credential_async() async def _create_credential_async(self, turl=None): tea_request = ph.get_new_request() tea_request.query = { 'Action': 'GenerateSessionAccessKey', 'Format': 'JSON', 'Version': '2015-04-01', 'DurationSeconds': str(self.duration_seconds), 'AccessKeyId': self.access_key_id, 'SignatureMethod': 'HMAC-SHA1', 'SignatureVersion': '1.0' } tea_request.query["Timestamp"] = ph.get_iso_8061_date() tea_request.query["SignatureNonce"] = ph.get_uuid() str_to_sign = ph.compose_string_to_sign('GET', tea_request.query) signature = ph.sign_string(str_to_sign, self.access_key_id + '&') tea_request.query['Signature'] = signature tea_request.protocol = 'https' tea_request.headers['host'] = turl if turl else 'sts.aliyuncs.com' # request response = await TeaCore.async_do_action(tea_request) if response.status_code == 200: dic = json.loads(response.body.decode('utf-8')) if "SessionAccessKey" in dic: cre = dic.get("SessionAccessKey") time_array = time.strptime(cre.get("Expiration"), "%Y-%m-%dT%H:%M:%SZ") expiration = calendar.timegm(time_array) return credentials.RsaKeyPairCredential(cre.get("SessionAccessKeyId"), cre.get("SessionAccessKeySecret"), expiration, self) raise CredentialException(response.body.decode('utf-8')) def get_credentials(self): return self._create_credential() def _create_credential(self, turl=None): tea_request = ph.get_new_request() tea_request.query = { 'Action': 'GenerateSessionAccessKey', 'Format': 'JSON', 'Version': '2015-04-01', 'DurationSeconds': str(self.duration_seconds), 'AccessKeyId': self.access_key_id, 'SignatureMethod': 'HMAC-SHA1', 'SignatureVersion': '1.0' } tea_request.query["Timestamp"] = ph.get_iso_8061_date() tea_request.query["SignatureNonce"] = ph.get_uuid() str_to_sign = ph.compose_string_to_sign('GET', tea_request.query) signature = ph.sign_string(str_to_sign, self.access_key_id + '&') tea_request.query['Signature'] = signature tea_request.protocol = 'https' tea_request.headers['host'] = turl if turl else 'sts.aliyuncs.com' # request response = TeaCore.do_action(tea_request) if response.status_code == 200: dic = json.loads(response.body.decode('utf-8')) if "SessionAccessKey" in dic: cre = dic.get("SessionAccessKey") time_array = time.strptime(cre.get("Expiration"), "%Y-%m-%dT%H:%M:%SZ") expiration = calendar.timegm(time_array) return credentials.RsaKeyPairCredential(cre.get("SessionAccessKeyId"), cre.get("SessionAccessKeySecret"), expiration, self) raise CredentialException(response.body.decode('utf-8')) class ProfileCredentialsProvider(AlibabaCloudCredentialsProvider): def __init__(self, path=None): super().__init__() self._set_arg('file_path', path) def parse_ini(self): file_path = self.file_path if self.file_path else au.environment_credentials_file if file_path is None: if not ac.HOME: return if os.path.exists(os.path.join(ac.HOME, "/.alibabacloud/credentials.ini")): # Support '/.alibabacloud/credentials.ini' is due to historical mistakes. # Please try to use '~/.alibabacloud/credentials.ini'. file_path = os.path.join(ac.HOME, "/.alibabacloud/credentials.ini") elif os.path.exists(os.path.join(ac.HOME, ".alibabacloud/credentials.ini")): file_path = os.path.join(ac.HOME, ".alibabacloud/credentials.ini") if file_path is None: return elif len(file_path) == 0: raise CredentialException("The specified credentials file is empty") # loads ini conf = configparser.ConfigParser() conf.read(file_path, encoding='utf-8') ini_map = dict(conf._sections) for k in dict(conf._sections): option = dict(ini_map[k]) for key, value in dict(ini_map[k]).items(): if '#' in value: option[key] = value.split('#')[0].strip() else: option[key] = value.strip() ini_map[k] = option client_config = ini_map.get(au.client_type) return client_config def get_credentials(self): client_config = self.parse_ini() if client_config is None: return return self._create_credential(client_config) def _create_credential(self, config): config_type = config.get(ac.INI_TYPE) if not config_type: raise CredentialException("The configured client type is empty") elif ac.INI_TYPE_ARN == config_type: return self._get_sts_assume_role_session_provider(config).get_credentials() elif ac.INI_TYPE_OIDC == config_type: return self._get_sts_oidc_role_session_provider(config).get_credentials() elif ac.INI_TYPE_KEY_PAIR == config_type: return self._get_sts_get_session_access_key_provider(config).get_credentials() elif ac.INI_TYPE_RAM == config_type: return self._get_instance_profile_provider(config).get_credentials() access_key_id = config.get(ac.INI_ACCESS_KEY_ID) access_key_secret = config.get(ac.INI_ACCESS_KEY_IDSECRET) if not access_key_id or not access_key_secret: return return credentials.AccessKeyCredential(access_key_id, access_key_secret) @staticmethod def _get_sts_assume_role_session_provider(config): access_key_id = config.get(ac.INI_ACCESS_KEY_ID) access_key_secret = config.get(ac.INI_ACCESS_KEY_IDSECRET) role_session_name = config.get(ac.INI_ROLE_SESSION_NAME) role_arn = config.get(ac.INI_ROLE_ARN) region_id = config.get(ac.DEFAULT_REGION) policy = config.get(ac.INI_POLICY) if not access_key_id or not access_key_secret: raise CredentialException("The configured access_key_id or access_key_secret is empty") if not role_session_name or not role_arn: raise CredentialException("The configured role_session_name or role_arn is empty") return RamRoleArnCredentialProvider( access_key_id, access_key_secret, role_session_name, role_arn, region_id, policy ) @staticmethod def _get_sts_oidc_role_session_provider(config): role_session_name = config.get(ac.INI_ROLE_SESSION_NAME) role_arn = config.get(ac.INI_ROLE_ARN) oidc_provider_arn = config.get(ac.INI_OIDC_PROVIDER_ARN) oidc_token_file_path = config.get(ac.INI_OIDC_TOKEN_FILE_PATH) region_id = config.get(ac.DEFAULT_REGION) policy = config.get(ac.INI_POLICY) if not role_arn: raise CredentialException("The configured role_arn is empty") if not oidc_provider_arn: raise CredentialException("The configured oidc_provider_arn is empty") return OIDCRoleArnCredentialProvider( role_session_name, role_arn, oidc_provider_arn, oidc_token_file_path, region_id, policy ) @staticmethod def _get_sts_get_session_access_key_provider(config): public_key_id = config.get(ac.INI_PUBLIC_KEY_ID) private_key_file = config.get(ac.INI_PRIVATE_KEY_FILE) if not private_key_file: raise CredentialException("The configured private_key_file is empty") private_key = au.get_private_key(private_key_file) if not public_key_id or not private_key: raise CredentialException("The configured public_key_id or private_key_file content is empty") return RsaKeyPairCredentialProvider(public_key_id, private_key) @staticmethod def _get_instance_profile_provider(config): role_name = config.get(ac.INI_ROLE_NAME) if not role_name: raise CredentialException("The configured role_name is empty") return EcsRamRoleCredentialProvider(role_name) class EnvironmentVariableCredentialsProvider(AlibabaCloudCredentialsProvider): def get_credentials(self): if 'default' != au.client_type: return access_key_id = au.environment_access_key_id access_key_secret = au.environment_access_key_secret security_token = au.environment_security_token if access_key_id is None or access_key_secret is None: return if len(access_key_id) == 0: raise CredentialException("Environment variable accessKeyId cannot be empty") if len(access_key_secret) == 0: raise CredentialException("Environment variable accessKeySecret cannot be empty") if security_token is not None and len(security_token) > 0: return credentials.StsCredential(access_key_id, access_key_secret, security_token) return credentials.AccessKeyCredential(access_key_id, access_key_secret) class CredentialsUriProvider(AlibabaCloudCredentialsProvider): def get_credentials(self): credentials_uri = os.environ.get('ALIBABA_CLOUD_CREDENTIALS_URI') if credentials_uri is None: return None return credentials.CredentialsURICredential(credentials_uri)