esrally/tracker/tracker.py (75 lines of code) (raw):

# Licensed to Elasticsearch B.V. under one or more contributor # license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright # ownership. Elasticsearch B.V. licenses this file to you under # the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. import logging import os from elastic_transport import ApiError, TransportError from jinja2 import Environment, FileSystemLoader from esrally import PROGRAM_NAME, types from esrally.client import factory from esrally.tracker import corpus, index from esrally.utils import console, io def process_template(templates_path, template_filename, template_vars, output_path): env = Environment(loader=FileSystemLoader(templates_path)) template = env.get_template(template_filename) with open(output_path, "w") as f: f.write(template.render(template_vars)) def extract_indices_from_data_streams(client, data_streams_to_extract): indices = [] # first extract index metadata (which is cheap) and defer extracting data to reduce the potential for # errors due to invalid index names late in the process. for data_stream_name in data_streams_to_extract: try: indices += index.extract_indices_from_data_stream(client, data_stream_name) except (ApiError, TransportError): logging.getLogger(__name__).exception("Failed to extract indices from data stream [%s]", data_stream_name) return indices def extract_mappings_and_corpora(client, output_path, indices_to_extract, batch_size): indices = [] corpora = [] # first extract index metadata (which is cheap) and defer extracting data to reduce the potential for # errors due to invalid index names late in the process. for index_name in indices_to_extract: try: indices += index.extract(client, output_path, index_name) except (ApiError, TransportError): logging.getLogger(__name__).exception("Failed to extract index [%s]", index_name) # That list only contains valid indices (with index patterns already resolved) for i in indices: c = corpus.extract(client, output_path, i["name"], batch_size) if c: corpora.append(c) return indices, corpora def create_track(cfg: types.Config): logger = logging.getLogger(__name__) track_name = cfg.opts("track", "track.name") indices = cfg.opts("generator", "indices") root_path = cfg.opts("generator", "output.path") target_hosts = cfg.opts("client", "hosts").default client_options = cfg.opts("client", "options").default data_streams = cfg.opts("generator", "data_streams") batch_size = int(cfg.opts("generator", "batch_size")) distribution_flavor, distribution_version, _, _ = factory.cluster_distribution_version(target_hosts, client_options) client = factory.EsClientFactory( hosts=target_hosts, client_options=client_options, distribution_version=distribution_version, distribution_flavor=distribution_flavor, ).create() console.info(f"Connected to Elasticsearch cluster version [{distribution_version}] flavor [{distribution_flavor}] \n", logger=logger) output_path = os.path.abspath(os.path.join(io.normalize_path(root_path), track_name)) io.ensure_dir(output_path) challenge_path = os.path.abspath(os.path.join(output_path, "challenges")) io.ensure_dir(challenge_path) operations_path = os.path.abspath(os.path.join(output_path, "operations")) io.ensure_dir(operations_path) if data_streams is not None: logger.info("Creating track [%s] matching data streams [%s]", track_name, data_streams) extracted_indices = extract_indices_from_data_streams(client, data_streams) indices = extracted_indices logger.info("Creating track [%s] matching indices [%s]", track_name, indices) indices, corpora = extract_mappings_and_corpora(client, output_path, indices, batch_size) if len(indices) == 0: raise RuntimeError("Failed to extract any indices for track!") template_vars = {"track_name": track_name, "indices": indices, "corpora": corpora} track_path = os.path.join(output_path, "track.json") default_challenges = os.path.join(challenge_path, "default.json") default_operations = os.path.join(operations_path, "default.json") templates_path = os.path.join(cfg.opts("node", "rally.root"), "resources") process_template(templates_path, "track.json.j2", template_vars, track_path) process_template(templates_path, "challenges.json.j2", template_vars, default_challenges) process_template(templates_path, "operations.json.j2", template_vars, default_operations) console.println("") console.info(f"Track {track_name} has been created. Run it with: {PROGRAM_NAME} race --track-path={output_path}")