def _refresh_credentials()

in alibabacloud_credentials/provider/ecs_ram_role.py [0:0]


    def _refresh_credentials(self, url: str = None) -> RefreshResult[Credentials]:
        role_name = self._role_name
        if self._role_name is None or self._role_name == '':
            role_name = self._get_role_name(url)
        tea_request = ph.get_new_request()
        tea_request.headers['host'] = url if url else self.__metadata_service_host
        metadata_token = self._get_metadata_token(url)
        if metadata_token is not None:
            tea_request.headers['X-aliyun-ecs-metadata-token'] = metadata_token
        if not url:
            tea_request.pathname = self.__url_in_ecs_metadata + role_name
        # request
        response = TeaCore.do_action(tea_request, self._runtime_options)

        if response.status_code != 200:
            raise CredentialException(self.__ecs_metadata_fetch_error_msg + ' HttpCode=' + str(response.status_code))

        dic = json.loads(response.body.decode('utf-8'))
        content_code = dic.get('Code')
        content_access_key_id = dic.get('AccessKeyId')
        content_access_key_secret = dic.get('AccessKeySecret')
        content_security_token = dic.get('SecurityToken')
        content_expiration = dic.get('Expiration')

        if content_code != 'Success':
            raise CredentialException(self.__ecs_metadata_fetch_error_msg)

        # 先转换为时间数组
        time_array = time.strptime(content_expiration, '%Y-%m-%dT%H:%M:%SZ')
        # 转换为时间戳
        expiration = calendar.timegm(time_array)
        credentials = Credentials(
            access_key_id=content_access_key_id,
            access_key_secret=content_access_key_secret,
            security_token=content_security_token,
            expiration=expiration,
            provider_name=self.get_provider_name()
        )
        self._should_refresh = True
        return RefreshResult(value=credentials,
                             stale_time=self._get_stale_time(expiration),
                             prefetch_time=self._get_prefetch_time(expiration))