benchmarking/repo_driver.py (399 lines of code) (raw):

#!/usr/bin/env python ############################################################################## # Copyright 2017-present, Facebook, Inc. # All rights reserved. # # This source code is licensed under the license found in the # LICENSE file in the root directory of this source tree. ############################################################################## from __future__ import absolute_import from __future__ import division from __future__ import print_function from __future__ import unicode_literals import argparse import datetime import json import os import sys import threading import time import traceback from collections import deque from harness import BenchmarkDriver from repos.repos import getRepo from utils.build_program import buildProgramPlatform from utils.custom_logger import getLogger from utils.utilities import ( getDirectory, deepMerge, getString, getRunStatus, setRunStatus, getMeta, ) parser = argparse.ArgumentParser(description="Perform one benchmark run") parser.add_argument( "--ab_testing", action="store_true", help="Enable A/B testing in benchmark." ) parser.add_argument( "--base_commit", help="In A/B testing, this is the control commit that is used to compare against. " + "If not specified, the default is the first commit in the week in UTC timezone. " + "Even if specified, the control is the later of the specified commit and the commit at the start of the week.", ) parser.add_argument( "--branch", default="master", help="The remote repository branch. Defaults to master", ) parser.add_argument( "--commit", default="master", help="The commit this benchmark runs on. It can be a branch. Defaults to master. " + "If it is a commit hash, and program runs on continuous mode, it is the starting " + "commit hash the regression runs on. The regression runs on all commits starting from " "the specified commit.", ) parser.add_argument( "--commit_file", help="The file saves the last commit hash that the regression has finished. " + "If this argument is specified and is valid, the --commit has no use.", ) parser.add_argument("--env", help="environment variables passed to runtime binary") parser.add_argument( "--exec_dir", required=True, help="The executable is saved in the specified directory. " + "If an executable is found for a commit, no re-compilation is performed. " + "Instead, the previous compiled executable is reused.", ) parser.add_argument( "--framework", required=True, choices=["caffe2", "oculus", "generic", "pytorch", "tflite", "glow"], help="Specify the framework to benchmark on.", ) parser.add_argument( "--frameworks_dir", default=None, help="Required. The root directory that all frameworks resides. " "Usually it is the " + os.path.join("specifications", "frameworks") + "directory.", ) parser.add_argument( "--interval", type=int, help="The minimum time interval in seconds between two benchmark runs.", ) parser.add_argument( "--platforms", required=True, help="Specify the platforms to benchmark on, in comma separated list." "Use this flag if the framework" " needs special compilation scripts. The scripts are called build.sh " "saved in " + os.path.join("specifications", "frameworks", "<framework>", "<platforms>") + " directory", ) parser.add_argument( "--regression", action="store_true", help="Indicate whether this run detects regression.", ) parser.add_argument( "--remote_repository", default="origin", help="The remote repository. Defaults to origin", ) parser.add_argument( "--repo", default="git", choices=["git", "hg"], help="Specify the source control repo of the framework.", ) parser.add_argument( "--repo_dir", required=True, help="Required. The base framework repo directory used for benchmark.", ) parser.add_argument( "--same_host", action="store_true", help="Specify whether the build and benchmark run are on the same host. " "If so, the build cannot be done in parallel with the benchmark run.", ) parser.add_argument( "--status_file", help="A file to inform the driver stops running when the content of the file is 0.", ) parser.add_argument( "--step", type=int, default=1, help="Specify the number of commits we want to run the benchmark once under continuous mode.", ) def stopRun(status_file): if status_file and os.path.isfile(status_file): with open(status_file, "r") as file: content = file.read().strip() if content == "0": return True return False def _runIndividual(interval, regression, ab_testing): return not interval and not regression and not ab_testing class ExecutablesBuilder(threading.Thread): def __init__(self, repo, work_queue, queue_lock, **kwargs): threading.Thread.__init__(self) raw_args = kwargs.get("raw_args", None) self.args, self.unknowns = parser.parse_known_args(raw_args) self.repo = repo self.work_queue = work_queue self.queue_lock = queue_lock self.current_commit_hash = None def run(self): try: if self.args.interval: while not stopRun(self.args.status_file): self._buildExecutables() time.sleep(self.args.interval) else: # single run self._buildExecutables() except Exception: setRunStatus(2) getLogger().exception("Error building executable.") def _buildExecutables(self): platforms = self.args.platforms.split(",") while not stopRun(self.args.status_file) and self._pullNewCommits(): for platform in platforms: self._saveOneCommitExecutable(platform) def _saveOneCommitExecutable(self, platform): getLogger().info( "Building executable on {} ".format(platform) + "@ {}".format(self.current_commit_hash) ) same_host = self.args.same_host if same_host: self.queue_lock.acquire() repo_info = self._buildOneCommitExecutable(platform, self.current_commit_hash) if repo_info is None: getLogger().error("Failed to extract repo commands. Skip this commit.") else: if not same_host: self.queue_lock.acquire() self.work_queue.append(repo_info) if self.queue_lock.locked(): self.queue_lock.release() def _buildOneCommitExecutable(self, platform, commit_hash): repo_info = {} repo_info_treatment = self._setupRepoStep(platform, commit_hash) if repo_info_treatment is None: return None repo_info["treatment"] = repo_info_treatment if self.args.ab_testing: # only build control on regression detection # figure out the base commit. It is the first commit in the week control_commit_hash = self._getControlCommit( repo_info_treatment["commit_time"], self.args.base_commit ) repo_info_control = self._setupRepoStep(platform, control_commit_hash) if repo_info_control is None: return None repo_info["control"] = repo_info_control # Pass meta file from build to benchmark meta = getMeta(self.args, platform) if meta: assert "meta" not in self.info, "info field already has a meta field" self.info["meta"] = meta if self.args.regression: repo_info["regression_commits"] = self._getCompareCommits( repo_info_treatment["commit"] ) # use repo_info to pass the value of platform repo_info["platform"] = platform return repo_info def _getCompareCommits(self, latest_commit): commits = self.repo.getPriorCommits(latest_commit, 12) if not commits: return [] commits = commits.split("\n") if commits[-1] == "": commits.pop() res = [] for commit in commits: c = commit.split(":") assert len(c) == 2, "Length is incorrect" res.append({"commit": c[0], "commit_time": int(float(c[1]))}) return res def _pullNewCommits(self): new_commit_hash = None if _runIndividual( self.args.interval, self.args.regression, self.args.ab_testing ): new_commit_hash = self.repo.getCurrentCommitHash() if new_commit_hash is None: getLogger().error("Commit is not specified") return False else: # first get into the correct branch self.repo.checkout(self.args.branch) self.repo.pull(self.args.remote_repository, self.args.branch) if self.current_commit_hash is None: self.current_commit_hash = self._getSavedCommit() if self.current_commit_hash is None: new_commit_hash = self.repo.getCommitHash(self.args.commit) else: new_commit_hash = self.repo.getNextCommitHash( self.current_commit_hash, self.args.step ) if new_commit_hash == self.current_commit_hash: getLogger().info( "Commit %s is already processed, sleeping...", new_commit_hash ) return False self.current_commit_hash = new_commit_hash return True def _getSavedCommit(self): if self.args.commit_file and os.path.isfile(self.args.commit_file): with open(self.args.commit_file, "r") as file: commit_hash = file.read().strip() # verify that the commit exists return self.repo.getCommitHash(commit_hash) else: return None def _setupRepoStep(self, platform, commit): repo_info = {} repo_info["commit"] = self.repo.getCommitHash(commit) repo_info["commit_time"] = self.repo.getCommitTime(repo_info["commit"]) return repo_info if self._buildProgram(platform, repo_info) else None def _buildProgram(self, platform, repo_info): directory = getDirectory(repo_info["commit"], repo_info["commit_time"]) program = self.args.framework + "_benchmark" if os.name == "nt": program = program + ".exe" elif platform.startswith("ios"): program = program + ".ipa" dst = os.path.join( self.args.exec_dir, self.args.framework, platform, directory, program ) repo_info["program"] = dst repo_info["programs"] = {"program": {"location": dst}} filedir = os.path.dirname(dst) if not _runIndividual( self.args.interval, self.args.regression, self.args.ab_testing ) and os.path.isfile(dst): return True else: result = self._buildProgramPlatform(repo_info, dst, platform) for fn in os.listdir(filedir): if fn != program: repo_info["programs"][fn] = {"location": os.path.join(filedir, fn)} return result def _buildProgramPlatform(self, repo_info, dst, platform): self.repo.checkout(repo_info["commit"]) return buildProgramPlatform( dst, self.args.repo_dir, self.args.framework, self.args.frameworks_dir, platform, ) def _getControlCommit(self, reference_time, base_commit): # Get start of week dt = datetime.datetime.utcfromtimestamp(reference_time) monday = dt - datetime.timedelta(days=dt.weekday()) start_of_week = monday.replace(hour=0, minute=0, second=0, microsecond=0) if base_commit: base_commit_time = self.repo.getCommitTime(base_commit) base_commit_datetime = datetime.datetime.utcfromtimestamp(base_commit_time) if base_commit_datetime >= start_of_week: return base_commit # Give more buffer to the date range to avoid the timezone issues start = end = start_of_week repeat = True while repeat: logs_str = self.repo.getCommitsInRange(start, end) if logs_str == "": end = end + datetime.timedelta(hours=1) else: repeat = False logs = logs_str.split("\n") for row in logs: items = row.strip().split(":") assert len(items) == 2, "Repo log format is wrong" commit_hash = items[0].strip() unix_time = int(float(items[1].strip())) unix_datetime = datetime.datetime.utcfromtimestamp(unix_time) if unix_datetime >= start_of_week: return commit_hash raise AssertionError("Cannot find the control commit") return None class RepoDriver(object): def __init__(self, **kwargs): raw_args = kwargs.get("raw_args", None) self.args, self.unknowns = parser.parse_known_args(raw_args) self.repo = getRepo(self.args.repo, self.args.repo_dir) self.queue_lock = threading.Lock() self.work_queue = deque() self.executables_builder = ExecutablesBuilder( self.repo, self.work_queue, self.queue_lock, raw_args=raw_args ) def run(self): getLogger().info( "Start benchmark run @ %s" % datetime.datetime.now().strftime("%Y_%m_%d_%H_%M") ) self.executables_builder.start() self._runBenchmarkSuites() return getRunStatus() def _runBenchmarkSuites(self): # initially sleep 10 seconds in case no need to build the binary time.sleep(10) if self.args.interval: while not stopRun(self.args.status_file): self._runBenchmarkSuitesInQueue() time.sleep(self.args.interval) else: # single run while self.executables_builder.is_alive(): time.sleep(10) self._runBenchmarkSuitesInQueue() def _runBenchmarkSuitesInQueue(self): same_host = self.args.same_host while not stopRun(self.args.status_file) and self.work_queue: # we can do this because this is the only # consumer of the work_queue self.queue_lock.acquire() repo_info = self.work_queue.popleft() if not same_host: self.queue_lock.release() self._runOneBenchmarkSuite(repo_info) if same_host: self.queue_lock.release() def _runOneBenchmarkSuite(self, repo_info): raw_args = self._getRawArgs(repo_info) if not _runIndividual( self.args.interval, self.args.regression, self.args.ab_testing ): # always sleep 10 seconds to make the phone in a more # consistent state time.sleep(10) # cannot use subprocess because it conflicts with requests app = BenchmarkDriver(raw_args=raw_args) app.run() ret = 0 setRunStatus(ret >> 8) if self.args.commit_file and self.args.regression: with open(self.args.commit_file, "w") as file: file.write(repo_info["treatment"]["commit"]) getLogger().info( "One benchmark run {} for ".format("successful" if ret == 0 else "failed") + repo_info["treatment"]["commit"] ) def _getRawArgs(self, repo_info): platform = repo_info["platform"] # Remove it from repo_info to avoid polution, should clean up later del repo_info["platform"] unknowns = self.unknowns # a not so elegant way of merging info construct if "--info" in unknowns: info_idx = unknowns.index("--info") info = json.loads(unknowns[info_idx + 1]) deepMerge(repo_info, info) del unknowns[info_idx + 1] del unknowns[info_idx] info = json.dumps(repo_info) raw_args = [] raw_args.extend( [ "--platform", getString(platform), "--framework", getString(self.args.framework), "--info", info, ] ) raw_args.extend(unknowns) if self.args.env: raw_args.append("--env") env_vars = self.args.env.split() for env_var in env_vars: raw_args.append(env_var) return raw_args if __name__ == "__main__": app = RepoDriver() app.run()