torchbenchmark/__init__.py (390 lines of code) (raw):
import contextlib
import dataclasses
import gc
import importlib
import io
import os
import pathlib
import subprocess
import sys
import tempfile
import threading
from typing import Any, Callable, Dict, List, NoReturn, Optional, Tuple
from urllib import request
from components._impl.tasks import base as base_task
from components._impl.workers import subprocess_worker
TORCH_DEPS = ['torch', 'torchvision', 'torchtext']
proxy_suggestion = "Unable to verify https connectivity, " \
"required for setup.\n" \
"Do you need to use a proxy?"
this_dir = pathlib.Path(__file__).parent.absolute()
model_dir = 'models'
install_file = 'install.py'
def _test_https(test_url: str = 'https://github.com', timeout: float = 0.5) -> bool:
try:
request.urlopen(test_url, timeout=timeout)
except OSError:
return False
return True
def _install_deps(model_path: str, verbose: bool = True) -> Tuple[bool, Any]:
from .util.env_check import get_pkg_versions
run_args = [
[sys.executable, install_file],
]
run_env = os.environ.copy()
run_env["PYTHONPATH"] = this_dir.parent
run_kwargs = {
'cwd': model_path,
'check': True,
'env': run_env,
}
output_buffer = None
_, stdout_fpath = tempfile.mkstemp()
try:
output_buffer = io.FileIO(stdout_fpath, mode="w")
if os.path.exists(os.path.join(model_path, install_file)):
if not verbose:
run_kwargs['stderr'] = subprocess.STDOUT
run_kwargs['stdout'] = output_buffer
versions = get_pkg_versions(TORCH_DEPS)
subprocess.run(*run_args, **run_kwargs) # type: ignore
new_versions = get_pkg_versions(TORCH_DEPS)
if versions != new_versions:
errmsg = f"The torch packages are re-installed after installing the benchmark deps. \
Before: {versions}, after: {new_versions}"
return (False, errmsg, None)
else:
return (True, f"No install.py is found in {model_path}. Skip.", None)
except subprocess.CalledProcessError as e:
return (False, e.output, io.FileIO(stdout_fpath, mode="r").read().decode())
except Exception as e:
return (False, e, io.FileIO(stdout_fpath, mode="r").read().decode())
finally:
del output_buffer
os.remove(stdout_fpath)
return (True, None, None)
def _list_model_paths() -> List[str]:
def dir_contains_file(dir, file_name) -> bool:
names = map(lambda x: x.name, filter(lambda x: x.is_file(), dir.iterdir()))
return file_name in names
p = pathlib.Path(__file__).parent.joinpath(model_dir)
# Only load the model directories that contain a "__init.py__" file
return sorted(str(child.absolute()) for child in p.iterdir() if child.is_dir() and dir_contains_file(child, "__init__.py"))
def setup(models: List[str] = [], verbose: bool = True, continue_on_fail: bool = False) -> bool:
if not _test_https():
print(proxy_suggestion)
sys.exit(-1)
failures = {}
models = list(map(lambda p: p.lower(), models))
model_paths = filter(lambda p: True if not models else os.path.basename(p).lower() in models, _list_model_paths())
for model_path in model_paths:
print(f"running setup for {model_path}...", end="", flush=True)
success, errmsg, stdout_stderr = _install_deps(model_path, verbose=verbose)
if success and errmsg and "No install.py is found" in errmsg:
print("SKIP - No install.py is found")
elif success:
print("OK")
else:
print("FAIL")
try:
errmsg = errmsg.decode()
except Exception:
pass
# If the install was very chatty, we don't want to overwhelm.
# This will not affect verbose mode, which does not catch stdout
# and stderr.
log_lines = (stdout_stderr or "").splitlines(keepends=False)
if len(log_lines) > 40:
log_lines = log_lines[:20] + ["..."] + log_lines[-20:]
stdout_stderr = "\n".join(log_lines)
if stdout_stderr:
errmsg = f"{stdout_stderr}\n\n{errmsg or ''}"
failures[model_path] = errmsg
if not continue_on_fail:
break
for model_path in failures:
print(f"Error for {model_path}:")
print("---------------------------------------------------------------------------")
print(failures[model_path])
print("---------------------------------------------------------------------------")
print()
return len(failures) == 0
@dataclasses.dataclass(frozen=True)
class ModelDetails:
"""Static description of what a particular TorchBench model supports.
When parameterizing tests, we only want to generate sensible ones.
(e.g. Those where a model can be imported and supports the feature to be
tested or benchmarked.) This requires us to import the model; however many
of the models are EXTREMELY stateful, and even importing them consumes
significant system resources. As a result, we only want one (or a few)
alive at any given time.
Note that affinity cannot be solved by simply calling `torch.set_num_threads`
in the child process; this will cause PyTorch to use all of the cores but
at a much lower efficiency.
This class describes what a particular model does and does not support, so
that we can release the underlying subprocess but retain any pertinent
metadata.
"""
path: str
exists: bool
optimized_for_inference: bool
_diagnostic_msg: str
metadata: Dict[str, Any]
@property
def name(self) -> str:
return os.path.basename(self.path)
class Worker(subprocess_worker.SubprocessWorker):
"""Run subprocess using taskset if CPU affinity is set.
When GOMP_CPU_AFFINITY is set, importing `torch` in the main process has
the very surprising effect of changing the threading behavior in the
subprocess. (See https://github.com/pytorch/pytorch/issues/49971 for
details.) This is a problem, because it means that the worker is not
hermetic and also tends to force the subprocess torch to run in single
threaded mode which drastically skews results.
This can be ameliorated by calling the subprocess using `taskset`, which
allows the subprocess PyTorch to properly bind threads.
"""
@property
def args(self) -> List[str]:
affinity = os.environ.get("GOMP_CPU_AFFINITY", "")
return (
["taskset", "--cpu-list", affinity] if affinity else []
) + super().args
class ModelTask(base_task.TaskBase):
# The worker may (and often does) consume significant system resources.
# In order to ensure that runs do not interfere with each other, we only
# allow a single ModelTask to exist at a time.
_lock = threading.Lock()
def __init__(
self,
model_path: str,
timeout: Optional[float] = None,
) -> None:
gc.collect() # Make sure previous task has a chance to release the lock
assert self._lock.acquire(blocking=False), "Failed to acquire lock."
self._model_path = model_path
self._worker = Worker(timeout=timeout)
self.worker.run("import torch")
self._details: ModelDetails = ModelDetails(
**self._maybe_import_model(
package=__name__,
model_path=model_path,
)
)
def __del__(self) -> None:
self._lock.release()
@property
def worker(self) -> subprocess_worker.SubprocessWorker:
return self._worker
@property
def model_details(self) -> bool:
return self._details
# =========================================================================
# == Import Model in the child process ====================================
# =========================================================================
@base_task.run_in_worker(scoped=True)
@staticmethod
def _maybe_import_model(package: str, model_path: str) -> Dict[str, Any]:
import importlib
import os
model_name = os.path.basename(model_path)
diagnostic_msg = ""
try:
module = importlib.import_module(f'.models.{model_name}', package=package)
Model = getattr(module, 'Model', None)
if Model is None:
diagnostic_msg = f"Warning: {module} does not define attribute Model, skip it"
elif not hasattr(Model, 'name'):
Model.name = model_name
except ModuleNotFoundError as e:
Model = None
diagnostic_msg = f"Warning: Could not find dependent module {e.name} for Model {model_name}, skip it"
# Populate global namespace so subsequent calls to worker.run can access `Model`
globals()["Model"] = Model
# This will be used to populate a `ModelDetails` instance in the parent.
return {
"path": model_path,
"exists": Model is not None,
"optimized_for_inference": hasattr(Model, "optimized_for_inference"),
"_diagnostic_msg": diagnostic_msg,
"metadata": {}
}
# =========================================================================
# == Instantiate a concrete `model` instance ==============================
# =========================================================================
@base_task.run_in_worker(scoped=True)
@staticmethod
def make_model_instance(test: str, device: str, jit: bool, batch_size: Optional[int]=None, extra_args: List[str]=[]) -> None:
Model = globals()["Model"]
model = Model(test=test, device=device, jit=jit, batch_size=batch_size, extra_args=extra_args)
import gc
gc.collect()
if device == 'cuda':
torch.cuda.empty_cache()
maybe_sync = torch.cuda.synchronize
else:
maybe_sync = lambda: None
globals().update({
"model": model,
"maybe_sync": maybe_sync,
})
# =========================================================================
# == Get Model attribute in the child process =============================
# =========================================================================
@base_task.run_in_worker(scoped=True)
@staticmethod
def get_model_attribute(attr: str) -> Any:
model = globals()["model"]
if hasattr(model, attr):
return getattr(model, attr)
else:
return None
def gc_collect(self) -> None:
self.worker.run("""
import gc
gc.collect()
""")
def del_model_instance(self):
self.worker.run("""
del model
del maybe_sync
""")
self.gc_collect()
# =========================================================================
# == Forward calls to `model` from parent to worker =======================
# =========================================================================
def set_train(self) -> None:
self.worker.run("model.set_train()")
def invoke(self) -> None:
self.worker.run("""
model.invoke()
maybe_sync()
""")
def set_eval(self) -> None:
self.worker.run("model.set_eval()")
def extract_details_train(self) -> None:
self._details.metadata["train_benchmark"] = self.worker.load_stmt("torch.backends.cudnn.benchmark")
self._details.metadata["train_deterministic"] = self.worker.load_stmt("torch.backends.cudnn.deterministic")
def check_details_train(self, device, md) -> None:
self.extract_details_train()
if device == 'cuda':
assert md["train_benchmark"] == self._details.metadata["train_benchmark"], \
"torch.backends.cudnn.benchmark does not match expect metadata during training."
assert md["train_deterministic"] == self._details.metadata["train_deterministic"], \
"torch.backends.cudnn.deterministic does not match expect metadata during training."
def extract_details_eval(self) -> None:
self._details.metadata["eval_benchmark"] = self.worker.load_stmt("torch.backends.cudnn.benchmark")
self._details.metadata["eval_deterministic"] = self.worker.load_stmt("torch.backends.cudnn.deterministic")
# FIXME: Models will use context "with torch.no_grad():", so the lifetime of no_grad will end after the eval().
# FIXME: Must incorporate this "torch.is_grad_enabled()" inside of actual eval() func.
# self._details.metadata["eval_nograd"] = not self.worker.load_stmt("torch.is_grad_enabled()")
self._details.metadata["eval_nograd"] = True
def check_details_eval(self, device, md) -> None:
self.extract_details_eval()
if device == 'cuda':
assert md["eval_benchmark"] == self._details.metadata["eval_benchmark"], \
"torch.backends.cudnn.benchmark does not match expect metadata during eval."
assert md["eval_deterministic"] == self._details.metadata["eval_deterministic"], \
"torch.backends.cudnn.deterministic does not match expect metadata during eval."
assert md["eval_nograd"] == self._details.metadata["eval_nograd"], \
"torch.is_grad_enabled does not match expect metadata during eval."
def check_opt_vs_noopt_jit(self) -> None:
self.worker.run("model.check_opt_vs_noopt_jit()")
@base_task.run_in_worker(scoped=True)
@staticmethod
def check_example() -> None:
model = globals()["model"]
module, example_inputs = model.get_module()
if isinstance(example_inputs, dict):
# Huggingface models pass **kwargs as arguments, not *args
module(**example_inputs)
else:
module(*example_inputs)
# If model implements `gen_inputs()` interface, test the first example input it generates
try:
input_iter, _size = model.gen_inputs()
next_inputs = next(input_iter)
for input in next_inputs:
if isinstance(input, dict):
# Huggingface models pass **kwargs as arguments, not *args
module(**input)
else:
module(*input)
except NotImplementedError:
# We allow models that don't implement this interface
pass
@base_task.run_in_worker(scoped=True)
@staticmethod
def check_eval_output() -> None:
instance = globals()["model"]
import torch
assert instance.test == "eval", "We only support checking output of an eval test. Please submit a bug report."
out = instance.invoke()
model_name = getattr(instance, 'name', None)
if not isinstance(out, tuple):
raise RuntimeError(f'Model {model_name} eval test output is not a tuple')
for ind, element in enumerate(out):
if not isinstance(element, torch.Tensor):
raise RuntimeError(f'Model {model_name} eval test output is tuple, but'
f' its {ind}-th element is not a Tensor.')
@base_task.run_in_worker(scoped=True)
@staticmethod
def check_device() -> None:
instance = globals()["model"]
# Check this BenchmarkModel has a device attribute.
current_device = getattr(instance, 'device', None)
if current_device is None:
raise RuntimeError('Missing device in BenchmarkModel.')
model, inputs = instance.get_module()
model_name = getattr(model, 'name', None)
# Check the model tensors are assigned to the expected device.
for t in model.parameters():
model_device = t.device.type
if model_device != current_device:
raise RuntimeError(f'Model {model_name} was not set to the'
f' expected device {current_device},'
f' found device {model_device}.')
# Check the inputs are assigned to the expected device.
def check_inputs(inputs):
if isinstance(inputs, torch.Tensor):
if inputs.dim() and current_device == "cuda":
# Zero dim Tensors (Scalars) can be captured by CUDA
# kernels and need not match device.
return
inputs_device = inputs.device.type
if inputs_device != current_device:
raise RuntimeError(f'Model {model_name} inputs were'
f' not set to the expected device'
f' {current_device}, found device'
f' {inputs_device}.')
elif isinstance(inputs, tuple):
# Some inputs are nested inside tuples, such as tacotron2
for i in inputs:
check_inputs(i)
elif isinstance(inputs, dict):
# Huggingface models take inputs as kwargs
for i in inputs.values():
check_inputs(i)
check_inputs(inputs)
# =========================================================================
# == Control `torch` state (in the subprocess) ============================
# =========================================================================
@contextlib.contextmanager
def no_grad(self, disable_nograd: bool) -> None:
# TODO: deduplicate with `torchbenchmark.util.model.no_grad`
initial_value = self.worker.load_stmt("torch.is_grad_enabled()")
eval_in_nograd = (
not disable_nograd and
self.worker.load_stmt("model.eval_in_nograd()"))
try:
self.worker.run(f"torch.set_grad_enabled({not eval_in_nograd})")
yield
finally:
self.worker.run(f"torch.set_grad_enabled({initial_value})")
@contextlib.contextmanager
def watch_cuda_memory(
self,
skip: bool,
assert_equal: Callable[[int, int], NoReturn],
):
# This context manager is used in testing to ensure we're not leaking
# memory; these tests are generally parameterized by device, so in some
# cases we want this (and the outer check) to simply be a no-op.
if skip:
yield
return
self.gc_collect()
memory_before = self.worker.load_stmt("torch.cuda.memory_allocated()")
yield
self.gc_collect()
assert_equal(
memory_before,
self.worker.load_stmt("torch.cuda.memory_allocated()"),
)
self.worker.run("torch.cuda.empty_cache()")
def list_models_details(workers: int = 1) -> List[ModelDetails]:
return [
ModelTask(model_path).model_details
for model_path in _list_model_paths()
]
def list_models(model_match=None):
models = []
for model_path in _list_model_paths():
model_name = os.path.basename(model_path)
try:
module = importlib.import_module(f'.models.{model_name}', package=__name__)
except ModuleNotFoundError as e:
print(f"Warning: Could not find dependent module {e.name} for Model {model_name}, skip it")
continue
Model = getattr(module, 'Model', None)
if Model is None:
print(f"Warning: {module} does not define attribute Model, skip it")
continue
if not hasattr(Model, 'name'):
Model.name = model_name
# If given model_match, only return full or partial name matches in models.
if model_match is None:
models.append(Model)
else:
if model_match.lower() in Model.name.lower():
models.append(Model)
return models
def load_model_by_name(model):
models = filter(lambda x: model.lower() == x.lower(),
map(lambda y: os.path.basename(y), _list_model_paths()))
models = list(models)
if not models:
return None
assert len(models) == 1, f"Found more than one models {models} with the exact name: {model}"
model_name = models[0]
try:
module = importlib.import_module(f'.models.{model_name}', package=__name__)
except ModuleNotFoundError as e:
print(f"Warning: Could not find dependent module {e.name} for Model {model_name}, skip it")
return None
Model = getattr(module, 'Model', None)
if Model is None:
print(f"Warning: {module} does not define attribute Model, skip it")
return None
if not hasattr(Model, 'name'):
Model.name = model_name
return Model
def get_metadata_from_yaml(path):
import yaml
metadata_path = path + "/metadata.yaml"
md = None
if os.path.exists(metadata_path):
with open(metadata_path, 'r') as f:
md = yaml.load(f, Loader=yaml.FullLoader)
return md
def str_to_bool(input: Any) -> bool:
if not input:
return False
return str(input).lower() in ("1", "yes", "y", "true", "t", "on")