esrally/mechanic/provisioner.py (405 lines of code) (raw):

# Licensed to Elasticsearch B.V. under one or more contributor # license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright # ownership. Elasticsearch B.V. licenses this file to you under # the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. import glob import json import logging import os import shutil import uuid import jinja2 from esrally import exceptions, types from esrally.mechanic import java_resolver, team from esrally.utils import console, convert, io, process def local(cfg: types.Config, car, plugins, ip, http_port, all_node_ips, all_node_names, target_root, node_name): distribution_version = cfg.opts("mechanic", "distribution.version", mandatory=False) cluster_name = cfg.opts("mechanic", "cluster.name") node_root_dir = os.path.join(target_root, node_name) runtime_jdk_bundled = convert.to_bool(car.mandatory_var("runtime.jdk.bundled")) runtime_jdk = car.mandatory_var("runtime.jdk") _, java_home = java_resolver.java_home(runtime_jdk, cfg.opts("mechanic", "runtime.jdk"), runtime_jdk_bundled) es_installer = ElasticsearchInstaller( car, java_home, node_name, cluster_name, node_root_dir, all_node_ips, all_node_names, ip, http_port ) plugin_installers = [PluginInstaller(plugin, java_home) for plugin in plugins] return BareProvisioner(es_installer, plugin_installers, distribution_version=distribution_version) def docker(cfg: types.Config, car, ip, http_port, target_root, node_name): distribution_version = cfg.opts("mechanic", "distribution.version", mandatory=False) cluster_name = cfg.opts("mechanic", "cluster.name") rally_root = cfg.opts("node", "rally.root") node_root_dir = os.path.join(target_root, node_name) return DockerProvisioner(car, node_name, cluster_name, ip, http_port, node_root_dir, distribution_version, rally_root) class NodeConfiguration: def __init__(self, build_type, car_runtime_jdks, car_provides_bundled_jdk, ip, node_name, node_root_path, binary_path, data_paths): self.build_type = build_type self.car_runtime_jdks = car_runtime_jdks self.car_provides_bundled_jdk = car_provides_bundled_jdk self.ip = ip self.node_name = node_name self.node_root_path = node_root_path self.binary_path = binary_path self.data_paths = data_paths def as_dict(self): return { "build-type": self.build_type, "car-runtime-jdks": self.car_runtime_jdks, "car-provides-bundled-jdk": self.car_provides_bundled_jdk, "ip": self.ip, "node-name": self.node_name, "node-root-path": self.node_root_path, "binary-path": self.binary_path, "data-paths": self.data_paths, } @staticmethod def from_dict(d): return NodeConfiguration( d["build-type"], d["car-runtime-jdks"], d["car-provides-bundled-jdk"], d["ip"], d["node-name"], d["node-root-path"], d["binary-path"], d["data-paths"], ) def save_node_configuration(path, n): with open(os.path.join(path, "node-config.json"), "w") as f: json.dump(n.as_dict(), f, indent=2) def load_node_configuration(path): with open(os.path.join(path, "node-config.json")) as f: return NodeConfiguration.from_dict(json.load(f)) class ConfigLoader: def __init__(self): pass def load(self): pass def _render_template(env, variables, file_name): try: template = env.get_template(io.basename(file_name)) # force a new line at the end. Jinja seems to remove it. return template.render(variables) + "\n" except jinja2.exceptions.TemplateSyntaxError as e: raise exceptions.InvalidSyntax("%s in %s" % (str(e), file_name)) except BaseException as e: raise exceptions.SystemSetupError("%s in %s" % (str(e), file_name)) def plain_text(file): _, ext = io.splitext(file) return ext in [".ini", ".txt", ".json", ".yml", ".yaml", ".options", ".properties"] def cleanup(preserve, install_dir, data_paths): def delete_path(p): if os.path.exists(p): try: logger.debug("Deleting [%s].", p) shutil.rmtree(p) except OSError: logger.exception("Could not delete [%s]. Skipping...", p) logger = logging.getLogger(__name__) if preserve: console.info(f"Preserving benchmark candidate installation at [{install_dir}].", logger=logger) else: logger.info("Wiping benchmark candidate installation at [%s].", install_dir) for path in data_paths: delete_path(path) delete_path(install_dir) def _apply_config(source_root_path, target_root_path, config_vars): logger = logging.getLogger(__name__) for root, _, files in os.walk(source_root_path): env = jinja2.Environment(loader=jinja2.FileSystemLoader(root)) relative_root = root[len(source_root_path) + 1 :] absolute_target_root = os.path.join(target_root_path, relative_root) io.ensure_dir(absolute_target_root) for name in files: source_file = os.path.join(root, name) target_file = os.path.join(absolute_target_root, name) if plain_text(source_file): logger.info("Reading config template file [%s] and writing to [%s].", source_file, target_file) # automatically merge config snippets from plugins (e.g. if they want to add config to elasticsearch.yml) with open(target_file, mode="a", encoding="utf-8") as f: f.write(_render_template(env, config_vars, source_file)) else: logger.info("Treating [%s] as binary and copying as is to [%s].", source_file, target_file) shutil.copy(source_file, target_file) class BareProvisioner: """ The provisioner prepares the runtime environment for running the benchmark. It prepares all configuration files and copies the binary of the benchmark candidate to the appropriate place. """ def __init__(self, es_installer, plugin_installers, distribution_version=None, apply_config=_apply_config): self.es_installer = es_installer self.plugin_installers = plugin_installers self.distribution_version = distribution_version self.apply_config = apply_config self.logger = logging.getLogger(__name__) def prepare(self, binary): self.es_installer.install(binary["elasticsearch"]) # we need to immediately delete it as plugins may copy their configuration during installation. self.es_installer.delete_pre_bundled_configuration() # determine after installation because some variables will depend on the install directory target_root_path = self.es_installer.es_home_path provisioner_vars = self._provisioner_variables() for p in self.es_installer.config_source_paths: self.apply_config(p, target_root_path, provisioner_vars) for installer in self.plugin_installers: installer.install(target_root_path, binary.get(installer.plugin_name)) for plugin_config_path in installer.config_source_paths: self.apply_config(plugin_config_path, target_root_path, provisioner_vars) # Never let install hooks modify our original provisioner variables and just provide a copy! self.es_installer.invoke_install_hook(team.BootstrapPhase.post_install, provisioner_vars.copy()) for installer in self.plugin_installers: installer.invoke_install_hook(team.BootstrapPhase.post_install, provisioner_vars.copy()) return NodeConfiguration( "tar", self.es_installer.car.mandatory_var("runtime.jdk"), convert.to_bool(self.es_installer.car.mandatory_var("runtime.jdk.bundled")), self.es_installer.node_ip, self.es_installer.node_name, self.es_installer.node_root_dir, self.es_installer.es_home_path, self.es_installer.data_paths, ) def _provisioner_variables(self): plugin_variables = {} mandatory_plugins = [] for installer in self.plugin_installers: plugin_variables.update(installer.variables) if installer.plugin.moved_to_module: self.logger.info( "Skipping adding plugin [%s] to cluster setting 'plugin.mandatory' as it has been moved to a module", installer.plugin_name, ) else: mandatory_plugins.append(installer.plugin_name) cluster_settings = {} if mandatory_plugins: # as a safety measure, prevent the cluster to startup if something went wrong during plugin installation which # we did not detect already here. This ensures we fail fast. # # https://www.elastic.co/guide/en/elasticsearch/plugins/current/_plugins_directory.html#_mandatory_plugins cluster_settings["plugin.mandatory"] = mandatory_plugins provisioner_vars = {} provisioner_vars.update(self.es_installer.variables) provisioner_vars.update(plugin_variables) provisioner_vars["cluster_settings"] = cluster_settings return provisioner_vars class ElasticsearchInstaller: def __init__( self, car, java_home, node_name, cluster_name, node_root_dir, all_node_ips, all_node_names, ip, http_port, hook_handler_class=team.BootstrapHookHandler, ): self.car = car self.java_home = java_home self.node_name = node_name self.cluster_name = cluster_name self.node_root_dir = node_root_dir self.install_dir = os.path.join(node_root_dir, "install") self.node_log_dir = os.path.join(node_root_dir, "logs", "server") self.heap_dump_dir = os.path.join(node_root_dir, "heapdump") self.all_node_ips = all_node_ips self.all_node_names = all_node_names self.node_ip = ip self.http_port = http_port self.hook_handler = hook_handler_class(self.car) if self.hook_handler.can_load(): self.hook_handler.load() self.es_home_path = None self.data_paths = None self.logger = logging.getLogger(__name__) def install(self, binary): self.logger.info("Preparing candidate locally in [%s].", self.install_dir) io.ensure_dir(self.install_dir) io.ensure_dir(self.node_log_dir) io.ensure_dir(self.heap_dump_dir) self.logger.info("Unzipping %s to %s", binary, self.install_dir) io.decompress(binary, self.install_dir) self.es_home_path = glob.glob(os.path.join(self.install_dir, "elasticsearch*"))[0] self.data_paths = self._data_paths() def delete_pre_bundled_configuration(self): # TODO remove the below ignore when introducing type hints config_path = os.path.join(self.es_home_path, "config") # type: ignore[arg-type] self.logger.info("Deleting pre-bundled Elasticsearch configuration at [%s]", config_path) shutil.rmtree(config_path) def invoke_install_hook(self, phase, variables): env = {} if self.java_home: env["JAVA_HOME"] = self.java_home self.hook_handler.invoke(phase.name, variables=variables, env=env) @property def variables(self): # bind as specifically as possible network_host = self.node_ip defaults = { "cluster_name": self.cluster_name, "node_name": self.node_name, "data_paths": self.data_paths, "log_path": self.node_log_dir, "heap_dump_path": self.heap_dump_dir, # this is the node's IP address as specified by the user when invoking Rally "node_ip": self.node_ip, # this is the IP address that the node will be bound to. Rally will bind to the node's IP address (but not to 0.0.0.0). The # reason is that we use the node's IP address as subject alternative name in x-pack. "network_host": network_host, "http_port": str(self.http_port), "transport_port": str(self.http_port + 100), "all_node_ips": '["%s"]' % '","'.join(self.all_node_ips), "all_node_names": '["%s"]' % '","'.join(self.all_node_names), # at the moment we are strict and enforce that all nodes are master eligible nodes "minimum_master_nodes": len(self.all_node_ips), "install_root_path": self.es_home_path, } variables = {} variables.update(self.car.variables) variables.update(defaults) return variables @property def config_source_paths(self): return self.car.config_paths def _data_paths(self): if "data_paths" in self.car.variables: data_paths = self.car.variables["data_paths"] if isinstance(data_paths, str): return [data_paths] elif isinstance(data_paths, list): return data_paths else: raise exceptions.SystemSetupError("Expected [data_paths] to be either a string or a list but was [%s]." % type(data_paths)) else: # TODO remove the below ignore when introducing type hints return [os.path.join(self.es_home_path, "data")] # type: ignore[arg-type] class PluginInstaller: def __init__(self, plugin, java_home, hook_handler_class=team.BootstrapHookHandler): self.plugin = plugin self.java_home = java_home self.hook_handler = hook_handler_class(self.plugin) if self.hook_handler.can_load(): self.hook_handler.load() self.logger = logging.getLogger(__name__) def install(self, es_home_path, plugin_url=None): installer_binary_path = os.path.join(es_home_path, "bin", "elasticsearch-plugin") if plugin_url: self.logger.info("Installing [%s] into [%s] from [%s]", self.plugin_name, es_home_path, plugin_url) install_cmd = '%s install --batch "%s"' % (installer_binary_path, plugin_url) else: self.logger.info("Installing [%s] into [%s]", self.plugin_name, es_home_path) install_cmd = '%s install --batch "%s"' % (installer_binary_path, self.plugin_name) return_code = process.run_subprocess_with_logging(install_cmd, env=self.env()) # see: https://www.elastic.co/guide/en/elasticsearch/plugins/current/_other_command_line_parameters.html if return_code == 0: self.logger.info("Successfully installed [%s].", self.plugin_name) elif return_code == 64: # most likely this is an unknown plugin raise exceptions.SystemSetupError("Unknown plugin [%s]" % self.plugin_name) elif return_code == 74: raise exceptions.SupplyError("I/O error while trying to install [%s]" % self.plugin_name) else: raise exceptions.RallyError( "Unknown error while trying to install [%s] (installer return code [%s]). Please check the logs." % (self.plugin_name, str(return_code)) ) def invoke_install_hook(self, phase, variables): self.hook_handler.invoke(phase.name, variables=variables, env=self.env()) def env(self): env = {} if self.java_home: env["JAVA_HOME"] = self.java_home return env @property def variables(self): return self.plugin.variables @property def config_source_paths(self): return self.plugin.config_paths @property def plugin_name(self): return self.plugin.name @property def sub_plugin_name(self): # if a plugin consists of multiple plugins (e.g. x-pack) we're interested in that name return self.variables.get("plugin_name", self.plugin_name) class DockerProvisioner: def __init__(self, car, node_name, cluster_name, ip, http_port, node_root_dir, distribution_version, rally_root): self.car = car self.node_name = node_name self.cluster_name = cluster_name self.node_ip = ip self.http_port = http_port self.node_root_dir = node_root_dir self.node_log_dir = os.path.join(node_root_dir, "logs", "server") self.heap_dump_dir = os.path.join(node_root_dir, "heapdump") self.distribution_version = distribution_version self.rally_root = rally_root self.binary_path = os.path.join(node_root_dir, "install") # use a random subdirectory to isolate multiple runs because an external (non-root) user cannot clean it up. self.data_paths = [os.path.join(node_root_dir, "data", str(uuid.uuid4()))] self.logger = logging.getLogger(__name__) provisioner_defaults = { "cluster_name": self.cluster_name, "node_name": self.node_name, # we bind-mount the directories below on the host to these ones. "install_root_path": "/usr/share/elasticsearch", "data_paths": ["/usr/share/elasticsearch/data"], "log_path": "/var/log/elasticsearch", "heap_dump_path": "/usr/share/elasticsearch/heapdump", # Docker container needs to expose service on external interfaces "network_host": "0.0.0.0", "discovery_type": "single-node", "http_port": str(self.http_port), "transport_port": str(self.http_port + 100), "cluster_settings": {}, } self.config_vars = {} self.config_vars.update(self.car.variables) self.config_vars.update(provisioner_defaults) def prepare(self, binary): # we need to allow other users to write to these directories due to Docker. # # Although os.mkdir passes 0o777 by default, mkdir(2) uses `mode & ~umask & 0777` to determine the final flags and # hence we need to modify the process' umask here. For details see https://linux.die.net/man/2/mkdir. previous_umask = os.umask(0) try: io.ensure_dir(self.binary_path) io.ensure_dir(self.node_log_dir) io.ensure_dir(self.heap_dump_dir) io.ensure_dir(self.data_paths[0]) finally: os.umask(previous_umask) mounts = {} for car_config_path in self.car.config_paths: for root, _, files in os.walk(car_config_path): env = jinja2.Environment(loader=jinja2.FileSystemLoader(root)) relative_root = root[len(car_config_path) + 1 :] absolute_target_root = os.path.join(self.binary_path, relative_root) io.ensure_dir(absolute_target_root) for name in files: source_file = os.path.join(root, name) target_file = os.path.join(absolute_target_root, name) mounts[target_file] = os.path.join("/usr/share/elasticsearch", relative_root, name) if plain_text(source_file): self.logger.info("Reading config template file [%s] and writing to [%s].", source_file, target_file) with open(target_file, mode="a", encoding="utf-8") as f: f.write(_render_template(env, self.config_vars, source_file)) else: self.logger.info("Treating [%s] as binary and copying as is to [%s].", source_file, target_file) shutil.copy(source_file, target_file) docker_cfg = self._render_template_from_file(self.docker_vars(mounts)) self.logger.info("Starting Docker container with configuration:\n%s", docker_cfg) with open(os.path.join(self.binary_path, "docker-compose.yml"), mode="w", encoding="utf-8") as f: f.write(docker_cfg) return NodeConfiguration( "docker", self.car.mandatory_var("runtime.jdk"), convert.to_bool(self.car.mandatory_var("runtime.jdk.bundled")), self.node_ip, self.node_name, self.node_root_dir, self.binary_path, self.data_paths, ) def docker_vars(self, mounts): v = { "es_version": self.distribution_version, "docker_image": self.car.mandatory_var("docker_image"), "node_ip": self.node_ip, "http_port": self.http_port, "es_data_dir": self.data_paths[0], "es_log_dir": self.node_log_dir, "es_heap_dump_dir": self.heap_dump_dir, "mounts": mounts, } self._add_if_defined_for_car(v, "docker_mem_limit") self._add_if_defined_for_car(v, "docker_cpu_count") return v def _add_if_defined_for_car(self, variables, key): if key in self.car.variables: variables[key] = self.car.variables[key] def _render_template(self, loader, template_name, variables): try: env = jinja2.Environment(loader=loader) for k, v in variables.items(): env.globals[k] = v template = env.get_template(template_name) return template.render() except jinja2.exceptions.TemplateSyntaxError as e: raise exceptions.InvalidSyntax("%s in %s" % (str(e), template_name)) except BaseException as e: raise exceptions.SystemSetupError("%s in %s" % (str(e), template_name)) def _render_template_from_file(self, variables): compose_file = os.path.join(self.rally_root, "resources", "docker-compose.yml.j2") return self._render_template( loader=jinja2.FileSystemLoader(io.dirname(compose_file)), template_name=io.basename(compose_file), variables=variables )