benchmarking/utils/utilities.py (288 lines of code) (raw):

#!/usr/bin/env python ############################################################################## # Copyright 2017-present, Facebook, Inc. # All rights reserved. # # This source code is licensed under the license found in the # LICENSE file in the root directory of this source tree. ############################################################################## from __future__ import absolute_import, division, print_function, unicode_literals import ast import copy import datetime import json import os import socket import sys import tempfile import uuid import zipfile from time import sleep import certifi import pkg_resources import requests from six import string_types from utils.custom_logger import getLogger # Status codes for benchmark SUCCESS_FLAG = 0 USER_ERROR_FLAG = 1 HARNESS_ERROR_FLAG = 2 USER_AND_HARNESS_ERROR_FLAG = 3 TIMEOUT_FLAG = 1 << 8 KILLED_FLAG = 1 << 9 # Mask to expose only external status bits EXTERNAL_STATUS_MASK = 0xFF class DownloadException(Exception): """Raised where exception occurs when downloading benchmark files.""" pass class BenchmarkArgParseException(Exception): """Raised where benchmark arguments could not be parsed or are invalid.""" pass def check_is_json(json_str): try: json.loads(json_str) return True except ValueError: return False def getBenchmarks(json_input, framework=None): if os.path.isfile(json_input): with open(json_input, "r") as f: content = json.load(f) elif check_is_json(json_input): content = json.loads(json_input) else: raise Exception(f"specified benchmark file doesn't exist: {json_input}") benchmarks = [] if "benchmarks" in content: path = os.path.abspath(os.path.dirname(json_input)) for benchmark_file in content["benchmarks"]: filename = os.path.join(path, benchmark_file) assert os.path.isfile(filename), "Benchmark {} doesn't exist".format( filename ) with open(filename, "r") as f: cnt = json.load(f) if framework and "model" in cnt and "framework" not in cnt["model"]: # do not override the framework specified in the json cnt["model"]["framework"] = framework benchmarks.append({"filename": filename, "content": cnt}) else: if framework and "model" in content: content["model"]["framework"] = framework benchmarks.append({"filename": json_input, "content": content}) return benchmarks def getDirectory(commit_hash, commit_time): dt = datetime.datetime.utcfromtimestamp(commit_time) directory = os.path.join(str(dt.year), str(dt.month), str(dt.day), commit_hash) return directory def getCommand(command): exe = command[0] args = [x if x.isdigit() else "'" + x + "'" for x in command[1:]] cmd = exe + " " + " ".join(args) return cmd def getFilename(name, **kwargs): replace_pattern = {" ": "-", "\\": "-", ":": "-", "/": "-"} if "replace_pattern" in kwargs: replace_pattern = kwargs["replace_pattern"] filename = name for orig_pattern, repl_pattern in replace_pattern.items(): filename = filename.replace(orig_pattern, repl_pattern) res = "".join( [ c for c in filename if c.isalpha() or c.isdigit() or c == "_" or c == "." or c == "-" or c == "/" ] ).rstrip() return res def getPythonInterpreter(): return sys.executable def deepMerge(tgt, src): if isinstance(src, list): # only handle simple lists tgt.extend(src) elif isinstance(src, dict): for name in src: m = src[name] if name not in tgt: tgt[name] = copy.deepcopy(m) else: deepMerge(tgt[name], m) else: # tgt has already specified a value # src does not override tgt return def deepReplace(root, pattern, replacement): if isinstance(root, list): for idx in range(len(root)): item = root[idx] root[idx] = deepReplace(item, pattern, replacement) elif isinstance(root, dict): for name in root: m = root[name] root[name] = deepReplace(m, pattern, replacement) elif isinstance(root, string_types): return root.replace(pattern, replacement) return root def getString(s): s = str(s) if os.name == "nt": # escape " with \" return '"' + s.replace('"', '\\"') + '"' else: return s def getFAIPEPROOT(): dir_path = os.path.dirname(os.path.realpath(__file__)) root_dir = os.path.join(dir_path, "../../") return os.path.abspath(root_dir) def ca_cert(): """Get valid ca_cert path for requests""" ca_cert_path = os.environ.get("CA_CERT_PATH") if not ca_cert_path or not os.path.exists(ca_cert_path): os.environ["CA_CERT_PATH"] = certifi.where() return os.environ["CA_CERT_PATH"] def requestsData(url, **kwargs): delay = 0 total_delay = 0 timeout = -1 if "timeout" in kwargs: timeout = kwargs["timeout"] retry = kwargs.pop("retry", True) result = None while True: try: """ When we use multiprocessing to call harness from internal, requests.Post(url, **kwargs) will get stuck and neither proceeding ahead nor throwing an error. Instead, we use Session and set trust_env to False to solve the problem. Reference: https://stackoverflow.com/a/39822223 """ with requests.Session() as session: session.trust_env = False # This session object can be reused. # If the CA_CERT file has changed it will not be updated implicitly. session.verify = ca_cert() result = session.post(url, **kwargs) if result.status_code != 200: getLogger().error( "Post request failed, receiving code {}".format(result.status_code) ) else: if delay > 0: getLogger().info("Post request successful") return result except requests.exceptions.SSLError: getLogger().exception("Post SSL verification failed") except requests.ConnectionError: getLogger().exception("Post Connection failed") except requests.exceptions.ReadTimeout: getLogger().exception("Post Readtimeout") except requests.exceptions.ChunkedEncodingError: getLogger().exception("Post ChunkedEncodingError") if not retry: break delay = delay + 1 if delay <= 5 else delay sleep_time = 1 << delay getLogger().info("wait {} seconds. Retrying...".format(sleep_time)) sleep(sleep_time) total_delay += sleep_time if timeout > 0 and total_delay > timeout: break getLogger().error( "Failed to post to {}, retrying after {} seconds...".format(url, total_delay) ) return result def requestsJson(url, **kwargs): try: result = requestsData(url, **kwargs) if result and result.status_code == 200: result_json = result.json() return result_json except ValueError as e: getLogger().error("Cannot decode json {}".format(e.output)) getLogger().error("Failed to retrieve json from {}".format(url)) return {} def parse_kwarg(kwarg_str): key, value = kwarg_str.split("=") try: value = ast.literal_eval("'" + value + "'") except ValueError: getLogger().error("Failed to parse kwarg str: {}".format(kwarg_str)) return key, value def getModelName(model): # given benchmark model entry parse model name, returns string. if model["framework"] == "caffe2": model_file_name = model["files"]["predict"]["filename"] elif model.get("files", {}).get("model", {}).get("filename", None): model_file_name = model["files"]["model"]["filename"] elif "name" in model: model_file_name = model["name"] else: model_file_name = "model" model_name = os.path.splitext(model_file_name)[0].replace(" ", "_") return model_name # Run status run_statuses = {} def _getRawRunStatus(key=""): global run_statuses return run_statuses.get(key, 0) def _setRawRunStatus(status, key=""): global run_statuses run_statuses[key] = status def getRunStatus(key=""): return _getRawRunStatus(key) & EXTERNAL_STATUS_MASK def setRunStatus(status, overwrite=False, key=""): if overwrite: _setRawRunStatus(status, key) else: _setRawRunStatus(_getRawRunStatus(key) | status, key) def getRunTimeout(key=""): return _getRawRunStatus(key) & TIMEOUT_FLAG == TIMEOUT_FLAG def setRunTimeout(timedOut=True, key=""): if timedOut: _setRawRunStatus(_getRawRunStatus(key) | TIMEOUT_FLAG, key) else: _setRawRunStatus(_getRawRunStatus(key) & ~TIMEOUT_FLAG, key) def getRunKilled(key=""): return _getRawRunStatus(key) & KILLED_FLAG == KILLED_FLAG def setRunKilled(killed=True, key=""): if killed: _setRawRunStatus(_getRawRunStatus(key) | KILLED_FLAG, key) else: _setRawRunStatus(_getRawRunStatus(key) & ~KILLED_FLAG, key) def getMeta(args, platform): meta = None if not args.frameworks_dir: meta_file = os.path.join( "specifications/frameworks", args.framework, platform, "meta.json" ) if "aibench" in sys.modules and pkg_resources.resource_exists( "aibench", meta_file ): meta = json.loads(pkg_resources.resource_string("aibench", meta_file)) return meta else: # look for files in the old default place old_default = str( os.path.dirname(os.path.realpath(__file__)) + "/../../specifications/frameworks" ) meta_file = os.path.join(old_default, args.framework, platform, "meta.json") else: meta_file = os.path.join( args.frameworks_dir, args.framework, platform, "meta.json" ) if os.path.isfile(meta_file): with open(meta_file, "r") as f: meta = json.load(f) return meta def getMachineId(): ident = socket.getfqdn() if len(ident) == 0 or ident == "localhost": ident = uuid.uuid1().hex return ident adhoc_configs = { "generic": "specifications/models/generic/adhoc.json", "opbench": "specifications/models/generic/adhoc_microbenchmarks.json", } def unpackAdhocFile(configName="generic"): if configName not in adhoc_configs: return "", False fd, path = tempfile.mkstemp(prefix="aibench") with pkg_resources.resource_stream("aibench", adhoc_configs[configName]) as stream: with os.fdopen(fd, "wb") as f: f.write(stream.read()) return path, True def zip_files(input, output: str): """ Archive files or folder for uploading. Input can be file/folder path or list of paths. Folder hierarchy will be preserved at the folder basename level. """ if not isinstance(input, list): input = [input] with zipfile.ZipFile(output, "w") as zf: for path in input: if os.path.isfile(path): zf.write(path, os.path.basename(path)) elif os.path.isdir(path): for directory, _, files in os.walk(path): arcdir = directory[directory.find(os.path.basename(path)) :] zf.write(directory, arcdir) for f in files: fpath = os.path.join(directory, f) arcfpath = os.path.join(arcdir, f) zf.write(fpath, arcfpath) else: raise IOError(f"Could not zip files. {path} is not a valid path.")