common/recipes-rest/rest-api/files/redfish_chassis_helper.py (272 lines of code) (raw):
import json
import os
import time
import typing as t
from json.decoder import JSONDecodeError
from shutil import which
import aggregate_sensor as libag
import pal
import sdr
from common_utils import async_exec
SensorDetails = t.NamedTuple(
"SensorDetails",
[
("sensor_name", str),
("sensor_number", int),
("fru_name", str),
("reading", float),
# Not referencing sdr.ThreshSensor as sdr is unavailable on unit test env
# only for platforms that support libpal
("sensor_thresh", t.Optional["sdr.ThreshSensor"]),
("sensor_unit", t.Optional[str]),
# Not referencing pal.SensorHistory as pal is unavailable on unit test env
# only for platforms that support libpal
("sensor_history", t.Optional["pal.SensorHistory"]),
],
)
FRUID_UTIL_PATH = "/usr/local/bin/fruid-util"
WEUTIL_PATH = "/usr/bin/weutil"
""" sensor type constants as defined in sensors.py """
LIB_SENSOR_IN = 0
LIB_SENSOR_FAN = 1
LIB_SENSOR_TEMPERATURE = 2
LIB_SENSOR_POWER = 3
LIB_SENSOR_CURR = 5
sensor_unit_dict = {
LIB_SENSOR_FAN: "RPM",
LIB_SENSOR_TEMPERATURE: "C",
LIB_SENSOR_POWER: "Watts",
LIB_SENSOR_IN: "Amps",
LIB_SENSOR_CURR: "Volts",
}
SAD_SENSOR = -99999 # default reading for values not found.
def get_pal_sensor(fru_name: str, fru_id: int, sensor_id: int) -> SensorDetails:
sensor_unit = sdr.sdr_get_sensor_units(fru_id, sensor_id)
try:
reading = pal.sensor_read(fru_id, sensor_id)
reading = reading
start_time = int(time.time()) - 60
sensor_history = pal.sensor_read_history(fru_id, sensor_id, start_time)
except pal.LibPalError:
reading = SAD_SENSOR # default value when we can't get reading
sensor_history = None
try:
sensor_thresh = sdr.sdr_get_sensor_thresh(fru_id, sensor_id)
except sdr.LibSdrError:
sensor_thresh = None
try:
sensor_name = sdr.sdr_get_sensor_name(fru_id, sensor_id)
except sdr.LibSdrError:
sensor_name = str(sensor_id)
sensor_name = fru_name + "/" + fru_name + "/" + sensor_name
if sensor_unit == "%":
sensor_unit = "Percent" # DMTF accepts this unit as text
sensor_details = SensorDetails(
sensor_name=sensor_name,
sensor_number=sensor_id,
fru_name=fru_name,
reading=reading,
sensor_thresh=sensor_thresh,
sensor_unit=sensor_unit,
sensor_history=sensor_history,
)
return sensor_details
def get_sensor_details_using_libpal(
fru_name: str, desired_sensor_units: t.List[str]
) -> t.List[SensorDetails]:
"""Returns sensor details for platforms that support libpal.
Those are compute and newer fboss platforms"""
fru_name_map = pal.pal_fru_name_map()
fru_id = fru_name_map[fru_name]
sensor_details_list = [] # type: t.List[SensorDetails]
if not pal.pal_is_fru_prsnt(fru_id): # Check if the fru is present
return sensor_details_list
sensor_ids_list = pal.pal_get_fru_sensor_list(fru_id)
for sensor_id in sensor_ids_list:
sensor_details = get_pal_sensor(fru_name, fru_id, sensor_id)
if sensor_details.sensor_unit in desired_sensor_units:
sensor_details_list.append(sensor_details)
return sensor_details_list
def get_sensor_details_using_libpal_helper(
sensor_units: t.List[str], fru_name: t.Optional[str] = None
) -> t.List[SensorDetails]:
all_sensor_details = []
if fru_name is None:
fru_name_list = get_single_sled_frus()
for fru in fru_name_list:
all_sensor_details += get_sensor_details_using_libpal(fru, sensor_units)
else:
all_sensor_details = get_sensor_details_using_libpal(fru_name, sensor_units)
return all_sensor_details
def get_older_fboss_sensor_details_filtered(
fru_name: str, desired_sensor_type: t.List[int]
) -> t.List[SensorDetails]:
"""Returns sensor details using sensors.py for older fboss
platforms that don't support libpal i.e. yamp, wedge, wedge100"""
desired_sensor_units = [sensor_unit_dict[typeid] for typeid in desired_sensor_type]
all_sensors = get_older_fboss_sensor_details(fru_name)
sensor_details_list = []
for sensor_candidate in all_sensors:
if sensor_candidate.sensor_unit in desired_sensor_units:
sensor_details_list.append(sensor_candidate)
return sensor_details_list
def get_older_fboss_sensor_details(fru_name: str) -> t.List[SensorDetails]:
sensor_details_list = []
import sensors
sensors.init()
for chip in sensors.ChipIterator():
for sensor in sensors.FeatureIterator(chip):
chip_name = sensors.chip_snprintf_name(chip)
adapter_name = sensors.get_adapter_name(chip.bus)
sensor_tag = sensor.name.decode("utf-8")
reading_key = sensor_tag + "_input"
ucr_key = sensor_tag + "_max"
sensor_label = sensors.get_label(chip, sensor)
sensor_name = chip_name + "/" + adapter_name + "/" + sensor_label
sfs = list(sensors.SubFeatureIterator(chip, sensor))
sf_keys = []
sf_vals = []
subfeatures_dict = {}
# retrieving keys and values for subfeatures
for sf in sfs:
sf_keys.append(sf.name.decode("utf-8"))
try:
sf_vals.append(sensors.get_value(chip, sf.number))
except sensors.ErrorAccessRead:
# in case of an error, we set default SAD_SENSOR
subfeatures_dict[reading_key] = SAD_SENSOR
# for sensors that don't have upper threshold as a feature
if ucr_key not in sf_keys:
subfeatures_dict[ucr_key] = 0 # default upper threshold val
for key, val in zip(sf_keys, sf_vals):
subfeatures_dict[key] = val
sensor_details = SensorDetails(
sensor_name=sensor_name,
sensor_number=0, # default bc sensors.py doesn't provide sensor id
fru_name=fru_name,
reading=subfeatures_dict[reading_key],
sensor_thresh=sdr.ThreshSensor(
ucr_thresh=subfeatures_dict[ucr_key],
unc_thresh=0,
unr_thresh=0,
lcr_thresh=0,
lnc_thresh=0,
lnr_thresh=0,
),
sensor_unit=sensor_unit_dict[sensor.type],
sensor_history=None, # bc sensors.py doesn't provide sensor history
)
sensor_details_list.append(sensor_details)
sensors.cleanup()
return sensor_details_list
def get_aggregate_sensors() -> t.List[SensorDetails]:
"""This method helps us get aggregate sensors like
SYSTEM_AIRFLOW that we can't get using libpal and sensors.py"""
libag.aggregate_sensor_init()
as_size = libag.aggregate_sensor_count()
ag_sensors_list = [] # type: t.List[SensorDetails]
for sensor_id in range(as_size):
sensor_details = get_aggregate_sensor(sensor_id)
if sensor_details:
ag_sensors_list.append(sensor_details)
return ag_sensors_list
def get_aggregate_sensor(sensor_id: int) -> t.Optional[SensorDetails]:
try:
sensor_name = libag.aggregate_sensor_name(sensor_id)
except libag.LibAggregateError:
# If we can't get the sensor_name, we don't
# populate this sensor. Move to next iteration
return None
try:
reading = libag.aggregate_sensor_read(sensor_id)
except libag.LibAggregateError:
reading = SAD_SENSOR # so we know its unhealthy sensor
sensor_details = SensorDetails(
sensor_name="Chassis/Chassis/" + sensor_name,
sensor_number=sensor_id, # set default to 0
fru_name="Chassis",
reading=reading,
sensor_thresh=None, # set to default
sensor_unit=None, # set to default
sensor_history=None, # set to default
)
return sensor_details
FruInfo = t.NamedTuple(
"FruInfo",
[
("fru_name", str),
("manufacturer", t.Optional[str]),
("serial_number", t.Optional[int]),
("model", t.Optional[str]),
],
)
# as a workaround for lru_cache because its not compatible with async func
__CACHE_FRU_INFO = {} # type: t.Dict[str, FruInfo]
async def get_fru_info_helper(fru_name: t.Optional[str] = None) -> t.List[FruInfo]:
frus_info_list = []
if is_libpal_supported(): # for compute and new fboss platforms
if fru_name is None: # for single sled platforms
fru_name_list = get_single_sled_frus()
for fru in fru_name_list:
fru_info = await get_fru_info(fru)
frus_info_list.append(fru_info)
else: # for multisled platforms
fru_info = await get_fru_info(fru_name)
frus_info_list.append(fru_info)
else: # for older fboss platforms
fru_info = await get_fru_info("BMC")
frus_info_list.append(fru_info)
return frus_info_list
async def get_fru_info(fru_name: str) -> FruInfo: # noqa: C901
if fru_name in __CACHE_FRU_INFO:
return __CACHE_FRU_INFO[fru_name]
fru_details_dict = {}
product_name_key = "Product Name"
if is_fruid_util_available(): # then its a compute platform
# update keys as per fruid util output
product_manufacturer_key = "Product Manufacturer"
product_serial_key = "Product Serial"
cmd = [FRUID_UTIL_PATH, fru_name, "--json"]
_, fru_details, _ = await async_exec(cmd)
try:
fru_details_list = json.loads(fru_details)
if fru_details_list: # if its not empty
fru_details_dict = fru_details_list[0]
except JSONDecodeError:
pass
elif is_we_util_available(): # then its an fboss platform
cmd = [WEUTIL_PATH]
_, fru_details, _ = await async_exec(cmd)
s_data = fru_details.split(os.linesep)
for line in s_data:
key_pair = line.split(":")
if len(key_pair) > 1:
fru_details_dict[key_pair[0]] = key_pair[1]
# update keys as per weutil output
product_manufacturer_key = "System Manufacturer"
product_serial_key = "Product Serial Number"
else:
raise NotImplementedError("Can't get fru info for fru : {}".format(fru_name))
try:
fru_info = FruInfo(
fru_name,
fru_details_dict[product_manufacturer_key].strip(),
fru_details_dict[product_serial_key].strip(),
fru_details_dict[product_name_key].strip(),
)
__CACHE_FRU_INFO[fru_name] = fru_info
return fru_info
except KeyError:
return FruInfo(
fru_name=fru_name, manufacturer=None, serial_number=None, model=None
)
def is_fruid_util_available() -> bool:
return which(FRUID_UTIL_PATH) is not None
def is_we_util_available() -> bool:
return which(WEUTIL_PATH) is not None
def get_single_sled_frus() -> t.List[str]:
fru_name_map = pal.pal_fru_name_map()
fru_list = []
for fru_name, fruid in fru_name_map.items():
if (
pal.FruCapability.FRU_CAPABILITY_HAS_DEVICE
not in pal.pal_get_fru_capability(fruid)
and "exp" not in fru_name
):
fru_list.append(fru_name)
return fru_list
def is_libpal_supported() -> bool:
"""platforms that don't support libpal for sensor related data"""
UNSUPPORTED_PLATFORM_BUILDNAMES = ["yamp", "wedge", "wedge100"]
return pal.pal_get_platform_name() not in UNSUPPORTED_PLATFORM_BUILDNAMES
def _get_accelerator_list() -> t.List[str]:
accelerators = []
fru_name_map = pal.pal_fru_name_map()
for fru_name, fruid in fru_name_map.items():
fru_capabilities = pal.pal_get_fru_capability(fruid)
if (
pal.FruCapability.FRU_CAPABILITY_HAS_DEVICE in fru_capabilities
and pal.FruCapability.FRU_CAPABILITY_SERVER not in fru_capabilities
):
accelerators.append(fru_name)
return accelerators