benchmarking/platforms/android/adb.py (176 lines of code) (raw):
#!/usr/bin/env python
##############################################################################
# Copyright 2017-present, Facebook, Inc.
# All rights reserved.
#
# This source code is licensed under the license found in the
# LICENSE file in the root directory of this source tree.
##############################################################################
from __future__ import absolute_import, division, print_function, unicode_literals
import os
import re
from platforms.platform_util_base import PlatformUtilBase
from six import string_types
from utils.custom_logger import getLogger
class ADB(PlatformUtilBase):
def __init__(self, device=None, tempdir=None):
super(ADB, self).__init__(device, tempdir)
def run(self, *args, **kwargs):
adb = self._addADB()
return super(ADB, self).run(adb, *args, **kwargs)
def push(self, src, tgt):
# Always remove the old file before pushing the new file
self.deleteFile(tgt)
return self.run("push", src, tgt)
def pull(self, src, tgt):
return self.run("pull", src, tgt)
def logcat(self, *args, timeout=30, retry=1):
# logcat can hang if a device becomes unavailable
return self.run("logcat", *args, timeout=timeout, retry=retry)
def reboot(self):
try:
self.run("reboot")
return True
except Exception:
getLogger().critical(
f"Rebooting failure for device {self.device}.",
exc_info=True,
)
return False
def root(self, silent=False):
return self.restart_adbd(root=True, silent=silent)
def unroot(self, silent=False):
return self.restart_adbd(root=False, silent=silent)
def user_is_root(self):
return self.get_user() == "root"
def get_user(self):
try:
return self.shell("whoami", retry=1, silent=True)[0]
except Exception:
getLogger().exception("whoami failed.")
return None # could fail on unrooted device
def restart_adbd(self, root=False, silent=False):
user = self.get_user()
if user is not None:
try:
if root and user != "root":
if not silent:
getLogger().info("Restarting adbd with root privilege.")
self.run(["root"], retry=1, silent=True)
elif not root and user == "root":
if not silent:
getLogger().info("Restarting adbd with nonroot privilege.")
self.run(["unroot"], retry=1, silent=True)
else:
return True # no-op
# Check if change worked
user = self.get_user()
if not silent:
getLogger().info(f"adbd user is now: {user}.")
return user == "root" if root else user != "root"
except Exception:
err_text = f"Error while restarting adbd with {'non' if not root else ''}root privilege."
if silent:
# still log error but no alert if in silent mode
getLogger().error(err_text, exc_info=True)
else:
getLogger().critical(err_text, exc_info=True)
return False
def deleteFile(self, file):
return self.shell(["rm", "-rf", file])
def shell(self, cmd, **kwargs):
dft = None
if "default" in kwargs:
dft = kwargs.pop("default")
val = self.run("shell", cmd, **kwargs)
if val is None and dft is not None:
val = dft
return val
def su_shell(self, cmd, **kwargs):
su_cmd = ["su", "-c"]
su_cmd.extend(cmd)
return self.shell(su_cmd, **kwargs)
def getprop(self, property: str, **kwargs) -> str:
if "default" not in kwargs:
kwargs["default"] = [""]
result = self.run(["shell", "getprop", property], **kwargs)
if type(result) is not list:
getLogger().error(
f"adb.getprop(\"{property}\") unexpectedly returned {type(result)} '{result}'."
)
return ""
if len(result) == 0:
getLogger().error(f'adb.getprop("{property}") returned an empty list.')
return ""
retval = result[0].strip()
getLogger().info(f"adb.getprop(\"{property}\") returned '{retval}'.")
return retval
def setprop(self, property, value, **kwargs):
self.adb.shell(["setprop", property, value], **kwargs)
def isRootedDevice(self, silent=True) -> bool:
try:
ret = self.shell(
["id", "-u", "2>&1"], retry=1, silent=silent, ignore_status=silent
)
if not silent:
getLogger().info(f"id -u returned '{ret}'.")
if "0" in ret:
return True
ret = self.shell(
["which", "su", "2>&1"], retry=1, silent=silent, ignore_status=silent
)
if not silent:
getLogger().info(f"which su returned '{ret}'.")
is_rooted = (
ret is not None and len(ret) > 0 and ret[0].find("not found") == -1
)
return is_rooted
except Exception:
return False
def setFrequency(self, target):
if not self.isRootedDevice():
getLogger().warning(
f"Cannot set frequency on unrooted device {self.device}."
)
return
cpus = self._getCPUs()
for cpu in cpus:
freq_target = None
if isinstance(target, dict):
if cpu in target:
freq_target = target[cpu]
else:
freq_target = "mid"
elif isinstance(target, string_types):
freq_target = target
else:
raise AssertionError("Unsupported frequency target")
self._setOneCPUFrequency(cpu, freq_target)
def _addADB(self):
adb = ["adb"]
if self.device:
adb.extend(["-s", self.device])
return adb
def _setOneCPUFrequency(self, cpu, freq_target):
directory = os.path.join(*["/sys/devices/system/cpu/", cpu, "/"])
scaling_governor = directory + "cpufreq/scaling_governor"
self.su_shell(['"echo userspace > {}"'.format(scaling_governor)])
set_scaling_governor = self.su_shell(["cat", scaling_governor]).strip()
assert set_scaling_governor == "userspace", getLogger().fatal(
"Cannot set scaling governor to userspace"
)
avail_freq = directory + "cpufreq/scaling_available_frequencies"
freqs = self.su_shell(["cat", avail_freq]).strip().split(" ")
assert len(freqs) > 0, "No available frequencies"
freq = None
if freq_target == "max":
freq = freqs[-1]
elif freq_target == "min":
freq = freqs[0]
elif freq_target == "mid":
freq = freqs[int(len(freqs) / 2)]
else:
assert re.match(r"^\d+$", freq_target), "Frequency target is not integer"
freq = freq_target
minfreq = directory + "cpufreq/scaling_min_freq"
self.su_shell(['"echo {} > {}"'.format(freq, minfreq)])
maxfreq = directory + "cpufreq/scaling_max_freq"
self.su_shell(['"echo {} > {}"'.format(freq, maxfreq)])
curr_speed = directory + "cpufreq/scaling_cur_freq"
set_freq = self.su_shell(["cat", curr_speed]).strip()
assert set_freq == freq, "Unable to set frequency {} for {}".format(
freq_target, cpu
)
getLogger().info("On {}, set {} frequency to {}".format(self.device, cpu, freq))
def _getCPUs(self):
dirs = self.su_shell(["ls", "/sys/devices/system/cpu/"])
dirs = dirs.split("\n")
return [x for x in dirs if re.match(r"^cpu\d+$", x)]