utils/es.py (186 lines of code) (raw):

""" All functions related to the ES client are placed here. """ from elasticsearch import Elasticsearch from elasticsearch.client import IngestClient import json import os.path from utils.tsdb import * def get_client(elasticsearch_host, elasticsearch_ca_path, elasticsearch_user, elasticsearch_pwd, cloud_id, cloud_pwd): """ Create ES client. If cloud values are provided, they will take priority over the local deployment. :param elasticsearch_host: ES host. :param elasticsearch_ca_path: Path to ES certificate. :param elasticsearch_user: Name of the ES user. :param elasticsearch_pwd: Password for ES. :param cloud_id: Cloud ID. Default is empty. :param cloud_pwd: Password for the elastic cloud. Default is empty. :return: ES client. """ if cloud_id != "" and cloud_pwd != "": print("Client will connect to the cloud.") return Elasticsearch( cloud_id=cloud_id, basic_auth=("elastic", cloud_pwd) ) return Elasticsearch( hosts=elasticsearch_host, ca_certs=elasticsearch_ca_path, basic_auth=(elasticsearch_user, elasticsearch_pwd) ) def add_doc_from_file(client: Elasticsearch, index_name: str, doc_path: str): """ Given a JSON file, add the document to the index. This function does not check if the file exists, since that requirement was already checked before it was called. :param client: ES client. :param index_name: name of the index to place the document. :param doc_path: path to the document to add. """ file = open(doc_path) content = json.load(file) file.close() client.index(index=index_name, document=content) def place_documents(client: Elasticsearch, index_name: str, folder_docs: str): """ Place all documents from folder to an index. :param client: ES client. :param index_name: name of the index to add the documents. :param folder_docs: path to the folder with the documents to add. """ print("Placing documents on the index {name}...".format(name=index_name)) if not client.indices.exists(index=index_name): print("Index {name} does not exist. Program will end.".format(name=index_name)) exit(0) if not os.path.isdir(folder_docs): print("Folder {} does not exist. Documents cannot be placed. Program will end.".format(folder_docs)) exit(0) for doc in os.listdir(folder_docs): doc_path = os.path.join(folder_docs, doc) if os.path.isfile(doc_path): add_doc_from_file(client, index_name, doc_path) # From Elastic docs: Use the refresh API to explicitly make all operations performed on one or more indices since # the last refresh available for search. If the request targets a data stream, it refreshes the stream’s backing # indices. client.indices.refresh(index=index_name) resp = client.search(index=index_name, query={"match_all": {}}) n_docs = resp['hits']['total']['value'] print("Successfully placed {} documents on the index {name}.\n".format(n_docs, name=index_name)) def create_index(client: Elasticsearch, index_name: str, mappings: {} = {}, settings: {} = {}): """ Create new ES index. If the index already exists, it will be deleted and a new one created. :param client: ES client. :param index_name: name of the index. :param mappings: mappings to be used for the new index. If not specified, default ones will be used. :param settings: settings to be used for the new index. If not specified, default ones will be used. """ if client.indices.exists(index=index_name): client.indices.delete(index=index_name) client.indices.create(index=index_name, mappings=mappings, settings=settings) print("Index {name} successfully created.\n".format(name=index_name)) def create_index_missing_for_docs(client: Elasticsearch): """ Create an index to place all the documents that were updated at least one time. :param client: ES client. """ create_index(client, overwritten_docs_index) pipelines = IngestClient(client) pipeline_name = 'get-missing-docs' pipelines.put_pipeline(id=pipeline_name, body={ 'description': "Drop all documents that were not overwritten.", "processors": [ { "drop": { "if": "ctx._version == 1" } } ] }) dest = { "index": overwritten_docs_index, "version_type": "external", "pipeline": pipeline_name } client.reindex(source={"index": tsdb_index}, dest=dest, refresh=True) def build_query(dimensions_exist: {}, dimensions_missing: []): """ Build query to retrieve document based on the dimensions. :param dimensions_exist: Dictionary containing field and field value of the dimensions that exist in the document. :param dimensions_missing: Dictionary of the dimension fields missing in the document. :return: query. """ query = { "bool": { "must": [], "must_not": [] } } for field in dimensions_exist: term = { "term": { field: dimensions_exist[field] } } query["bool"]["must"].append(term) for field in dimensions_missing: exists = { "exists": { "field": field } } query["bool"]["must_not"].append(exists) return query def get_and_place_documents(client: Elasticsearch, data_stream: str, dir_name: str, dimensions_values: {}, dimensions_missing: [], n: int, number_of_docs): """ Given the dimensions, place the documents in the directory. :param client: ES client. :param data_stream: Name of the data stream. :param dir_name: Name of the parent directory. :param dimensions_values: Values for the dimension fields that exist in the document. :param dimensions_missing: List of dimension fields missing in the document. :param n: Number of the directory inside the parent directory. Example: 1 would create dir_name/1. :param number_of_docs: Number of documents to get with that set of dimensions. :param index_name: Index name in which documents are stored. """ query = build_query(dimensions_values, dimensions_missing) res = client.search(index=data_stream, query=query, sort={"@timestamp": "asc"}, size=number_of_docs) dir_for_docs = os.path.join(dir_name, str(n)) os.mkdir(dir_for_docs) for doc in res["hits"]["hits"]: name = doc["_id"] + ".json" with open(os.path.join(dir_for_docs, name), 'w') as file: json.dump(doc, file, indent=4) def get_missing_docs_info(client: Elasticsearch, data_stream: str, display_docs: int, dir, get_overlapping_files: bool, copy_docs_per_dimension: int): """ Display the dimensions of the first @display_docs documents. If @get_overlapping_files is set to True, then @copy_docs_per_dimension documents will be placed in a directory (if the directory does not exist!). :param client: ES client. :param display_docs: number of documents to display. :param dir: name of the directory. :param get_overlapping_files: true if you want to place fields in the directory, false otherwise. :param copy_docs_per_dimension: number of documents to get for a set of dimensions. :param docs_index: name of the index with the documents. """ if get_overlapping_files: if os.path.exists(dir): print("WARNING: The directory {} exists. Please delete it. Documents will not be placed.\n".format(dir)) get_overlapping_files = False else: os.mkdir(dir) n = 1 body = {'size': display_docs, 'query': {'match_all': {}}} res = client.search(index=overwritten_docs_index, body=body) dimensions = time_series_fields["dimension"] print("The timestamp and dimensions of the first {} overwritten documents are:".format(display_docs)) for doc in res["hits"]["hits"]: if get_overlapping_files: dimensions_values = {"@timestamp": doc["_source"]["@timestamp"]} dimensions_missing = [] print("- Timestamp {}:".format(doc["_source"]["@timestamp"])) for dimension in dimensions: el = doc["_source"] keys = dimension.split(".") for key in keys: if key not in el: el = "(Missing value)" break el = el[key] print("\t{}: {}".format(dimension, el)) if get_overlapping_files: if el != "(Missing value)": dimensions_values[dimension] = el else: dimensions_missing.append(dimension) if get_overlapping_files: get_and_place_documents(client, data_stream, dir, dimensions_values, dimensions_missing, n, copy_docs_per_dimension) n += 1 def copy_docs_from_to(client: Elasticsearch, source_index: str, dest_index: str, max_docs: int): """ Copy documents from one index to the other. :param client: ES client. :param source_index: source index with the documents to be copied to a new index. :param dest_index: destination index for the documents. :param max_docs: max number of documents to copy. :return: True if the number of documents is the same in the new index as it was in the old index. """ print("Copying documents from {} to {}...".format(source_index, dest_index)) if not client.indices.exists(index=source_index): print("Source index {name} does not exist. Program will end.".format(name=source_index)) exit(0) if max_docs != -1: resp = client.reindex(source={"index": source_index}, dest={"index": dest_index}, refresh=True, max_docs=max_docs) else: resp = client.reindex(source={"index": source_index}, dest={"index": dest_index}, refresh=True) if resp["updated"] > 0: print("WARNING: Out of {} documents from the index {}, {} of them were discarded.\n".format(resp["total"], source_index, resp[ "updated"])) return False else: print( "All {} documents taken from index {} were successfully placed to index {}.\n".format(resp["total"], source_index, dest_index)) return True def get_tsdb_config(client: Elasticsearch, data_stream_name: str, docs_index: int, settings_mappings_index: int): """ Get the index name where documents are placed, and mappings and settings for the new TSDB index. :param client: ES client. :param data_stream_name: :param docs_index: number of the index in the data stream with the documents to be moved to the TSDB index. :param settings_mappings_index: number of the index for the settings and mappings for the TSDB index. :return: documents index name, settings and mappings for the TSDB index. """ data_stream = client.indices.get_data_stream(name=data_stream_name) n_indexes = len(data_stream["data_streams"][0]["indices"]) # Get the index to use for document retrieval if docs_index == -1: docs_index = 0 elif docs_index >= n_indexes: print("ERROR: Data stream {} has {} indexes. Documents index number {} is not valid.".format(data_stream_name, n_indexes, docs_index)) exit(0) # Get index to use for settings/mappings if settings_mappings_index == -1: settings_mappings_index = n_indexes - 1 elif settings_mappings_index >= n_indexes: print("ERROR: Data stream {} has {} indexes. Settings/mappings index number {} is not valid.".format( data_stream_name, n_indexes, settings_mappings_index)) exit(0) docs_index_name = data_stream["data_streams"][0]["indices"][docs_index]["index_name"] settings_mappings_index_name = data_stream["data_streams"][0]["indices"][settings_mappings_index]["index_name"] print("Index being used for the documents is {}.".format(docs_index_name)) print("Index being used for the settings and mappings is {}.".format(settings_mappings_index_name)) print() mappings = client.indices.get_mapping(index=settings_mappings_index_name)[settings_mappings_index_name]["mappings"] settings = client.indices.get_settings(index=settings_mappings_index_name)[settings_mappings_index_name]["settings"] settings = get_tsdb_settings(mappings, settings) return docs_index_name, mappings, settings def copy_from_data_stream(client: Elasticsearch, data_stream_name: str, docs_index: int,settings_mappings_index: int, max_docs: int): """ Given a data stream, it copies the documents retrieved from the given index and places them in a new index with TSDB enabled. :param client: ES client. :param data_stream_name: name of the data stream. :param docs_index: number of the index to use to retrieve the documents. :param settings_mappings_index: number of the index to use to get the mappings and settings for the TSDB index. :param max_docs: maximum documents to be reindexed. :return: True if the number of documents placed to the TSDB index remained the same. False otherwise. """ print("Testing data stream {}.".format(data_stream_name)) if not client.indices.exists(index=data_stream_name): print("\tData stream {} does not exist. Program will end.".format(data_stream_name)) exit(0) source_index, mappings, settings = get_tsdb_config(client, data_stream_name, docs_index, settings_mappings_index) create_index(client, tsdb_index, mappings, settings) return copy_docs_from_to(client, source_index, tsdb_index, max_docs)