benchmarking/driver/benchmark_driver.py (373 lines of code) (raw):

#!/usr/bin/env python ############################################################################## # Copyright 2017-present, Facebook, Inc. # All rights reserved. # # This source code is licensed under the license found in the # LICENSE file in the root directory of this source tree. ############################################################################## from __future__ import absolute_import from __future__ import division from __future__ import print_function from __future__ import unicode_literals import copy import gc import os import sys import time import traceback from utils.custom_logger import getLogger from utils.utilities import getCommand, deepMerge, setRunStatus, getRunStatus def runOneBenchmark( info, benchmark, framework, platform, backend, reporters, lock, cooldown=None, user_identifier=None, local_reporter=None, ): assert "treatment" in info, "Treatment is missing in info" getLogger().info("Running {}".format(benchmark["path"])) status = 0 minfo = copy.deepcopy(info["treatment"]) mbenchmark = copy.deepcopy(benchmark) if "shared_libs" in info: minfo["shared_libs"] = info["shared_libs"] try: # invalidate CPU cache [1.0 for _ in range(20 << 20)] gc.collect() data = _runOnePass(minfo, mbenchmark, framework, platform) status = status | getRunStatus() meta = None if "control" in info: cinfo = copy.deepcopy(info["control"]) if "shared_libs" in info: cinfo["shared_libs"] = info["shared_libs"] # cool down between treatment and control if "model" in benchmark and "cooldown" in benchmark["model"]: cooldown = float(benchmark["model"]["cooldown"]) time.sleep(cooldown) # invalidate CPU cache [1.0 for _ in range(20 << 20)] gc.collect() control = _runOnePass(cinfo, benchmark, framework, platform) status = status | getRunStatus() bname = benchmark["model"]["name"] data = _mergeDelayData(data, control, bname) if benchmark["tests"][0]["metric"] != "generic": data = _adjustData(info, data) meta = _retrieveMeta( info, benchmark, platform, framework, backend, user_identifier ) data = _retrieveInfo(info, data) result = {"meta": meta, "data": data} except Exception: # Catch all exceptions so that failure in one test does not # affect other tests getLogger().critical( "Exception caught when running benchmark.", exc_info=True, ) data = None status = 2 setRunStatus(status) # Set result meta and data to default values to that # the reporter will not try to key into a None result = {"meta": {}, "data": []} if data is None or len(data) == 0: _logNoData(benchmark, info, platform.getMangledName()) return status with lock: for reporter in reporters: reporter.report(result) if ( "regression_commits" in info and info["run_type"] == "benchmark" and local_reporter ): from regression_detectors.regression_detectors import checkRegressions checkRegressions( info, platform, framework, benchmark, reporters, result["meta"], local_reporter, ) return status def _logNoData(benchmark, info, name): model_name = "" if "model" in benchmark and "name" in benchmark["model"]: model_name = benchmark["model"]["name"] commit_hash = "" if "commit" in info["treatment"]: commit_hash = info["treatment"]["commit"] getLogger().info( "No data collected for {}".format(model_name) + "on {}. ".format(name) + "The run may be failed for " + "{}".format(commit_hash) ) def _runOnePass(info, benchmark, framework, platform): assert ( len(benchmark["tests"]) == 1 ), "At this moment, only one test exists in the benchmark" to = benchmark["model"]["repeat"] if "repeat" in benchmark["model"] else 1 output = None for idx in range(to): benchmark["tests"][0]["INDEX"] = idx one_output, output_files = framework.runBenchmark(info, benchmark, platform) if output: deepMerge(output, one_output) else: output = copy.deepcopy(one_output) if getRunStatus() != 0: # early exit if there is an error break stats = _getStatisticsSet(benchmark["tests"][0]) data = _processDelayData(output, stats) return data def _processDelayData(input_data, stats): if not isinstance(input_data, dict): return input_data data = {} for k in input_data: d = input_data[k] if d is not None: data[k] = copy.deepcopy(d) if "values" in d: if "summary" not in d: data[k]["summary"] = _getStatistics(d["values"], stats) if "num_runs" not in d: data[k]["num_runs"] = len(data[k]["values"]) return data def _mergeDelayData(treatment_data, control_data, bname): data = copy.deepcopy(treatment_data) # meta is not a metric, so handle is seperatly data["meta"] = _mergeDelayMeta(treatment_data["meta"], control_data["meta"], bname) for k in treatment_data: # meta was already merged, so don't try to merge it again if k == "meta": continue if k not in control_data: getLogger().error( f"Value {k} existed in treatment but not control for benchmark {bname}.", ) continue control_value = control_data[k] treatment_value = treatment_data[k] if "info_string" in treatment_value: assert ( "info_string" in control_value ), "Control value missing info_string field" # If the treatment and control are not the same, # treatment value is used, the control value is lost. treatment_string = treatment_value["info_string"] control_string = control_value["info_string"] if treatment_string != control_string: getLogger().warning( "Treatment value is used, and the control value is lost. " + "The field info_string in control " + "({})".format(control_string) + "is different from the info_string in treatment " + "({})".format(treatment_string) ) if "values" in control_value: data[k]["control_values"] = control_value["values"] if "summary" in control_value: data[k]["control_summary"] = control_value["summary"] assert "summary" in treatment_value, "Summary is missing in treatment" # create diff of delay if "summary" in control_value and "summary" in treatment_value: data[k]["diff_summary"] = _createDiffOfDelay( control_value["summary"], treatment_value["summary"] ) return data def _to_float(token: str): try: return float(token) except ValueError: return None def _percentileArgVal(token) -> float: if len(token) < 2 or token[0] != "p": return None percentile = _to_float(token[1:]) return ( percentile if percentile is not None and percentile >= 0 and percentile <= 100 else None ) def _createDiffOfDelay(csummary, tsummary): # create diff of delay diff_summary = {} for key in tsummary: if tsummary[key] is None: continue arg = _percentileArgVal(key) if arg is not None: if arg == int(arg): reflection = "p" + str(100 - int(arg)) else: reflection = "p" + str(100.0 - arg) if reflection in csummary and csummary[reflection] is not None: diff_summary[key] = round(tsummary[key] - csummary[reflection], 15) elif key in csummary and csummary[key] is not None: diff_summary[key] = round(tsummary[key] - csummary[key], 15) return diff_summary def _mergeDelayMeta(treatment_meta, control_meta, bname): meta = copy.deepcopy(treatment_meta) for k in treatment_meta: if k not in control_meta: getLogger().critical( f"Value {k} existed in treatment but not control for benchmark {bname}." ) continue meta["control_{}".format(k)] = control_meta[k] return meta def _processErrorData(treatment_files, golden_files, stats=None): treatment_outputs = _collectErrorData(treatment_files) golden_outputs = _collectErrorData(golden_files) data = {} for output in treatment_outputs: treatment_values = treatment_outputs[output] assert output in golden_outputs, "Output {} is missing in golden".format(output) golden_values = golden_outputs[output] diff_values = list( map(lambda pair: pair[0] - pair[1], zip(treatment_values, golden_values)) ) diff_values.sort() treatment_values.sort() golden_values.sort() data[output] = { "summary": _getStatistics(treatment_values, stats), "control_summary": _getStatistics(golden_values, stats), "diff_summary": _getStatistics(diff_values, stats), } data[output]["type"] = output data[output]["num_runs"] = len(treatment_values) return data def _collectErrorData(output_files): data = {} for output in output_files: filename = output_files[output] assert os.path.isfile(filename), "File {} doesn't exist".format(filename) with open(filename, "r") as f: content = f.read().splitlines() data[output] = [float(x.strip()) for x in content] return data _default_statistics = ["mean", "p0", "p10", "p50", "p90", "p100", "stdev", "MAD", "cv"] def _getStatisticsSet(test): if test is not None and "statistics" in test: result = test["statistics"] if "p50" not in result: result.append( "p50" ) # always include p50 since it is needed for internal calculations return result else: return _default_statistics def _getStatistics(array, stats=_default_statistics): if len(array) == 0: return {} if "p50" not in stats: stats.append( "p50" ) # always include p50 since it is needed for internal calculations sorted_array = sorted(array) median = _getMedian(sorted_array) mean = _getMean(array) stdev = _getStdev(array, mean) meta_values = { "mean": mean, "p50": median, # special case for even-numbered arrays "stdev": stdev, "MAD": _getMedian(sorted(map(lambda x: abs(x - median), sorted_array))), "cv": stdev / mean if mean != 0 else None, } results = {} for stat in stats: if stat in meta_values: results[stat] = meta_values[stat] else: percentile_arg_value = _percentileArgVal(stat) # parses p0-p100 if percentile_arg_value is None: getLogger().error(f"Unsupported custom statistic '{stat}' ignored.") assert ( percentile_arg_value is not None ), f"Unsupported custom statistic '{stat}'." else: results[stat] = _getPercentile(sorted_array, percentile_arg_value) return results def _getPercentile(sorted_array, percentile: float): length = len(sorted_array) assert ( length > 0 and percentile >= 0 and percentile <= 100 ), f"invalid percentile value '{percentile}'." if percentile == 100: return sorted_array[-1] if percentile == 50: return _getMedian(sorted_array) # linear interpolation: exactly matches np.percentile(sorted_array, percentile, interpolation="linear") k = (length - 1) * percentile / 100.0 floor_index = int(k) ceil_index = int(k + 1.0) # valid only if k is not already an integer value if ( floor_index == k or ceil_index >= length ): # handle the case where k is integer or max return sorted_array[floor_index] weighted_floor_value = sorted_array[floor_index] * (ceil_index - k) weighted_ceil_value = sorted_array[ceil_index] * (k - floor_index) return weighted_floor_value + weighted_ceil_value def _getMean(values): return sum(values) / len(values) def _getStdev(values, mean): sq_diffs = [(x - mean) ** 2 for x in values] return (sum(sq_diffs) / len(values)) ** 0.5 def _getMedian(values): length = len(values) return ( values[length // 2] if (length % 2) == 1 else (values[(length - 1) // 2] + values[length // 2]) / 2 ) def _adjustData(info, data): if "regressed_types" not in info: return data assert ( "run_type" in info and info["run_type"] == "regress" ), "Regressed types only show up in regress run type" for v in data: if v in info["regressed_types"]: data[v]["regressed"] = 1 return data def _retrieveMeta(info, benchmark, platform, framework, backend, user_identifier): assert "treatment" in info, "Treatment is missing in info" meta = {} # common meta["backend"] = backend meta["time"] = time.time() meta["framework"] = framework.getName() meta["platform"] = platform.getName() if platform.platform_hash: meta["platform_hash"] = platform.platform_hash meta["command"] = sys.argv meta["command_str"] = getCommand(sys.argv) if user_identifier: meta["user_identifier"] = user_identifier # model specific if "model" in benchmark: model = benchmark["model"] meta["net_name"] = model["name"] if "group" in benchmark["model"]: meta["group"] = benchmark["model"]["group"] # test specific test = benchmark["tests"][0] meta["metric"] = test["metric"] if "identifier" in test: meta["identifier"] = test["identifier"] else: meta["identifier"] = meta["net_name"] # info specific if "commit" in info["treatment"]: meta["commit"] = info["treatment"]["commit"] meta["commit_time"] = info["treatment"]["commit_time"] if "control" in info: meta["control_commit"] = info["control"]["commit"] meta["control_commit_time"] = info["control"]["commit_time"] if "run_type" in info: meta["run_type"] = info["run_type"] # Local run, user specific information if "user" in info: meta["user"] = info["user"] return meta def _retrieveInfo(info, data): if "treatment" in info: if "meta" not in data: data["meta"] = {} data["meta"]["treatment_diff"] = info["treatment"].get("diff", "") # For early detection, we have treatment version info. data["meta"]["treatment_version"] = info["treatment"].get("version", "") # For post detection, we have treatment commit info. data["meta"]["treatment_commit"] = info["treatment"].get("commit", "") if "control" in info and "diff" in info["control"]: if "meta" not in data: data["meta"] = {} # For control, we should always have commit info. data["meta"]["control_diff"] = info["control"].get("diff", "") data["meta"]["control_commit"] = info["control"].get("commit", "") return data