in esrally/driver/driver.py [0:0]
def receiveMsg_WakeupMessage(self, msg, sender):
# it would be better if we could send ourselves a message at a specific time, simulate this with a boolean...
if self.start_driving:
self.start_driving = False
self.drive()
else:
current_samples = self.send_samples()
if self.cancel.is_set():
self.logger.info("Worker[%s] has detected that benchmark has been cancelled. Notifying master...", str(self.worker_id))
self.send(self.driver_actor, actor.BenchmarkCancelled())
elif self.executor_future is not None and self.executor_future.done():
e = self.executor_future.exception(timeout=0)
if e:
self.logger.exception(
"Worker[%s] has detected a benchmark failure. Notifying master...", str(self.worker_id), exc_info=e
)
# the exception might be user-defined and not be on the load path of the master driver. Hence, it cannot be
# deserialized on the receiver so we convert it here to a plain string.
self.send(self.driver_actor, actor.BenchmarkFailure(f"Error in load generator [{self.worker_id}]", str(e)))
else:
self.logger.debug("Worker[%s] is ready for the next task.", str(self.worker_id))
self.executor_future = None
self.drive()
else:
if current_samples and len(current_samples) > 0:
most_recent_sample = current_samples[-1]
if most_recent_sample.percent_completed is not None:
self.logger.debug(
"Worker[%s] is executing [%s] (%.2f%% complete).",
str(self.worker_id),
most_recent_sample.task,
most_recent_sample.percent_completed * 100.0,
)
else:
# TODO: This could be misleading given that one worker could execute more than one task...
self.logger.debug(
"Worker[%s] is executing [%s] (dependent eternal task).", str(self.worker_id), most_recent_sample.task
)
else:
self.logger.debug("Worker[%s] is executing (no samples).", str(self.worker_id))
self.wakeupAfter(datetime.timedelta(seconds=self.wakeup_interval))