in esrally/driver/driver.py [0:0]
def start_benchmark(self):
self.logger.info("Benchmark is about to start.")
# ensure relative time starts when the benchmark starts.
self.reset_relative_time()
self.logger.info("Attaching cluster-level telemetry devices.")
self.telemetry.on_benchmark_start()
self.logger.info("Cluster-level telemetry devices are now attached.")
allocator = Allocator(self.challenge.schedule)
self.allocations = allocator.allocations
self.number_of_steps = len(allocator.join_points) - 1
self.tasks_per_join_point = allocator.tasks_per_joinpoint
self.logger.info(
"Benchmark consists of [%d] steps executed by [%d] clients.",
self.number_of_steps,
len(self.allocations), # type: ignore[arg-type] # TODO remove the below ignore when introducing type hints
)
# avoid flooding the log if there are too many clients
if allocator.clients < 128:
self.logger.debug("Allocation matrix:\n%s", "\n".join([str(a) for a in self.allocations]))
create_api_keys = self.config.opts("client", "options").all_client_options["default"].get("create_api_key_per_client", None)
worker_assignments = calculate_worker_assignments(self.load_driver_hosts, allocator.clients)
worker_id = 0
for assignment in worker_assignments:
host = assignment["host"]
for clients in assignment["workers"]:
# don't assign workers without any clients
if len(clients) > 0:
self.logger.debug("Allocating worker [%d] on [%s] with [%d] clients.", worker_id, host, len(clients))
worker = self.driver_actor.create_client(host, self.config, worker_id)
client_allocations = ClientAllocations()
worker_client_contexts = {}
for client_id in clients:
client_allocations.add(client_id, self.allocations[client_id])
self.clients_per_worker[client_id] = worker_id
client_context = ClientContext(client_id=client_id, parent_worker_id=worker_id)
if create_api_keys:
resp = self.create_api_key(self.default_sync_es_client, client_id)
client_context.api_key = ApiKey(id=resp["id"], secret=resp["api_key"])
worker_client_contexts[client_id] = client_context
self.client_contexts[worker_id] = worker_client_contexts
self.driver_actor.start_worker(
worker, worker_id, self.config, self.track, client_allocations, client_contexts=worker_client_contexts
)
self.workers.append(worker)
worker_id += 1
self.update_progress_message()