bisection.py (481 lines of code) (raw):
"""bisection.py
Runs bisection to determine PRs that cause performance change.
It assumes that the pytorch, torchbench, torchtext and torchvision repositories provided are all clean with the latest code.
By default, the torchvision and torchtext package version will be fixed to the latest commit on the pytorch commit date.
Usage:
python bisection.py --work-dir <WORK-DIR> \
--pytorch-src <PYTORCH_SRC_DIR> \
--torchbench-src <TORCHBENCH_SRC_DIR> \
--config <BISECT_CONFIG> --output <OUTPUT_FILE_PATH>
"""
import os
import json
import shutil
import yaml
import argparse
import typing
from tabulate import tabulate
import re
import subprocess
from datetime import datetime
from typing import Optional, List, Dict, Tuple
from torchbenchmark.util import gitutils
TORCH_GITREPO="https://github.com/pytorch/pytorch.git"
TORCHBENCH_GITREPO="https://github.com/pytorch/benchmark.git"
TORCHBENCH_DEPS = {
"torchtext": (os.path.expandvars("${HOME}/text"), "main"),
"torchvision": (os.path.expandvars("${HOME}/vision"), "main"),
}
def exist_dir_path(string):
if os.path.isdir(string):
return string
else:
raise NotADirectoryError(string)
# Translates test name to filter
# For example, ["test_eval[yolov3-cpu-eager]", "test_train[yolov3-gpu-eager]"]
# -> "((eval and yolov3 and cpu and eager) or (train and yolov3 and gpu and eager))"
# If targets is None, run everything except slomo
def targets_to_bmfilter(targets: List[str], models: List[str]) -> str:
bmfilter_names = []
if targets == None or len(targets) == 0:
return "(not slomo)"
for test in targets:
regex = re.compile("test_(train|eval)\[([a-zA-Z0-9_]+)-([a-z]+)-([a-z]+)\]")
m = regex.match(test)
if not m:
if test in models:
partial_name = test
else:
print(f"Cannot recognize the TorchBench filter: {test}. Exit.")
exit(1)
else:
partial_name = " and ".join(m.groups())
bmfilter_names.append(f"({partial_name})")
return "(" + " or ".join(bmfilter_names) + ")"
# Find the latest non-empty json file in the directory
def find_latest_json_file(result_dir: str):
json_files = list(filter(lambda x: x.endswith(".json"), os.listdir(result_dir)))
json_files.sort(reverse=True)
for f in json_files:
# Return the first non-empty json file
json_path = os.path.join(result_dir, f)
if os.path.exists(json_path) and os.stat(json_path).st_size:
return json_path
print(f"Can't find non-empty json files in path: {result_dir}")
return str()
def get_delta_str(reference: float, current: float) -> str:
delta_num = ((current - reference) / current * 100)
delta_str = "{:+3f}".format(delta_num) + "%"
if (abs(delta_num) >= 5):
delta_str = delta_str + "*"
return delta_str
def get_means(data):
rc = dict()
for param in data["benchmarks"]:
name = param["name"]
mean = param["stats"]["mean"]
rc[name] = mean
return rc
def analyze_abtest_result_dir(result_dir: str):
dirs = [ os.path.join(result_dir, name) for name in os.listdir(result_dir) if os.path.isdir(os.path.join(result_dir, name)) ]
delta = False
json_files = list(filter(len, map(find_latest_json_file, dirs)))
out = [['Benchmark']]
assert json_files, f"Don't find benchmark result files in {result_dir}."
# If there are only two json files, we believe it is an abtest, so print delta of the mean
if len(json_files) == 2:
delta = True
with open(json_files[0], "r") as fp:
cur_result = json.load(fp)
means = get_means(cur_result)
for key in means:
out.append([])
out[-1].append(key)
for index, json_file in enumerate(json_files):
with open(json_file, "r") as fp:
jsonobj = json.load(fp)
header = f"Run {os.path.basename(os.path.dirname(json_file))}"
out[0].append(header)
means = get_means(jsonobj)
if delta and index == 0:
reference = means
for key_index, key in enumerate(means):
out[key_index+1].append(means[key])
if delta and index == 1:
out[0].append("Delta")
out[key_index+1].append(get_delta_str(reference[key], means[key]))
out_str = tabulate(out, headers='firstrow')
return out_str
class Commit:
sha: str
ctime: str
digest: Dict[str, float]
def __init__(self, sha, ctime):
self.sha = sha
self.ctime = ctime
self.digest = None
def __str__(self):
return self.sha
class TorchSource:
srcpath: str
build_lazy: bool
commits: List[Commit]
# Map from commit SHA to index in commits
commit_dict: Dict[str, int]
def __init__(self, srcpath: str, build_lazy: bool):
self.srcpath = srcpath
self.build_lazy = build_lazy
self.commits = []
self.commit_dict = dict()
def prep(self) -> bool:
repo_origin_url = gitutils.get_git_origin(self.srcpath)
if not repo_origin_url == TORCH_GITREPO:
print(f"WARNING: Unmatched repo origin url: {repo_origin_url} with standard {TORCH_GITREPO}")
self.update_repos()
# Clean up the existing packages
self.cleanup()
return True
# Update pytorch, torchtext, and torchvision repo
def update_repos(self):
repos = [(self.srcpath, "master")]
repos.extend(TORCHBENCH_DEPS.values())
for (repo, branch) in repos:
gitutils.clean_git_repo(repo)
assert gitutils.update_git_repo(repo, branch), f"Failed to update {branch} branch of repository {repo}."
# Get all commits between start and end, save them in self.commits
def init_commits(self, start: str, end: str, abtest: bool) -> bool:
if not abtest:
commits = gitutils.get_git_commits(self.srcpath, start, end)
else:
commits = [start, end]
if not commits or len(commits) < 2:
print(f"Failed to retrieve commits from {start} to {end} in {self.srcpath}.")
return False
for count, commit in enumerate(commits):
ctime = gitutils.get_git_commit_date(self.srcpath, commit)
self.commits.append(Commit(sha=commit, ctime=ctime))
self.commit_dict[commit] = count
return True
def get_mid_commit(self, left: Commit, right: Commit) -> Optional[Commit]:
left_index = self.commit_dict[left.sha]
right_index = self.commit_dict[right.sha]
if right_index == left_index + 1:
return None
else:
return self.commits[int((left_index + right_index) / 2)]
def setup_build_env(self, env) -> Dict[str, str]:
env["USE_CUDA"] = "1"
env["BUILD_CAFFE2_OPS"] = "0"
# Do not build the test
env["BUILD_TEST"] = "0"
env["USE_MKLDNN"] = "1"
env["USE_MKL"] = "1"
env["USE_CUDNN"] = "1"
env["CMAKE_PREFIX_PATH"] = env["CONDA_PREFIX"]
return env
# Checkout the last commit of dependencies on date
def checkout_deps(self, cdate: datetime):
for pkg in TORCHBENCH_DEPS:
pkg_path, branch = TORCHBENCH_DEPS[pkg]
gitutils.checkout_git_branch(pkg_path, branch)
dep_commit = gitutils.get_git_commit_on_date(pkg_path, cdate)
print(f"Checking out {pkg} commit {dep_commit} ...", end="", flush=True)
assert dep_commit, "Failed to find the commit on {cdate} of {pkg}"
assert gitutils.checkout_git_commit(pkg_path, dep_commit), "Failed to checkout commit {commit} of {pkg}"
print("done.")
# Install dependencies such as torchtext and torchvision
def build_install_deps(self, build_env):
# Build torchvision
print(f"Building torchvision ...", end="", flush=True)
command = "python setup.py install"
subprocess.check_call(command, cwd=TORCHBENCH_DEPS["torchvision"][0], env=build_env, shell=True)
print("done")
# Build torchtext
print(f"Building torchtext ...", end="", flush=True)
command = "python setup.py clean install"
subprocess.check_call(command, cwd=TORCHBENCH_DEPS["torchtext"][0], env=build_env, shell=True)
print("done")
def _build_lazy_tensor(self, commit: Commit, build_env: Dict[str, str]):
if self.build_lazy:
print(f"Building pytorch lazy tensor on {commit.sha} ...", end="", flush=True)
lazy_tensor_path = os.path.join(self.srcpath, "lazy_tensor_core")
command = "./scripts/apply_patches.sh"
subprocess.check_call(command, cwd=self.lazy_tensor_path, env=build_env, shell=True)
command = "python setup.py install"
subprocess.check_call(command, cwd=self.lazy_tensor_path, env=build_env, shell=True)
print("done")
def build(self, commit: Commit):
# checkout pytorch commit
print(f"Checking out pytorch commit {commit.sha} ...", end="", flush=True)
gitutils.checkout_git_commit(self.srcpath, commit.sha)
print("done.")
# checkout pytorch deps commit
ctime = datetime.strptime(commit.ctime.split(" ")[0], "%Y-%m-%d")
self.checkout_deps(ctime)
# setup environment variables
build_env = self.setup_build_env(os.environ.copy())
# build pytorch
print(f"Building pytorch commit {commit.sha} ...", end="", flush=True)
# Check if version.py exists, if it does, remove it.
# This is to force pytorch update the version.py file upon incremental compilation
version_py_path = os.path.join(self.srcpath, "torch/version.py")
if os.path.exists(version_py_path):
os.remove(version_py_path)
try:
command = "python setup.py install"
subprocess.check_call(command, cwd=self.srcpath, env=build_env, shell=True)
command_testbuild = "python -c 'import torch'"
subprocess.check_call(command_testbuild, cwd=os.environ["HOME"], env=build_env, shell=True)
except subprocess.CalledProcessError:
# Remove the build directory, then try build it again
build_path = os.path.join(self.srcpath, "build")
if os.path.exists(build_path):
shutil.rmtree(build_path)
subprocess.check_call(command, cwd=self.srcpath, env=build_env, shell=True)
print("done")
# build pytorch lazy tensor if needed
self._build_lazy_tensor(commit, build_env)
self.build_install_deps(build_env)
def cleanup(self):
packages = ["torch", "torchtext", "torchvision"]
CLEANUP_ROUND = 5
# Clean up multiple times to make sure the packages are all uninstalled
for _ in range(CLEANUP_ROUND):
command = "pip uninstall -y " + " ".join(packages) + " || true"
subprocess.check_call(command, shell=True)
print("done")
class TorchBench:
srcpath: str # path to pytorch/benchmark source code
branch: str
timelimit: int # timeout limit in minutes
workdir: str
models: List[str]
first_time: bool
torch_src: TorchSource
def __init__(self, srcpath: str,
torch_src: TorchSource,
timelimit: int,
workdir: str):
self.srcpath = srcpath
self.torch_src = torch_src
self.timelimit = timelimit
self.workdir = workdir
self.first_time = True
self.models = list()
def prep(self) -> bool:
# Verify the code in srcpath is pytorch/benchmark
repo_origin_url = gitutils.get_git_origin(self.srcpath)
if not repo_origin_url == TORCHBENCH_GITREPO:
print(f"WARNING: Unmatched repo origin url: {repo_origin_url} with standard {TORCHBENCH_GITREPO}")
# get the name of current branch
self.branch = gitutils.get_current_branch(self.srcpath)
# get list of models
self.models = [ model for model in os.listdir(os.path.join(self.srcpath, "torchbenchmark", "models"))
if os.path.isdir(os.path.join(self.srcpath, "torchbenchmark", "models", model)) ]
return True
def _install_benchmark(self):
"Install and build TorchBench dependencies"
command = ["python", "install.py"]
subprocess.check_call(command, cwd=self.srcpath, shell=False)
def run_benchmark(self, commit: Commit, targets: List[str]) -> str:
# Return the result json file path
output_dir = os.path.join(self.workdir, commit.sha)
# If the directory already exists, clear its contents
if os.path.exists(output_dir):
assert os.path.isdir(output_dir), "Must specify output directory: {output_dir}"
filelist = [ f for f in os.listdir(output_dir) ]
for f in filelist:
os.remove(os.path.join(output_dir, f))
else:
os.mkdir(output_dir)
bmfilter = targets_to_bmfilter(targets, self.models)
# If the first time to run benchmark, install the dependencies first
if self.first_time:
self._install_benchmark()
self.first_time = False
print(f"Running TorchBench for commit: {commit.sha}, filter {bmfilter} ...", end="", flush=True)
command = f"""bash .github/scripts/run.sh "{output_dir}" "{bmfilter}" 2>&1 | tee {output_dir}/benchmark.log"""
try:
subprocess.check_call(command, cwd=self.srcpath, shell=True, timeout=self.timelimit * 60)
except subprocess.TimeoutExpired:
print(f"Benchmark timeout for {commit.sha}. Result will be None.")
return output_dir
print("done.")
return output_dir
def gen_digest(self, result_dir: str, targets: List[str]) -> Dict[str, float]:
filelist = [ f for f in os.listdir(result_dir) if f.endswith(".json") ]
out = dict()
if not len(filelist):
print(f"Empty directory or json file in {result_dir}. Return empty digest.")
return out
# Use the first json as the benchmark data file
data_file = os.path.join(result_dir, filelist[0])
if not os.stat(data_file).st_size:
print(f"Empty json file {filelist[0]} in {result_dir}. Return empty digest.")
return out
with open(data_file, "r") as df:
data = json.load(df)
# Fill in targets if it is None
if targets == None:
targets = list()
for each in data["benchmarks"]:
targets.append(each["name"])
old_targets = targets.copy()
for t in filter(lambda x: x in self.models, old_targets):
targets.remove(t)
names = filter(lambda y: t in y, map(lambda x: x["name"], data["benchmarks"]))
targets.extend(list(names))
for each in data["benchmarks"]:
if each["name"] in targets:
out[each["name"]] = each["stats"]["mean"]
# Make sure all target tests are available
for target in targets:
assert out[target], f"Don't find benchmark result of {target} in {filelist[0]}."
return out
def get_digest(self, commit: Commit, targets: List[str], debug: bool) -> Dict[str, float]:
# digest is cached
if commit.digest is not None:
return commit.digest
# if debug mode, skip the build and benchmark run
if debug:
result_dir = os.path.join(self.workdir, commit.sha)
if os.path.isdir(result_dir):
filelist = [ f for f in os.listdir(result_dir) if f.endswith(".json") ]
if len(filelist):
data_file = os.path.join(result_dir, filelist[0])
if os.stat(data_file).st_size:
commit.digest = self.gen_digest(result_dir, targets)
return commit.digest
# Build pytorch and its dependencies
self.torch_src.build(commit)
# Run benchmark
result_dir = self.run_benchmark(commit, targets)
commit.digest = self.gen_digest(result_dir, targets)
print(f"Cleaning up packages from commit {commit.sha} ...", end="", flush=True)
self.torch_src.cleanup()
return commit.digest
class TorchBenchBisection:
workdir: str
start: str
end: str
threshold: float
direction: str
targets: List[str]
# left commit, right commit, targets to test
bisectq: List[Tuple[Commit, Commit, List[str]]]
result: List[Tuple[Commit, Commit]]
torch_src: TorchSource
bench: TorchBench
output_json: str
debug: bool
abtest: bool
def __init__(self,
workdir: str,
torch_src: str,
bench_src: str,
start: str,
end: str,
threshold: float,
direction: str,
timeout: int,
targets: List[str],
output_json: str,
build_lazy: bool = False,
debug: bool = False):
self.workdir = workdir
self.start = start
self.end = end
self.threshold = threshold
self.direction = direction
self.targets = targets
self.bisectq = list()
self.result = list()
self.torch_src = TorchSource(srcpath = torch_src, build_lazy=build_lazy)
self.bench = TorchBench(srcpath = bench_src,
torch_src = self.torch_src,
timelimit = timeout,
workdir = self.workdir)
self.output_json = output_json
self.debug = debug
# Special treatment for abtest
self.abtest = False
if self.threshold == 100.0 and self.direction == "decrease":
self.abtest = True
# Left: older commit; right: newer commit
# Return: List of targets that satisfy the regression rule: <threshold, direction>
def regression(self, left: Commit, right: Commit, targets: List[str]) -> List[str]:
# If uncalculated, commit.digest will be None
assert left.digest, "Commit {left.sha} must have a digest"
assert right.digest, "Commit {right.sha} must have a digest"
out = []
for target in targets:
# digest could be empty if benchmark timeout
left_mean = left.digest[target] if len(left.digest) else 0
right_mean = right.digest[target] if len(right.digest) else 0
# If either left or right timeout, diff is 100. Otherwise use the min mean value to calculate diff.
diff = abs(left_mean - right_mean) / min(left_mean, right_mean) * 100 if min(left_mean, right_mean) else 100
# If both timeout, diff is zero percent
diff = 0 if not max(left_mean, right_mean) else diff
print(f"Target {target}: left commit {left.sha} mean {left_mean} vs. right commit {right.sha} mean {right_mean}. Diff: {diff}.")
if diff >= self.threshold:
if self.direction == "increase" and left_mean < right_mean:
# Time increase == performance regression
out.append(target)
elif self.direction == "decrease" and left_mean > right_mean:
# Time decrease == performance optimization
out.append(target)
elif self.direction == "both":
out.append(target)
return out
def prep(self) -> bool:
if not self.torch_src.prep():
return False
if not self.torch_src.init_commits(self.start, self.end, self.abtest):
return False
if not self.bench.prep():
return False
left_commit = self.torch_src.commits[0]
right_commit = self.torch_src.commits[-1]
self.bisectq.append((left_commit, right_commit, self.targets))
return True
def run(self):
while len(self.bisectq):
(left, right, targets) = self.bisectq.pop(0)
self.bench.get_digest(left, targets, self.debug)
self.bench.get_digest(right, targets, self.debug)
if targets == None and len(left.digest):
targets = left.digest.keys()
if targets == None and len(right.digest):
targets = right.digest.keys()
updated_targets = self.regression(left, right, targets)
if len(updated_targets):
mid = self.torch_src.get_mid_commit(left, right)
if mid == None:
self.result.append((left, right))
else:
self.bisectq.append((left, mid, updated_targets))
self.bisectq.append((mid, right, updated_targets))
def output(self):
json_obj = dict()
json_obj["start"] = self.start
json_obj["end"] = self.end
json_obj["threshold"] = self.threshold
json_obj["timeout"] = self.bench.timelimit
json_obj["torchbench_branch"] = self.bench.branch
json_obj["result"] = []
for res in self.result:
r = dict()
r["commit1"] = res[0].sha
r["commit1_time"] = res[0].ctime
r["commit1_digest"] = res[0].digest if len(res[0].digest) else "timeout"
r["commit2"] = res[1].sha
r["commit2_time"] = res[1].ctime
r["commit2_digest"] = res[1].digest if len(res[1].digest) else "timeout"
json_obj["result"].append(r)
with open(self.output_json, 'w') as outfile:
json.dump(json_obj, outfile, indent=2)
def output_abtest_result(self):
abtest_result = analyze_abtest_result_dir(self.workdir)
with open(self.output_json, 'w') as outfile:
outfile.write(abtest_result)
print(abtest_result)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--work-dir",
help="bisection working directory",
type=exist_dir_path)
parser.add_argument("--pytorch-src",
help="the directory of pytorch source code git repository",
type=exist_dir_path)
parser.add_argument("--torchbench-src",
help="the directory of torchbench source code git repository",
type=exist_dir_path)
parser.add_argument("--config",
help="the bisection configuration in YAML format")
parser.add_argument("--output",
help="the output json file")
parser.add_argument("--analyze-result",
help="specify the output result directory to analyze")
# by default, do not build lazy tensor
parser.add_argument("--build-lazy",
action='store_true',
help="build lazy tensor feature in PyTorch")
# by default, debug mode is disabled
parser.add_argument("--debug",
help="run in debug mode, if the result json exists, use it directly",
action='store_true')
args = parser.parse_args()
# If this is to print the overview of a test result, don't need to run the actual execution
if args.analyze_result:
print(analyze_abtest_result_dir(args.analyze_result))
exit(0)
with open(args.config, "r") as f:
bisect_config = yaml.full_load(f)
# sanity checks
valid_directions = ["increase", "decrease", "both"]
assert("start" in bisect_config), "Illegal bisection config, must specify start commit SHA."
assert("end" in bisect_config), "Illegal bisection config, must specify end commit SHA."
assert("threshold" in bisect_config), "Illegal bisection config, must specify threshold."
assert("direction" in bisect_config), "Illegal bisection config, must specify direction."
assert(bisect_config["direction"] in valid_directions), "We only support increase, decrease, or both directions"
assert("timeout" in bisect_config), "Illegal bisection config, must specify timeout."
targets = None
if "tests" in bisect_config:
targets = bisect_config["tests"]
bisection = TorchBenchBisection(workdir=args.work_dir,
torch_src=args.pytorch_src,
bench_src=args.torchbench_src,
start=bisect_config["start"],
end=bisect_config["end"],
threshold=bisect_config["threshold"],
direction=bisect_config["direction"],
timeout=bisect_config["timeout"],
targets=targets,
output_json=args.output,
build_lazy=args.build_lazy,
debug=args.debug)
assert bisection.prep(), "The working condition of bisection is not satisfied."
print("Preparation steps ok. Commit to bisect: " + " ".join([str(x) for x in bisection.torch_src.commits]))
bisection.run()
if bisection.abtest:
bisection.output_abtest_result()
else:
bisection.output()