alibabacloud_credentials/provider/cli_profile.py (148 lines of code) (raw):

import os import json from typing import Any, Dict import aiofiles from alibabacloud_credentials.provider import StaticAKCredentialsProvider, EcsRamRoleCredentialsProvider, \ RamRoleArnCredentialsProvider, OIDCRoleArnCredentialsProvider, StaticSTSCredentialsProvider from .refreshable import Credentials from alibabacloud_credentials_api import ICredentialsProvider from alibabacloud_credentials.utils import auth_constant as ac from alibabacloud_credentials.utils import auth_util as au from alibabacloud_credentials.exceptions import CredentialException async def _load_config_async(file_path: str) -> Any: async with aiofiles.open(file_path, mode='r') as f: content = await f.read() return json.loads(content) def _load_config(file_path: str) -> Any: with open(file_path, mode='r') as f: content = f.read() return json.loads(content) class CLIProfileCredentialsProvider(ICredentialsProvider): def __init__(self, *, profile_name: str = None): self._profile_file = os.path.join(ac.HOME, ".aliyun/config.json") self._profile_name = profile_name or au.environment_profile_name self.__innerProvider = None def _should_reload_credentials_provider(self) -> bool: if self.__innerProvider is None: return True return False def get_credentials(self) -> Credentials: if au.environment_cli_profile_disabled.lower() == "true": raise CredentialException('cli credentials file is disabled') if self._should_reload_credentials_provider(): if not os.path.exists(self._profile_file) or not os.path.isfile(self._profile_file): raise CredentialException(f'unable to open credentials file: {self._profile_file}') try: config = _load_config(self._profile_file) except Exception as e: raise CredentialException( f'failed to parse credential form cli credentials file: {self._profile_file}') if config is None: raise CredentialException( f'failed to parse credential form cli credentials file: {self._profile_file}') profile_name = self._profile_name if self._profile_name is None or self._profile_name == '': profile_name = config.get('current') self.__innerProvider = self._get_credentials_provider(config, profile_name) cre = self.__innerProvider.get_credentials() credentials = Credentials( access_key_id=cre.get_access_key_id(), access_key_secret=cre.get_access_key_secret(), security_token=cre.get_security_token(), provider_name=f'{self.get_provider_name()}/{cre.get_provider_name()}' ) return credentials async def get_credentials_async(self) -> Credentials: if au.environment_cli_profile_disabled.lower() == "true": raise CredentialException('cli credentials file is disabled') if self._should_reload_credentials_provider(): if not os.path.exists(self._profile_file) or not os.path.isfile(self._profile_file): raise CredentialException(f'unable to open credentials file: {self._profile_file}') try: config = await _load_config_async(self._profile_file) except Exception as e: raise CredentialException( f'failed to parse credential form cli credentials file: {self._profile_file}') if config is None: raise CredentialException( f'failed to parse credential form cli credentials file: {self._profile_file}') profile_name = self._profile_name if self._profile_name is None or self._profile_name == '': profile_name = config.get('current') self.__innerProvider = self._get_credentials_provider(config, profile_name) cre = await self.__innerProvider.get_credentials_async() credentials = Credentials( access_key_id=cre.get_access_key_id(), access_key_secret=cre.get_access_key_secret(), security_token=cre.get_security_token(), provider_name=f'{self.get_provider_name()}/{cre.get_provider_name()}' ) return credentials def _get_credentials_provider(self, config: Dict, profile_name: str) -> ICredentialsProvider: if profile_name is None or profile_name == '': raise CredentialException('invalid profile name') profiles = config.get('profiles', []) if not profiles: raise CredentialException(f"unable to get profile with '{profile_name}' form cli credentials file.") for profile in profiles: if profile.get('name') is not None and profile['name'] == profile_name: mode = profile.get('mode') if mode == "AK": return StaticAKCredentialsProvider( access_key_id=profile.get('access_key_id'), access_key_secret=profile.get('access_key_secret') ) elif mode == "StsToken": return StaticSTSCredentialsProvider( access_key_id=profile.get('access_key_id'), access_key_secret=profile.get('access_key_secret'), security_token=profile.get('sts_token') ) elif mode == "RamRoleArn": pre_provider = StaticAKCredentialsProvider( access_key_id=profile.get('access_key_id'), access_key_secret=profile.get('access_key_secret') ) return RamRoleArnCredentialsProvider( credentials_provider=pre_provider, role_arn=profile.get('ram_role_arn'), role_session_name=profile.get('ram_session_name'), duration_seconds=profile.get('expired_seconds'), policy=profile.get('policy'), external_id=profile.get('external_id'), sts_region_id=profile.get('sts_region'), enable_vpc=profile.get('enable_vpc'), ) elif mode == "EcsRamRole": return EcsRamRoleCredentialsProvider( role_name=profile.get('ram_role_name') ) elif mode == "OIDC": return OIDCRoleArnCredentialsProvider( role_arn=profile.get('ram_role_arn'), oidc_provider_arn=profile.get('oidc_provider_arn'), oidc_token_file_path=profile.get('oidc_token_file'), role_session_name=profile.get('role_session_name'), duration_seconds=profile.get('expired_seconds'), policy=profile.get('policy'), sts_region_id=profile.get('sts_region'), enable_vpc=profile.get('enable_vpc'), ) elif mode == "ChainableRamRoleArn": previous_provider = self._get_credentials_provider(config, profile.get('source_profile')) return RamRoleArnCredentialsProvider( credentials_provider=previous_provider, role_arn=profile.get('ram_role_arn'), role_session_name=profile.get('ram_session_name'), duration_seconds=profile.get('expired_seconds'), policy=profile.get('policy'), external_id=profile.get('external_id'), sts_region_id=profile.get('sts_region'), enable_vpc=profile.get('enable_vpc'), ) else: raise CredentialException(f"unsupported profile mode '{mode}' form cli credentials file.") raise CredentialException(f"unable to get profile with '{profile_name}' form cli credentials file.") def get_provider_name(self) -> str: return 'cli_profile'