def __init__()

in alibabacloud_credentials/provider/ecs_ram_role.py [0:0]


    def __init__(self, *,
                 role_name: str = None,
                 disable_imds_v1: bool = None,
                 http_options: HttpOptions = None,
                 async_update_enabled: bool = True):

        if au.environment_ecs_metadata_disabled.lower() == 'true':
            raise ValueError('IMDS credentials is disabled')

        self.__url_in_ecs_metadata = '/latest/meta-data/ram/security-credentials/'
        self.__url_in_ecs_metadata_token = '/latest/api/token'
        self.__ecs_metadata_fetch_error_msg = 'Failed to get RAM session credentials from ECS metadata service.'
        self.__ecs_metadata_token_fetch_error_msg = 'Failed to get token from ECS Metadata Service.'
        self.__metadata_service_host = '100.100.100.200'
        self._should_refresh = False

        self._role_name = role_name if role_name is not None else au.environment_ecs_metadata
        self._disable_imds_v1 = disable_imds_v1 if disable_imds_v1 is not None else au.environment_imds_v1_disabled.lower() == 'true'
        self._http_options = http_options if http_options is not None else HttpOptions()
        self._runtime_options = {
            'connectTimeout': self._http_options.connect_timeout if self._http_options.connect_timeout is not None else EcsRamRoleCredentialsProvider.DEFAULT_CONNECT_TIMEOUT,
            'readTimeout': self._http_options.read_timeout if self._http_options.read_timeout is not None else EcsRamRoleCredentialsProvider.DEFAULT_READ_TIMEOUT,
            'httpProxy': self._http_options.proxy
        }

        if async_update_enabled:
            self._credentials_cache = RefreshCachedSupplier(
                refresh_callable=self._refresh_credentials,
                refresh_callable_async=self._refresh_credentials_async,
                stale_value_behavior=StaleValueBehavior.ALLOW,
                prefetch_strategy=NonBlocking()
            )

            scheduler = BackgroundScheduler()

            def refresh_task():
                if self._should_refresh:
                    log.debug(f'Begin checking or refreshing credentials asynchronously')
                    self.get_credentials()

            scheduler.add_job(refresh_task, 'interval', minutes=1)
            scheduler.start()

            def shutdown_handler(signum, frame):
                log.debug(f'Shutting down scheduler...')
                scheduler.shutdown(wait=False)

            signal.signal(signal.SIGINT, shutdown_handler)
            signal.signal(signal.SIGTERM, shutdown_handler)

        else:
            self._credentials_cache = RefreshCachedSupplier(
                refresh_callable=self._refresh_credentials,
                refresh_callable_async=self._refresh_credentials_async,
                stale_value_behavior=StaleValueBehavior.ALLOW
            )