python/alibabacloud_opensearch/client.py (426 lines of code) (raw):

# -*- coding: utf-8 -*- # This file is auto-generated, don't edit it. Thanks. import time from Tea.exceptions import TeaException, UnretryableException from Tea.request import TeaRequest from Tea.core import TeaCore from alibabacloud_opensearch_util.opensearch_util import OpensearchUtil from typing import Dict, Any from alibabacloud_credentials.client import Client as CredentialClient from alibabacloud_opensearch import models as open_search_models from alibabacloud_tea_util.client import Client as UtilClient from alibabacloud_credentials import models as credential_models from alibabacloud_tea_util import models as util_models class Client: """ *\ """ _endpoint: str = None _protocol: str = None _user_agent: str = None _credential: CredentialClient = None def __init__( self, config: open_search_models.Config, ): if UtilClient.is_unset(config): raise TeaException({ 'name': 'ParameterMissing', 'message': "'config' can not be unset" }) if UtilClient.empty(config.type): config.type = 'access_key' credential_config = credential_models.Config( access_key_id=config.access_key_id, type=config.type, access_key_secret=config.access_key_secret, security_token=config.security_token ) self._credential = CredentialClient(credential_config) self._endpoint = config.endpoint self._protocol = config.protocol self._user_agent = config.user_agent def _request( self, method: str, pathname: str, query: Dict[str, Any], headers: Dict[str, str], body: Any, runtime: util_models.RuntimeOptions, ) -> Dict[str, Any]: runtime.validate() _runtime = { 'timeouted': 'retry', 'readTimeout': runtime.read_timeout, 'connectTimeout': runtime.connect_timeout, 'httpProxy': runtime.http_proxy, 'httpsProxy': runtime.https_proxy, 'noProxy': runtime.no_proxy, 'maxIdleConns': runtime.max_idle_conns, 'retry': { 'retryable': runtime.autoretry, 'maxAttempts': UtilClient.default_number(runtime.max_attempts, 3) }, 'backoff': { 'policy': UtilClient.default_string(runtime.backoff_policy, 'no'), 'period': UtilClient.default_number(runtime.backoff_period, 1) }, 'ignoreSSL': runtime.ignore_ssl } _last_request = None _last_exception = None _now = time.time() _retry_times = 0 while TeaCore.allow_retry(_runtime.get('retry'), _retry_times, _now): if _retry_times > 0: _backoff_time = TeaCore.get_backoff_time(_runtime.get('backoff'), _retry_times) if _backoff_time > 0: TeaCore.sleep(_backoff_time) _retry_times = _retry_times + 1 try: _request = TeaRequest() accesskey_id = self.get_access_key_id() access_key_secret = self.get_access_key_secret() _request.protocol = UtilClient.default_string(self._protocol, 'HTTP') _request.method = method _request.pathname = pathname _request.headers = TeaCore.merge({ 'user-agent': self.get_user_agent(), 'Date': OpensearchUtil.get_date(), 'host': UtilClient.default_string(self._endpoint, f'opensearch-cn-hangzhou.aliyuncs.com'), 'X-Opensearch-Nonce': UtilClient.get_nonce() }, headers) if not UtilClient.is_unset(query): _request.query = UtilClient.stringify_map_value(query) if not UtilClient.is_unset(body): req_body = UtilClient.to_jsonstring(body) _request.headers['Content-MD5'] = OpensearchUtil.get_content_md5(req_body) _request.headers['Content-Type'] = 'application/json' _request.body = req_body _request.headers['Authorization'] = OpensearchUtil.get_signature(_request, accesskey_id, access_key_secret) _last_request = _request _response = TeaCore.do_action(_request, _runtime) obj_str = UtilClient.read_as_string(_response.body) if UtilClient.is_4xx(_response.status_code) or UtilClient.is_5xx(_response.status_code): raise TeaException({ 'message': _response.status_message, 'data': obj_str, 'code': _response.status_code }) obj = UtilClient.parse_json(obj_str) res = UtilClient.assert_as_map(obj) return { 'body': res, 'headers': _response.headers } except Exception as e: if TeaCore.is_retryable(e): _last_exception = e continue raise e raise UnretryableException(_last_request, _last_exception) async def _request_async( self, method: str, pathname: str, query: Dict[str, Any], headers: Dict[str, str], body: Any, runtime: util_models.RuntimeOptions, ) -> Dict[str, Any]: runtime.validate() _runtime = { 'timeouted': 'retry', 'readTimeout': runtime.read_timeout, 'connectTimeout': runtime.connect_timeout, 'httpProxy': runtime.http_proxy, 'httpsProxy': runtime.https_proxy, 'noProxy': runtime.no_proxy, 'maxIdleConns': runtime.max_idle_conns, 'retry': { 'retryable': runtime.autoretry, 'maxAttempts': UtilClient.default_number(runtime.max_attempts, 3) }, 'backoff': { 'policy': UtilClient.default_string(runtime.backoff_policy, 'no'), 'period': UtilClient.default_number(runtime.backoff_period, 1) }, 'ignoreSSL': runtime.ignore_ssl } _last_request = None _last_exception = None _now = time.time() _retry_times = 0 while TeaCore.allow_retry(_runtime.get('retry'), _retry_times, _now): if _retry_times > 0: _backoff_time = TeaCore.get_backoff_time(_runtime.get('backoff'), _retry_times) if _backoff_time > 0: TeaCore.sleep(_backoff_time) _retry_times = _retry_times + 1 try: _request = TeaRequest() accesskey_id = await self.get_access_key_id_async() access_key_secret = await self.get_access_key_secret_async() _request.protocol = UtilClient.default_string(self._protocol, 'HTTP') _request.method = method _request.pathname = pathname _request.headers = TeaCore.merge({ 'user-agent': self.get_user_agent(), 'Date': OpensearchUtil.get_date(), 'host': UtilClient.default_string(self._endpoint, f'opensearch-cn-hangzhou.aliyuncs.com'), 'X-Opensearch-Nonce': UtilClient.get_nonce() }, headers) if not UtilClient.is_unset(query): _request.query = UtilClient.stringify_map_value(query) if not UtilClient.is_unset(body): req_body = UtilClient.to_jsonstring(body) _request.headers['Content-MD5'] = OpensearchUtil.get_content_md5(req_body) _request.headers['Content-Type'] = 'application/json' _request.body = req_body _request.headers['Authorization'] = OpensearchUtil.get_signature(_request, accesskey_id, access_key_secret) _last_request = _request _response = await TeaCore.async_do_action(_request, _runtime) obj_str = await UtilClient.read_as_string_async(_response.body) if UtilClient.is_4xx(_response.status_code) or UtilClient.is_5xx(_response.status_code): raise TeaException({ 'message': _response.status_message, 'data': obj_str, 'code': _response.status_code }) obj = UtilClient.parse_json(obj_str) res = UtilClient.assert_as_map(obj) return { 'body': res, 'headers': _response.headers } except Exception as e: if TeaCore.is_retryable(e): _last_exception = e continue raise e raise UnretryableException(_last_request, _last_exception) def search_ex( self, app_name: str, request: open_search_models.SearchRequestModel, runtime: util_models.RuntimeOptions, ) -> open_search_models.SearchResponseModel: """ 系统提供了丰富的搜索语法以满足用户各种场景下的搜索需求。 """ return TeaCore.from_map( open_search_models.SearchResponseModel(), self._request('GET', f'/v3/openapi/apps/{app_name}/search', TeaCore.to_map(request.query), request.headers, None, runtime) ) async def search_ex_async( self, app_name: str, request: open_search_models.SearchRequestModel, runtime: util_models.RuntimeOptions, ) -> open_search_models.SearchResponseModel: """ 系统提供了丰富的搜索语法以满足用户各种场景下的搜索需求。 """ return TeaCore.from_map( open_search_models.SearchResponseModel(), await self._request_async('GET', f'/v3/openapi/apps/{app_name}/search', TeaCore.to_map(request.query), request.headers, None, runtime) ) def search( self, app_name: str, request: open_search_models.SearchRequestModel, ) -> open_search_models.SearchResponseModel: """ 系统提供了丰富的搜索语法以满足用户各种场景下的搜索需求。 """ runtime = util_models.RuntimeOptions( connect_timeout=5000, read_timeout=10000, autoretry=False, ignore_ssl=False, max_idle_conns=50 ) return self.search_ex(app_name, request, runtime) async def search_async( self, app_name: str, request: open_search_models.SearchRequestModel, ) -> open_search_models.SearchResponseModel: """ 系统提供了丰富的搜索语法以满足用户各种场景下的搜索需求。 """ runtime = util_models.RuntimeOptions( connect_timeout=5000, read_timeout=10000, autoretry=False, ignore_ssl=False, max_idle_conns=50 ) return await self.search_ex_async(app_name, request, runtime) def suggest_ex( self, app_name: str, model_name: str, request: open_search_models.SuggestRequestModel, runtime: util_models.RuntimeOptions, ) -> open_search_models.SuggestResponseModel: """ 下拉提示是搜索服务的基础功能,在用户输入查询词的过程中,智能推荐候选query,减少用户输入,帮助用户尽快找到想要的内容。 """ return TeaCore.from_map( open_search_models.SuggestResponseModel(), self._request('GET', f'/v3/openapi/apps/{app_name}/suggest/{model_name}/search', TeaCore.to_map(request.query), request.headers, None, runtime) ) async def suggest_ex_async( self, app_name: str, model_name: str, request: open_search_models.SuggestRequestModel, runtime: util_models.RuntimeOptions, ) -> open_search_models.SuggestResponseModel: """ 下拉提示是搜索服务的基础功能,在用户输入查询词的过程中,智能推荐候选query,减少用户输入,帮助用户尽快找到想要的内容。 """ return TeaCore.from_map( open_search_models.SuggestResponseModel(), await self._request_async('GET', f'/v3/openapi/apps/{app_name}/suggest/{model_name}/search', TeaCore.to_map(request.query), request.headers, None, runtime) ) def suggest( self, app_name: str, model_name: str, request: open_search_models.SuggestRequestModel, ) -> open_search_models.SuggestResponseModel: """ 下拉提示是搜索服务的基础功能,在用户输入查询词的过程中,智能推荐候选query,减少用户输入,帮助用户尽快找到想要的内容。 """ runtime = util_models.RuntimeOptions( connect_timeout=5000, read_timeout=10000, autoretry=False, ignore_ssl=False, max_idle_conns=50 ) return self.suggest_ex(app_name, model_name, request, runtime) async def suggest_async( self, app_name: str, model_name: str, request: open_search_models.SuggestRequestModel, ) -> open_search_models.SuggestResponseModel: """ 下拉提示是搜索服务的基础功能,在用户输入查询词的过程中,智能推荐候选query,减少用户输入,帮助用户尽快找到想要的内容。 """ runtime = util_models.RuntimeOptions( connect_timeout=5000, read_timeout=10000, autoretry=False, ignore_ssl=False, max_idle_conns=50 ) return await self.suggest_ex_async(app_name, model_name, request, runtime) def push_document_ex( self, app_name: str, table_name: str, request: open_search_models.PushDocumentRequestModel, runtime: util_models.RuntimeOptions, ) -> open_search_models.PushDocumentResponseModel: """ 支持新增、更新、删除 等操作,以及对应批量操作 """ return TeaCore.from_map( open_search_models.PushDocumentResponseModel(), self._request('POST', f'/v3/openapi/apps/{app_name}/{table_name}/actions/bulk', None, request.headers, request.body, runtime) ) async def push_document_ex_async( self, app_name: str, table_name: str, request: open_search_models.PushDocumentRequestModel, runtime: util_models.RuntimeOptions, ) -> open_search_models.PushDocumentResponseModel: """ 支持新增、更新、删除 等操作,以及对应批量操作 """ return TeaCore.from_map( open_search_models.PushDocumentResponseModel(), await self._request_async('POST', f'/v3/openapi/apps/{app_name}/{table_name}/actions/bulk', None, request.headers, request.body, runtime) ) def push_document( self, app_name: str, table_name: str, request: open_search_models.PushDocumentRequestModel, ) -> open_search_models.PushDocumentResponseModel: """ 支持新增、更新、删除 等操作,以及对应批量操作 """ runtime = util_models.RuntimeOptions( connect_timeout=5000, read_timeout=10000, autoretry=False, ignore_ssl=False, max_idle_conns=50 ) return self.push_document_ex(app_name, table_name, request, runtime) async def push_document_async( self, app_name: str, table_name: str, request: open_search_models.PushDocumentRequestModel, ) -> open_search_models.PushDocumentResponseModel: """ 支持新增、更新、删除 等操作,以及对应批量操作 """ runtime = util_models.RuntimeOptions( connect_timeout=5000, read_timeout=10000, autoretry=False, ignore_ssl=False, max_idle_conns=50 ) return await self.push_document_ex_async(app_name, table_name, request, runtime) def collect_data_ex( self, app_name: str, collector_name: str, request: open_search_models.CollectDataRequestModel, runtime: util_models.RuntimeOptions, ) -> open_search_models.CollectDataResponseModel: """ 为了给客户提供更高质量的搜索效果,opensearch目前支持客户通过server端上传点击数据。 """ return TeaCore.from_map( open_search_models.CollectDataResponseModel(), self._request('POST', f'/v3/openapi/app-groups/{app_name}/data-collections/{collector_name}/actions/bulk', None, request.headers, request.body, runtime) ) async def collect_data_ex_async( self, app_name: str, collector_name: str, request: open_search_models.CollectDataRequestModel, runtime: util_models.RuntimeOptions, ) -> open_search_models.CollectDataResponseModel: """ 为了给客户提供更高质量的搜索效果,opensearch目前支持客户通过server端上传点击数据。 """ return TeaCore.from_map( open_search_models.CollectDataResponseModel(), await self._request_async('POST', f'/v3/openapi/app-groups/{app_name}/data-collections/{collector_name}/actions/bulk', None, request.headers, request.body, runtime) ) def collect_data( self, app_name: str, collector_name: str, request: open_search_models.CollectDataRequestModel, ) -> open_search_models.CollectDataResponseModel: """ 为了给客户提供更高质量的搜索效果,opensearch目前支持客户通过server端上传点击数据。 """ runtime = util_models.RuntimeOptions( connect_timeout=5000, read_timeout=10000, autoretry=False, ignore_ssl=False, max_idle_conns=50 ) return self.collect_data_ex(app_name, collector_name, request, runtime) async def collect_data_async( self, app_name: str, collector_name: str, request: open_search_models.CollectDataRequestModel, ) -> open_search_models.CollectDataResponseModel: """ 为了给客户提供更高质量的搜索效果,opensearch目前支持客户通过server端上传点击数据。 """ runtime = util_models.RuntimeOptions( connect_timeout=5000, read_timeout=10000, autoretry=False, ignore_ssl=False, max_idle_conns=50 ) return await self.collect_data_ex_async(app_name, collector_name, request, runtime) def set_user_agent( self, user_agent: str, ) -> None: self._user_agent = user_agent def append_user_agent( self, user_agent: str, ) -> None: self._user_agent = f'{self._user_agent} {user_agent}' def get_user_agent(self) -> str: user_agent = UtilClient.get_user_agent(self._user_agent) return user_agent def get_access_key_id(self) -> str: if UtilClient.is_unset(self._credential): return '' access_key_id = self._credential.get_access_key_id() return access_key_id async def get_access_key_id_async(self) -> str: if UtilClient.is_unset(self._credential): return '' access_key_id = await self._credential.get_access_key_id_async() return access_key_id def get_access_key_secret(self) -> str: if UtilClient.is_unset(self._credential): return '' secret = self._credential.get_access_key_secret() return secret async def get_access_key_secret_async(self) -> str: if UtilClient.is_unset(self._credential): return '' secret = await self._credential.get_access_key_secret_async() return secret