python/alibabacloud_ha3engine/client.py (548 lines of code) (raw):

# -*- coding: utf-8 -*- # This file is auto-generated, don't edit it. Thanks. import time from Tea.exceptions import TeaException, UnretryableException from Tea.request import TeaRequest from Tea.core import TeaCore from alibabacloud_darabonba_encode_util.encoder import Encoder from typing import Dict, Any, List from alibabacloud_ha3engine import models as ha_3engine_models from alibabacloud_tea_util.client import Client as UtilClient from alibabacloud_tea_util import models as util_models from alibabacloud_darabonba_string.client import Client as StringClient from alibabacloud_darabonba_map.client import Client as MapClient class Client: _endpoint: str = None _instance_id: str = None _protocol: str = None _user_agent: str = None _credential: str = None _domainsuffix: str = None def __init__( self, config: ha_3engine_models.Config, ): if UtilClient.is_unset(config): raise TeaException({ 'name': 'ParameterMissing', 'message': "'config' can not be unset" }) self._credential = self.get_realm_sign_str(config.access_user_name, config.access_pass_word) self._endpoint = config.endpoint self._instance_id = config.instance_id self._protocol = config.protocol self._user_agent = config.user_agent self._domainsuffix = 'ha.aliyuncs.com' def _request( self, method: str, pathname: str, query: Dict[str, Any], headers: Dict[str, str], body: Any, runtime: util_models.RuntimeOptions, ) -> Dict[str, Any]: runtime.validate() _runtime = { 'timeouted': 'retry', 'readTimeout': runtime.read_timeout, 'connectTimeout': runtime.connect_timeout, 'httpProxy': runtime.http_proxy, 'httpsProxy': runtime.https_proxy, 'noProxy': runtime.no_proxy, 'maxIdleConns': runtime.max_idle_conns, 'retry': { 'retryable': runtime.autoretry, 'maxAttempts': UtilClient.default_number(runtime.max_attempts, 5) }, 'backoff': { 'policy': UtilClient.default_string(runtime.backoff_policy, 'no'), 'period': UtilClient.default_number(runtime.backoff_period, 1) }, 'ignoreSSL': runtime.ignore_ssl } _last_request = None _last_exception = None _now = time.time() _retry_times = 0 while TeaCore.allow_retry(_runtime.get('retry'), _retry_times, _now): if _retry_times > 0: _backoff_time = TeaCore.get_backoff_time(_runtime.get('backoff'), _retry_times) if _backoff_time > 0: TeaCore.sleep(_backoff_time) _retry_times = _retry_times + 1 try: _request = TeaRequest() _request.protocol = UtilClient.default_string(self._protocol, 'HTTP') _request.method = method _request.pathname = pathname _request.headers = TeaCore.merge({ 'user-agent': self.get_user_agent(), 'host': UtilClient.default_string(self._endpoint, f'{self._instance_id}.{self._domainsuffix}'), 'authorization': f'Basic {self._credential}', 'content-type': 'application/json; charset=utf-8' }, headers) if not UtilClient.is_unset(query): _request.query = UtilClient.stringify_map_value(query) _request.headers['X-Opensearch-Request-ID'] = UtilClient.get_nonce() if not UtilClient.is_unset(body): _request.headers['X-Opensearch-Swift-Request-ID'] = UtilClient.get_nonce() _request.body = UtilClient.to_jsonstring(body) _last_request = _request _response = TeaCore.do_action(_request, _runtime) obj_str = UtilClient.read_as_string(_response.body) if UtilClient.is_4xx(_response.status_code) or UtilClient.is_5xx(_response.status_code): raw_msg = None try: raw_msg = UtilClient.parse_json(obj_str) except Exception as err: raw_msg = obj_str raw_map = { 'errors': raw_msg } raise TeaException({ 'message': _response.status_message, 'data': raw_map, 'code': _response.status_code }) if UtilClient.empty(obj_str): rawbody_map = { 'status': _response.status_message, 'code': _response.status_code } return { 'body': UtilClient.to_jsonstring(rawbody_map), 'headers': _response.headers } return { 'body': obj_str, 'headers': _response.headers } except Exception as e: if TeaCore.is_retryable(e): _last_exception = e continue raise e raise UnretryableException(_last_request, _last_exception) async def _request_async( self, method: str, pathname: str, query: Dict[str, Any], headers: Dict[str, str], body: Any, runtime: util_models.RuntimeOptions, ) -> Dict[str, Any]: runtime.validate() _runtime = { 'timeouted': 'retry', 'readTimeout': runtime.read_timeout, 'connectTimeout': runtime.connect_timeout, 'httpProxy': runtime.http_proxy, 'httpsProxy': runtime.https_proxy, 'noProxy': runtime.no_proxy, 'maxIdleConns': runtime.max_idle_conns, 'retry': { 'retryable': runtime.autoretry, 'maxAttempts': UtilClient.default_number(runtime.max_attempts, 5) }, 'backoff': { 'policy': UtilClient.default_string(runtime.backoff_policy, 'no'), 'period': UtilClient.default_number(runtime.backoff_period, 1) }, 'ignoreSSL': runtime.ignore_ssl } _last_request = None _last_exception = None _now = time.time() _retry_times = 0 while TeaCore.allow_retry(_runtime.get('retry'), _retry_times, _now): if _retry_times > 0: _backoff_time = TeaCore.get_backoff_time(_runtime.get('backoff'), _retry_times) if _backoff_time > 0: TeaCore.sleep(_backoff_time) _retry_times = _retry_times + 1 try: _request = TeaRequest() _request.protocol = UtilClient.default_string(self._protocol, 'HTTP') _request.method = method _request.pathname = pathname _request.headers = TeaCore.merge({ 'user-agent': self.get_user_agent(), 'host': UtilClient.default_string(self._endpoint, f'{self._instance_id}.{self._domainsuffix}'), 'authorization': f'Basic {self._credential}', 'content-type': 'application/json; charset=utf-8' }, headers) if not UtilClient.is_unset(query): _request.query = UtilClient.stringify_map_value(query) _request.headers['X-Opensearch-Request-ID'] = UtilClient.get_nonce() if not UtilClient.is_unset(body): _request.headers['X-Opensearch-Swift-Request-ID'] = UtilClient.get_nonce() _request.body = UtilClient.to_jsonstring(body) _last_request = _request _response = await TeaCore.async_do_action(_request, _runtime) obj_str = await UtilClient.read_as_string_async(_response.body) if UtilClient.is_4xx(_response.status_code) or UtilClient.is_5xx(_response.status_code): raw_msg = None try: raw_msg = UtilClient.parse_json(obj_str) except Exception as err: raw_msg = obj_str raw_map = { 'errors': raw_msg } raise TeaException({ 'message': _response.status_message, 'data': raw_map, 'code': _response.status_code }) if UtilClient.empty(obj_str): rawbody_map = { 'status': _response.status_message, 'code': _response.status_code } return { 'body': UtilClient.to_jsonstring(rawbody_map), 'headers': _response.headers } return { 'body': obj_str, 'headers': _response.headers } except Exception as e: if TeaCore.is_retryable(e): _last_exception = e continue raise e raise UnretryableException(_last_request, _last_exception) def set_user_agent( self, user_agent: str, ) -> None: """ 设置Client UA 配置. """ self._user_agent = user_agent def append_user_agent( self, user_agent: str, ) -> None: """ 添加Client UA 配置. """ self._user_agent = f'{self._user_agent} {user_agent}' def get_user_agent(self) -> str: """ 获取Client 配置 UA 配置. """ user_agent = UtilClient.get_user_agent(self._user_agent) return user_agent def get_realm_sign_str( self, access_user_name: str, access_pass_word: str, ) -> str: """ 计算用户请求识别特征, 遵循 Basic Auth 生成规范. """ access_user_name_str = StringClient.trim(access_user_name) access_pass_word_str = StringClient.trim(access_pass_word) realm_str = f'{access_user_name_str}:{access_pass_word_str}' return Encoder.base_64encode_to_string(StringClient.to_bytes(realm_str, 'UTF-8')) def build_ha_search_query( self, haquery: ha_3engine_models.HaQuery, ) -> str: if UtilClient.is_unset(haquery.query): raise TeaException({ 'name': 'ParameterMissing', 'message': "'HaQuery.query' can not be unset" }) temp_string = f'query={haquery.query}' config_str = self.build_ha_queryconfig_clause_str(haquery.config) temp_string = f"{temp_string}&&cluster={UtilClient.default_string(haquery.cluster, 'general')}" temp_string = f'{temp_string}&&config={config_str}' if not UtilClient.is_unset(haquery.filter): filter_str = haquery.filter if not UtilClient.empty(filter_str): field_value_trimed = StringClient.trim(filter_str) temp_string = f'{temp_string}&&filter={field_value_trimed}' if not UtilClient.is_unset(haquery.custom_query): for key_field in MapClient.key_set(haquery.custom_query): field_value = haquery.custom_query.get(key_field) if not UtilClient.empty(field_value): field_value_trimed = StringClient.trim(field_value) key_field_trimed = StringClient.trim(key_field) temp_string = f'{temp_string}&&{key_field_trimed}={field_value_trimed}' if not UtilClient.is_unset(haquery.sort): sort_str = self.build_ha_query_sort_clause_str(haquery.sort) if not UtilClient.empty(sort_str): temp_string = f'{temp_string}&&sort={sort_str}' if not UtilClient.is_unset(haquery.aggregate): aggregate_clause_str = self.build_ha_query_aggregate_clause_str(haquery.aggregate) if not UtilClient.empty(aggregate_clause_str): temp_string = f'{temp_string}&&aggregate={aggregate_clause_str}' if not UtilClient.is_unset(haquery.distinct): distinct_clause_str = self.build_ha_query_distinct_clause_str(haquery.distinct) if not UtilClient.empty(distinct_clause_str): temp_string = f'{temp_string}&&distinct={distinct_clause_str}' kvpairs = self.build_searc_kv_pair_clause_str(haquery.kvpairs) if not UtilClient.empty(kvpairs): temp_string = f'{temp_string}&&kvpairs={kvpairs}' return temp_string def build_ha_query_aggregate_clause_str( self, clause: List[ha_3engine_models.HaQueryAggregateClause], ) -> str: temp_clause_string = '' for aggregate_clause in clause: temp_aggregate_clause_string = '' if UtilClient.is_unset(aggregate_clause.group_key) or UtilClient.is_unset(aggregate_clause.agg_fun): raise TeaException({ 'name': 'ParameterMissing', 'message': "'HaQueryAggregateClause.groupKey/aggFun' can not be unset" }) if not UtilClient.empty(aggregate_clause.group_key) and not UtilClient.empty(aggregate_clause.agg_fun): group_key_trimed = StringClient.trim(aggregate_clause.group_key) agg_fun_trimed = StringClient.trim(aggregate_clause.agg_fun) temp_aggregate_clause_string = f'group_key:{group_key_trimed},agg_fun:{agg_fun_trimed}' if not UtilClient.empty(aggregate_clause.range): range_trimed = StringClient.trim(aggregate_clause.range) temp_aggregate_clause_string = f'{temp_aggregate_clause_string},range:{range_trimed}' if not UtilClient.empty(aggregate_clause.max_group): max_group_trimed = StringClient.trim(aggregate_clause.max_group) temp_aggregate_clause_string = f'{temp_aggregate_clause_string},max_group:{max_group_trimed}' if not UtilClient.empty(aggregate_clause.agg_filter): agg_filter_trimed = StringClient.trim(aggregate_clause.agg_filter) temp_aggregate_clause_string = f'{temp_aggregate_clause_string},agg_filter:{agg_filter_trimed}' if not UtilClient.empty(aggregate_clause.agg_sampler_thres_hold): agg_sampler_thres_hold_trimed = StringClient.trim(aggregate_clause.agg_sampler_thres_hold) temp_aggregate_clause_string = f'{temp_aggregate_clause_string},agg_sampler_threshold:{agg_sampler_thres_hold_trimed}' if not UtilClient.empty(aggregate_clause.agg_sampler_step): agg_sampler_step_trimed = StringClient.trim(aggregate_clause.agg_sampler_step) temp_aggregate_clause_string = f'{temp_aggregate_clause_string},agg_sampler_step:{agg_sampler_step_trimed}' if not UtilClient.empty(temp_clause_string): temp_clause_string = f'{temp_clause_string};{temp_aggregate_clause_string}' else: temp_clause_string = f'{temp_aggregate_clause_string}' return temp_clause_string def build_ha_query_distinct_clause_str( self, clause: List[ha_3engine_models.HaQueryDistinctClause], ) -> str: temp_clause_string = '' for distinct_clause in clause: temp_distinct_clause_string = '' if UtilClient.is_unset(distinct_clause.dist_key): raise TeaException({ 'name': 'ParameterMissing', 'message': "'HaQueryDistinctClause.distKey' can not be unset" }) if not UtilClient.empty(distinct_clause.dist_key): dist_key_trimed = StringClient.trim(distinct_clause.dist_key) temp_distinct_clause_string = f'dist_key:{dist_key_trimed}' if not UtilClient.empty(distinct_clause.dist_count): dist_count_trimed = StringClient.trim(distinct_clause.dist_count) temp_distinct_clause_string = f'{temp_distinct_clause_string},dist_count:{dist_count_trimed}' if not UtilClient.empty(distinct_clause.dist_times): dist_times_trimed = StringClient.trim(distinct_clause.dist_times) temp_distinct_clause_string = f'{temp_distinct_clause_string},dist_times:{dist_times_trimed}' if not UtilClient.empty(distinct_clause.reserved): reserved_trimed = StringClient.trim(distinct_clause.reserved) temp_distinct_clause_string = f'{temp_distinct_clause_string},reserved:{reserved_trimed}' if not UtilClient.empty(distinct_clause.dist_filter): dist_filter_trimed = StringClient.trim(distinct_clause.dist_filter) temp_distinct_clause_string = f'{temp_distinct_clause_string},dist_filter:{dist_filter_trimed}' if not UtilClient.empty(distinct_clause.update_total_hit): update_total_hit_trimed = StringClient.trim(distinct_clause.update_total_hit) temp_distinct_clause_string = f'{temp_distinct_clause_string},update_total_hit:{update_total_hit_trimed}' if not UtilClient.empty(distinct_clause.grade): grade_trimed = StringClient.trim(distinct_clause.grade) temp_distinct_clause_string = f'{temp_distinct_clause_string},grade:{grade_trimed}' if not UtilClient.empty(temp_clause_string): temp_clause_string = f'{temp_clause_string};{temp_distinct_clause_string}' else: temp_clause_string = f'{temp_distinct_clause_string}' return temp_clause_string def build_ha_query_sort_clause_str( self, clause: List[ha_3engine_models.HaQuerySortClause], ) -> str: temp_clause_string = '' for sort_clause in clause: field_value_trimed = StringClient.trim(sort_clause.sort_order) key_field_trimed = StringClient.trim(sort_clause.sort_key) if UtilClient.equal_string(field_value_trimed, '+') or UtilClient.equal_string(field_value_trimed, '-'): if not UtilClient.empty(field_value_trimed) and not UtilClient.empty(key_field_trimed): if UtilClient.empty(temp_clause_string): temp_clause_string = f'{field_value_trimed}{key_field_trimed}' else: temp_clause_string = f'{temp_clause_string};{field_value_trimed}{key_field_trimed}' return temp_clause_string def build_ha_queryconfig_clause_str( self, clause: ha_3engine_models.HaQueryconfigClause, ) -> str: temp_clause_string = '' if UtilClient.is_unset(clause): raise TeaException({ 'name': 'ParameterMissing', 'message': "'HaQueryconfigClause' can not be unset" }) if UtilClient.is_unset(clause.start): clause.start = None if UtilClient.is_unset(clause.hit): clause.hit = None if UtilClient.is_unset(clause.format): clause.format = None temp_clause_string = f"start:{UtilClient.default_string(clause.start, '0')}" temp_clause_string = f"{temp_clause_string},hit:{UtilClient.default_string(clause.hit, '10')}" temp_clause_string = f"{temp_clause_string},format:{StringClient.to_lower(UtilClient.default_string(clause.format, 'json'))}" if not UtilClient.is_unset(clause.custom_config): for key_field in MapClient.key_set(clause.custom_config): field_value = clause.custom_config.get(key_field) if not UtilClient.empty(field_value): field_value_trimed = StringClient.trim(field_value) key_field_trimed = StringClient.trim(key_field) if not UtilClient.empty(temp_clause_string): temp_clause_string = f'{temp_clause_string},{key_field_trimed}:{field_value_trimed}' else: temp_clause_string = f'{key_field_trimed}:{field_value_trimed}' return temp_clause_string def build_sqlsearch_query( self, sqlquery: ha_3engine_models.SQLQuery, ) -> str: if UtilClient.is_unset(sqlquery.query): raise TeaException({ 'name': 'ParameterMissing', 'message': "'SQLQuery.query' can not be unset" }) temp_string = f'query={sqlquery.query}' kvpairs = self.build_searc_kv_pair_clause_str(sqlquery.kvpairs) if not UtilClient.empty(kvpairs): temp_string = f'{temp_string}&&kvpair={kvpairs}' return temp_string def build_searc_kv_pair_clause_str( self, kv_pair: Dict[str, str], ) -> str: tempkvpairs_string = f'__ops_request_id:{UtilClient.get_nonce()}' if not UtilClient.is_unset(kv_pair): for key_field in MapClient.key_set(kv_pair): field_value = kv_pair.get(key_field) if not UtilClient.empty(field_value): field_value_trimed = StringClient.trim(field_value) key_field_trimed = StringClient.trim(key_field) tempkvpairs_string = f'{tempkvpairs_string},{key_field_trimed}:{field_value_trimed}' return tempkvpairs_string def search_ex( self, request: ha_3engine_models.SearchRequestModel, runtime: util_models.RuntimeOptions, ) -> ha_3engine_models.SearchResponseModel: """ 系统提供了丰富的搜索语法以满足用户各种场景下的搜索需求。 """ return TeaCore.from_map( ha_3engine_models.SearchResponseModel(), self._request('GET', f'/query', TeaCore.to_map(request.query), request.headers, None, runtime) ) async def search_ex_async( self, request: ha_3engine_models.SearchRequestModel, runtime: util_models.RuntimeOptions, ) -> ha_3engine_models.SearchResponseModel: """ 系统提供了丰富的搜索语法以满足用户各种场景下的搜索需求。 """ return TeaCore.from_map( ha_3engine_models.SearchResponseModel(), await self._request_async('GET', f'/query', TeaCore.to_map(request.query), request.headers, None, runtime) ) def search( self, request: ha_3engine_models.SearchRequestModel, ) -> ha_3engine_models.SearchResponseModel: """ 系统提供了丰富的搜索语法以满足用户各种场景下的搜索需求。 """ runtime = util_models.RuntimeOptions( connect_timeout=5000, read_timeout=10000, autoretry=False, ignore_ssl=False, max_idle_conns=50 ) return self.search_with_options(request, runtime) async def search_async( self, request: ha_3engine_models.SearchRequestModel, ) -> ha_3engine_models.SearchResponseModel: """ 系统提供了丰富的搜索语法以满足用户各种场景下的搜索需求。 """ runtime = util_models.RuntimeOptions( connect_timeout=5000, read_timeout=10000, autoretry=False, ignore_ssl=False, max_idle_conns=50 ) return await self.search_with_options_async(request, runtime) def search_with_options( self, request: ha_3engine_models.SearchRequestModel, runtime: util_models.RuntimeOptions, ) -> ha_3engine_models.SearchResponseModel: """ 系统提供了丰富的搜索语法以满足用户各种场景下的搜索需求,及传入运行时参数. """ return self.search_ex(request, runtime) async def search_with_options_async( self, request: ha_3engine_models.SearchRequestModel, runtime: util_models.RuntimeOptions, ) -> ha_3engine_models.SearchResponseModel: """ 系统提供了丰富的搜索语法以满足用户各种场景下的搜索需求,及传入运行时参数. """ return await self.search_ex_async(request, runtime) def push_document_ex( self, data_source_name: str, request: ha_3engine_models.PushDocumentsRequestModel, runtime: util_models.RuntimeOptions, ) -> ha_3engine_models.PushDocumentsResponseModel: """ 支持新增、更新、删除 等操作,以及对应批量操作 """ return TeaCore.from_map( ha_3engine_models.PushDocumentsResponseModel(), self._request('POST', f'/update/{data_source_name}/actions/bulk', None, request.headers, request.body, runtime) ) async def push_document_ex_async( self, data_source_name: str, request: ha_3engine_models.PushDocumentsRequestModel, runtime: util_models.RuntimeOptions, ) -> ha_3engine_models.PushDocumentsResponseModel: """ 支持新增、更新、删除 等操作,以及对应批量操作 """ return TeaCore.from_map( ha_3engine_models.PushDocumentsResponseModel(), await self._request_async('POST', f'/update/{data_source_name}/actions/bulk', None, request.headers, request.body, runtime) ) def push_documents( self, data_source_name: str, key_field: str, request: ha_3engine_models.PushDocumentsRequestModel, ) -> ha_3engine_models.PushDocumentsResponseModel: """ 支持新增、更新、删除 等操作,以及对应批量操作 """ runtime = util_models.RuntimeOptions( connect_timeout=5000, read_timeout=10000, autoretry=False, ignore_ssl=False, max_idle_conns=50 ) return self.push_documents_with_options(data_source_name, key_field, request, runtime) async def push_documents_async( self, data_source_name: str, key_field: str, request: ha_3engine_models.PushDocumentsRequestModel, ) -> ha_3engine_models.PushDocumentsResponseModel: """ 支持新增、更新、删除 等操作,以及对应批量操作 """ runtime = util_models.RuntimeOptions( connect_timeout=5000, read_timeout=10000, autoretry=False, ignore_ssl=False, max_idle_conns=50 ) return await self.push_documents_with_options_async(data_source_name, key_field, request, runtime) def push_documents_with_options( self, data_source_name: str, key_field: str, request: ha_3engine_models.PushDocumentsRequestModel, runtime: util_models.RuntimeOptions, ) -> ha_3engine_models.PushDocumentsResponseModel: """ 支持新增、更新、删除 等操作,以及对应批量操作,及传入运行时参数. """ request.headers = { 'X-Opensearch-Swift-PK-Field': key_field } return self.push_document_ex(data_source_name, request, runtime) async def push_documents_with_options_async( self, data_source_name: str, key_field: str, request: ha_3engine_models.PushDocumentsRequestModel, runtime: util_models.RuntimeOptions, ) -> ha_3engine_models.PushDocumentsResponseModel: """ 支持新增、更新、删除 等操作,以及对应批量操作,及传入运行时参数. """ request.headers = { 'X-Opensearch-Swift-PK-Field': key_field } return await self.push_document_ex_async(data_source_name, request, runtime)