benchmarking/platforms/device_manager.py (304 lines of code) (raw):

#!/usr/bin/env python ############################################################################## # Copyright 2020-present, Facebook, Inc. # All rights reserved. # # This source code is licensed under the license found in the # LICENSE file in the root directory of this source tree. ############################################################################## from __future__ import absolute_import from __future__ import division from __future__ import print_function from __future__ import unicode_literals import datetime import json import time from collections import defaultdict from threading import Thread from typing import Dict from bridge.db import DBDriver from get_connected_devices import GetConnectedDevices from platforms.android.adb import ADB from platforms.platforms import getDeviceList from reboot_device import reboot as reboot_device from utils.custom_logger import getLogger REBOOT_INTERVAL = datetime.timedelta(hours=8) MINIMUM_DM_INTERVAL = 10 DEFAULT_DM_INTERVAL = 10 def getDevicesString(devices): device_list = [ d["kind"] + "|" + d["hash"] + "|" + d["name"] + "|" + d["abi"] + "|" + d["os"] + "|" + ("1" if d["available"] else "0" if d["live"] else "2") for d in devices ] devices_str = ",".join(device_list) return devices_str def valid_dm_interval(arg) -> int: try: value = int(arg) if value < MINIMUM_DM_INTERVAL: raise ValueError() except ValueError: getLogger().warning( "Logging interval must be specified as an integer in seconds >= {}. Using default {}s.".format( MINIMUM_DM_INTERVAL, DEFAULT_DM_INTERVAL ) ) value = DEFAULT_DM_INTERVAL return value class DeviceManager(object): """ Provides devices metadata to the lab instance. For mobile platforms, checks connectivity of devices and performs updates to lab devices and db. """ def __init__(self, args: Dict, db: DBDriver): self.args = args self.db: DBDriver = db self.lab_devices = {} self.online_devices = None self.device_dc_count = defaultdict(int) self.dc_threshold = 3 self._initializeDevices() self.running = True self.failed_device_checks = 0 self.device_monitor_interval = self.args.device_monitor_interval self.device_monitor = Thread(target=self._runDeviceMonitor) self.device_monitor.start() if self.args.usb_hub_device_mapping: from utils.usb_controller import USBController self.usb_controller = USBController(self.args.usb_hub_device_mapping) else: self.usb_controller = None def getLabDevices(self): """Return a reference to the lab's device meta data.""" return self.lab_devices def _runDeviceMonitor(self): while self.running: # if the lab is hosting mobile devices, thread will monitor connectivity of devices. if self.args.platform.startswith( "android" ) or self.args.platform.startswith("ios"): self._checkDevices() self._updateHeartbeats() time.sleep(self.device_monitor_interval) def _checkDevices(self): """Run any device health checks, e.g. connectivity, battery, etc.""" try: online_hashes = getDeviceList(self.args, silent=True) self._handleDCDevices(online_hashes) self._handleNewDevices(online_hashes) self.failed_device_checks = 0 except Exception: getLogger().exception("Error while checking devices.") self.failed_device_checks += 1 # If 3 device checks have failed, critically log failure. if self.failed_device_checks == 3: getLogger().critical( "Persistent error while checking devices.", exc_info=True ) def _handleDCDevices(self, online_hashes): """ If there are devices we expect to be connected to the host, check if they are rebooting or have been put offline by the USBController, else mark the device as unavailable and offline. After dc_threshold times that the device is not seen, remove it completely and critically log. """ for h in online_hashes: if h in self.device_dc_count: device = [d for d in self.online_devices if d["hash"] == h][0] getLogger().info(f"Device {device} has reconnected.") self.device_dc_count.pop(h) self._enableDevice(device) dc_devices = [ device for device in self.online_devices if device["hash"] not in online_hashes ] for dc_device in dc_devices: kind = dc_device["kind"] hash = dc_device["hash"] lab_device = self.lab_devices[kind][hash] usb_disabled = False if self.usb_controller and not self.usb_controller.active.get(hash, True): usb_disabled = True if "rebooting" not in lab_device and not usb_disabled: if hash not in self.device_dc_count: getLogger().error( f"Device {dc_device} is disconnected and has been marked unavailable for benchmarking.", ) self._disableDevice(dc_device) self.device_dc_count[hash] += 1 dc_count = self.device_dc_count[hash] if dc_count < self.dc_threshold: getLogger().error( f"Device {dc_device} has shown as disconnected {dc_count} time(s) ({dc_count * self.device_monitor_interval}s)", ) elif dc_count == self.dc_threshold: getLogger().critical( f"Device {dc_device} has shown as disconnected {dc_count} time(s) ({dc_count * self.device_monitor_interval}s) and is offline.", ) self.online_devices.remove(dc_device) self.device_dc_count.pop(hash) def _handleNewDevices(self, online_hashes): """ Check if there are newly detected devices connected to the host and add them to the device list. """ new_devices = [ h for h in online_hashes if h not in [p["hash"] for p in self.online_devices] ] if new_devices: devices = ",".join(new_devices) devices = self._getDevices(devices) if devices: for d in devices: self._enableDevice(d) if d["hash"] not in [ device["hash"] for device in self.online_devices ]: self.online_devices.append(d) if d["hash"] in self.device_dc_count: self.device_dc_count.pop(d["hash"]) getLogger().info("New device added: {}".format(d)) def _updateHeartbeats(self): """Update device heartbeats for all devices which are marked "live" in lab devices.""" claimer_id = self.args.claimer_id hashes = [] for k in self.lab_devices: for hash in self.lab_devices[k]: if self.lab_devices[k][hash]["live"]: hashes.append(hash) hashes = ",".join(hashes) self.db.updateHeartbeats(claimer_id, hashes) def _getDevices(self, devices=None): """Get list of device meta data for available devices.""" raw_args = [] raw_args.extend(["--platform", self.args.platform]) if self.args.platform_sig: raw_args.append("--platform_sig") raw_args.append(self.args.platform_sig) if devices: raw_args.append("--devices") raw_args.append(devices) elif self.args.devices: raw_args.append("--devices") raw_args.append(self.args.devices) if self.args.hash_platform_mapping: # if the user provides filename, we will load it. raw_args.append("--hash_platform_mapping") raw_args.append(self.args.hash_platform_mapping) if self.args.device_name_mapping: # if the user provides filename, we will load it. raw_args.append("--device_name_mapping") raw_args.append(self.args.device_name_mapping) app = GetConnectedDevices(raw_args=raw_args) devices_json = app.run() assert devices_json, "Devices cannot be empty" devices = json.loads(devices_json.strip()) return devices def _initializeDevices(self): """Create device meta data used by lab instance, and update devices in db.""" self.online_devices = self._getDevices() for k in self.online_devices: kind = k["kind"] hash = k["hash"] name = k["name"] abi = k["abi"] os = k["os"] entry = { "kind": kind, "hash": hash, "name": name, "abi": abi, "os": os, "available": True, "live": True, "start_time": None, "done_time": None, "output_dir": None, "job": None, "adb": ADB(hash, self.args.android_dir), "reboot_time": datetime.datetime.now() - datetime.timedelta(hours=8), "usb_hub": {}, } if kind not in self.lab_devices: self.lab_devices[kind] = {} self.lab_devices[kind][hash] = entry dvs = [ self.lab_devices[k][h] for k in self.lab_devices for h in self.lab_devices[k] ] self.db.updateDevices(self.args.claimer_id, getDevicesString(dvs), True) def _disableDevice(self, device): kind = device["kind"] hash = device["hash"] entry = self.lab_devices[kind][hash] entry["available"] = False entry["live"] = False self.db.updateDevices( self.args.claimer_id, getDevicesString([self.lab_devices[kind][hash]]), False, ) def _enableDevice(self, device): kind = device["kind"] hash = device["hash"] name = device["name"] abi = device["abi"] os = device["os"] entry = { "kind": kind, "hash": hash, "name": name, "abi": abi, "os": os, "available": True, "live": True, "start_time": None, "done_time": None, "output_dir": None, "job": None, "adb": ADB(hash, self.args.android_dir), "reboot_time": datetime.datetime.now() - datetime.timedelta(hours=8), "usb_hub": {}, } if kind not in self.lab_devices: self.lab_devices[kind] = {} self.lab_devices[kind][hash] = entry self.db.updateDevices( self.args.claimer_id, getDevicesString([self.lab_devices[kind][hash]]), False, ) def shutdown(self): self.db.updateDevices(self.args.claimer_id, "", True) self.running = False class CoolDownDevice(Thread): """Used by AsyncRun to cool device down after benchmark. Will reboot the device if required and add rebooting status to device entry.""" def __init__(self, device, args, db, force_reboot): Thread.__init__(self) self.device = device self.args = args self.db = db self.force_reboot = force_reboot def run(self): reboot = self.args.reboot and ( self.force_reboot or self.device["reboot_time"] + REBOOT_INTERVAL < datetime.datetime.now() ) success = True # reboot mobile devices if required if reboot: raw_args = [] raw_args.extend(["--platform", self.args.platform]) raw_args.extend(["--device", self.device["hash"]]) raw_args.extend(["--android_dir", self.args.android_dir]) self.device["rebooting"] = True if reboot_device(raw_args=raw_args): getLogger().info("Device {} was rebooted.".format(self.device)) self.device["reboot_time"] = datetime.datetime.now() else: self.device.pop("rebooting") getLogger().critical(f"Device {self.device} could not be rebooted.") success = False # sleep for device cooldown if self.args.platform.startswith("ios") or self.args.platform.startswith( "android" ): getLogger().info("Sleep 180 seconds") time.sleep(180) else: getLogger().info("Sleep 20 seconds") time.sleep(20) # device should be available again, remove rebooting flag. if "rebooting" in self.device: del self.device["rebooting"] if success: self.device["available"] = True device_str = getDevicesString([self.device]) self.db.updateDevices(self.args.claimer_id, device_str, False) getLogger().info( "Device {}({}) available".format( self.device["kind"], self.device["hash"] ) ) else: self.device["live"] = False getLogger().info("CoolDownDevice lock released")