alibabacloud_credentials/provider/ecs_ram_role.py (210 lines of code) (raw):

import calendar import json import time import signal import logging from alibabacloud_credentials.provider.refreshable import Credentials, RefreshResult, StaleValueBehavior, \ RefreshCachedSupplier, NonBlocking from alibabacloud_credentials.http import HttpOptions from Tea.core import TeaCore from apscheduler.schedulers.background import BackgroundScheduler from alibabacloud_credentials_api import ICredentialsProvider from alibabacloud_credentials.utils import auth_util as au from alibabacloud_credentials.utils import parameter_helper as ph from alibabacloud_credentials.exceptions import CredentialException log = logging.getLogger(__name__) log.setLevel(logging.DEBUG) class EcsRamRoleCredentialsProvider(ICredentialsProvider): DEFAULT_METADATA_TOKEN_DURATION = 21600 DEFAULT_CONNECT_TIMEOUT = 1000 DEFAULT_READ_TIMEOUT = 1000 def __init__(self, *, role_name: str = None, disable_imds_v1: bool = None, http_options: HttpOptions = None, async_update_enabled: bool = True): if au.environment_ecs_metadata_disabled.lower() == 'true': raise ValueError('IMDS credentials is disabled') self.__url_in_ecs_metadata = '/latest/meta-data/ram/security-credentials/' self.__url_in_ecs_metadata_token = '/latest/api/token' self.__ecs_metadata_fetch_error_msg = 'Failed to get RAM session credentials from ECS metadata service.' self.__ecs_metadata_token_fetch_error_msg = 'Failed to get token from ECS Metadata Service.' self.__metadata_service_host = '100.100.100.200' self._should_refresh = False self._role_name = role_name if role_name is not None else au.environment_ecs_metadata self._disable_imds_v1 = disable_imds_v1 if disable_imds_v1 is not None else au.environment_imds_v1_disabled.lower() == 'true' self._http_options = http_options if http_options is not None else HttpOptions() self._runtime_options = { 'connectTimeout': self._http_options.connect_timeout if self._http_options.connect_timeout is not None else EcsRamRoleCredentialsProvider.DEFAULT_CONNECT_TIMEOUT, 'readTimeout': self._http_options.read_timeout if self._http_options.read_timeout is not None else EcsRamRoleCredentialsProvider.DEFAULT_READ_TIMEOUT, 'httpProxy': self._http_options.proxy } if async_update_enabled: self._credentials_cache = RefreshCachedSupplier( refresh_callable=self._refresh_credentials, refresh_callable_async=self._refresh_credentials_async, stale_value_behavior=StaleValueBehavior.ALLOW, prefetch_strategy=NonBlocking() ) scheduler = BackgroundScheduler() def refresh_task(): if self._should_refresh: log.debug(f'Begin checking or refreshing credentials asynchronously') self.get_credentials() scheduler.add_job(refresh_task, 'interval', minutes=1) scheduler.start() def shutdown_handler(signum, frame): log.debug(f'Shutting down scheduler...') scheduler.shutdown(wait=False) signal.signal(signal.SIGINT, shutdown_handler) signal.signal(signal.SIGTERM, shutdown_handler) else: self._credentials_cache = RefreshCachedSupplier( refresh_callable=self._refresh_credentials, refresh_callable_async=self._refresh_credentials_async, stale_value_behavior=StaleValueBehavior.ALLOW ) def get_credentials(self) -> Credentials: return self._credentials_cache._sync_call() async def get_credentials_async(self) -> Credentials: return await self._credentials_cache._async_call() def _get_role_name(self, url: str = None) -> str: tea_request = ph.get_new_request() tea_request.headers['host'] = url if url else self.__metadata_service_host metadata_token = self._get_metadata_token(url) if metadata_token is not None: tea_request.headers['X-aliyun-ecs-metadata-token'] = metadata_token if not url: tea_request.pathname = self.__url_in_ecs_metadata response = TeaCore.do_action(tea_request, self._runtime_options) if response.status_code != 200: raise CredentialException(self.__ecs_metadata_fetch_error_msg + ' HttpCode=' + str(response.status_code)) return response.body.decode('utf-8') async def _get_role_name_async(self, url: str = None) -> str: tea_request = ph.get_new_request() tea_request.headers['host'] = url if url else self.__metadata_service_host metadata_token = await self._get_metadata_token_async(url) if metadata_token is not None: tea_request.headers['X-aliyun-ecs-metadata-token'] = metadata_token if not url: tea_request.pathname = self.__url_in_ecs_metadata response = await TeaCore.async_do_action(tea_request, self._runtime_options) if response.status_code != 200: raise CredentialException(self.__ecs_metadata_fetch_error_msg + ' HttpCode=' + str(response.status_code)) return response.body.decode('utf-8') def _get_metadata_token(self, url: str = None) -> str: tea_request = ph.get_new_request() tea_request.method = 'PUT' tea_request.headers['host'] = url if url else self.__metadata_service_host tea_request.headers['X-aliyun-ecs-metadata-token-ttl-seconds'] = str( EcsRamRoleCredentialsProvider.DEFAULT_METADATA_TOKEN_DURATION) if not url: tea_request.pathname = self.__url_in_ecs_metadata_token try: response = TeaCore.do_action(tea_request, self._runtime_options) if response.status_code != 200: raise CredentialException( self.__ecs_metadata_token_fetch_error_msg + ' HttpCode=' + str(response.status_code)) return response.body.decode('utf-8') except Exception as e: if self._disable_imds_v1: raise e return None async def _get_metadata_token_async(self, url: str = None) -> str: tea_request = ph.get_new_request() tea_request.method = 'PUT' tea_request.headers['host'] = url if url else self.__metadata_service_host tea_request.headers['X-aliyun-ecs-metadata-token-ttl-seconds'] = str( EcsRamRoleCredentialsProvider.DEFAULT_METADATA_TOKEN_DURATION) if not url: tea_request.pathname = self.__url_in_ecs_metadata_token try: response = await TeaCore.async_do_action(tea_request, self._runtime_options) if response.status_code != 200: raise CredentialException( self.__ecs_metadata_token_fetch_error_msg + ' HttpCode=' + str(response.status_code)) return response.body.decode('utf-8') except Exception as e: if self._disable_imds_v1: raise e return None def _refresh_credentials(self, url: str = None) -> RefreshResult[Credentials]: role_name = self._role_name if self._role_name is None or self._role_name == '': role_name = self._get_role_name(url) tea_request = ph.get_new_request() tea_request.headers['host'] = url if url else self.__metadata_service_host metadata_token = self._get_metadata_token(url) if metadata_token is not None: tea_request.headers['X-aliyun-ecs-metadata-token'] = metadata_token if not url: tea_request.pathname = self.__url_in_ecs_metadata + role_name # request response = TeaCore.do_action(tea_request, self._runtime_options) if response.status_code != 200: raise CredentialException(self.__ecs_metadata_fetch_error_msg + ' HttpCode=' + str(response.status_code)) dic = json.loads(response.body.decode('utf-8')) content_code = dic.get('Code') content_access_key_id = dic.get('AccessKeyId') content_access_key_secret = dic.get('AccessKeySecret') content_security_token = dic.get('SecurityToken') content_expiration = dic.get('Expiration') if content_code != 'Success': raise CredentialException(self.__ecs_metadata_fetch_error_msg) # 先转换为时间数组 time_array = time.strptime(content_expiration, '%Y-%m-%dT%H:%M:%SZ') # 转换为时间戳 expiration = calendar.timegm(time_array) credentials = Credentials( access_key_id=content_access_key_id, access_key_secret=content_access_key_secret, security_token=content_security_token, expiration=expiration, provider_name=self.get_provider_name() ) self._should_refresh = True return RefreshResult(value=credentials, stale_time=self._get_stale_time(expiration), prefetch_time=self._get_prefetch_time(expiration)) async def _refresh_credentials_async(self, url: str = None) -> RefreshResult[Credentials]: role_name = self._role_name if self._role_name is None: role_name = await self._get_role_name_async(url) tea_request = ph.get_new_request() tea_request.headers['host'] = url if url else self.__metadata_service_host metadata_token = await self._get_metadata_token_async(url) if metadata_token is not None: tea_request.headers['X-aliyun-ecs-metadata-token'] = metadata_token if not url: tea_request.pathname = self.__url_in_ecs_metadata + role_name # request response = await TeaCore.async_do_action(tea_request, self._runtime_options) if response.status_code != 200: raise CredentialException(self.__ecs_metadata_fetch_error_msg + ' HttpCode=' + str(response.status_code)) dic = json.loads(response.body.decode('utf-8')) content_code = dic.get('Code') content_access_key_id = dic.get('AccessKeyId') content_access_key_secret = dic.get('AccessKeySecret') content_security_token = dic.get('SecurityToken') content_expiration = dic.get('Expiration') if content_code != 'Success': raise CredentialException(self.__ecs_metadata_fetch_error_msg) # 先转换为时间数组 time_array = time.strptime(content_expiration, '%Y-%m-%dT%H:%M:%SZ') # 转换为时间戳 expiration = calendar.timegm(time_array) credentials = Credentials( access_key_id=content_access_key_id, access_key_secret=content_access_key_secret, security_token=content_security_token, expiration=expiration, provider_name=self.get_provider_name() ) self._should_refresh = True return RefreshResult(value=credentials, stale_time=self._get_stale_time(expiration), prefetch_time=self._get_prefetch_time(expiration)) def _get_stale_time(self, expiration: int) -> int: if expiration < 0: return int(time.mktime(time.localtime())) + 60 * 60 return expiration - 15 * 60 def _get_prefetch_time(self, expiration: int) -> int: if expiration < 0: return int(time.mktime(time.localtime())) + 5 * 60 return int(time.mktime(time.localtime())) + 60 * 60 def get_provider_name(self) -> str: return 'ecs_ram_role'