common/recipes-utils/bcm5396-util/files/bcm5396_py3.py (361 lines of code) (raw):

# # Copyright 2004-present Facebook. All rights reserved. # # This program file is free software; you can redistribute it and/or modify it # under the terms of the GNU General Public License as published by the # Free Software Foundation; version 2 of the License. # # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License # for more details. # # You should have received a copy of the GNU General Public License # along with this program in a file named COPYING; if not, write to the # Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, # Boston, MA 02110-1301 USA # import subprocess import time class Bcm5396MDIO: """The class to access BCM5396 through MDIO intf""" MDIO_CMD = "mdio-bb" PHYADDR = 0x1E ACCESS_CTRL_REG = 16 IO_CTRL_REG = 17 STATUS_REG = 18 DATA0_REG = 24 DATA1_REG = 25 DATA2_REG = 26 DATA3_REG = 27 def __init__(self, mdc, mdio): self.mdc = mdc self.mdio = mdio self.page = -1 def __io(self, op, reg, val=0): cmd = "%s -p -c %s -d %s %s %s %s" % ( self.MDIO_CMD, self.mdc, self.mdio, op, str(self.PHYADDR), str(reg), ) if op == "write": cmd += " %s" % val out = ( subprocess.Popen(cmd.split(), stdout=subprocess.PIPE) .communicate()[0] .decode() ) if op == "write": return val # need to parse the result for read rc = 0 for line in out.split("\n"): if not line.startswith("Read:"): continue rc = int(line.split(":")[1], 0) return rc def __read_mdio(self, reg): return self.__io("read", reg) def __write_mdio(self, reg, val): return self.__io("write", reg, val) def __set_page(self, page): if self.page == page: return # Write MII register ACCESS_CTRL_REG: # set bit 0 as "1" to enable MDIO access # set "page number to bit 15:8 val = 0x1 | ((page & 0xFF) << 8) self.__write_mdio(self.ACCESS_CTRL_REG, val) self.page = page def __wait_for_done(self): # Read MII register IO_CTRL_REG: # Check op_code = "00" while self.__read_mdio(self.IO_CTRL_REG) & 0x3: time.sleep(0.010) # 10ms def read(self, page, reg, n_bytes): self.__set_page(page) # Write MII register IO_CTRL_REG: # set "Operation Code as "00" # set "Register Address" to bit 15:8 val = 0x00 | ((reg & 0xFF) << 8) self.__write_mdio(self.IO_CTRL_REG, val) # Write MII register IO_CTRL_REG: # set "Operation Code as "10" # set "Register Address" to bit 15:8 val = 0x2 | ((reg & 0xFF) << 8) self.__write_mdio(self.IO_CTRL_REG, val) self.__wait_for_done() # Read MII register DATA0_REG for bit 15:0 val = int(self.__read_mdio(self.DATA0_REG)) # Read MII register DATA1_REG for bit 31:16 val |= self.__read_mdio(self.DATA1_REG) << 16 # Read MII register DATA2_REG for bit 47:32 val |= self.__read_mdio(self.DATA2_REG) << 32 # Read MII register DATA3_REG for bit 63:48 val |= self.__read_mdio(self.DATA3_REG) << 48 return val def write(self, page, reg, val, n_bytes): self.__set_page(page) # Write MII register DATA0_REG for bit 15:0 self.__write_mdio(self.DATA0_REG, val & 0xFFFF) # Write MII register DATA1_REG for bit 31:16 self.__write_mdio(self.DATA1_REG, (val >> 16) & 0xFFFF) # Write MII register DATA2_REG for bit 47:32 self.__write_mdio(self.DATA2_REG, (val >> 32) & 0xFFFF) # Write MII register DATA3_REG for bit 63:48 self.__write_mdio(self.DATA3_REG, (val >> 48) & 0xFFFF) # Write MII register IO_CTRL_REG: # set "Operation Code as "00" # set "Register Address" to bit 15:8 val = 0x00 | ((reg & 0xFF) << 8) self.__write_mdio(self.IO_CTRL_REG, val) # Write MII register IO_CTRL_REG: # set "Operation Code as "01" # set "Register Address" to bit 15:8 val = 0x1 | ((reg & 0xFF) << 8) self.__write_mdio(self.IO_CTRL_REG, val) self.__wait_for_done() class Bcm5396SPI: """The class to access BCM5396 through SPI interface""" SPI_CMD = "spi-bb" READ_CMD = 0x60 WRITE_CMD = 0x61 SPI_STS_DIO = 0xF0 SPI_STS_REG = 0xFE SPI_STS_REG_RACK = 0x1 << 5 SPI_STS_REG_SPIF = 0x1 << 7 PAGE_REG = 0xFF def __init__(self, cs, clk, mosi, miso): self.cs = cs self.clk = clk self.mosi = mosi self.miso = miso self.page = -1 def __bytes2val(self, values): # LSB first, MSB last pos = 0 result = 0 for byte in values: if type(byte) is str: byte = int(byte, 16) if byte > 255: raise Exception("%s is not a byte in the list %s" % (byte, values)) result |= byte << pos pos += 8 return result def __val2bytes(self, value, n): result = [] for _ in range(n): result.append(value & 0xFF) value >>= 8 if value > 0: raise Exception("Value, %s, is too large for %s bytes" % (value, n)) return result def __io(self, bytes_to_write, to_read=0): # TODO: check parameters cmd = "%s -s %s -S low -c %s -o %s -i %s " % ( self.SPI_CMD, self.cs, self.clk, self.mosi, self.miso, ) if len(bytes_to_write): write_cmd = "-w %s %s " % ( len(bytes_to_write) * 8, " ".join([str(byte) for byte in bytes_to_write]), ) else: write_cmd = "" if to_read: # spi-bb will first return the exact number of bits used for # writing. So, total number of bits to read should also include # the number of bits written. cmd += "-r %s " % str((len(bytes_to_write) + to_read) * 8) cmd += write_cmd rc = 0 out = ( subprocess.Popen(cmd.split(), stdout=subprocess.PIPE) .communicate()[0] .decode() ) if to_read: # need to parse the result for line in out.split("\n"): if not line.startswith("Read"): continue res = line.split(":")[1] rc = self.__bytes2val(res.split()[len(bytes_to_write) :]) break return rc def __set_page(self, page): page &= 0xFF if self.page == page: return self.__io([self.WRITE_CMD, self.PAGE_REG, page]) self.page = page def __read_spi_reg(self, reg): reg &= 0xFF return self.__io([self.READ_CMD, reg], 1) def __read_spi_sts(self): return self.__read_spi_reg(self.SPI_STS_REG) def __read_spi_dio(self): return self.__read_spi_reg(self.SPI_STS_DIO) def read(self, page, reg, n_bytes): """Read a register value from a page.""" if n_bytes > 8: print("TODO to support reading more than 8 bytes") return 0 if page > 0xFF or reg > 0xFF: print("Page and register must be <= 255") return 0 try: self.__set_page(page) self.__io([self.READ_CMD, reg], 1) while True: # check sts sts = self.__read_spi_sts() if sts & self.SPI_STS_REG_RACK: break bytes = [] for _ in range(n_bytes): bytes.append(self.__read_spi_dio()) except Exception as e: print(e) return self.__bytes2val(bytes) def write(self, page, reg, val, n_bytes): """Write a value as n bytes to a register on a page.""" if page > 0xFF or reg > 0xFF: print("Page and register must be <= 255") return bytes = self.__val2bytes(val, n_bytes) if len(bytes) > 8: print("TODO to support writing more than 8 bytes") return bytes = [self.WRITE_CMD, reg] + bytes try: self.__set_page(page) self.__io(bytes) except Exception as e: print(e) class Bcm5396: """The class for BCM5396 Switch""" MDIO_ACCESS = 0 SPI_ACCESS = 1 def __init__(self, access, verbose=False, **kwargs): self.verbose = verbose if access == self.MDIO_ACCESS: self.access = Bcm5396MDIO(**kwargs) else: self.access = Bcm5396SPI(**kwargs) def write(self, page, reg, value, n_bytes): if self.verbose: print("WRITE {:2x} {:2x} {:2x} ".format(page, reg, n_bytes), end="") bytes = "{:2x}".format(value) print([bytes[i : i + 2] for i in range(0, len(bytes), 2)][-n_bytes:]) return self.access.write(page, reg, value, n_bytes) def read(self, page, reg, n_bytes): if self.verbose: print("READ {:2x} {:2x} {:2x} ".format(page, reg, n_bytes), end="") result = self.access.read(page, reg, n_bytes) if self.verbose: bytes = "{:2x}".format(result) print([bytes[i : i + 2] for i in range(0, len(bytes), 2)][-n_bytes:]) return result def __add_remove_vlan(self, add, vid, untag, fwd, spt): VLAN_PAGE = 0x5 CTRL_ADDR = 0x60 CTRL_START_DONE = 0x1 << 7 VID_ADDR = 0x61 ENTRY_ADDR = 0x63 fwd_map = self.__ports2portmap(fwd) untag_map = self.__ports2portmap(untag) # mark it as write and stop the previous action ctrl = 0 self.write(VLAN_PAGE, CTRL_ADDR, ctrl, 1) # write entry if add: entry = 0x1 | ((spt & 0x1F) << 1) | (fwd_map << 6) | (untag_map << 23) else: entry = 0x0 self.write(VLAN_PAGE, ENTRY_ADDR, entry, 8) # write vid as the index self.write(VLAN_PAGE, VID_ADDR, vid & 0xFFF, 2) # start the write ctrl = CTRL_START_DONE self.write(VLAN_PAGE, CTRL_ADDR, ctrl, 1) while True: ctrl = self.read(VLAN_PAGE, CTRL_ADDR, 1) if not (ctrl & CTRL_START_DONE): # done break time.sleep(0.010) # 10ms def add_vlan(self, vid, untag, fwd, spt=0): return self.__add_remove_vlan(True, vid, untag, fwd, spt) def remove_vlan(self, vid): return self.__add_remove_vlan(False, vid, [], [], 0) def get_vlan(self, vid): VLAN_PAGE = 0x5 CTRL_ADDR = 0x60 CTRL_START_DONE = 0x1 << 7 CTRL_READ = 0x1 VID_ADDR = 0x61 ENTRY_ADDR = 0x63 # mark it as read and stop the previous action ctrl = CTRL_READ self.write(VLAN_PAGE, CTRL_ADDR, ctrl, 1) # write the vid as the index self.write(VLAN_PAGE, VID_ADDR, vid & 0xFFF, 2) # start the read ctrl = CTRL_READ | CTRL_START_DONE self.write(VLAN_PAGE, CTRL_ADDR, ctrl, 1) while True: ctrl = self.read(VLAN_PAGE, CTRL_ADDR, 1) if not (ctrl & CTRL_START_DONE): # done break time.sleep(0.010) # 10ms entry = self.read(VLAN_PAGE, ENTRY_ADDR, 8) res = {} res["valid"] = True if entry & 0x1 else False res["spt"] = (entry >> 1) & 0x1F res["fwd"] = self.__portmap2ports((entry >> 6) & 0x1FFFF) res["untag"] = self.__portmap2ports((entry >> 23) & 0x1FFFF) return res def __portmap2ports(self, port_map): return list( set([port if port_map & (0x1 << port) else None for port in range(0, 17)]) - set([None]) ) def __ports2portmap(self, ports): port_map = 0 for port in ports: port_map |= 0x1 << port return port_map & 0x1FFFF def __parse_arl_result(self, vid, result): is_bitset = lambda bit: True if result & (0x1 << bit) else False if not is_bitset(3): return None res = {} # parse vid first res["vid"] = (vid >> 48) & 0xFFF mac_val = vid & 0xFFFFFFFFFFFF mac_list = [] for pos in range(5, -1, -1): mac_list.append("{:02x}".format((mac_val >> (pos * 8)) & 0xFF)) res["mac"] = ":".join(mac_list) if mac_val & (0x1 << 40): res["ports"] = self.__portmap2ports((result >> 6) & 0xFFFF) else: res["ports"] = [(result >> 6) & 0xF] res["static"] = is_bitset(5) res["age"] = is_bitset(4) res["valid"] = is_bitset(3) res["priority"] = result & 0x7 return res def get_all_arls(self): ARL_PAGE = 0x5 SEARCH_CTRL_ADDR = 0x30 SEARCH_CTRL_START_DONE = 0x1 << 7 SEARCH_CTRL_SR_VALID = 0x1 VID0_ADDR = 0x33 RESULT0_ADDR = 0x3B VID1_ADDR = 0x40 RESULT1_ADDR = 0x48 all = [] # write START to search control ctrl = SEARCH_CTRL_START_DONE self.write(ARL_PAGE, SEARCH_CTRL_ADDR, ctrl, 1) while True: ctrl = self.read(ARL_PAGE, SEARCH_CTRL_ADDR, 1) if not (ctrl & SEARCH_CTRL_START_DONE): # Done break if not (ctrl & SEARCH_CTRL_SR_VALID): # result is not ready, sleep and retry time.sleep(0.010) # 10ms continue for vid_addr, result_addr in [ [VID1_ADDR, RESULT1_ADDR], [VID0_ADDR, RESULT0_ADDR], ]: vid = self.read(ARL_PAGE, vid_addr, 8) result = self.read(ARL_PAGE, result_addr, 4) one = self.__parse_arl_result(vid, result) if one: all.append(one) return all def vlan_ctrl(self, enable): VLAN_CTRL_PAGE = 0x34 VLAN_CTRL0_REG = 0x0 VLAN_CTRL0_B_EN_1QVLAN = 0x1 << 7 ctrl = self.read(VLAN_CTRL_PAGE, VLAN_CTRL0_REG, 1) need_write = False if enable: if not ctrl & VLAN_CTRL0_B_EN_1QVLAN: need_write = True ctrl |= VLAN_CTRL0_B_EN_1QVLAN else: if ctrl & VLAN_CTRL0_B_EN_1QVLAN: need_write = True ctrl &= (~VLAN_CTRL0_B_EN_1QVLAN) & 0xFF if need_write: self.write(VLAN_CTRL_PAGE, VLAN_CTRL0_REG, ctrl, 1) def vlan_set_port_default(self, port, vid, pri=0): VLAN_PORT_PAGE = 0x34 VLAN_PORT_REG_BASE = 0x10 if port < 0 or port > 16: raise Exception("Invalid port number %s" % port) if pri < 0 or pri > 7: raise Exception("Invalid priority %s" % pri) if vid < 0 or vid > 0xFFF: raise Exception("Invalid VLAN %s" % vid) reg = VLAN_PORT_REG_BASE + port * 2 ctrl = (pri << 13) | vid self.write(VLAN_PORT_PAGE, reg, ctrl, 2) def vlan_get_port_default(self, port): VLAN_PORT_PAGE = 0x34 VLAN_PORT_REG_BASE = 0x10 if port < 0 or port > 16: raise Exception("Invalid port number %s" % port) reg = VLAN_PORT_REG_BASE + port * 2 val = self.read(VLAN_PORT_PAGE, reg, 2) res = {} res["priority"] = (val >> 13) & 0x7 res["vid"] = val & 0xFFF return res