in d2go/runner/default_runner.py [0:0]
def _do_test(self, cfg, model, train_iter=None, model_tag="default"):
"""train_iter: Current iteration of the model, None means final iteration"""
assert len(cfg.DATASETS.TEST)
assert cfg.OUTPUT_DIR
is_final = (train_iter is None) or (train_iter == cfg.SOLVER.MAX_ITER - 1)
logger.info(
f"Running evaluation for model tag {model_tag} at iter {train_iter}..."
)
def _get_inference_dir_name(base_dir, inference_type, dataset_name):
return os.path.join(
base_dir,
inference_type,
model_tag,
str(train_iter) if train_iter is not None else "final",
dataset_name,
)
add_flop_printing_hook(model, cfg.OUTPUT_DIR)
results = OrderedDict()
results[model_tag] = OrderedDict()
for dataset_name in cfg.DATASETS.TEST:
# Evaluator will create output folder, no need to create here
output_folder = _get_inference_dir_name(
cfg.OUTPUT_DIR, "inference", dataset_name
)
# NOTE: creating evaluator after dataset is loaded as there might be dependency. # noqa
data_loader = self.build_detection_test_loader(cfg, dataset_name)
evaluator = self.get_evaluator(
cfg, dataset_name, output_folder=output_folder
)
if not isinstance(evaluator, DatasetEvaluators):
evaluator = DatasetEvaluators([evaluator])
if comm.is_main_process():
tbx_writer = self.get_tbx_writer(cfg)
logger.info("Adding visualization evaluator ...")
mapper = self.get_mapper(cfg, is_train=False)
vis_eval_type = self.get_visualization_evaluator()
if vis_eval_type is not None:
evaluator._evaluators.append(
vis_eval_type(
cfg,
tbx_writer,
mapper,
dataset_name,
train_iter=train_iter,
tag_postfix=model_tag,
)
)
results_per_dataset = inference_on_dataset(model, data_loader, evaluator)
if comm.is_main_process():
results[model_tag][dataset_name] = results_per_dataset
if is_final:
print_csv_format(results_per_dataset)
if is_final and cfg.TEST.AUG.ENABLED:
# In the end of training, run an evaluation with TTA
# Only support some R-CNN models.
output_folder = _get_inference_dir_name(
cfg.OUTPUT_DIR, "inference_TTA", dataset_name
)
logger.info("Running inference with test-time augmentation ...")
data_loader = self.build_detection_test_loader(
cfg, dataset_name, mapper=lambda x: x
)
evaluator = self.get_evaluator(
cfg, dataset_name, output_folder=output_folder
)
inference_on_dataset(
GeneralizedRCNNWithTTA(cfg, model), data_loader, evaluator
)
if is_final and cfg.TEST.EXPECTED_RESULTS and comm.is_main_process():
assert len(results) == 1, "Results verification only supports one dataset!"
verify_results(cfg, results[model_tag][cfg.DATASETS.TEST[0]])
# write results to tensorboard
if comm.is_main_process() and results:
from detectron2.evaluation.testing import flatten_results_dict
flattened_results = flatten_results_dict(results)
for k, v in flattened_results.items():
tbx_writer = self.get_tbx_writer(cfg)
tbx_writer._writer.add_scalar("eval_{}".format(k), v, train_iter)
if comm.is_main_process():
tbx_writer = self.get_tbx_writer(cfg)
tbx_writer._writer.flush()
return results