def _init_tasks_and_requests()

in src/lighteval/pipeline.py [0:0]


    def _init_tasks_and_requests(self, tasks: str):
        with local_ranks_zero_first() if self.launcher_type == ParallelismManager.NANOTRON else nullcontext():
            logger.info("--- LOADING TASKS ---")

            # The registry contains all the potential tasks
            registry = Registry(
                custom_tasks=self.pipeline_parameters.custom_tasks_directory,
            )

            # load the tasks fro the configs and their datasets
            task_configs: list[LightevalTaskConfig] = registry.get_tasks_configs(tasks)
            self.tasks_dict: dict[str, LightevalTask] = registry.get_tasks_from_configs(task_configs)
            LightevalTask.load_datasets(self.tasks_dict, self.pipeline_parameters.dataset_loading_processes)
            self.documents_dict = {
                task.full_name: task.get_docs(self.pipeline_parameters.max_samples)
                for _, task in self.tasks_dict.items()
            }

            self.sampling_docs = collections.defaultdict(list)
            for _, docs in self.documents_dict.items():
                for doc in docs:
                    for sampling in doc.sampling_methods:
                        self.sampling_docs[sampling].append(doc)

            # If there are metric_options defined from the yaml file,
            # review if they have to be updated.
            if self._metric_options:
                self._update_num_samples(list(self.tasks_dict.values()))

            self.evaluation_tracker.task_config_logger.log(self.tasks_dict)