in esrally/rally.py [0:0]
def with_actor_system(runnable, cfg: types.Config):
logger = logging.getLogger(__name__)
already_running = actor.actor_system_already_running()
logger.info("Actor system already running locally? [%s]", str(already_running))
try:
actors = actor.bootstrap_actor_system(try_join=already_running, prefer_local_only=not already_running)
# We can only support remote benchmarks if we have a dedicated daemon that is not only bound to 127.0.0.1
cfg.add(config.Scope.application, "system", "remote.benchmarking.supported", already_running)
# This happens when the admin process could not be started, e.g. because it could not open a socket.
except thespian.actors.InvalidActorAddress:
logger.info("Falling back to offline actor system.")
actor.use_offline_actor_system()
actors = actor.bootstrap_actor_system(try_join=True)
except KeyboardInterrupt:
raise exceptions.UserInterrupted("User has cancelled the benchmark (detected whilst bootstrapping actor system).") from None
except Exception as e:
logger.exception("Could not bootstrap actor system.")
if str(e) == "Unable to determine valid external socket address.":
console.warn(
"Could not determine a socket address. Are you running without any network? Switching to degraded mode.", logger=logger
)
logger.info("Falling back to offline actor system.")
actor.use_offline_actor_system()
actors = actor.bootstrap_actor_system(try_join=True)
else:
raise
try:
runnable(cfg)
finally:
# We only shutdown the actor system if it was not already running before
if not already_running:
shutdown_complete = False
times_interrupted = 0
while not shutdown_complete and times_interrupted < 2:
try:
# give some time for any outstanding messages to be delivered to the actor system
time.sleep(3)
logger.info("Attempting to shutdown internal actor system.")
actors.shutdown()
# note that this check will only evaluate to True for a TCP-based actor system.
timeout = 15
while actor.actor_system_already_running() and timeout > 0:
logger.info("Actor system is still running. Waiting...")
time.sleep(1)
timeout -= 1
if timeout > 0:
shutdown_complete = True
logger.info("Shutdown completed.")
else:
logger.warning("Shutdown timed out. Actor system is still running.")
break
except KeyboardInterrupt:
times_interrupted += 1
logger.warning("User interrupted shutdown of internal actor system.")
console.info("Please wait a moment for Rally's internal components to shutdown.")
if not shutdown_complete and times_interrupted > 0:
logger.warning("Terminating after user has interrupted actor system shutdown explicitly for [%d] times.", times_interrupted)
console.println("")
console.warn("Terminating now at the risk of leaving child processes behind.")
console.println("")
console.warn("The next race may fail due to an unclean shutdown.")
console.println("")
console.println(SKULL)
console.println("")
raise exceptions.UserInterrupted(
f"User has cancelled the benchmark (shutdown not complete as user interrupted " f"{times_interrupted} times)."
) from None
if not shutdown_complete:
console.warn(
"Could not terminate all internal processes within timeout. Please check and force-terminate all Rally processes."
)