lm_eval/tasks/__init__.py (471 lines of code) (raw):
import collections
import inspect
import logging
import os
from functools import partial
from typing import Dict, List, Mapping, Optional, Union
from lm_eval import utils
from lm_eval.api.group import ConfigurableGroup, GroupConfig
from lm_eval.api.task import ConfigurableTask, Task
from lm_eval.evaluator_utils import get_subtask_list
GROUP_ONLY_KEYS = list(GroupConfig().to_dict().keys())
class TaskManager:
"""TaskManager indexes all tasks from the default `lm_eval/tasks/`
and an optional directory if provided.
"""
def __init__(
self,
verbosity="INFO",
include_path: Optional[Union[str, List]] = None,
include_defaults: bool = True,
) -> None:
self.verbosity = verbosity
self.include_path = include_path
self.logger = utils.eval_logger
self.logger.setLevel(getattr(logging, f"{verbosity}"))
self._task_index = self.initialize_tasks(
include_path=include_path, include_defaults=include_defaults
)
self._all_tasks = sorted(list(self._task_index.keys()))
self._all_groups = sorted(
[x for x in self._all_tasks if self._task_index[x]["type"] == "group"]
)
self._all_subtasks = sorted(
[x for x in self._all_tasks if self._task_index[x]["type"] == "task"]
)
self._all_tags = sorted(
[x for x in self._all_tasks if self._task_index[x]["type"] == "tag"]
)
self.task_group_map = collections.defaultdict(list)
def initialize_tasks(
self,
include_path: Optional[Union[str, List]] = None,
include_defaults: bool = True,
):
"""Creates a dictionary of tasks index.
:param include_path: Union[str, List] = None
An additional path to be searched for tasks recursively.
Can provide more than one such path as a list.
:param include_defaults: bool = True
If set to false, default tasks (those in lm_eval/tasks/) are not indexed.
:return
Dictionary of task names as key and task metadata
"""
if include_defaults:
all_paths = [os.path.dirname(os.path.abspath(__file__)) + "/"]
else:
all_paths = []
if include_path is not None:
if isinstance(include_path, str):
include_path = [include_path]
all_paths.extend(include_path)
task_index = {}
for task_dir in all_paths:
tasks = self._get_task_and_group(task_dir)
task_index = {**tasks, **task_index}
return task_index
@property
def all_tasks(self):
return self._all_tasks
@property
def all_groups(self):
return self._all_groups
@property
def all_subtasks(self):
return self._all_subtasks
@property
def all_tags(self):
return self._all_tags
@property
def task_index(self):
return self._task_index
def list_all_tasks(
self, list_groups=True, list_tags=True, list_subtasks=True
) -> str:
from pytablewriter import MarkdownTableWriter
def sanitize_path(path):
# don't print full path if we are within the lm_eval/tasks dir !
# if we aren't though, provide the full path.
if "lm_eval/tasks/" in path:
return "lm_eval/tasks/" + path.split("lm_eval/tasks/")[-1]
else:
return path
group_table = MarkdownTableWriter()
group_table.headers = ["Group", "Config Location"]
gt_values = []
for g in self.all_groups:
path = self.task_index[g]["yaml_path"]
if path == -1:
path = "---"
else:
path = sanitize_path(path)
gt_values.append([g, path])
group_table.value_matrix = gt_values
tag_table = MarkdownTableWriter()
tag_table.headers = ["Tag"]
tag_table.value_matrix = [[t] for t in self.all_tags]
subtask_table = MarkdownTableWriter()
subtask_table.headers = ["Task", "Config Location", "Output Type"]
st_values = []
for t in self.all_subtasks:
path = self.task_index[t]["yaml_path"]
output_type = ""
# read the yaml file to determine the output type
if path != -1:
config = utils.load_yaml_config(path, mode="simple")
if "output_type" in config:
output_type = config["output_type"]
elif (
"include" in config
): # if no output type, check if there is an include with an output type
include_path = path.split("/")[:-1] + config["include"]
include_config = utils.load_yaml_config(include_path, mode="simple")
if "output_type" in include_config:
output_type = include_config["output_type"]
if path == -1:
path = "---"
else:
path = sanitize_path(path)
st_values.append([t, path, output_type])
subtask_table.value_matrix = st_values
result = "\n"
if list_groups:
result += group_table.dumps() + "\n\n"
if list_tags:
result += tag_table.dumps() + "\n\n"
if list_subtasks:
result += subtask_table.dumps() + "\n\n"
return result
def match_tasks(self, task_list):
return utils.pattern_match(task_list, self.all_tasks)
def _name_is_registered(self, name) -> bool:
if name in self.all_tasks:
return True
return False
def _name_is_task(self, name) -> bool:
if self._name_is_registered(name) and (self.task_index[name]["type"] == "task"):
return True
return False
def _name_is_tag(self, name) -> bool:
if self._name_is_registered(name) and (self.task_index[name]["type"] == "tag"):
return True
return False
def _name_is_group(self, name) -> bool:
if self._name_is_registered(name) and (
self.task_index[name]["type"] == "group"
):
return True
return False
def _name_is_python_task(self, name):
if self._name_is_registered(name) and (
self.task_index[name]["type"] == "python_task"
):
return True
return False
def _config_is_task(self, config) -> bool:
if ("task" in config) and isinstance(config["task"], str):
return True
return False
def _config_is_group(self, config) -> bool:
if ("task" in config) and isinstance(config["task"], list):
return True
return False
def _config_is_python_task(self, config) -> bool:
if "class" in config:
return True
return False
def _get_yaml_path(self, name):
if name not in self.task_index:
raise ValueError
return self.task_index[name]["yaml_path"]
def _get_config(self, name):
if name not in self.task_index:
raise ValueError
yaml_path = self._get_yaml_path(name)
if yaml_path == -1:
return {}
else:
return utils.load_yaml_config(yaml_path, mode="full")
def _get_tasklist(self, name):
if self._name_is_task(name):
raise ValueError
return self.task_index[name]["task"]
def _process_alias(self, config, group=None):
# If the group is not the same as the original
# group which the group alias was intended for,
# Set the group_alias to None instead.
if ("group_alias" in config) and ("group" in config) and group is not None:
if config["group"] != group:
config["group_alias"] = None
return config
def _class_has_config_in_constructor(self, cls):
constructor = getattr(cls, "__init__", None)
return (
"config" in inspect.signature(constructor).parameters
if constructor
else False
)
def _load_individual_task_or_group(
self,
name_or_config: Optional[Union[str, dict]] = None,
parent_name: Optional[str] = None,
update_config: Optional[dict] = None,
) -> Mapping:
def _load_task(config, task):
if "include" in config:
config = {
**utils.load_yaml_config(
yaml_path=None,
yaml_config={"include": config.pop("include")},
mode="full",
),
**config,
}
if self._config_is_python_task(config):
if self._class_has_config_in_constructor(config["class"]):
task_object = config["class"](config=config)
else:
task_object = config["class"]()
if isinstance(task_object, ConfigurableTask):
# very scuffed: set task name here. TODO: fixme?
task_object.config.task = config["task"]
else:
task_object = ConfigurableTask(config=config)
return {task: task_object}
def _get_group_and_subtask_from_config(config):
group_name = ConfigurableGroup(config=config)
subtask_list = []
for task in group_name.config["task"]:
if isinstance(task, str) and self._name_is_tag(task):
subtask_list.extend(self._get_tasklist(task))
else:
subtask_list.append(task)
return group_name, subtask_list
def _process_group_config(config, update_config=None):
if update_config is not None:
config = {**config, **update_config}
_update_config = {
k: v for k, v in config.items() if k not in GROUP_ONLY_KEYS
}
if not bool(_update_config):
_update_config = None
group_config = {k: v for k, v in config.items() if k in GROUP_ONLY_KEYS}
return group_config, _update_config
if isinstance(name_or_config, str):
if update_config is not None:
# Process name_or_config as a dict instead
name_or_config = {"task": name_or_config, **update_config}
elif self._name_is_task(name_or_config) or self._name_is_python_task(
name_or_config
):
task_config = self._get_config(name_or_config)
return _load_task(task_config, task=name_or_config)
else:
subtask_list = self._get_tasklist(name_or_config)
if subtask_list == -1:
group_config = self._get_config(name_or_config)
group_config, update_config = _process_group_config(group_config)
group_name, subtask_list = _get_group_and_subtask_from_config(
group_config
)
else:
if self._name_is_tag(name_or_config):
fn = partial(
self._load_individual_task_or_group,
update_config=name_or_config
if isinstance(name_or_config, dict)
else None,
)
return dict(
collections.ChainMap(*map(fn, reversed(subtask_list)))
)
else:
group_name = ConfigurableGroup(
config={"group": name_or_config, "task": subtask_list}
)
if isinstance(name_or_config, dict):
if self._config_is_task(name_or_config):
name = name_or_config.pop("task")
if update_config is not None:
name_or_config = {**name_or_config, **update_config}
# If the name is registered as a group
if self._name_is_group(name):
group_config = self._get_config(name)
group_config, update_config = _process_group_config(
group_config, name_or_config
)
group_name, subtask_list = _get_group_and_subtask_from_config(
group_config
)
elif self._name_is_tag(name):
subtask_list = self._get_tasklist(name)
fn = partial(
self._load_individual_task_or_group,
update_config=name_or_config,
)
return dict(collections.ChainMap(*map(fn, reversed(subtask_list))))
else:
if self._name_is_registered(name):
base_task_config = self._get_config(name)
# Check if this is a duplicate.
if parent_name is not None:
num_duplicate = len(
list(
filter(
lambda x: x.startswith(name),
self.task_group_map[parent_name],
)
)
)
if num_duplicate > 0:
name = f"{name}-{num_duplicate}"
self.task_group_map[parent_name].append(name)
task_config = {
**base_task_config,
**name_or_config,
}
else:
task_config = name_or_config
return _load_task(task_config, task=name)
else:
group_config, update_config = _process_group_config(name_or_config)
group_name, subtask_list = _get_group_and_subtask_from_config(
group_config
)
fn = partial(
self._load_individual_task_or_group,
parent_name=group_name,
update_config=update_config,
)
return {
group_name: dict(collections.ChainMap(*map(fn, reversed(subtask_list))))
}
def load_task_or_group(self, task_list: Optional[Union[str, list]] = None) -> dict:
"""Loads a dictionary of task objects from a list
:param task_list: Union[str, list] = None
Single string or list of string of task names to be loaded
:return
Dictionary of task objects
"""
if isinstance(task_list, str):
task_list = [task_list]
all_loaded_tasks = dict(
collections.ChainMap(*map(self._load_individual_task_or_group, task_list))
)
return all_loaded_tasks
def load_config(self, config: Dict):
return self._load_individual_task_or_group(config)
def _get_task_and_group(self, task_dir: str):
"""Creates a dictionary of tasks index with the following metadata,
- `type`, that can be either `task`, `python_task`, `group` or `tags`.
`task` refer to regular task configs, `python_task` are special
yaml files that only consists of `task` and `class` parameters.
`group` are group configs. `tags` are labels that can be assigned
to tasks to assist in sorting and calling tasks of certain themes.
- `yaml_path`, path to the yaml file. If the entry is a `group` that
was configured through a task config, the yaml_path will be -1
and all subtasks will be listed in `task` (see below)
- `task`, reserved for entries with `type` as `group`. This will list
all subtasks. When a group config is created (as opposed to task
config having `group` parameter set), this will be set to -1 to
avoid recursive indexing. The whole list of subtasks will be loaded
at evaluation.
:param task_dir: str
A directory to check for tasks
:return
Dictionary of task names as key and task metadata
"""
# TODO: remove group in next release
print_info = True
ignore_dirs = [
"__pycache__",
".ipynb_checkpoints",
]
tasks_and_groups = collections.defaultdict()
for root, dirs, file_list in os.walk(task_dir):
dirs[:] = [d for d in dirs if d not in ignore_dirs]
for f in file_list:
if f.endswith(".yaml"):
yaml_path = os.path.join(root, f)
config = utils.load_yaml_config(yaml_path, mode="simple")
if self._config_is_python_task(config):
# This is a python class config
tasks_and_groups[config["task"]] = {
"type": "python_task",
"yaml_path": yaml_path,
}
elif self._config_is_group(config):
# This is a group config
tasks_and_groups[config["group"]] = {
"type": "group",
"task": -1, # This signals that
# we don't need to know
# the task list for indexing
# as it can be loaded
# when called.
"yaml_path": yaml_path,
}
# # Registered the level 1 tasks from a group config
# for config in config["task"]:
# if isinstance(config, dict) and self._config_is_task(config):
# task = config["task"]
# tasks_and_groups[task] = {
# "type": "task",
# "yaml_path": yaml_path,
# }
elif self._config_is_task(config):
# This is a task config
task = config["task"]
tasks_and_groups[task] = {
"type": "task",
"yaml_path": yaml_path,
}
# TODO: remove group in next release
for attr in ["tag", "group"]:
if attr in config:
if attr == "group" and print_info:
self.logger.info(
"`group` and `group_alias` keys in tasks' configs will no longer be used in the next release of lm-eval. "
"`tag` will be used to allow to call a collection of tasks just like `group`. "
"`group` will be removed in order to not cause confusion with the new ConfigurableGroup "
"which will be the offical way to create groups with addition of group-wide configuations."
)
print_info = False
# attr = "tag"
attr_list = config[attr]
if isinstance(attr_list, str):
attr_list = [attr_list]
for tag in attr_list:
if tag not in tasks_and_groups:
tasks_and_groups[tag] = {
"type": "tag",
"task": [task],
"yaml_path": -1,
}
elif tasks_and_groups[tag]["type"] != "tag":
self.logger.info(
f"The tag {tag} is already registered as a group, this tag will not be registered. "
"This may affect tasks you want to call."
)
break
else:
tasks_and_groups[tag]["task"].append(task)
else:
self.logger.debug(f"File {f} in {root} could not be loaded")
return tasks_and_groups
def get_task_name_from_config(task_config: Dict[str, str]) -> str:
if "task" in task_config:
return task_config["task"]
if "dataset_name" in task_config:
return "{dataset_path}_{dataset_name}".format(**task_config)
else:
return "{dataset_path}".format(**task_config)
def get_task_name_from_object(task_object):
if hasattr(task_object, "config"):
return task_object._config["task"]
# TODO: scrap this
# this gives a mechanism for non-registered tasks to have a custom name anyways when reporting
return (
task_object.EVAL_HARNESS_NAME
if hasattr(task_object, "EVAL_HARNESS_NAME")
else type(task_object).__name__
)
def _check_duplicates(task_dict: dict) -> List[str]:
"""helper function solely used in validating get_task_dict output.
Takes the output of lm_eval.evaluator_utils.get_subtask_list and
returns a list of all leaf subtasks contained within, and errors if any such leaf subtasks are
"oversubscribed" to several disjoint groups.
"""
subtask_names = []
for key, value in task_dict.items():
subtask_names.extend(value)
duplicate_tasks = {
task_name for task_name in subtask_names if subtask_names.count(task_name) > 1
}
# locate the potentially problematic groups that seem to 'compete' for constituent subtasks
competing_groups = [
group
for group in task_dict.keys()
if len(set(task_dict[group]).intersection(duplicate_tasks)) > 0
]
if len(duplicate_tasks) > 0:
raise ValueError(
f"Found 1 or more tasks while trying to call get_task_dict() that were members of more than 1 called group: {list(duplicate_tasks)}. Offending groups: {competing_groups}. Please call groups which overlap their constituent tasks in separate evaluation runs."
)
def get_task_dict(
task_name_list: Union[str, List[Union[str, Dict, Task]]],
task_manager: Optional[TaskManager] = None,
):
"""Creates a dictionary of task objects from either a name of task, config, or prepared Task object.
:param task_name_list: List[Union[str, Dict, Task]]
Name of model or LM object, see lm_eval.models.get_model
:param task_manager: TaskManager = None
A TaskManager object that stores indexed tasks. If not set,
task_manager will load one. This should be set by the user
if there are additional paths that want to be included
via `include_path`
:return
Dictionary of task objects
"""
task_name_from_string_dict = {}
task_name_from_config_dict = {}
task_name_from_object_dict = {}
if isinstance(task_name_list, str):
task_name_list = [task_name_list]
elif isinstance(task_name_list, list):
if not all([isinstance(task, (str, dict, Task)) for task in task_name_list]):
raise TypeError(
"Expected all list items to be of types 'str', 'dict', or 'Task', but at least one entry did not match."
)
else:
raise TypeError(
f"Expected a 'str' or 'list' but received {type(task_name_list)}."
)
string_task_name_list = [task for task in task_name_list if isinstance(task, str)]
others_task_name_list = [
task for task in task_name_list if not isinstance(task, str)
]
if len(string_task_name_list) > 0:
if task_manager is None:
task_manager = TaskManager()
task_name_from_string_dict = task_manager.load_task_or_group(
string_task_name_list
)
for task_element in others_task_name_list:
if isinstance(task_element, dict):
task_name_from_config_dict = {
**task_name_from_config_dict,
**task_manager.load_config(config=task_element),
}
elif isinstance(task_element, Task):
task_name_from_object_dict = {
**task_name_from_object_dict,
get_task_name_from_object(task_element): task_element,
}
if not set(task_name_from_string_dict.keys()).isdisjoint(
set(task_name_from_object_dict.keys())
):
raise ValueError
final_task_dict = {
**task_name_from_string_dict,
**task_name_from_config_dict,
**task_name_from_object_dict,
}
# behavior can get odd if one tries to invoke several groups that "compete" for the same task.
# (notably, because one could request several num_fewshot values at once in GroupConfig overrides for the subtask
# and we'd be unsure which to use and report.)
# we explicitly check and error in this case.
_check_duplicates(get_subtask_list(final_task_dict))
return final_task_dict