benchmarking/bridge/db.py (160 lines of code) (raw):
#!/usr/bin/env python
##############################################################################
# Copyright 2017-present, Facebook, Inc.
# All rights reserved.
#
# This source code is licensed under the license found in the
# LICENSE file in the root directory of this source tree.
##############################################################################
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
import json
from bridge.auth import Auth
from utils.custom_logger import getLogger
from utils.utilities import requestsJson
NETWORK_TIMEOUT = 150
class DBDriver(object):
def __init__(
self, db, app_id, token, table, job_queue, is_test, benchmark_db_entry
):
self.table = table
self.job_queue = job_queue
auth = Auth(db, app_id, token, is_test)
self.auth_params = auth.get_auth_params()
assert benchmark_db_entry != "", "Database entry cannot be empty"
self.benchmark_db_entry = benchmark_db_entry
def submitBenchmarks(self, data, devices, identifier, user, hashes=None):
json_data = json.dumps(data)
params = {
"table": self.table,
"job_queue": self.job_queue,
"action": "add",
"identifier": identifier,
"devices": devices,
"benchmarks": json_data,
"user": user,
}
if hashes:
params["hashes"] = hashes
self._requestData(params)
def claimBenchmarks(self, server_id, devices, hashes):
params = {
"table": self.table,
"job_queue": self.job_queue,
"action": "claim",
"claimer": server_id,
"devices": devices,
"hashes": hashes,
}
result_json = self._requestData(params)
return self._processBenchmarkResults(result_json["values"])
def updateHeartbeats(self, server_id, hashes):
params = {
"table": self.table,
"action": "heartbeat",
"claimer": server_id,
"hashes": hashes,
}
self._requestData(params)
def releaseBenchmarks(self, server_id, ids):
params = {
"table": self.table,
"job_queue": self.job_queue,
"action": "release",
"claimer": server_id,
"ids": ids,
}
self._requestData(params)
def runBenchmarks(self, server_id, ids):
params = {
"table": self.table,
"job_queue": self.job_queue,
"action": "run",
"claimer": server_id,
"ids": ids,
}
self._requestData(params)
def doneBenchmarks(self, id, status, result, log):
params = {
"table": self.table,
"job_queue": self.job_queue,
"action": "done",
"id": id,
"status": status,
"result": result,
"log": log,
}
self._requestData(params)
def statusBenchmarks(self, identifier):
params = {
"table": self.table,
"job_queue": self.job_queue,
"action": "status",
"identifier": identifier,
}
request_json = self._requestData(params)
return request_json["values"]
def updateLogBenchmarks(self, id, log):
params = {
"table": self.table,
"job_queue": self.job_queue,
"action": "update_log",
"id": id,
"log": log,
}
request_json = self._requestData(params, retry=False)
return request_json["status"]
def killBenchmarks(self, identifier):
params = {
"table": self.table,
"job_queue": self.job_queue,
"action": "kill",
"identifier": identifier,
}
self._requestData(params)
def getBenchmarks(self, ids):
params = {
"table": self.table,
"job_queue": self.job_queue,
"action": "get",
"ids": ids,
}
request_json = self._requestData(params)
return request_json["values"]
def updateDevices(self, server_id, devices, reset):
params = {
"table": self.table,
"job_queue": self.job_queue,
"action": "update_devices",
"claimer": server_id,
"devices": devices,
}
if reset:
params["reset"] = "true"
self._requestData(params)
def listDevices(self, job_queue):
params = {
"table": self.table,
"job_queue": job_queue,
"action": "list_devices",
}
result_json = self._requestData(params)
return result_json["values"]
def _requestData(self, params, retry=True):
params.update(self.auth_params)
result_json = requestsJson(
self.benchmark_db_entry, data=params, timeout=NETWORK_TIMEOUT, retry=retry
)
if "status" not in result_json or result_json["status"] != "success":
getLogger().warning(
"DB post failed.\tbenchmark_db_entry: {}\t params: {}".format(
self.benchmark_db_entry, json.dumps(params)
)
)
for key in result_json:
getLogger().error("{}: {}".format(key, result_json[key]))
return {
"status": "fail",
"values": [],
}
else:
return result_json
def _processBenchmarkResults(self, result_json):
for result in result_json:
benchmarks = json.loads(result["benchmarks"])
result["benchmarks"] = benchmarks
return result_json