awswrangler/opensearch/_write.py (319 lines of code) (raw):

# mypy: disable-error-code=name-defined """Amazon OpenSearch Write Module (PRIVATE).""" from __future__ import annotations import ast import json import logging from typing import TYPE_CHECKING, Any, Generator, Iterable, Mapping, cast import boto3 import numpy as np from pandas import notna import awswrangler.pandas as pd from awswrangler import _utils, exceptions from awswrangler._utils import parse_path from awswrangler.opensearch._utils import _get_distribution, _get_version_major, _is_serverless if TYPE_CHECKING: try: import jsonpath_ng except ImportError: pass else: jsonpath_ng = _utils.import_optional_dependency("jsonpath_ng") if TYPE_CHECKING: try: import opensearchpy except ImportError: pass else: opensearchpy = _utils.import_optional_dependency("opensearchpy") if TYPE_CHECKING: try: import progressbar except ImportError: pass else: progressbar = _utils.import_optional_dependency("progressbar") _logger: logging.Logger = logging.getLogger(__name__) _DEFAULT_REFRESH_INTERVAL = "1s" def _selected_keys(document: Mapping[str, Any], keys_to_write: list[str] | None) -> Mapping[str, Any]: if keys_to_write is None: keys_to_write = list(document.keys()) keys_to_write = list(filter(lambda x: x != "_id", keys_to_write)) return {key: document[key] for key in keys_to_write} def _actions_generator( documents: Iterable[dict[str, Any]] | Iterable[Mapping[str, Any]], index: str, doc_type: str | None, keys_to_write: list[str] | None, id_keys: list[str] | None, bulk_size: int = 10000, ) -> Generator[list[dict[str, Any]], None, None]: bulk_chunk_documents = [] for i, document in enumerate(documents): if id_keys: _id = "-".join([str(document[id_key]) for id_key in id_keys]) else: _id = cast(str, document.get("_id")) bulk_chunk_documents.append( { "_index": index, "_type": doc_type, "_id": _id, "_source": _selected_keys(document, keys_to_write), } ) if (i + 1) % bulk_size == 0: yield bulk_chunk_documents bulk_chunk_documents = [] if len(bulk_chunk_documents) > 0: yield bulk_chunk_documents def _df_doc_generator(df: pd.DataFrame) -> Generator[dict[str, Any], None, None]: def _deserialize(v: Any) -> Any: if isinstance(v, str): v = v.strip() if v.startswith("{") and v.endswith("}") or v.startswith("[") and v.endswith("]"): try: v = json.loads(v) except json.decoder.JSONDecodeError: try: v = ast.literal_eval(v) # if properties are enclosed with single quotes if not isinstance(v, dict): _logger.warning("could not convert string to json: %s", v) except SyntaxError as e: _logger.warning("could not convert string to json: %s", v) _logger.warning(e) return v df_iter = df.iterrows() for _, document in df_iter: yield {k: _deserialize(v) for k, v in document.items() if np.array(notna(v)).any()} def _file_line_generator(path: str, is_json: bool = False) -> Generator[Any, None, None]: with open(path) as fp: for line in fp: if is_json: yield json.loads(line) else: yield line.strip() @_utils.check_optional_dependency(jsonpath_ng, "jsonpath_ng") def _get_documents_w_json_path(documents: list[Mapping[str, Any]], json_path: str) -> list[Any]: from jsonpath_ng.exceptions import JsonPathParserError try: jsonpath_expression = jsonpath_ng.parse(json_path) except JsonPathParserError as e: _logger.error("invalid json_path: %s", json_path) raise e output_documents = [] for doc in documents: for match in jsonpath_expression.find(doc): match_value = match.value if isinstance(match_value, list): output_documents += match_value elif isinstance(match_value, dict): output_documents.append(match_value) else: msg = f"expected json_path value to be a list/dict. received type {type(match_value)} ({match_value})" raise ValueError(msg) return output_documents def _get_refresh_interval(client: "opensearchpy.OpenSearch", index: str) -> Any: url = f"/{index}/_settings" try: response = client.transport.perform_request("GET", url) index_settings = response.get(index, {}).get("index", {}) refresh_interval = index_settings.get("refresh_interval", _DEFAULT_REFRESH_INTERVAL) return refresh_interval except opensearchpy.exceptions.NotFoundError: return _DEFAULT_REFRESH_INTERVAL def _set_refresh_interval(client: "opensearchpy.OpenSearch", index: str, refresh_interval: Any | None) -> Any: url = f"/{index}/_settings" body = {"index": {"refresh_interval": refresh_interval}} try: return client.transport.perform_request("PUT", url, headers={"content-type": "application/json"}, body=body) except opensearchpy.exceptions.RequestError: return None def _disable_refresh_interval( client: "opensearchpy.OpenSearch", index: str, ) -> Any: return _set_refresh_interval(client=client, index=index, refresh_interval="-1") @_utils.check_optional_dependency(opensearchpy, "opensearchpy") def create_index( client: "opensearchpy.OpenSearch", index: str, doc_type: str | None = None, settings: dict[str, Any] | None = None, mappings: dict[str, Any] | None = None, ) -> dict[str, Any]: """Create an index. Parameters ---------- client instance of opensearchpy.OpenSearch to use. index Name of the index. doc_type Name of the document type (for Elasticsearch versions 5.x and earlier). settings Index settings https://opensearch.org/docs/opensearch/rest-api/create-index/#index-settings mappings Index mappings https://opensearch.org/docs/opensearch/rest-api/create-index/#mappings Returns ------- OpenSearch rest api response https://opensearch.org/docs/opensearch/rest-api/create-index/#response. Examples -------- Creating an index. >>> import awswrangler as wr >>> client = wr.opensearch.connect(host='DOMAIN-ENDPOINT') >>> response = wr.opensearch.create_index( ... client=client, ... index="sample-index1", ... mappings={ ... "properties": { ... "age": { "type" : "integer" } ... } ... }, ... settings={ ... "index": { ... "number_of_shards": 2, ... "number_of_replicas": 1 ... } ... } ... ) """ body = {} if mappings: if _get_distribution(client) == "opensearch" or _get_version_major(client) >= 7: body["mappings"] = mappings # doc type deprecated elif doc_type: body["mappings"] = {doc_type: mappings} else: body["mappings"] = {index: mappings} if settings: body["settings"] = settings if not body: body = None # type: ignore[assignment] # ignore 400 cause by IndexAlreadyExistsException when creating an index response: dict[str, Any] = client.indices.create(index, body=body, ignore=400) if "error" in response: _logger.warning(response) if str(response["error"]).startswith("MapperParsingException"): raise ValueError(response["error"]) return response @_utils.check_optional_dependency(opensearchpy, "opensearchpy") def delete_index(client: "opensearchpy.OpenSearch", index: str) -> dict[str, Any]: """Delete an index. Parameters ---------- client instance of opensearchpy.OpenSearch to use. index Name of the index. Returns ------- OpenSearch rest api response Examples -------- Deleting an index. >>> import awswrangler as wr >>> client = wr.opensearch.connect(host='DOMAIN-ENDPOINT') >>> response = wr.opensearch.delete_index( ... client=client, ... index="sample-index1" ... ) """ # ignore 400/404 IndexNotFoundError exception response: dict[str, Any] = client.indices.delete(index, ignore=[400, 404]) if "error" in response: _logger.warning(response) return response @_utils.check_optional_dependency(opensearchpy, "opensearchpy") def index_json( client: "opensearchpy.OpenSearch", path: str, index: str, doc_type: str | None = None, boto3_session: boto3.Session | None = boto3.Session(), json_path: str | None = None, use_threads: bool | int = False, **kwargs: Any, ) -> Any: """Index all documents from JSON file to OpenSearch index. The JSON file should be in a JSON-Lines text format (newline-delimited JSON) - https://jsonlines.org/ OR if the is a single large JSON please provide `json_path`. Parameters ---------- client instance of opensearchpy.OpenSearch to use. path s3 or local path to the JSON file which contains the documents. index Name of the index. doc_type Name of the document type (for Elasticsearch versions 5.x and earlier). json_path JsonPath expression to specify explicit path to a single name element in a JSON hierarchical data structure. Read more about `JsonPath <https://jsonpath.com>`_ boto3_session Boto3 Session to be used to access S3 if **path** is provided. The default boto3 session will be used if **boto3_session** is ``None``. use_threads True to enable concurrent requests, False to disable multiple threads. If enabled os.cpu_count() will be used as the max number of threads. If integer is provided, specified number is used. **kwargs KEYWORD arguments forwarded to :func:`~awswrangler.opensearch.index_documents` which is used to execute the operation Returns ------- Response payload https://opensearch.org/docs/opensearch/rest-api/document-apis/bulk/#response. Examples -------- Writing contents of JSON file >>> import awswrangler as wr >>> client = wr.opensearch.connect(host='DOMAIN-ENDPOINT') >>> wr.opensearch.index_json( ... client=client, ... path='docs.json', ... index='sample-index1' ... ) """ _logger.debug("indexing %s from %s", index, path) if boto3_session is None: raise ValueError("boto3_session cannot be None") if path.startswith("s3://"): bucket, key = parse_path(path) s3 = boto3_session.client("s3") obj = s3.get_object(Bucket=bucket, Key=key) body = obj["Body"].read() lines = body.splitlines() documents = [json.loads(line) for line in lines] if json_path: documents = _get_documents_w_json_path(documents, json_path) else: # local path documents = list(_file_line_generator(path, is_json=True)) if json_path: documents = _get_documents_w_json_path(documents, json_path) return index_documents( client=client, documents=documents, index=index, doc_type=doc_type, use_threads=use_threads, **kwargs ) @_utils.check_optional_dependency(opensearchpy, "opensearchpy") def index_csv( client: "opensearchpy.OpenSearch", path: str, index: str, doc_type: str | None = None, pandas_kwargs: dict[str, Any] | None = None, use_threads: bool | int = False, **kwargs: Any, ) -> Any: """Index all documents from a CSV file to OpenSearch index. Parameters ---------- client instance of opensearchpy.OpenSearch to use. path S3 or local path to the CSV file which contains the documents. index Name of the index. doc_type Name of the document type (for Elasticsearch versions 5.x and earlier). pandas_kwargs Dictionary of arguments forwarded to pandas.read_csv(). e.g. pandas_kwargs={'sep': '|', 'na_values': ['null', 'none']} https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_csv.html Note: these params values are enforced: `skip_blank_lines=True` use_threads True to enable concurrent requests, False to disable multiple threads. If enabled os.cpu_count() will be used as the max number of threads. If integer is provided, specified number is used. **kwargs KEYWORD arguments forwarded to :func:`~awswrangler.opensearch.index_documents` which is used to execute the operation Returns ------- Response payload https://opensearch.org/docs/opensearch/rest-api/document-apis/bulk/#response. Examples -------- Writing contents of CSV file >>> import awswrangler as wr >>> client = wr.opensearch.connect(host='DOMAIN-ENDPOINT') >>> wr.opensearch.index_csv( ... client=client, ... path='docs.csv', ... index='sample-index1' ... ) Writing contents of CSV file using pandas_kwargs >>> import awswrangler as wr >>> client = wr.opensearch.connect(host='DOMAIN-ENDPOINT') >>> wr.opensearch.index_csv( ... client=client, ... path='docs.csv', ... index='sample-index1', ... pandas_kwargs={'sep': '|', 'na_values': ['null', 'none']} ... ) """ _logger.debug("indexing %s from %s", index, path) if pandas_kwargs is None: pandas_kwargs = {} enforced_pandas_params = { "skip_blank_lines": True, # 'na_filter': True # will generate Nan value for empty cells. We remove Nan keys in _df_doc_generator # Note: if the user will pass na_filter=False null fields will be indexed as well ({"k1": null, "k2": null}) } pandas_kwargs.update(enforced_pandas_params) df = pd.read_csv(path, **pandas_kwargs) return index_df(client, df=df, index=index, doc_type=doc_type, use_threads=use_threads, **kwargs) @_utils.check_optional_dependency(opensearchpy, "opensearchpy") def index_df( client: "opensearchpy.OpenSearch", df: pd.DataFrame, index: str, doc_type: str | None = None, use_threads: bool | int = False, **kwargs: Any, ) -> Any: """Index all documents from a DataFrame to OpenSearch index. Parameters ---------- client instance of opensearchpy.OpenSearch to use. df `Pandas DataFrame <https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html>`_ index Name of the index. doc_type Name of the document type (for Elasticsearch versions 5.x and earlier). use_threads True to enable concurrent requests, False to disable multiple threads. If enabled os.cpu_count() will be used as the max number of threads. If integer is provided, specified number is used. **kwargs KEYWORD arguments forwarded to :func:`~awswrangler.opensearch.index_documents` which is used to execute the operation Returns ------- Response payload https://opensearch.org/docs/opensearch/rest-api/document-apis/bulk/#response. Examples -------- Writing rows of DataFrame >>> import awswrangler as wr >>> import pandas as pd >>> client = wr.opensearch.connect(host='DOMAIN-ENDPOINT') >>> wr.opensearch.index_df( ... client=client, ... df=pd.DataFrame([{'_id': '1'}, {'_id': '2'}, {'_id': '3'}]), ... index='sample-index1', ... ) """ return index_documents( client=client, documents=_df_doc_generator(df), index=index, doc_type=doc_type, use_threads=use_threads, **kwargs, ) @_utils.check_optional_dependency(opensearchpy, "opensearchpy") def index_documents( client: "opensearchpy.OpenSearch", documents: Iterable[Mapping[str, Any]], index: str, doc_type: str | None = None, keys_to_write: list[str] | None = None, id_keys: list[str] | None = None, ignore_status: list[Any] | tuple[Any] | None = None, bulk_size: int = 1000, chunk_size: int | None = 500, max_chunk_bytes: int | None = 100 * 1024 * 1024, max_retries: int | None = None, initial_backoff: int | None = None, max_backoff: int | None = None, use_threads: bool | int = False, enable_refresh_interval: bool = True, **kwargs: Any, ) -> dict[str, Any]: """ Index all documents to OpenSearch index. Note ---- `max_retries`, `initial_backoff`, and `max_backoff` are not supported with parallel bulk (when `use_threads` is set to True). Note ---- Some of the args are referenced from opensearch-py client library (bulk helpers) https://opensearch-py.readthedocs.io/en/latest/helpers.html#opensearchpy.helpers.bulk https://opensearch-py.readthedocs.io/en/latest/helpers.html#opensearchpy.helpers.streaming_bulk If you receive `Error 429 (Too Many Requests) /_bulk` please to to decrease `bulk_size` value. Please also consider modifying the cluster size and instance type - Read more here: https://aws.amazon.com/premiumsupport/knowledge-center/resolve-429-error-es/ Parameters ---------- client instance of opensearchpy.OpenSearch to use. documents List which contains the documents that will be inserted. index Name of the index. doc_type Name of the document type (for Elasticsearch versions 5.x and earlier). keys_to_write list of keys to index. If not provided all keys will be indexed id_keys list of keys that compound document unique id. If not provided will use `_id` key if exists, otherwise will generate unique identifier for each document. ignore_status list of HTTP status codes that you want to ignore (not raising an exception) bulk_size number of docs in each _bulk request (default: 1000) chunk_size number of docs in one chunk sent to es (default: 500) max_chunk_bytes the maximum size of the request in bytes (default: 100MB) max_retries maximum number of times a document will be retried when ``429`` is received, set to 0 (default) for no retries on ``429`` (default: 2) initial_backoff number of seconds we should wait before the first retry. Any subsequent retries will be powers of ``initial_backoff*2**retry_number`` (default: 2) max_backoff maximum number of seconds a retry will wait (default: 600) use_threads True to enable concurrent requests, False to disable multiple threads. If enabled os.cpu_count() will be used as the max number of threads. If integer is provided, specified number is used. enable_refresh_interval True (default) to enable ``refresh_interval`` modification to ``-1`` (disabled) while indexing documents **kwargs KEYWORD arguments forwarded to bulk operation elasticsearch >= 7.10.2 / opensearch: \ https://opensearch.org/docs/opensearch/rest-api/document-apis/bulk/#url-parameters elasticsearch < 7.10.2: \ https://opendistro.github.io/for-elasticsearch-docs/docs/elasticsearch/rest-api-reference/#url-parameters Returns ------- Response payload https://opensearch.org/docs/opensearch/rest-api/document-apis/bulk/#response. Examples -------- Writing documents >>> import awswrangler as wr >>> client = wr.opensearch.connect(host='DOMAIN-ENDPOINT') >>> wr.opensearch.index_documents( ... documents=[{'_id': '1', 'value': 'foo'}, {'_id': '2', 'value': 'bar'}], ... index='sample-index1' ... ) """ if "refresh" in kwargs and _is_serverless(client): raise exceptions.NotSupported("Refresh policy not supported in OpenSearch Serverless.") if use_threads and any([max_retries, initial_backoff, max_backoff]): raise exceptions.InvalidArgumentCombination( "`max_retries`, `initial_backoff`, and `max_backoff` are not supported when `use_threads` is set to True" ) if not isinstance(documents, list): documents = list(documents) total_documents = len(documents) _logger.debug("indexing %s documents into %s", total_documents, index) actions = _actions_generator( documents, index, doc_type, keys_to_write=keys_to_write, id_keys=id_keys, bulk_size=bulk_size ) success = 0 errors: list[Any] = [] refresh_interval = None try: if progressbar: widgets = [ progressbar.Percentage(), # type: ignore[no-untyped-call] progressbar.SimpleProgress(format=" (%(value_s)s/%(max_value_s)s)"), # type: ignore[no-untyped-call] progressbar.Bar(), # type: ignore[no-untyped-call] progressbar.Timer(), # type: ignore[no-untyped-call] ] progress_bar = progressbar.ProgressBar( widgets=widgets, max_value=total_documents, prefix="Indexing: " ).start() for i, bulk_chunk_documents in enumerate(actions): if i == 1 and enable_refresh_interval: # second bulk iteration, in case the index didn't exist before refresh_interval = _get_refresh_interval(client, index) _disable_refresh_interval(client, index) _logger.debug("running bulk index of %s documents", len(bulk_chunk_documents)) bulk_kwargs = { "ignore_status": ignore_status, "chunk_size": chunk_size, "max_chunk_bytes": max_chunk_bytes, "request_timeout": 30, **kwargs, } _logger.debug("running bulk with kwargs: %s", bulk_kwargs) if use_threads: # Parallel bulk does not support max_retries, initial_backoff & max_backoff for _success, _errors in opensearchpy.helpers.parallel_bulk( client, bulk_chunk_documents, **bulk_kwargs ): success += _success errors += _errors else: # Defaults bulk_kwargs["max_retries"] = 5 if not max_retries else max_retries bulk_kwargs["initial_backoff"] = 2 if not initial_backoff else initial_backoff bulk_kwargs["max_backoff"] = 600 if not max_backoff else max_backoff _success, _errors = opensearchpy.helpers.bulk(client, bulk_chunk_documents, **bulk_kwargs) success += _success errors += _errors _logger.debug("indexed %s documents (%s/%s)", _success, success, total_documents) if progressbar: progress_bar.update(success, force=True) except opensearchpy.TransportError as e: if str(e.status_code) == "429": # Too Many Requests _logger.error( "Error 429 (Too Many Requests):" "Try to tune bulk_size parameter." "Read more here: https://aws.amazon.com/premiumsupport/knowledge-center/resolve-429-error-es" ) raise e finally: if enable_refresh_interval: _set_refresh_interval(client, index, refresh_interval) return {"success": success, "errors": errors}