esrally/tracker/index.py (62 lines of code) (raw):

# Licensed to Elasticsearch B.V. under one or more contributor # license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright # ownership. Elasticsearch B.V. licenses this file to you under # the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. import json import logging import os INDEX_SETTINGS_EPHEMERAL_KEYS = ["uuid", "creation_date", "version", "provided_name", "store"] INDEX_SETTINGS_PARAMETERS = { "number_of_replicas": "{{{{number_of_replicas | default({orig})}}}}", "number_of_shards": "{{{{number_of_shards | default({orig})}}}}", } def filter_ephemeral_index_settings(settings): """ Some of the 'settings' reported by Elasticsearch for an index are ephemeral values, not useful for re-creating the index. :param settings: Index settings reported by index.get() :return: settings with ephemeral keys removed """ filtered = dict(settings) for s in INDEX_SETTINGS_EPHEMERAL_KEYS: filtered.pop(s, None) return filtered def update_index_setting_parameters(settings): for s, param in INDEX_SETTINGS_PARAMETERS.items(): if s in settings: orig_value = settings[s] settings[s] = param.format(orig=orig_value) def is_valid(index_name, index_pattern): if len(index_name) == 0: return False, "Index name is empty" # When the indices are requested directly (with --data-streams or --indices) then we honor the # request, even if it includes hidden indices. But when asking for all indices we skip hidden # indices as they could be system indices and restoring them to another cluster would break it. if index_pattern in ("_all", "*") and index_name.startswith("."): return False, f"Index [{index_name}] is hidden" return True, None def extract_index_mapping_and_settings(client, index_pattern): """ Calls index GET to retrieve mapping + settings, filtering settings so they can be used to re-create this index :param client: Elasticsearch client :param index_pattern: name of index :return: index creation dictionary """ results = {} logger = logging.getLogger(__name__) # the response might contain multiple indices if a wildcard was provided response = client.indices.get(index=index_pattern, params={"expand_wildcards": "all"}) for index, details in response.items(): valid, reason = is_valid(index, index_pattern) if valid: mappings = details["mappings"] index_settings = filter_ephemeral_index_settings(details.get("settings", {}).get("index", {})) update_index_setting_parameters(index_settings) results[index] = {"mappings": mappings, "settings": {"index": index_settings}} else: logger.info("Skipping index [%s] (reason: %s).", index, reason) return results def extract(client, outdir, index_pattern): """ Request index information to format in "index.json" for Rally :param client: Elasticsearch client :param outdir: destination directory :param index_pattern: name of index :return: Dict of template variables representing the index for use in track """ results = [] index_obj = extract_index_mapping_and_settings(client, index_pattern) for index, details in index_obj.items(): filename = f"{index}.json" outpath = os.path.join(outdir, filename) with open(outpath, "w") as outfile: json.dump(details, outfile, indent=4, sort_keys=True) outfile.write("\n") results.append( { "name": index, "path": outpath, "filename": filename, } ) return results def extract_indices_from_data_stream(client, data_stream_pattern): """ Calls Elasticsearch client get_data_stream function to retrieve list of indices :param client: Elasticsearch client :param data_stream_pattern: name of data stream :return: list of index names """ results = [] # the response might contain multiple indices if a wildcard was provided params_defined = {"expand_wildcards": "all", "filter_path": "data_streams.name"} results_data_streams = client.indices.get_data_stream(name=data_stream_pattern, params=params_defined) for indices in results_data_streams["data_streams"]: results.append(indices.get("name")) return results