common/recipes-core/psu-update/files/psu-update-artesyn.py (273 lines of code) (raw):

#!/usr/bin/env python3 """ usage: <address> <pickled Image> [logic | primary | secondary | discharger | all] ... ./artup.by 0xb6 firmware.pkl all """ import argparse import asyncio import json import os import pickle import struct import subprocess import sys from contextlib import contextmanager import pyrmd import srec from pyrmd import modbuscmd, read_register, transcript def auto_int(x): return int(x, 0) parser = argparse.ArgumentParser() parser.add_argument("--addr", type=auto_int, required=True, help="PSU Modbus Address") parser.add_argument( "--statusfile", default=None, help="Write status to JSON file during process" ) parser.add_argument( "--rmfwfile", action="store_true", help="Delete FW file after update completes" ) parser.add_argument( "--transcript", action="store_true", help="Write modbus commands and replies to modbus-transcript.log", ) parser.add_argument("file", help="firmware file") status = {"pid": os.getpid(), "state": "started"} statuspath = None def write_status(): global status if statuspath is None: return tmppath = statuspath + "~" with open(tmppath, "w") as tfh: tfh.write(json.dumps(status)) os.rename(tmppath, statuspath) def status_state(state): global status status["state"] = state write_status() ISP_FLASH_DATA = 0x45 ISP_CTRL_CMD = 0x42 ISP_CONFIG_STATUS = 0x43 ISP_CTRL_ENTER = 0x01 ISP_CTRL_EXIT = 0x00 ISP_CTRL_RESTART = 0x03 ADP_START_UPDATE = 0x05 ADP_UPDATE_BLOCK = 0x06 ADP_FINISH_UPDATE = 0x07 PACKET_SIZE = 64 class Statusline: def __init__(self): self.msg = "" def update(self, msg): ll = len(self.msg) self.msg = msg print(msg, end="", flush=False) print(" " * (ll - len(msg)), end="\r", flush=True) def done(self, msg): self.update(msg) self.msg = "" print() sl = Statusline() def hdlc_crc(data): data = bytearray(data) crc = 0xFFFF for i in data: temp = i & 0xFF temp ^= crc temp &= 0xFF temp ^= temp << 4 temp &= 0xFF crc = ( (temp << 8) & 0xFFFF ^ ((crc >> 8) & 0x00FF) ^ ((temp >> 4) & 0xFFFF) ^ ((temp << 3) & 0xFFFF) ) crc ^= 0xFFFF crc &= 0xFFFF data.append(crc & 0xFF) data.append(((crc >> 8) & 0xFF)) return data def adp_start_update(p): return hdlc_crc(struct.pack("BBBBB", ADP_START_UPDATE, 0x00, p, 0x01, 0x00)) def isp_enter(target): cmd = struct.pack("BBB", ISP_CTRL_CMD, ISP_CTRL_ENTER, target.boot_type) if target.adp: cmd += adp_start_update(target.adp) return cmd def isp_data(target, data, seqbytes): cmd = struct.pack("BB", ISP_FLASH_DATA, len(data) + 2) cmd += seqbytes cmd += data return cmd def isp_exit(target): cmd = struct.pack("BBB", ISP_CTRL_CMD, ISP_CTRL_EXIT, target.boot_type) if target.adp: cmd += hdlc_crc( struct.pack("BBBBB", ADP_FINISH_UPDATE, 0x00, target.adp, 0x01, 0x00) ) return cmd class ArtesynTarget: __slots__ = ["name", "startseq", "begin", "end", "boot_type", "adp"] def __init__(self, name, startseq, begin, end, boot_type, adp=None): self.name = name self.startseq = startseq # Convert wordaddrs to byteaddrs self.begin = begin * 2 self.end = (end + 1) * 2 self.boot_type = boot_type self.adp = adp targets = { t.name: t for t in [ ArtesynTarget("primary", 0x0, 0x00000, 0x05DFF, 0xAA, 0x41), ArtesynTarget("secondary", 0x300, 0x06000, 0x0BDFF, 0xBB, 0x42), ArtesynTarget("discharger", 0x600, 0x0C000, 0x11DFF, 0xDD, 0x43), ArtesynTarget("logic", 0x900, 0x12000, 0x183FF, 0xCC), ] } def hd(bs): return " ".join("{:02x}".format(b) for b in bs) class RequestError(Exception): def __init__(self, *args): newargs = [] for arg in args: if isinstance(arg, bytes): arg = hd(arg) newargs.append(arg) super().__init__(*newargs) async def request(addr, modbus_cmd, *args, **kwargs): cmd = bytearray([addr]) cmd.extend(modbus_cmd) resp = await modbuscmd(cmd, *args, **kwargs) # Strip address return resp[1:] async def get_status(addr: int): bits = await request(addr, struct.pack("B", ISP_CONFIG_STATUS), expected=6) if bits[0] != ISP_CONFIG_STATUS: raise RequestError(bits[1:]) status = int.from_bytes(bits[1:], byteorder="big") flags = [ "mode", "startup_error", "checksum_error", "boundary_error", "packet_size_error", "pass_code_error", ] res = {} for i, flag in enumerate(flags): res[flag] = status >> i & 0x1 == 0 res["mode"] = "ISP" if res["mode"] else "MAP" return res class ArtesynError(Exception): def __init__(self, *args, resp=b""): super().__init__(self, *args) self.resp = resp async def send_target( addr: int, image: srec.Image, target: ArtesynTarget, prev_sent: int, total_data: int ): data = image[target.begin : target.end] print("Flashing {} on {:x}".format(target.name, addr)) n_chunks = len(data) // PACKET_SIZE if len(data) % PACKET_SIZE != 0: print("Packet size does not evenly divide image!") n_chunks += 1 sent = 0 rlen = 0 seq = target.startseq for n in range(n_chunks): chunk = data[n * PACKET_SIZE : (n + 1) * PACKET_SIZE] if len(chunk) < PACKET_SIZE: chunk += b"\xff" * (PACKET_SIZE - len(chunk)) if sent == 0: timeout = 3000 else: timeout = 500 seqbytes = struct.pack(">H", seq) explen = (3 + rlen) if rlen != 0 else 0 resp = await request( addr, isp_data(target, chunk, seqbytes), timeout=timeout, expected=explen ) rlen = len(resp) sent += PACKET_SIZE if resp[0] == 0xC5: # error raise ArtesynError("Error response: {}".format(resp), resp=resp) if resp[0] == 0x45: if resp[4] == 0x00 or resp[4] == 0x0A: seq += 1 else: raise ArtesynError("Block sequence error: {}".format(resp), resp=resp) sl.update( "Flashing {}... {:.2%} \r".format( target.name, (sent + prev_sent) / total_data ) ) status_state( "Flashing {}... {:.2%} \r".format( target.name, (sent + prev_sent) / total_data ) ) status["flash_progress_percent"] = 100 * (sent + prev_sent) / total_data sl.done("Flashed {}".format(target.name)) return sent async def fw_revision(addr): resp = await read_register(addr, 0x38, length=4) return resp.decode("ascii").strip() async def aupd(addr, image, targetnames): print("fw rev... ", end="") try: print(await fw_revision(addr)) except pyrmd.ModbusTimeout: print("timed out.") entered = [] total_data = 0 total_sent = 0 try: for name in targetnames: status_state("flashing target {}".format(name)) target = targets[name] entered.append(name) enter_tries = 3 total_data += target.end - target.begin while True: try: result = await request(addr, isp_enter(target), timeout=3000) break except pyrmd.ModbusCRCError as e: # The first of these sometimes fails, always with a CRC # check failure. # If one succeeds, they all do, though. # TODO: figure out why # I suspect the microcontroller isn't quite completing the # reply before it switches into ISP Mode, and response is # incomplete or garbled somehow. # For now though, a retry or two gets past this point # and things are pretty much fine after that. enter_tries -= 1 if enter_tries > 0: print("failed to enter bootloader, retrying...") continue raise e if target.name == "logic" and result[0] & 0x80 != 0: raise RequestError("Error entering ISP mode", result) print("ISP enter: {}".format(name)) for name in reversed(targetnames): target = targets[name] total_sent += await send_target(addr, image, target, total_sent, total_data) finally: for name in reversed(entered): target = targets[name] sl.done("Exiting update of {}".format(target.name)) status_state("done") await request(addr, isp_exit(target), timeout=3000) await asyncio.sleep(2) print("fw rev... ", end="") try: print(await fw_revision(addr)) except pyrmd.ModbusTimeout: print("timed out.") @contextmanager def suppress_monitoring(): """ contextmanager to pause rackmond monitoring on entry and resume on exit, including exits due to exception """ try: subprocess.check_output(["rackmonctl", "pause"]) yield finally: subprocess.check_output(["rackmonctl", "resume"]) def main(): args = parser.parse_args() addr = args.addr global statuspath statuspath = args.statusfile with open(args.file, "rb") as f: image = pickle.load(f) targetnames = ["logic", "discharger", "secondary", "primary"] loop = asyncio.get_event_loop() with transcript(), suppress_monitoring(): loop.run_until_complete(aupd(addr, image, targetnames)) if __name__ == "__main__": main()