benchmarking/remote/screen_reporter.py (120 lines of code) (raw):
#!/usr/bin/env python
##############################################################################
# Copyright 2017-present, Facebook, Inc.
# All rights reserved.
#
# This source code is licensed under the license found in the
# LICENSE file in the root directory of this source tree.
##############################################################################
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
import json
import os
import time
class ScreenReporter(object):
def __init__(self, xdb, devices, debug=False, log_output_dir=None):
self.xdb = xdb
self.devices = devices
self.debug = debug
self.log_output_dir = log_output_dir
def run(self, user_identifier):
done = False
statuses = {}
while not done:
done = self._runOnce(user_identifier, statuses)
time.sleep(1)
def _runOnce(self, user_identifier, statuses):
retries = 3
while retries > 0:
new_statuses = self.xdb.statusBenchmarks(user_identifier)
if len(new_statuses) > 0 and "id" in new_statuses[0]:
break
time.sleep(1)
retries -= 1
else:
print(
f"Could not find benchmark entry for user_identifier {user_identifier}."
)
return True
for s in new_statuses:
if s["id"] not in statuses:
statuses[s["id"]] = s["status"]
self._display(s)
else:
if statuses[s["id"]] != s["status"]:
statuses[s["id"]] = s["status"]
self._display(s)
for s in statuses:
status = statuses[s]
if status not in ["DONE", "FAILED", "USER_ERROR", "TIMEOUT"]:
return False
return True
def _display(self, s):
abbrs = self.devices.getAbbrs(s["device"])
print(
"Job status for {}".format(s["device"])
+ (" (" + ",".join(abbrs) + ")" if abbrs else "")
+ " is changed to {}".format(s["status"])
)
self._displayResult(s)
def _printLog(self, r):
if self.log_output_dir is None:
log = r["log"]
outputs = log.split("\n")
for o in outputs:
print(o)
print("\n Meta data:\n ")
for key, value in (json.loads(r["result"]))["0"]["meta"]:
print(key, ":", value)
else:
try:
if not os.path.exists(self.log_output_dir):
os.makedirs(self.log_output_dir)
# Use device name to create output log files in the directory 'log_output_dir'
# Write logs
self._writeToDirectory(self.log_output_dir, r["log"], r["device"])
# Write meta data
self._writeToDirectory(
self.log_output_dir,
json.dumps(json.loads(r["result"])["0"]["meta"]),
r["device"],
"_meta",
)
except Exception as e:
print("Caught exception: " + str(e))
print("Could not write to file specified at " + self.log_output_dir)
def _writeToDirectory(
self, dir: str, data: str, output_file_name: str, suffix: str = ""
):
"""
Writes the given data in the specificed directory with the defined file name.
Option to add a suffix in the output file name
"""
output_file_name = f"{dir}/{output_file_name}{suffix}.txt"
with open(output_file_name, "w") as outfile:
outfile.write(data)
print(f"Logs written at {output_file_name}")
def _displayResult(self, s):
if (
s["status"] != "DONE"
and s["status"] != "FAILED"
and s["status"] != "USER_ERROR"
):
return
output = self.xdb.getBenchmarks(str(s["id"]))
for r in output:
if s["status"] == "DONE":
res = json.loads(r["result"]) if r["result"] else []
benchmarks = json.loads(r["benchmarks"])
metric = benchmarks["benchmark"]["content"]["tests"][0]["metric"]
for identifier in res:
data = res[identifier]
if "NET latency" in data:
net_latency = data["NET latency"]
if "p50" in net_latency["summary"]:
net_delay = data["NET latency"]["summary"]["p50"]
elif "mean" in net_latency["summary"]:
net_delay = data["NET latency"]["summary"]["mean"]
else:
raise AssertionError("Net latency is not specified")
print("ID:{}\tNET latency: {}".format(identifier, net_delay))
elif metric == "generic":
if isinstance(data, dict):
if "meta" in data:
del data["meta"]
if not data:
return
# dump std printout to screen for custom_binary
if isinstance(data, list):
data = "\n".join(data)
print(data)
if self.debug:
self._printLog(r)
else:
self._printLog(r)