utils/tsdb.py (64 lines of code) (raw):

""" This file has everything related to TSDB automation. Some of these functions might need to be updated in the future. For example: discard_unknown_settings discard settings that are not currently accepted in the ES Python client. If the situation changes, the function will no longer be accurate. """ # This is a dictionary for all the time series fields accepted as of today (29.June.2023). # routing_path is also part of the dictionary since it is mandatory to have it for a time series index. time_series_fields = { "dimension": [], "counter": [], "gauge": [], "routing_path": [] } # We need to set the routing path to create a TSDB index. # As of today (29.June.2023), only keyword fields are accepted. accepted_fields_for_routing = ["keyword"] # The name of the index with TSDB enabled. tsdb_index = "tsdb-index-enabled" # This is the index in which we will store the documents that were overwritten - ie, the ones that caused us # to lose data. overwritten_docs_index = "tsdb-overwritten-docs" # Some settings cause an error as they are not known to ElasticSearch Python client. # This function discards the ones that were causing me error (there might be more!). def discard_unknown_settings(settings: []): settings["index"].pop("provided_name", None) settings["index"].pop("uuid", None) settings["index"].pop("creation_date", None) if "version" in settings["index"]: settings["index"]["version"].pop("created", None) return settings def get_time_series_fields(mappings: {}): """ Place all fields in the time time_series_fields dictionary. :param mappings: Mappings dictionary. """ fields = mappings["properties"] # A function to flatten the name of the fields def get_all_fields(fields: {}, common: str, result: {}): def join_strings(str1: str, str2: str): if str1 == "": return str2 return str1 + "." + str2 for key in fields: if "properties" in fields[key]: get_all_fields(fields[key]["properties"], join_strings(common, key), result) else: new_key = join_strings(common, key) result[new_key] = fields[key] result = {} get_all_fields(fields, "", result) # Split the time series fields according to metric / dimension def cluster_fields_by_type(fields: {}): for field in fields: if "time_series_dimension" in fields[field] and fields[field]["time_series_dimension"]: time_series_fields["dimension"].append(field) if fields[field]["type"] in accepted_fields_for_routing: time_series_fields["routing_path"].append(field) if "time_series_metric" in fields[field]: metric = fields[field]["time_series_metric"] time_series_fields[metric].append(field) cluster_fields_by_type(result) if len(time_series_fields["routing_path"]) == 0: print("Routing path is empty. Program will end.") exit(0) print("The time series fields for the TSDB index are: ") for key in time_series_fields: if len(time_series_fields[key]) > 0: print("\t- {} ({} fields):".format(key, len(time_series_fields[key]))) for value in time_series_fields[key]: print("\t\t- {}".format(value)) print() def get_tsdb_settings(mappings: {}, settings: {}): """ Modify the settings, so they fit the TSDB enabled mode. Get all time series metrics using the mappings. :param mappings: mappings. :param settings: settings. :return: modified settings for the TSDB index. """ # Some settings cause an error on the ES client. This function removes them. discard_unknown_settings(settings) # Add the time_series mode settings["index"] |= {"mode": "time_series"} # Get all time series fields get_time_series_fields(mappings) # Set a new window to avoid time series end / start time errors time_series = { "time_series": { "end_time": "2100-06-08T14:41:54.000Z", "start_time": "1900-06-08T09:54:18.000Z" } } settings["index"] |= time_series settings["index"] |= {"routing_path": time_series_fields["routing_path"]} return settings