in lmms_eval/__main__.py [0:0]
def cli_evaluate_single(args: Union[argparse.Namespace, None] = None) -> None:
eval_logger = logging.getLogger("lmms-eval")
eval_logger.setLevel(getattr(logging, f"{args.verbosity}"))
eval_logger.info(f"Verbosity set to {args.verbosity}")
os.environ["TOKENIZERS_PARALLELISM"] = "false"
initialize_tasks(args.verbosity)
if args.limit:
eval_logger.warning(" --limit SHOULD ONLY BE USED FOR TESTING." "REAL METRICS SHOULD NOT BE COMPUTED USING LIMIT.")
if args.include_path is not None:
eval_logger.info(f"Including path: {args.include_path}")
include_path(args.include_path)
if args.tasks is None:
task_names = ALL_TASKS
elif args.tasks == "list":
eval_logger.info("Available Tasks:\n - {}".format(f"\n - ".join(sorted(ALL_TASKS))))
sys.exit()
elif args.tasks == "list_with_num":
log_message = (
"\n" + "=" * 70 + "\n" + "\n\tYou are trying to check all the numbers in each task." + "\n\tThis action will download the complete dataset." + "\n\tIf the results are not clear initially, call this again." + "\n\n" + "=" * 70
)
eval_logger.info(log_message)
task_dict = get_task_dict([task for task in sorted(ALL_TASKS)], model_name="llava")
for task_name in task_dict.keys():
task_obj = task_dict[task_name]
if type(task_obj) == tuple:
group, task_obj = task_obj
if task_obj is None:
continue
eval_logger.info(f"\nTask : {task_obj.config.task}\n - #num : {len(task_obj.test_docs()) if task_obj.has_test_docs() else len(task_obj.validation_docs())}")
sys.exit()
else:
tasks_list = args.tasks.split(",")
eval_logger.info(f"Evaluating on {len(tasks_list)} tasks.")
task_names = utils.pattern_match(tasks_list, ALL_TASKS)
task_missing = [task for task in tasks_list if task not in task_names and "*" not in task] # we don't want errors if a wildcard ("*") task name was used
if task_missing:
missing = ", ".join(task_missing)
eval_logger.error(
f"Tasks were not found: {missing}. Try `lmms-eval --tasks list` for list of available tasks",
)
# eval_logger.warn(f"Tasks {missing} were not found. Try `lmms-eval --tasks list` for list of available tasks.")
eval_logger.info(f"Selected Tasks: {task_names}")
# set datetime before evaluation
datetime_str = utils.get_datetime_str(timezone=args.timezone)
if args.output_path:
hash_input = f"{args.model_args}".encode("utf-8")
hash_output = hashlib.sha256(hash_input).hexdigest()[:6]
path = Path(args.output_path)
path = path.expanduser().resolve().joinpath(f"{datetime_str}_{args.log_samples_suffix}_{args.model}_model_args_{hash_output}")
args.output_path = path
elif args.log_samples and not args.output_path:
assert args.output_path, "Specify --output_path"
results = evaluator.simple_evaluate(
model=args.model,
model_args=args.model_args,
tasks=task_names,
num_fewshot=args.num_fewshot,
batch_size=args.batch_size,
device=args.device,
limit=args.limit,
check_integrity=args.check_integrity,
show_task_to_terminal=args.show_task_to_terminal,
log_samples=args.log_samples,
gen_kwargs=args.gen_kwargs,
cli_args=args,
)
if results is not None:
if args.log_samples:
samples = results.pop("samples")
else:
samples = None
dumped = json.dumps(results, indent=4, default=_handle_non_serializable)
if args.show_config:
print(dumped)
if args.output_path:
args.output_path.mkdir(parents=True, exist_ok=True)
result_file_path = path.joinpath("results.json")
if result_file_path.exists():
eval_logger.warning(f"Output file {result_file_path} already exists and will be overwritten.")
result_file_path.open("w").write(dumped)
if args.log_samples:
for task_name, config in results["configs"].items():
filename = args.output_path.joinpath(f"{task_name}.json")
# Structure the data with 'args' and 'logs' keys
data_to_dump = {"args": vars(args), "model_configs": config, "logs": sorted(samples[task_name], key=lambda x: x["doc_id"])} # Convert Namespace to dict
samples_dumped = json.dumps(data_to_dump, indent=4, default=_handle_non_serializable)
filename.open("w").write(samples_dumped)
eval_logger.info(f"Saved samples to {filename}")
return results, samples
return None, None