esrally/rallyd.py (87 lines of code) (raw):

# Licensed to Elasticsearch B.V. under one or more contributor # license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright # ownership. Elasticsearch B.V. licenses this file to you under # the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. import argparse import logging import os import sys import time from esrally import ( BANNER, PROGRAM_NAME, actor, check_python_version, doc_link, exceptions, log, version, ) from esrally.utils import console, process def start(args): if actor.actor_system_already_running(): raise exceptions.RallyError("An actor system appears to be already running.") actor.bootstrap_actor_system(local_ip=args.node_ip, coordinator_ip=args.coordinator_ip) console.info(f"Successfully started actor system on node [{args.node_ip}] with coordinator node IP [{args.coordinator_ip}].") if console.RALLY_RUNNING_IN_DOCKER: console.info(f"Running with PID: {os.getpid()}") while process.wait_for_child_processes( callback=lambda process: console.info(f"Actor with PID [{process.pid}] terminated with status [{process.returncode}]."), list_callback=lambda children: console.info(f"Waiting for child processes ({[p.pid for p in children]}) to terminate..."), ): pass console.info("All actors terminated, exiting.") def stop(raise_errors=True): if actor.actor_system_already_running(): # noinspection PyBroadException try: # TheSpian writes the following warning upon start (at least) on Mac OS X: # # WARNING:root:Unable to get address info for address 103.1.168.192.in-addr.arpa (AddressFamily.AF_INET,\ # SocketKind.SOCK_DGRAM, 17, 0): <class 'socket.gaierror'> [Errno 8] nodename nor servname provided, or not known # # Therefore, we will not show warnings but only errors. logging.basicConfig(level=logging.ERROR) running_system = actor.bootstrap_actor_system(try_join=True) running_system.shutdown() # await termination... console.info("Shutting down actor system.", end="", flush=True) while actor.actor_system_already_running(): console.println(".", end="", flush=True) time.sleep(1) console.println(" [OK]") except BaseException: console.error("Could not shut down actor system.") if raise_errors: # raise again so user can see the error raise elif raise_errors: console.error("Could not shut down actor system: Actor system is not running.") sys.exit(1) def status(): if actor.actor_system_already_running(): console.println("Running") else: console.println("Stopped") def main(): check_python_version() log.install_default_log_config() log.configure_logging() console.init(assume_tty=False) parser = argparse.ArgumentParser( prog=PROGRAM_NAME, description=BANNER + "\n\n Rally daemon to support remote benchmarks", epilog=f"Find out more about Rally at {console.format.link(doc_link())}", formatter_class=argparse.RawDescriptionHelpFormatter, ) parser.add_argument("--version", action="version", version="%(prog)s " + version.version()) subparsers = parser.add_subparsers(title="subcommands", dest="subcommand", help="") subparsers.required = True start_command = subparsers.add_parser("start", help="Starts the Rally daemon") restart_command = subparsers.add_parser("restart", help="Restarts the Rally daemon") for p in [start_command, restart_command]: p.add_argument("--node-ip", required=True, help="The IP of this node.") p.add_argument("--coordinator-ip", required=True, help="The IP of the coordinator node.") subparsers.add_parser("stop", help="Stops the Rally daemon") subparsers.add_parser("status", help="Shows the current status of the local Rally daemon") args = parser.parse_args() if args.subcommand == "start": start(args) elif args.subcommand == "stop": stop() elif args.subcommand == "status": status() elif args.subcommand == "restart": stop(raise_errors=False) start(args) else: raise exceptions.RallyError("Unknown subcommand [%s]" % args.subcommand) if __name__ == "__main__": main()