common/recipes-rest/rest-api/files/node_sensors.py (199 lines of code) (raw):
#!/usr/bin/env python
#
# Copyright 2015-present Facebook. All Rights Reserved.
#
# This program file is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; version 2 of the License.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
# for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program in a file named COPYING; if not, write to the
# Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor,
# Boston, MA 02110-1301 USA
#
import re
from asyncio import TimeoutError
from typing import Any, Dict, List, Optional
import common_utils
from node import node
async def sensor_util_history_clear(fru="all", sensor_id="", sensor_name=""):
cmd = ["/usr/local/bin/sensor-util", fru, "--history-clear"]
if sensor_id != "":
cmd += [sensor_id]
if sensor_name != "":
cmd_util = ["/usr/local/bin/sensor-util", fru]
try:
retcode, stdout, stderr = await common_utils.async_exec(
cmd_util, shell=False
)
out = stdout.decode().splitlines()
rx = re.compile(
r"(\S+[\S\s]*\S+)\s+\(0x([a-fA-F\d]+)\)\s+:\s+(-?\d+\.\d+)\s+(\S+)\s+\|\s+\((\S+)\)(.*)$"
)
rx_na = re.compile(
r"(\S+[\S\s]*\S+)\s+\(0x([a-fA-F\d]+)\)\s+:\s+(\S+)\s+\|\s+\((\S+)\)"
)
for line in out:
m_val = rx.match(line)
m_na = rx_na.match(line)
if m_val:
s_name = m_val.group(1)
s_id = m_val.group(2)
s_val = m_val.group(3)
s_unit = m_val.group(4)
s_status = m_val.group(5)
elif m_na:
s_name = m_na.group(1)
s_id = m_na.group(2)
s_val = m_na.group(3) # noqa: F841
s_unit = "na" # noqa: F841
s_status = m_na.group(4) # noqa: F841
else:
continue
if sensor_name != "" and sensor_name.lower() == s_name.lower():
snr_num = "0x" + str(s_id)
cmd += [snr_num]
except TimeoutError:
print("TimeoutException received")
except Exception:
print("Exception received")
try:
retcode, stdout, stderr = await common_utils.async_exec(cmd, shell=True)
if retcode == 0:
return {"result": "success"}
else:
return {"result": "failure"}
except Exception:
return {"result": "failure"}
async def sensor_util( # noqa: C901
fru="all",
sensor_name="",
sensor_id="",
period="60",
display: Optional[List[Any]] = None,
):
cmd = ["/usr/local/bin/sensor-util", fru]
if sensor_id != "":
cmd += [sensor_id]
if "thresholds" in display:
cmd += ["--threshold"]
if "history" in display:
return {}
elif "history" in display:
cmd += ["--history", period]
sensors = []
if sensor_id != "":
sensor_id_val = int(sensor_id, base=16)
else:
sensor_id_val = 0
try:
retcode, stdout, stderr = await common_utils.async_exec(cmd, shell=False)
out = stdout.splitlines()
sensors = {}
if "history" in display:
# MB_CONN_P12V_INA230_PWR (0x9B) min = NA, average = NA, max = NA
# SYSTEM_AIRFLOW (0x0) min = 15.48, average = 15.77, max = 15.86
rx = re.compile(
r"(\S+[\S\s]*\S+)\s+\(0x([a-fA-F\d]+)\)\s+min = ([^,]+), average = ([^,]+), max = (\S+)"
)
else:
# SYSTEM_AIRFLOW (0x0) : 15.78 CFM | (ok)
# SYSTEM_AIRFLOW (0x0) : 15.62 CFM | (ok) | UCR: NA | UNC: NA | UNR: NA | LCR: NA | LNC: NA | LNR: NA
rx = re.compile(
r"(\S+[\S\s]*\S+)\s+\(0x([a-fA-F\d]+)\)\s+:\s+(-?\d+\.\d+)\s+(\S+)?\s+\|\s+\((\S+)\)(.*)$"
)
# MB_CONN_P12V_INA230_PWR (0x9B) : NA | (na)
# MB_CONN_P12V_INA230_PWR (0x9B) : NA | (na)
rx_na = re.compile(
r"(\S+[\S\s]*\S+)\s+\(0x([a-fA-F\d]+)\)\s+:\s+(\S+)\s+\|\s+\((\S+)\)"
)
for line in out:
if "history" in display:
m = rx.match(line)
if m:
s_name = m.group(1)
s_id = m.group(2)
s_min = m.group(3)
s_avg = m.group(4)
s_max = m.group(5)
snr = {"min": s_min, "avg": s_avg, "max": s_max}
if "id" in display:
snr["id"] = s_id
else:
continue
else:
m_val = rx.match(line)
m_na = rx_na.match(line)
s_thresholds = {}
if m_val:
s_name = m_val.group(1)
s_id = m_val.group(2)
s_val = m_val.group(3)
s_unit = m_val.group(4) if m_val.group(4) != None else ""
s_status = m_val.group(5)
if "thresholds" in display:
thres_str = m_val.group(6).strip()
threshold_arr = thres_str.split("|")
for t in threshold_arr:
a = t.split(": ")
if len(a) != 2:
continue
key = a[0].strip()
val = a[1].strip()
if val.lower() != "na":
s_thresholds[key] = val
elif m_na:
s_name = m_na.group(1)
s_id = m_na.group(2)
s_val = m_na.group(3)
s_unit = "na"
s_status = m_na.group(4)
else:
continue
snr = {"value": s_val}
if "units" in display:
snr["units"] = s_unit
if "id" in display:
snr["id"] = s_id
if "status" in display:
snr["status"] = s_status
if "thresholds" in display and len(s_thresholds) > 0:
snr["thresholds"] = s_thresholds
processing_id = int(s_id, 16)
if (
(sensor_id == "" and sensor_name == "")
or (sensor_name != "" and sensor_name.lower() == s_name.lower())
or (sensor_id != "" and sensor_id_val == processing_id)
):
if s_name in sensors:
if isinstance(sensors[s_name], list):
sensors[s_name].append(snr)
else:
sensors[s_name] = [sensors[s_name], snr]
else:
sensors[s_name] = snr
except TimeoutError:
print("TimeoutException received")
except Exception as e:
print("Exception received")
print(e)
return sensors
class sensorsNode(node):
def __init__(self, name, info=None, actions=None):
self.name = name
if info == None:
self.info = {}
else:
self.info = info
if actions == None:
self.actions = []
else:
self.actions = actions
async def getInformation(self, param: Optional[Dict[Any, Any]] = None):
snr_name = ""
snr_id = ""
period = "60"
display = []
if "display" in param:
display = param["display"].split(",")
if "name" in param:
snr_name = param["name"]
if "id" in param:
snr_id = param["id"]
if "history-period" in param:
period = param["history-period"]
return await sensor_util(self.name, snr_name, snr_id, period, display)
async def doAction(self, info, param: Optional[Dict[Any, Any]] = None):
snr_name = ""
snr = ""
if "name" in param:
snr_name = param["name"]
if "id" in param:
snr = param["id"]
if not self.actions:
return {"result": "failure"}
return await sensor_util_history_clear(self.name, snr, snr_name)
def get_node_sensors(name):
actions = ["history-clear"]
return sensorsNode(name, actions=actions)