common/recipes-core/psu-update/files/pyrmd.py (148 lines of code) (raw):

#!/usr/bin/env python3 """ Modbus/rackmond library + Standalone tool to view current power/current readings """ import asyncio import contextlib import struct import time class ModbusException(Exception): ... class ModbusTimeout(ModbusException): ... class ModbusCRCError(ModbusException): ... modbuslog = None logstart = None def log(*args, **kwargs): if modbuslog: t = time.time() - logstart if all(args): pfx = "[{:4.02f}]".format(t) else: pfx = "" print(pfx, *args, file=modbuslog, **kwargs) @contextlib.contextmanager def transcript(): global modbuslog, logstart logstart = time.time() with open("modbus-transcript.log", "w") as f: modbuslog = f yield modbuslog = None # This many modbus requests can be in flight at once MAX_QUEUED = 15 MODBUS_SEM = asyncio.Semaphore(MAX_QUEUED) async def modbuscmd(raw_cmd, expected=0, timeout=0): async with MODBUS_SEM: reader, writer = await asyncio.open_unix_connection("/var/run/rackmond.sock") cmd_header = struct.pack("@HxxHHL", 1, len(raw_cmd), expected, timeout) log("-> {}".format(" ".join("{:02x}".format(b) for b in raw_cmd))) cmd = cmd_header + raw_cmd req_header = struct.pack("@H", len(cmd)) writer.write(req_header + cmd) response = await reader.read() writer.close() rlen, = struct.unpack("@H", response[:2]) if rlen == 0: error, = struct.unpack("@H", response[2:4]) if error == 4: log("<- timeout") log("") raise ModbusTimeout() if error == 5: log("<- [CRC ERROR]") log("") raise ModbusCRCError() raise ModbusException(error) log("<- {}".format(" ".join("{:02x}".format(b) for b in response[2:rlen]))) log("") return response[2:rlen] async def read_register(addr, register, length=1, timeout=0): cmd = struct.pack(">BBHH", addr, 0x3, register, length) data = await modbuscmd(cmd, expected=5 + (2 * length), timeout=timeout) return data[3:] class Register: __slots__ = ["name", "start", "length", "convert", "interval"] def __init__(self, name, start, length=1, convert=None, interval=None): self.name = name self.start = start self.length = length self.convert = convert self.interval = interval def conv_ascii(bs): return bs.decode("ascii").strip("\x00 ") def conv_fixed(n): def convert(bs): return int.from_bytes(bs, byteorder="big") / (2 ** n) return convert REGISTERS = [ Register("model", 0x0, 8, conv_ascii, 120), Register("fw", 0x38, 4, conv_ascii, 120), Register("power", 0x96, convert=conv_fixed(3)), Register("current", 0x8C, convert=conv_fixed(6)), ] class PSU: def __init__(self, addr, rescan=120): self.addr = addr self.readings = {} self.regs = REGISTERS self.uptimes = {} self.rescan = rescan self.recheck = None async def update_register(self, reg): if reg.interval and time.time() < reg.interval + self.uptimes.get(reg.name, 0): return try: bs = await read_register(self.addr, reg.start, reg.length) if reg.convert: self.readings[reg.name] = reg.convert(bs) else: self.readings[reg.name] = bs self.uptimes[reg.name] = time.time() return True except ModbusTimeout: self.readings[reg.name] = None return False async def read(self): now = time.time() if self.recheck and now < self.recheck: return self.recheck = None if all( not ok for ok in await asyncio.gather( *[self.update_register(reg) for reg in self.regs] ) ): # If all reads failed, wait `rescan` seconds to check this PSU # address again now = time.time() self.recheck = now + self.rescan async def modelscan(): addrs = [0xA4, 0xA5, 0xA6, 0xB4, 0xB5, 0xB6] psus = [PSU(a) for a in addrs] cols = [reg.name for reg in REGISTERS] widths = {} print("\x1b[?1049h", end="") # enter altscreen print("\x1b[0;0H\x1b[s\x1b[?25l", end="") # save cursor while True: # Update PSU data await asyncio.gather(*[psu.read() for psu in psus]) print(" ", end="") for col in cols: print("{c:<{w}} ".format(c=col, w=widths.get(col, 0)), end="") print() for psu in psus: print(" {:02x} ".format(psu.addr), end="") for col in cols: v = psu.readings.get(col, None) if not v and col == cols[0] and psu.recheck: v = "t/o.. ({:0.02f}s)".format(psu.recheck - time.time()) v = str(v) if v else "t/o" widths[col] = max(widths.get(col, 0), len(v)) print("{v:<{w}} ".format(v=v, w=widths[col]), end="") print() print("\x1b[u", end="", flush=True) # restore cursor if __name__ == "__main__": loop = asyncio.get_event_loop() try: loop.run_until_complete(modelscan()) except KeyboardInterrupt: pass finally: print("\x1b[?1049l\x1b[?25h", end="", flush=True) # leave altscreen loop.run_until_complete(loop.shutdown_asyncgens()) loop.close()