util/python/alibabacloud_opensearch_util/opensearch_util.py (74 lines of code) (raw):

# -*- coding: utf-8 -*- import base64 import hashlib import hmac import time import urllib.parse as parse from typing import List, Dict, Any from Tea.request import TeaRequest def _get_canonicalized_headers(headers): canon_keys = [] for k in headers: if k.lower().startswith('x-opensearch-'): canon_keys.append(k) canon_keys.sort() canon_header = '' for k in canon_keys: canon_header += f'{k.lower()}:{headers[k]}\n' return canon_header def _get_canonicalized_resource(pathname, query): canonicalized = parse.quote(pathname)\ .replace("%2F", "/") \ .replace("%3F", "?") \ .replace("%3D", "=") \ .replace("%26", "&") sorted_params = sorted(query) params_to_sign = [] for key in sorted_params: value = query[key] if value is not None: if value == "": params_to_sign.append(parse.quote(key)) else: params_to_sign.append("{}={}".format(parse.quote(f'{key}'), parse.quote(f'{value}').replace("/", "%2F"))) if len(params_to_sign) > 0: return canonicalized + "?" + "&".join(params_to_sign) return canonicalized def _get_header(headers, key, default_value=None): if key in headers and headers[key] is not None: return headers[key] return default_value class OpensearchUtil: """ This module used for OpenSearch SDK """ @staticmethod def append(strings: List[str], item: str) -> List[str]: """ Append a string into a string arrat @example append(["a", "b"], "c") => ["a", "b", "c"] @param strings: the string list @param item: the string to be appended @return: new array """ string_list = strings.copy() string_list.append(item) return string_list @staticmethod def keys(m: Dict[str, Any]) -> List[str]: """ get the map's keys @param m: the map @return: new array """ return list(m.keys()) @staticmethod def join(strings: List[str], separator: str) -> str: """ Join array elements with a spearator string. @example join(["a", "b"], ".") => "a.b" @param strings: the string list @param separator: the separator @return: the joined string """ return separator.join(strings) @staticmethod def get_date() -> str: """ Get date. @example 2006-01-02T15:04:05Z @return: date string """ return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) @staticmethod def get_signature(request: TeaRequest, access_key_id: str, access_key_secret: str) -> str: """ Get signature with request, accesskeyId, accessKeySecret. @param request: the object of tea request @param access_key_id: accesskey id @param access_key_secret: accesskey secret @return: signature string """ method, pathname, headers, query = request.method, request.pathname, request.headers, request.query content_md5 = _get_header(headers, "Content-MD5", "") content_type = _get_header(headers, "Content-Type", "") date = _get_header(headers, "Date", "") canon_headers = _get_canonicalized_headers(headers) canon_resource = _get_canonicalized_resource(pathname, query) sign_str = f'{method}\n{content_md5}\n{content_type}\n{date}\n{canon_headers}{canon_resource}' hash_val = hmac.new(access_key_secret.encode('utf-8'), sign_str.encode('utf-8'), hashlib.sha1).digest() signature = base64.b64encode(hash_val).decode('utf-8') return f'OPENSEARCH {access_key_id}:{signature}' @staticmethod def get_content_md5(content: str) -> str: """ Get content MD5. @param content: the string which will be calculated @return: md5 string """ md5 = hashlib.md5(content.encode('utf-8')) return md5.hexdigest() @staticmethod def map_to_string(origin: Dict[str, str], sep: str) -> str: """ Splice origin into string with sep. @param origin: the source map @param sep: the spearator @return: the spliced string """ ret = [f'{k}{sep}{v if v is not None else ""}' for k, v in origin.items()] return ','.join(ret)