benchmarking/run_remote.py (739 lines of code) (raw):

#!/usr/bin/env python ############################################################################## # Copyright 2017-present, Facebook, Inc. # All rights reserved. # # This source code is licensed under the license found in the # LICENSE file in the root directory of this source tree. ############################################################################## from __future__ import absolute_import from __future__ import division from __future__ import print_function from __future__ import unicode_literals import argparse import glob import json import os import re import shutil import subprocess import tempfile import threading from collections import defaultdict from getpass import getuser from random import randint import pkg_resources from bridge.db import DBDriver from remote.devices import Devices from remote.file_handler import FileHandler from remote.print_result_url import PrintResultURL from remote.screen_reporter import ScreenReporter from tabulate import tabulate from utils.build_program import buildProgramPlatform, buildUsingBuck from utils.custom_logger import getLogger, setLoggerLevel from utils.utilities import getBenchmarks, getMeta, parse_kwarg, unpackAdhocFile parser = argparse.ArgumentParser(description="Run the benchmark remotely") parser.add_argument( "--app_id", help="The app id you use to upload/download your file for everstore " "and access the job queue", ) parser.add_argument( "--async_submit", action="store_true", help="Return once the job has been submitted to db. No need to wait till " "finish so that you can submit mutiple jobs in async way.", ) parser.add_argument( "-b", "--benchmark_file", help="Specify the json file for the benchmark or a number of benchmarks", ) parser.add_argument( "--benchmark_db", help="The database that will store benchmark infos" ) parser.add_argument("--benchmark_db_entry", help="The entry point of server's database") parser.add_argument( "--benchmark_table", help="The table that will store benchmark infos" ) parser.add_argument( "-c", "--custom_binary", help="Specify the custom binary that you want to run." ) parser.add_argument( "--cache_config", required=True, help="The config file to specify the cached uploaded files. If the files " "are already uploaded in the recent past, do not upload again.", ) parser.add_argument( "--hashes", default=None, help="Specify the exact devices to run remotely by hashes. Have to use " "together with --remote and --devices", ) parser.add_argument( "--debug", action="store_true", help="Debug mode to retain all the running binaries and models.", ) parser.add_argument( "--log_output_dir", default=None, help="Directory where the benchmark logs are written to. If not specified, the logs are outputted to the terminal", ) parser.add_argument( "--devices", help="Specify the devices to benchmark on, in comma separated list." ) parser.add_argument( "--devices_config", default=None, help="The config file in absolute path to map abbreviations to full names", ) parser.add_argument("--env", help="environment variables passed to runtime binary") parser.add_argument( "--fetch_result", action="store_true", help="Fetch the result of already submitted jobs, use together with " "--user_identifier", ) parser.add_argument( "--fetch_status", action="store_true", help="Fetch the status of already submitted jobs, use together with " "--user_identifier", ) parser.add_argument( "--kill", action="store_true", help="Kill submitted jobs, use together with " "--user_identifier", ) parser.add_argument( "--file_storage", help="The storage engine for uploading and downloading files" ) parser.add_argument( "--force_submit", action="store_true", help="Force to submit the run." ) parser.add_argument( "--framework", choices=["caffe2", "generic", "oculus", "pytorch", "tflite", "glow"], help="Specify the framework to benchmark on.", ) parser.add_argument( "--frameworks_dir", default=None, help="The root directory that all frameworks resides. " "Usually it is the specifications/frameworks directory. " "If not provide, we will try to find it from the binary.", ) parser.add_argument( "--info", help="The json serialized options describing the control and treatment." ) parser.add_argument( "--job_queue", default="aibench_interactive", help="Specify the db job queue that the benchmark is sent to", ) parser.add_argument( "--list_devices", action="store_true", help="List the devices associated to the job queue", ) parser.add_argument( "--list_job_queues", action="store_true", help="List the job queues that have available devices", ) parser.add_argument( "--logger_level", default="info", choices=["info", "warning", "error"], help="Specify the logger level", ) parser.add_argument( "--platform", help="Specify the platform to benchmark on." "Use this flag if the framework" " needs special compilation scripts. The scripts are called build.sh " "saved in specifications/frameworks/<framework>/<platforms> directory", ) parser.add_argument( "--pre_built_binary", help="Specify the pre_built_binary to bypass the building process.", ) parser.add_argument( "--force_profile", action="store_true", help="Enable profiling regardless of the setting in the benchmark.", ) parser.add_argument( "--query_num_devices", help="Return the counter of user specified device name under different condition", ) parser.add_argument( "--repo_dir", help="Required. The base framework repo directory used for benchmark." ) parser.add_argument( "--result_db", help="The database that will store benchmark results" ) parser.add_argument( "--root_model_dir", help="The root model directory if the meta data of the model uses " "relative directory, i.e. the location field starts with //", ) parser.add_argument( "--screen_reporter", action="store_true", help="Display the summary of the benchmark result on screen.", ) parser.add_argument("--server_addr", help="The lab's server address") parser.add_argument( "--string_map", help="The json serialized arguments passed into treatment for remote run.", ) parser.add_argument( "--test", action="store_true", help="Indicate whether this is a test run. Test runs use a different database.", ) parser.add_argument( "--token", help="The token you use to upload/download your file for everstore " "and access the job queue", ) parser.add_argument( "--urlPrefix", help="URL Prefix if you want to find your result from the URL." ) parser.add_argument( "--user_identifier", help="The identifier user pass in to differentiate different benchmark runs.", ) parser.add_argument( "--user_string", help="The user_string pass in to differentiate different regression benchmark runs.", ) parser.add_argument( "--adhoc", nargs="?", const="generic", default=None, help="Use the adhoc template file", ) parser.add_argument( "--buck_target", default="", help="The buck command to build the custom binary" ) class BuildProgram(threading.Thread): def __init__(self, args, file_handler, tempdir, filenames, prebuilt_binary=None): threading.Thread.__init__(self) self.tempdir = tempdir self.args = args self.file_handler = file_handler self.filenames = filenames self.prebuilt_binary = prebuilt_binary def run(self): self._buildProgram(self.tempdir) def _buildProgram(self, tempdir): # build binary platform = self.args.platform program = tempdir + "/program" if os.name == "nt": program = program + ".exe" elif platform.startswith("ios"): program = program + ".ipa" if self.prebuilt_binary: program = self.prebuilt_binary elif self.args.buck_target: print("Building program with buck...") success = buildUsingBuck(program, self.args.platform, self.args.buck_target) if not success: return else: print("Building program...") success = buildProgramPlatform( program, self.args.repo_dir, self.args.framework, self.args.frameworks_dir, self.args.platform, ) if not success: return # upload all files under the fname directory filedir = os.path.dirname(program) allfiles = [] if os.path.exists(filedir): if self.prebuilt_binary: allfiles = [program] else: allfiles = [ os.path.join(filedir, f) for f in os.listdir(filedir) if os.path.isfile(os.path.join(filedir, f)) ] for fn in allfiles: filename, _ = self.file_handler.uploadFile(fn, None, None, False) getLogger().info("program: {}".format(filename)) self.filenames[os.path.basename(fn)] = filename # main program needs to be in self.filenames["program"] = self.filenames[os.path.basename(program)] else: self.filenames["program"] = program class RunRemote(object): def __init__(self, raw_args=None): self.args, self.unknowns = parser.parse_known_args(raw_args) self._updateArgs(self.args) setLoggerLevel(self.args.logger_level) if not self.args.benchmark_db_entry: assert ( self.args.server_addr is not None ), "Either server_addr or benchmark_db_entry must be specified" while self.args.server_addr[-1] == "/": self.args.server_addr = self.args.server_addr[:-1] self.args.benchmark_db_entry = self.args.server_addr + "/benchmark/" self.db = DBDriver( self.args.benchmark_db, self.args.app_id, self.args.token, self.args.benchmark_table, self.args.job_queue, self.args.test, self.args.benchmark_db_entry, ) self.url_printer = PrintResultURL(self.args) self.file_handler = FileHandler(self.args) self.devices = Devices(self.args.devices_config) # Hard code scuba table self.scuba_dataset = "caffe2_benchmarking" self.info = None self.temprdir = "" def run(self): if self.args.list_devices: devices = self.db.listDevices(self.args.job_queue) self._listDevices() return devices if self.args.list_job_queues: self._printJobQueues() return if self.args.fetch_status or self.args.fetch_result: result = self._fetchResult() return result if self.args.kill: self._killJob() return if self.args.query_num_devices: return self._queryNumDevices(self.args.query_num_devices) assert self.args.benchmark_file, "--benchmark_file (-b) must be specified" assert self.args.devices, "--devices must be specified" assert self.args.framework, "--framework must be specified" assert self.args.platform, "--platform must be specified" assert self.args.repo_dir, "--repo_dir must be specified" assert ( (self.args.info is not None) and (self.args.custom_binary is None) and (self.args.pre_built_binary is None) ) or ( self.args.info is None ), "--info cannot co-exist with --custom_binary and --pre_built_binary" list_job_queues = self._listJobQueues() if not self.args.force_submit: self._checkDevices(self.args.devices, self.args.hashes) assert ( self.args.job_queue != "*" and self.args.job_queue in list_job_queues ), "--job_queue must be choosen from " + " ".join(list_job_queues) self.tempdir = tempfile.mkdtemp(prefix="aibench") program_filenames = {} if self.args.info: self.info = json.loads(self.args.info) else: self.info = {"treatment": {"programs": {}}} if self.args.string_map: self.info["treatment"]["string_map"] = str(self.args.string_map) assert ("treatment" in self.info) and ("programs" in self.info["treatment"]), ( 'In --info, field treatment must exist. In info["treatment"] ' "program field must exist (may be None)" ) binary = ( self.info["treatment"]["programs"]["program"]["location"] if ( "programs" in self.info["treatment"] and "program" in self.info["treatment"]["programs"] ) else self.args.custom_binary if self.args.custom_binary else self.args.pre_built_binary ) t = BuildProgram( self.args, self.file_handler, self.tempdir, program_filenames, binary ) t.start() benchmarks = getBenchmarks(self.args.benchmark_file, self.args.framework) self._updateBenchmarksWithArgs(benchmarks, self.args) for benchmark in benchmarks: self._uploadOneBenchmark(benchmark) if self.args.debug: for test in benchmark["content"]["tests"]: test["log_output"] = True if self.args.env: env = {} env_vars = self.args.env.split() for env_var in env_vars: k, v = parse_kwarg(env_var) env[k] = v for test in benchmark["content"]["tests"]: cmd_env = {} cmd_env.update(env) if "env" in test: cmd_env.update(test["env"]) test["env"] = cmd_env t.join() assert ( "program" in program_filenames ), "program does not exist. Build may be failed." for fn in program_filenames: self.info["treatment"]["programs"][fn] = {"location": program_filenames[fn]} # Pass meta file from build to benchmark meta = getMeta(self.args, self.args.platform) if meta: assert "meta" not in self.info, "info field already has a meta field" self.info["meta"] = meta new_devices = self.devices.getFullNames(self.args.devices) user_identifier = ( int(self.args.user_identifier) if self.args.user_identifier else randint(1, 1000000000000000) ) user = getuser() if not self.args.user_string else self.args.user_string hashes = self.args.hashes for benchmark in benchmarks: data = { "benchmark": benchmark, "info": self.info, } self.db.submitBenchmarks(data, new_devices, user_identifier, user, hashes) if self.args.async_submit: print("Job submitted.") self._printRunDetailsURL(user_identifier) return self.url_printer.printURL(self.scuba_dataset, user_identifier, benchmarks) if not self.args.debug: shutil.rmtree(self.tempdir, True) if self.args.screen_reporter: self._screenReporter(user_identifier) self._printRunDetailsURL(user_identifier) # Clean up try: rm_list = glob.glob("/tmp/aibench*") rm_list.extend(glob.iglob("/tmp/aibench*")) for f in rm_list: if os.path.isdir(f): shutil.rmtree(f, True) if os.path.isfile(f): os.remove(f) except Exception: pass def _updateBenchmarksWithArgs(self, benchmarks, args): for benchmark in benchmarks: content = benchmark["content"] tests = [] if "tests" in content: tests = content["tests"] for test in tests: if args.force_profile: if "profiler" not in test: test["profiler"] = {} test["profiler"]["enabled"] = True def _uploadOneBenchmark(self, benchmark): filename = benchmark["filename"] one_benchmark = benchmark["content"] # TODO refactor the code to collect all files to upload del_paths = [] if "model" in one_benchmark: if "files" in one_benchmark["model"]: for field in one_benchmark["model"]["files"]: value = one_benchmark["model"]["files"][field] assert ( "location" in value ), "location field is missing in benchmark " "{}".format(filename) ref_path = ["files", field] if self._uploadFile(value, filename, benchmark, ref_path): del_paths.append(ref_path) if "libraries" in one_benchmark["model"]: for value in one_benchmark["model"]["libraries"]: assert ( "location" in value ), "location field is missing in benchmark " "{}".format(filename) self._uploadFile(value, filename, benchmark) for del_path in del_paths: self._del_from_benchmark(benchmark["content"]["model"], del_path) # upload test file assert ( "tests" in one_benchmark ), "tests field is missing in benchmark {}".format(filename) tests = one_benchmark["tests"] for test in tests: if "input_files" in test: self._uploadTestFiles(test["input_files"], filename) # ignore the outputs for non accuracy metrics if "output_files" in test and test["metric"] == "error": self._uploadTestFiles(test["output_files"], filename) def _uploadTestFiles(self, files, basefilename): if isinstance(files, list): for i in range(len(files)): f = files[i] self._uploadFile(f, basefilename) elif isinstance(files, dict): for f in files: value = files[f] if isinstance(value, list): for i in range(len(value)): v = value[i] self._uploadFile(v, basefilename) else: self._uploadFile(value, basefilename) def _uploadFile( self, f, basefilename, benchmark=None, ref_path=None, cache_file=True ): if "location" not in f: return location = f["location"] if "md5" not in f: raise Exception("No md5sum provided for {}".format(f["filename"])) md5 = f["md5"] """ For the file from repo, there is special handling we need to fetch both control and treatment , and also move the file from benchmark to info Note: Support the file in model first """ if location.startswith("//repo"): assert ( ref_path is not None ), "repo is not yet \ supported for {}".format( location ) for side in self.info: if side == "extra": continue value = self.info[side] commit_hash = "master" if "commit" in value: commit_hash = value["commit"] or "master" tgt_file = self._downloadRepoFile(location, self.tempdir, commit_hash) f["location"], f["md5"] = self.file_handler.uploadFile( tgt_file, md5, basefilename, cache_file ) # add to info assert len(ref_path), "ref_path must be a path to target file" value["programs"][".".join(ref_path)] = {"location": f["location"]} # remove from benchmark assert ( benchmark is not None ), "benchmark must be passed into _uploadFile" return True else: f["location"], f["md5"] = self.file_handler.uploadFile( location, md5, basefilename, cache_file ) return False def _downloadRepoFile(self, location, tgt_dir, commit_hash): """ location: //repo/fbsource/fbcode/aibench/...../a.py """ raw_scm_query = pkg_resources.resource_string( "aibench", "benchmarking/bin/scm_query.par" ) query_exe = os.path.join(tgt_dir, "scm_query.par") with open(query_exe, "wb") as f: f.write(raw_scm_query) cmd = ["chmod", "+x", os.path.join(tgt_dir, "scm_query.par")] subprocess.check_output(cmd) dirs = location[2:].split("/") tgt_file = os.path.join(tgt_dir, dirs[-1]) cmd = [ query_exe, "--repo", dirs[1], "--file_path", "/".join(dirs[2:]), "--target_file", tgt_file, "--commit_hash", commit_hash, ] getLogger().info("Downloading {}".format(location)) subprocess.check_output(cmd) os.remove(query_exe) return tgt_file def _del_from_benchmark(self, benchmark, ref_path): tgt = benchmark for item in ref_path[:-1]: tgt = tgt[item] tgt.pop(ref_path[-1]) def _listDevices(self, flag=True): devices = self.db.listDevices(self.args.job_queue) headers = ["Device", "Status", "Abbrs", "Hash"] rows = [] for device in devices: abbrs = self.devices.getAbbrs(device["device"]) abbrs = ",".join(abbrs) if abbrs else "" hash = device["hash"] row = [device["device"], device["status"], abbrs, hash] rows.append(row) rows.sort() if flag: table = tabulate(rows, headers=headers, tablefmt="orgtbl") print("\n{}\n".format(table)) return rows def _checkDevices(self, specified_devices, hashes=None): rows = self._listDevices(flag=False) specifiedDevices = set(specified_devices.split(",")) specifiedHashes = None if hashes: hashes = hashes.split(",") devices = specified_devices.split(",") if len(hashes) != len(devices): raise Exception("You need to provide same number of hashes and devices") specifiedHashes = {} for i, hash in enumerate(hashes): specifiedHashes[hash] = devices[i] devices = {} devicesIn = True for row in rows: abbrs = row[-2].split(",") if row[-2] else [] if row[-1] not in devices: devices[row[-1]] = {row[0]}.union(set(abbrs)) else: devices[row[-1]].union({row[0]}.union(set(abbrs))) if specifiedHashes: for specifiedHash in specifiedHashes: if ( specifiedHash not in devices or specifiedHashes[specifiedHash] not in devices[specifiedHash] ): devicesIn = False else: allDevices = set() for v in devices.values(): allDevices = allDevices.union(v) devicesIn = not specifiedDevices.difference(allDevices) if not devicesIn: errMessages = " ".join( [ "Devices", specified_devices, "is not available in the job_queue", self.args.job_queue, ] ) if hashes: errMessages = " ".join( [ "Devices", specified_devices, "with hashes", ",".join(hashes), "is not available in the job_queue", self.args.job_queue, ] ) raise Exception(errMessages) def _queryNumDevices(self, device_name): deviceCounter = defaultdict(int) for device in self.db.listDevices(self.args.job_queue): abbrs = self.devices.getAbbrs(device["device"]) if device["device"] == device_name or device_name in (abbrs or []): deviceCounter[device["status"]] += 1 return deviceCounter def _listJobQueues(self): devices = self.db.listDevices(job_queue="*") list_job_queues = sorted({device["job_queue"] for device in devices}) return list_job_queues def _printJobQueues(self): list_job_queues = self._listJobQueues() for jobQueue in list_job_queues: print(jobQueue) def _printRunDetailsURL(self, user_identifier): if self.args.urlPrefix: print( "You can find more info via {}{}".format( self.args.urlPrefix, user_identifier ) ) def _screenReporter(self, user_identifier): reporter = ScreenReporter( self.db, self.devices, self.args.debug, self.args.log_output_dir ) reporter.run(user_identifier) def _fetchResult(self): user_identifier = self.args.user_identifier assert user_identifier, ( "User identifier must be specified for " "fetching the status and/or result of the previously run benchmarks" ) statuses = self.db.statusBenchmarks(user_identifier) result = None if self.args.fetch_status: result = json.dumps(statuses) elif self.args.fetch_result: ids = ",".join([str(status["id"]) for status in statuses]) output = self.db.getBenchmarks(ids) self._mobilelabResult(output) result = json.dumps(output) return result def _killJob(self): user_identifier = self.args.user_identifier assert user_identifier, ( "User identifier must be specified for " "killing submitted jobs." ) statuses = self.db.statusBenchmarks(user_identifier) result = json.dumps(statuses) status = json.loads(result)[-1]["status"] if status in ["RUNNING", "QUEUE"]: self.db.killBenchmarks(user_identifier) getLogger().info("The job has been killed") else: getLogger().info( "The job cannot be killed since its status is {}".format(status) ) def _mobilelabResult(self, output): # always get the last result for item in output: raw_result = item["result"] if raw_result is None: continue result = json.loads(raw_result) mobilelab_result = {"treatment": {}, "control": {}} for k in result: # k is identifier v = result[k] for kk in v: vv = v[kk] # update values if only summary exists if "values" not in vv or len(vv["values"]) == 0: if "summary" in vv: if "mean" in vv["summary"]: vv["values"] = [vv["summary"]["mean"]] elif "p50" in vv["summary"]: vv["values"] = [vv["summary"]["p50"]] if "control_summary" in vv: if "mean" in vv["control_summary"]: vv["control_values"] = [vv["control_summary"]["mean"]] elif "p50" in vv["control_summary"]: vv["control_values"] = [vv["control_summary"]["p50"]] # check values again if "values" not in vv or len(vv["values"]) == 0: continue assert vv["type"], "type is missing in {}".format(kk) assert vv["metric"], "metric is missing in {}".format(kk) if vv["metric"] == "flops": continue unit = vv["unit"] if "unit" in vv else "null" self._mobilelabAddField( mobilelab_result["treatment"], k, vv["type"], vv["metric"], vv["values"], unit, ) if "control_values" in vv: self._mobilelabAddField( mobilelab_result["control"], k, vv["type"], vv["metric"], vv["control_values"], unit, ) item["mobilelab_result"] = mobilelab_result def _mobilelabAddField(self, output, identifier, type, metric, values, unit): key = "{}__{}__{}".format(identifier, type, metric) key = re.sub(r"\W+", "_", key) assert key not in output, "duplicate key {}".format(key) output[key] = { "values": values, "metric": metric, "type": type, "unit": unit, } def _updateArgs(self, args): # Remove later when adhoc is moved to seperated infrastructure if args.adhoc is not None: adhoc_file, success = unpackAdhocFile(args.adhoc) if success: args.benchmark_file = adhoc_file else: getLogger().error( "Could not find specified adhoc config: {}".format(args.adhoc) ) if __name__ == "__main__": raw_args = None app = RunRemote(raw_args=raw_args) app.run()