in src/lighteval/logging/info_loggers.py [0:0]
def aggregate(self, task_dict: dict[str, LightevalTask], bootstrap_iters: int = 1000): # noqa: C901
"""
Aggregate the metrics for each task and then for all tasks.
Args:
task_dict (dict[str, LightevalTask]): used to determine what aggregation function to use for each metric
bootstrap_iters (int, optional): Number of runs used to run the statistical bootstrap. Defaults to 1000.
"""
for task_name, metrics in self.metrics_values.items():
task = task_dict[task_name]
skip_metric = []
for metric_name, metric_values in metrics.items():
if metric_name in skip_metric:
# The metric is in a subset which has already been computed and saved
continue
try:
metric_result = task.aggregation()[metric_name](metric_values)
except OverflowError:
logger.warning(f"{task_name}, {metric_name} got an OVERFLOW ERROR when aggregating.")
metric_result = float("nan")
except KeyError:
continue
if isinstance(metric_result, dict): # For some corpus level grouping metrics
self.metric_aggregated[task_name].update(metric_result)
skip_metric.extend(list(metric_result.keys())) # no need to recompute them later
else:
self.metric_aggregated[task_name][metric_name] = metric_result
if isinstance(metric_result, dict) or bootstrap_iters == 0:
stderr = (
None # We skip stderr for some corpus metrics that return dicts, or if bootstrap_iters is 0
)
else:
aggregation = task.aggregation()[metric_name]
stderr = get_stderr_function(aggregation=aggregation, number_experiments=bootstrap_iters)
if stderr is not None and len(metric_values) > 1:
try:
self.metric_aggregated[task_name][f"{metric_name}_stderr"] = stderr(metric_values)
except OverflowError:
# Is this need or should we just pass?
self.metric_aggregated[task_name][f"{metric_name}_stderr"] = float("nan")
logger.warning(f"{task_name}, {metric_name} got an OVERFLOW ERROR when computing stderr.")
# We group subtasks which belong to the same parent task, like MMLU, to compute an average on them
# and compute an average of all metrics
grouped_tasks = collections.defaultdict(list)
suite_average = {}
suite_nb = {}
# Build aggregation
for k, metrics in self.metric_aggregated.items():
if "|" in k:
suite, task, fewshot = k.split("|")
grouped_tasks[f"{suite}|{task.split(':')[0]}:_average|{fewshot}"].append(k)
for metric, value in metrics.items():
suite_average[metric] = suite_average.get(metric, 0) + value
suite_nb[metric] = suite_nb.get(metric, 0) + 1
# Compute average for sub groups
for average_task, list_of_subtasks in grouped_tasks.items():
if len(list_of_subtasks) > 1:
metrics = list(self.metric_aggregated[list_of_subtasks[0]].keys())
self.metric_aggregated[average_task] = {
metric: sum(self.metric_aggregated[k][metric] for k in list_of_subtasks) / len(list_of_subtasks)
for metric in metrics
}
# Compute average for all
for metric, value in suite_average.items():
suite_average[metric] = value / suite_nb[metric]
self.metric_aggregated["all"] = suite_average