benchmarking/frameworks/framework_base.py (534 lines of code) (raw):
#!/usr/bin/env python
##############################################################################
# Copyright 2017-present, Facebook, Inc.
# All rights reserved.
#
# This source code is licensed under the license found in the
# LICENSE file in the root directory of this source tree.
##############################################################################
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
import abc
import ast
import json
import os
import random
import re
import shutil
from copy import deepcopy
from bridge.file_storage.upload_files.file_uploader import FileUploader
from data_converters.data_converters import getConverters
from platforms.platforms import getHostPlatform
from six import string_types
from utils import software_power
from utils.custom_logger import getLogger
from utils.utilities import (
deepMerge,
deepReplace,
getFAIPEPROOT,
getString,
getModelName,
)
class FrameworkBase(object):
def __init__(self, args):
self.converters = getConverters()
self.tmpdir = None
self.host_platform = None
self.args = args
@abc.abstractmethod
def getName(self):
return "Error"
@abc.abstractmethod
def runBenchmark(self, info, benchmark, platform):
model = benchmark["model"]
tests = benchmark["tests"]
assert len(tests) == 1, (
"At this point, only one test should "
+ "exist in one benchmark. However, benchmark "
+ "{} doesn't.".format(benchmark["name"])
)
test = tests[0]
index = test["INDEX"] if "INDEX" in test else 0
first_iteration = index == 0
last_iteration = ("repeat" not in model) or (
"repeat" in model and index == model["repeat"] - 1
)
if self.host_platform is None:
self.host_platform = getHostPlatform(self.tempdir, self.args)
program_files = {
name: info["programs"][name]["location"] for name in info["programs"]
}
program_path = (
os.path.dirname(program_files["program"])
if "program" in program_files
else None
)
stringmap_from_info = info["string_map"] if "string_map" in info else None
self._replaceStringMap(benchmark, platform, program_path, stringmap_from_info)
# better to be before target program files separation.
# this way, in ios, the platform may not be copied to the target.
platform.preprocess(programs=program_files, benchmark=benchmark)
tgt_program_files, host_program_files = self._separatePrograms(
program_files, test.get("commands")
)
tgt_program_files = platform.copyFilesToPlatform(
tgt_program_files, copy_files=first_iteration
)
programs = {}
deepMerge(programs, host_program_files)
deepMerge(programs, tgt_program_files)
model_files = {
name: model["files"][name]["location"] for name in model["files"]
}
if "converter" in model:
converter = model["converter"]
assert "name" in converter, "converter field must have a name"
assert converter["name"] in self.converters, "Unknown converter {}".format(
converter
)
else:
converter = None
output = {}
# inject default parameters into test
if "iter" not in test:
test["iter"] = -1
# overall preprocess
if "preprocess" in model and first_iteration:
commands = model["preprocess"]["commands"]
self._runCommands(
output,
commands,
self.host_platform,
programs,
model,
None,
model_files,
None,
None,
None,
None,
-1,
converter,
)
input_files = (
{
name: test["input_files"][name]["location"]
for name in test["input_files"]
}
if "input_files" in test
else None
)
test_files = (
{name: test["files"][name]["location"] for name in test["files"]}
if "files" in test
else {}
)
# Let's handle preprocess comamnd first,
# since we will copy all files into host
if "preprocess" in test:
# simple thing first, let's assume preprocess is self contained
# check the program to executable
if (
"files" in test["preprocess"]
and "program" in test["preprocess"]["files"]
):
host_program_path = test["preprocess"]["files"]["program"]["location"]
os.chmod(host_program_path, 0o777)
# will deprecate in the future
if "files" in test["preprocess"]:
preprocess_files = {
name: test["preprocess"]["files"][name]["location"]
for name in test["preprocess"]["files"]
}
deepMerge(test_files, preprocess_files)
if "commands" in test["preprocess"]:
commands = test["preprocess"]["commands"]
elif "command" in test["preprocess"]:
commands = [test["preprocess"]["command"]]
self._runCommands(
output,
commands,
self.host_platform,
programs,
model,
test,
model_files,
input_files,
None,
None,
test_files,
-1,
converter,
)
tgt_input_files = (
platform.copyFilesToPlatform(input_files) if input_files else None
)
shared_libs = None
if "shared_libs" in info:
shared_libs = platform.copyFilesToPlatform(
info["shared_libs"], copy_files=first_iteration
)
tgt_model_files = platform.copyFilesToPlatform(
model_files, copy_files=first_iteration
)
tgt_result_files = None
if "output_files" in test:
tgt_result_files = {
name: test["output_files"][name]["location"]
for name in test["output_files"]
}
total_num = test["iter"]
if "platform_args" in test:
platform_args = test["platform_args"]
elif "platform_args" in model:
platform_args = model["platform_args"]
else:
platform_args = {}
if "timeout" in model:
platform_args["timeout"] = model["timeout"]
if "timeout" in test:
platform_args["timeout"] = test["timeout"]
program = programs["program"] if "program" in programs else ""
if test["metric"] == "power":
platform_args["power"] = True
method = test.get("method")
platform_args["method"] = method
if method == "software":
power_util = software_power.PowerUtil(
platform, test.get("collection_time", 300)
)
else:
# FIXME "Monsoon" was unimportable
from utils.monsoon_power import collectPowerData
# in power metric, the output is ignored
total_num = 0
platform.killProgram(program)
if test.get("env", False):
platform_args["env"] = test["env"]
if platform.getType() == "host":
# Fix the number of threads
if not platform_args.get("env", False):
platform_args["env"] = {}
MKL_NUM_THREADS = test.get("MKL_NUM_THREADS", 1)
OMP_NUM_THREADS = test.get("OMP_NUM_THREADS", 1)
if MKL_NUM_THREADS > 0:
platform_args["env"]["MKL_NUM_THREADS"] = MKL_NUM_THREADS
if OMP_NUM_THREADS > 0:
platform_args["env"]["OMP_NUM_THREADS"] = OMP_NUM_THREADS
# Randomly select one cpu core from logic cpu #4 to #13.
cpu_core = test.get("cpu-list", random.randint(5, 14))
if isinstance(test["commands"], list) and cpu_core > 0:
test["commands"][-1] = " ".join(
["taskset", "--cpu-list", str(cpu_core), test["commands"][-1]]
)
self._runCommands(
output,
test["commands"],
platform,
programs,
model,
test,
tgt_model_files,
tgt_input_files,
tgt_result_files,
shared_libs,
test_files,
total_num,
converter,
platform_args=platform_args,
main_command=True,
)
if test["metric"] == "power":
if test.get("method") == "software":
output = power_util.collect()
else:
collection_time = (
test["collection_time"] if "collection_time" in test else 180
)
voltage = float(test["voltage"]) if "voltage" in test else 4.0
output = collectPowerData(
platform.platform_hash,
collection_time,
voltage,
test["iter"],
self.args.monsoon_map,
)
platform.waitForDevice(20)
# kill the process if exists
platform.killProgram(program)
# remove the files before copying out the output files
# this will save some time in ios platform, since in ios
# all files are copied back to the host system
if len(output) > 0:
if input_files is not None:
platform.delFilesFromPlatform(tgt_input_files)
if last_iteration:
platform.delFilesFromPlatform(tgt_model_files)
platform.delFilesFromPlatform(tgt_program_files)
if shared_libs is not None:
platform.delFilesFromPlatform(shared_libs)
output_files = None
if "output_files" in test:
target_dir = os.path.join(self.tempdir, "output")
shutil.rmtree(target_dir, True)
os.makedirs(target_dir)
output_files = platform.moveFilesFromPlatform(tgt_result_files, target_dir)
platform.postprocess()
if "postprocess" in test:
if (
"files" in test["postprocess"]
and "program" in test["preprocess"]["files"]
):
host_program_path = test["postprocess"]["files"]["program"]["location"]
os.chmod(host_program_path, 0o777)
# will deprecate in the future
if "files" in test["postprocess"]:
postprocess_files = {
name: test["postprocess"]["files"][name]["location"]
for name in test["postprocess"]["files"]
}
deepMerge(test_files, postprocess_files)
commands = None
if "commands" in test["postprocess"]:
commands = test["postprocess"]["commands"]
elif "command" in test["postprocess"]:
commands = [test["postprocess"]["command"]]
self._runCommands(
output,
commands,
self.host_platform,
programs,
model,
test,
model_files,
input_files,
output_files,
None,
test_files,
-1,
converter,
)
if "postprocess" in model and last_iteration:
commands = model["postprocess"]["commands"]
self._runCommands(
output,
commands,
self.host_platform,
programs,
model,
test,
model_files,
None,
None,
None,
None,
-1,
converter,
)
# after everything is done, some of the output files may
# contain metrics that can be processed. Those files have
# field converter, and specify which convert to use to
# convert the metrics
if output_files:
to_upload = {}
for filename in output_files:
file = output_files[filename]
output_file_spec = test["output_files"][filename]
# if files should be uploaded, upload and add location to meta data.
if output_file_spec.get("upload", False):
to_upload.update({filename: file})
# if output_file can be converted for data, convert and merge output.
converter = output_file_spec.get("converter")
if not converter:
continue
assert "name" in converter, "converter field must have a name"
assert (
converter["name"] in self.converters
), "Unknown converter {}".format(converter["name"])
converter_class = self.converters[converter["name"]]
args = converter.get("args")
with open(file, "r") as f:
content = f.read()
convert = converter_class()
results, _ = convert.collect(content, args)
one_output = convert.convert(results)
deepMerge(output, one_output)
if to_upload:
output_file_uploader = FileUploader("output_files").get_uploader()
output_file_meta = {}
for filename, file in to_upload.items():
try:
getLogger().info(f"Uploading {filename} ({file}) to manifold")
url = output_file_uploader.upload_file(file)
output_file_meta.update({filename: url})
getLogger().info(f"{file} uploaded to {url}")
except Exception:
getLogger().exception(
f"Could not upload output file {file}. Skipping."
)
if output_file_meta:
output["meta"].update({"output_files": output_file_meta})
return output, output_files
@abc.abstractmethod
def composeRunCommand(
self,
commands,
platform,
programs,
model,
test,
model_files,
input_files,
output_files,
shared_libs,
test_files=None,
main_command=False,
):
if commands is None or not isinstance(commands, list):
return None
files = input_files.copy() if input_files is not None else {}
files.update(output_files if output_files is not None else {})
files.update(test_files if test_files is not None else {})
extra_arguments = " " + model["command_args"] if "command_args" in model else ""
string_map = json.loads(self.args.string_map) if self.args.string_map else {}
composed_commands = []
for command in commands:
more_args = extra_arguments if "{program}" in command else ""
command = self._getReplacedCommand(
command, files, model, test, programs, model_files
)
command += more_args
# extra args only applied for main_command
if main_command and len(commands) == 1 and "pep_extra_args" in string_map:
command += " " + string_map["pep_extra_args"]
composed_commands.append(command)
return composed_commands
def _getReplacedCommand(self, command, files, model, test, programs, model_files):
pattern = re.compile(r"\{([\w|\.]+)\}")
repeat = True
while repeat:
repeat = False
results = []
for m in pattern.finditer(command):
results.append(
{"start": m.start(), "end": m.end(), "content": m.group(1)}
)
results.reverse()
for res in results:
replace = self._getMatchedString(test, res["content"], files)
if replace is None:
# TODO: handle shared libraries
replace = self._getMatchedString(model, res["content"], model_files)
if replace is None:
replace = self._getMatchedString(programs, res["content"])
if replace:
command = command[: res["start"]] + replace + command[res["end"] :]
repeat = True
return command
def _getMatchedString(self, root, match, files=None):
if not isinstance(root, dict):
return None
if match in root:
return getString(root[match])
# split on .
fields = match.split(".")
found = True
entry = root
for i in range(len(fields)):
field = fields[i]
if field not in entry:
found = False
break
entry = entry[field]
if not found:
return None
if "location" in entry:
# is a file field
if files and fields[-1] in files:
return getString(files[fields[-1]])
assert isinstance(entry, string_types), (
"Output {}".format(entry) + " is not string type"
)
return getString(entry)
@abc.abstractmethod
def runOnPlatform(self, total_num, cmd, platform, platform_args, converter):
raise NotImplementedError("Child class need to implement runOnPlatform")
def _runCommands(
self,
output,
commands,
platform,
programs,
model,
test,
model_files,
input_files,
output_files,
shared_libs,
test_files,
total_num,
converter,
platform_args=None,
main_command=False,
):
if platform_args is None:
platform_args = {}
if test and test.get("log_output", False):
platform_args["log_output"] = True
if self.args.timeout > 0 and "timeout" not in platform_args:
platform_args["timeout"] = self.args.timeout
cmds = self.composeRunCommand(
commands,
platform,
programs,
model,
test,
model_files,
input_files,
output_files,
shared_libs,
test_files,
main_command,
)
profiling_enabled = False
profiling_args = {}
if "profiler" in test:
profiling_enabled = test["profiler"].get("enabled", False)
if profiling_enabled:
# test[] is potentially raw user input so we need to ensure
# ensure all fields are populated so we don't have to check elsewhere
profiling_args = deepcopy(test["profiler"])
default_profiler = (
"perfetto"
if "cpu" not in profiling_args.get("types", ["cpu"])
else "simpleperf"
)
profiler = profiling_args.setdefault("profiler", default_profiler)
default_type = "memory" if profiler == "perfetto" else "cpu"
profiling_args.setdefault("types", [default_type])
profiling_args.setdefault("options", {})
platform_args["model_name"] = getModelName(model)
for idx, cmd in enumerate(cmds):
# note that we only enable profiling for the last command
# of the main commands.
platform_args["profiling_args"] = (
profiling_args
if (profiling_enabled and main_command and idx == len(cmds) - 1)
else {"enabled": False}
)
one_output = self.runOnPlatform(
total_num, cmd, platform, platform_args, converter
)
deepMerge(output, one_output)
@abc.abstractmethod
def verifyBenchmarkFile(self, benchmark, filename, is_post):
return None
def rewriteBenchmarkTests(self, benchmark, filename):
pass
def _separatePrograms(self, program_files, commands):
if commands is None or not isinstance(commands, list):
return program_files, {}
tgt_program_files = {}
for command in commands:
for name in program_files:
if "{" + name + "}" in command:
tgt_program_files[name] = program_files[name]
host_program_files = {
name: program_files[name]
for name in program_files
if name not in tgt_program_files
}
return tgt_program_files, host_program_files
def _createHostDir(self):
hostdir = os.path.join(self.tempdir, "host")
i = 0
while os.path.exists(hostdir):
hostdir = os.path.join(self.tempdir, "host" + str(i))
i = i + 1
os.makedirs(hostdir, 0o777)
return hostdir
def _replaceStringMap(self, root, platform, program_path, stringmap_from_info):
try:
# backward compatible
string_map = (
json.loads(self.args.string_map) if self.args.string_map else {}
)
info_string_map = (
json.loads(stringmap_from_info) if stringmap_from_info else {}
)
except Exception:
string_map = (
ast.literal_eval(self.args.string_map) if self.args.string_map else {}
)
info_string_map = stringmap_from_info if stringmap_from_info else {}
deepMerge(string_map, info_string_map)
string_map["TGTDIR"] = platform.getOutputDir()
string_map["HOSTDIR"] = self._createHostDir()
string_map["FAIPEPROOT"] = getFAIPEPROOT()
string_map["DEVICEHASH"] = platform.platform_hash
if program_path:
string_map["BUILDDIR"] = program_path
for name in string_map:
value = string_map[name]
deepReplace(root, "{" + name + "}", value)