pytest_rally/elasticsearch.py (85 lines of code) (raw):

# Licensed to Elasticsearch B.V. under one or more contributor # license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright # ownership. Elasticsearch B.V. licenses this file to you under # the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. import errno import json import logging import shlex import socket import subprocess import time from pytest_rally import process import uuid class TestCluster: def __init__(self, distribution_version=None, revision="current", http_port=19200, node_name="rally-node", car="4gheap,trial-license,x-pack-ml,lean-watermarks", debug=False): self.installation_id = None self.distribution_version = distribution_version self.revision = revision self.http_port = http_port self.transport_port = http_port + 100 self.node_name = node_name self.car = car self.debug = debug self.logger = logging.getLogger(__name__) def wait_until_port_is_free(self, timeout=120): start = time.perf_counter() end = start + timeout while time.perf_counter() < end: c = socket.socket() connect_result = c.connect_ex(("127.0.0.1", self.http_port)) # noinspection PyBroadException try: if connect_result == errno.ECONNREFUSED: c.close() return else: c.close() time.sleep(0.5) except Exception: pass raise TimeoutError(f"Port [{self.http_port}] is occupied after [{timeout}] seconds") def install(self): cmd = (f"esrally install --quiet " f"--http-port={self.http_port} --node={self.node_name} " f"--master-nodes={self.node_name} --car={self.car} " f'--seed-hosts="127.0.0.1:{self.transport_port}"') if self.distribution_version is not None: cmd += f" --distribution-version={self.distribution_version}" else: cmd += f" --revision={self.revision}" self.logger.debug("Installing Elasticsearch: [%s]", cmd) if self.debug: return else: try: self.wait_until_port_is_free() self.logger.info("Installing Elasticsearch: [%s]", cmd) output = process.run_command_with_output(cmd) self.installation_id = json.loads("".join(output))["installation-id"] except subprocess.CalledProcessError as e: raise AssertionError(f"Failed to install Elasticsearch", e) def start(self): race = str(uuid.uuid4()) cmd = f'esrally start --runtime-jdk=bundled --installation-id={self.installation_id} --race-id={race}' self.logger.info("Starting Elasticsearch: [%s]", cmd) if self.debug: return else: try: subprocess.run(shlex.split(cmd), check=True) except subprocess.CalledProcessError as e: raise AssertionError("Failed to start Elasticsearch test cluster.", e) def stop(self): cmd = f"esrally stop --installation-id={self.installation_id}" self.logger.info("Stopping Elasticsearch: [%s]", cmd) if self.debug: return else: try: subprocess.run(shlex.split(cmd)) except subprocess.CalledProcessError as e: raise AssertionError("Failed to stop Elasticsearch test cluster.", e) def __str__(self): return f"TestCluster[installation-id={self.installation_id}]"