iact3/plugin/base_plugin.py (195 lines of code) (raw):
import abc
import asyncio
import logging
import os
from Tea.core import TeaCore
from Tea.exceptions import TeaException
from Tea.model import TeaModel
from alibabacloud_credentials import providers
from alibabacloud_credentials.client import Client
from alibabacloud_credentials.utils import auth_util
from alibabacloud_credentials.utils.auth_constant import HOME
from alibabacloud_tea_openapi.models import Config
from alibabacloud_tea_util.models import RuntimeOptions
from oslo_utils import importutils
from iact3.plugin import error_code
from iact3.util import pascal_to_snake
LOG = logging.getLogger(__name__)
DEFAULT_INI_CREDENTIAL_FILE_PATHS = [
os.path.join(HOME, '.alibabacloud/credentials.ini'),
os.path.join(HOME, '.aliyun/credentials.ini')
]
ASYNC_FLAG = '_async'
RUNTIME_OPTIONS_FLAG = '_with_options'
REQUEST_SUFFIX = 'Request'
class CredentialsProvider(providers.DefaultCredentialsProvider):
def __init__(self):
super().__init__()
self.user_configuration_providers = [
providers.EnvironmentVariableCredentialsProvider(),
providers.ProfileCredentialsProvider(path=self._get_credential_file_path())
]
role_name = auth_util.environment_ECSMeta_data
if role_name is not None:
self.user_configuration_providers.append(providers.EcsRamRoleCredentialProvider(role_name))
self.user_configuration_providers.append(providers.CredentialsUriProvider())
def _get_credential_file_path(self):
if auth_util.environment_credentials_file:
return auth_util.environment_credentials_file
for path in DEFAULT_INI_CREDENTIAL_FILE_PATHS:
if os.path.exists(path):
return path
class CredentialClient(Client):
def __init__(self, config=None):
if config is None:
provider = CredentialsProvider()
self.cloud_credential = provider.get_credentials()
return
super(CredentialClient, self).__init__(config)
class TeaSDKPlugin(metaclass=abc.ABCMeta):
product = None
def __init__(self,
region_id: str,
credential: CredentialClient = None,
config_kwargs: dict = None,
endpoint: str = None):
self.region_id = region_id
if not credential:
credential = CredentialClient()
self.credential = credential
if not config_kwargs:
config_kwargs = {}
config_kwargs.update(
region_id=region_id,
credential=credential
)
if endpoint:
config_kwargs.update(endpoint=endpoint)
self.config = Config(**config_kwargs)
self.endpoint = self.config.endpoint
self._client = None
self.runtime_option = RuntimeOptions(**self.runtime_kwargs())
@abc.abstractmethod
def api_client(self):
raise NotImplementedError
@abc.abstractmethod
def models_path(self, action_name):
raise NotImplementedError
def runtime_kwargs(self):
return {
'autoretry': True,
'max_attempts': 3
}
@property
def client(self):
if not self._client:
client = self.api_client()(self.config)
if not self.endpoint:
self.endpoint = getattr(client, '_endpoint', '')
return self.api_client()(self.config)
return self._client
async def send_request(self, request_name: str, ignoreException: bool=False, **kwargs) -> dict:
request = self._build_request(request_name, **kwargs)
api_name = self._get_api_name(request_name)
action_name = self._get_action_name(api_name)
func = getattr(self.client, action_name)
try:
resp = await func(request, self.runtime_option)
except TeaException as ex:
LOG.debug(f'plugin exception: {self.product} {self.endpoint} {api_name} {request.to_map()} {ex.data}')
if ignoreException:
return ex.data
raise ex
if not isinstance(resp, TeaModel):
LOG.error(f'plugin response: {self.product} {self.endpoint} {api_name} {request.to_map()} {resp}')
raise TeaException(dict(
code=error_code.UNKNOWN_ERROR,
message='The response of TeaSDK is not TeaModel.',
data=resp
))
resp = TeaCore.to_map(resp)
LOG.debug(f'plugin response: {self.product} {self.endpoint} {api_name} {request.to_map()} {resp}')
return resp.get('body', {})
def _get_api_name(self, request_name):
if request_name.endswith(REQUEST_SUFFIX):
suffix_len = len(REQUEST_SUFFIX)
api_name = request_name[:-suffix_len]
else:
api_name = request_name
return api_name
def _get_action_name(self, api_name):
action_name = pascal_to_snake(api_name)
if not action_name.endswith(RUNTIME_OPTIONS_FLAG):
action_name = f'{action_name}{RUNTIME_OPTIONS_FLAG}'
if not action_name.endswith(ASYNC_FLAG):
action_name = f'{action_name}{ASYNC_FLAG}'
return action_name
def _build_request(self, request_name, **kwargs):
if 'RegionId' not in kwargs:
kwargs['RegionId'] = self.region_id
if not request_name.endswith('Request'):
request_name = f'{request_name}Request'
class_path = self.models_path(request_name)
request = importutils.import_class(class_path)()
request = request.from_map(kwargs)
return request
@staticmethod
def _convert_tags(tags: dict, kwargs, tag_key='Tags'):
if not tags:
return
assert isinstance(tags, dict)
kwargs[tag_key] = [dict(Key=k, Value=v) for k, v in tags.items() if v is not None]
PAGE_NUMBER, PAGE_SIZE, TOTAL_COUNT, TOTAL_PAGES, TOTAL = \
'PageNumber', 'PageSize', 'TotalCount', 'TotalPages', 'Total'
PAGE_OUTER_KEY = None
async def fetch_all(self, request, kwargs, *keys):
kwargs = kwargs.copy()
if self.PAGE_SIZE not in kwargs:
kwargs[self.PAGE_SIZE] = 50
ex = []
result = []
# first fetch
kwargs[self.PAGE_NUMBER] = 1
resp = await self.send_request(request, **kwargs)
if self.PAGE_OUTER_KEY:
resp = resp.get(self.PAGE_OUTER_KEY)
values = self._get_from_resp(resp, *keys)
result.extend(values)
# calculate total pages
if self.TOTAL_COUNT in resp:
total_pages = (resp[self.TOTAL_COUNT] - 1) // kwargs[self.PAGE_SIZE] + 1
elif self.TOTAL in resp:
total_pages = (resp[self.TOTAL] - 1) // kwargs[self.PAGE_SIZE] + 1
else:
total_pages = resp[self.TOTAL_PAGES]
if total_pages <= 1:
return result
# concurrent fetch
async def fetch_one_page(page, params):
try:
params = params.copy()
params[self.PAGE_NUMBER] = page
resp = await self.send_request(request, **params)
if self.PAGE_OUTER_KEY:
resp = resp.get(self.PAGE_OUTER_KEY)
values = self._get_from_resp(resp, *keys)
result.extend(values)
except Exception as e:
LOG.error(e)
ex.append(e)
tasks = []
for i in range(2, total_pages + 1):
task = asyncio.create_task(fetch_one_page(i, kwargs))
tasks.append(task)
await asyncio.gather(*tasks)
if ex:
raise ex[0]
return result
@staticmethod
def _get_from_resp(resp, *keys):
values = []
last_index = len(keys) - 1
for i, key in enumerate(keys):
if i == last_index:
resp = resp.get(key, [])
values.extend(resp)
else:
resp = resp.get(key, {})
return values