def start_benchmark()

in esrally/driver/driver.py [0:0]


    def start_benchmark(self):
        self.logger.info("Benchmark is about to start.")
        # ensure relative time starts when the benchmark starts.
        self.reset_relative_time()
        self.logger.info("Attaching cluster-level telemetry devices.")
        self.telemetry.on_benchmark_start()
        self.logger.info("Cluster-level telemetry devices are now attached.")

        allocator = Allocator(self.challenge.schedule)
        self.allocations = allocator.allocations
        self.number_of_steps = len(allocator.join_points) - 1
        self.tasks_per_join_point = allocator.tasks_per_joinpoint

        self.logger.info(
            "Benchmark consists of [%d] steps executed by [%d] clients.",
            self.number_of_steps,
            len(self.allocations),  # type: ignore[arg-type]  # TODO remove the below ignore when introducing type hints
        )
        # avoid flooding the log if there are too many clients
        if allocator.clients < 128:
            self.logger.debug("Allocation matrix:\n%s", "\n".join([str(a) for a in self.allocations]))

        create_api_keys = self.config.opts("client", "options").all_client_options["default"].get("create_api_key_per_client", None)
        worker_assignments = calculate_worker_assignments(self.load_driver_hosts, allocator.clients)
        worker_id = 0
        for assignment in worker_assignments:
            host = assignment["host"]
            for clients in assignment["workers"]:
                # don't assign workers without any clients
                if len(clients) > 0:
                    self.logger.debug("Allocating worker [%d] on [%s] with [%d] clients.", worker_id, host, len(clients))
                    worker = self.driver_actor.create_client(host, self.config, worker_id)

                    client_allocations = ClientAllocations()
                    worker_client_contexts = {}
                    for client_id in clients:
                        client_allocations.add(client_id, self.allocations[client_id])
                        self.clients_per_worker[client_id] = worker_id
                        client_context = ClientContext(client_id=client_id, parent_worker_id=worker_id)

                        if create_api_keys:
                            resp = self.create_api_key(self.default_sync_es_client, client_id)
                            client_context.api_key = ApiKey(id=resp["id"], secret=resp["api_key"])

                        worker_client_contexts[client_id] = client_context
                        self.client_contexts[worker_id] = worker_client_contexts
                    self.driver_actor.start_worker(
                        worker, worker_id, self.config, self.track, client_allocations, client_contexts=worker_client_contexts
                    )
                    self.workers.append(worker)
                    worker_id += 1

        self.update_progress_message()