benchmarking/utils/usb_controller.py (68 lines of code) (raw):
##############################################################################
# Copyright 2020-present, Facebook, Inc.
# All rights reserved.
#
# This source code is licensed under the license found in the
# LICENSE file in the root directory of this source tree.
##############################################################################
import json
import multiprocessing
import brainstem
from brainstem.result import Result
from utils.custom_logger import getLogger
manager = multiprocessing.Manager()
class USBController:
"""Controller for Acroname USB hubs. This class will allow lab devices
to be connected and disconnected from the lab by their device meta data.
loaded with mapping:
{
"hub_serial": {
"port_number": "device_hash"
}
}
"""
def __init__(self, usb_hub_device_mapping):
self.device_map = {} # map of device hash to (hub_serial, port_num)
self.active = manager.dict() # device hash to enable/disable boolean status
with open(usb_hub_device_mapping) as f:
mapping = json.load(f)
getLogger().info("mapping {}".format(mapping))
for hub_serial, port_device_map in mapping.items():
for port_number, device_hash in port_device_map.items():
self.device_map[device_hash] = (int(port_number), hub_serial)
self.active[device_hash] = True # default on
getLogger().info("mapping {}".format(self.device_map))
def connect(self, device_hash):
try:
port_number, hub_serial = self.device_map[device_hash]
except KeyError:
raise Exception("Device {} or hub not connected".format(device_hash))
stem = brainstem.stem.USBHub3p()
result = stem.discoverAndConnect(brainstem.link.Spec.USB, int(hub_serial))
if result != Result.NO_ERROR:
raise Exception(
"Could not connect to hub {} with error code {}".format(
hub_serial, result
)
)
try:
result = stem.usb.setPortEnable(port_number)
if result == Result.NO_ERROR:
self.active[device_hash] = True
else:
raise Exception(
"Could not enable port {} with error code {}".format(
port_number, result
)
)
finally:
stem.disconnect()
def disconnect(self, device_hash):
try:
port_number, hub_serial = self.device_map[device_hash]
except KeyError:
raise Exception("Device {} or hub not connected".format(device_hash))
stem = brainstem.stem.USBHub3p()
result = stem.discoverAndConnect(brainstem.link.Spec.USB, int(hub_serial))
if result != Result.NO_ERROR:
raise Exception(
"Could not connect to hub {} with error code {}".format(
hub_serial, result
)
)
try:
result = stem.usb.setPortDisable(port_number)
if result == Result.NO_ERROR:
self.active[device_hash] = False
else:
raise Exception(
"Could not diable port {} with error code {}".format(
port_number, result
)
)
finally:
stem.disconnect()