platform/broadcom/sonic-platform-modules-dell/s6100/sonic_platform/sfp.py (158 lines of code) (raw):

#!/usr/bin/env python ############################################################################# # DELLEMC # # Module contains an implementation of SONiC Platform Base API and # provides the platform information # ############################################################################# try: import os import syslog import time from sonic_platform_base.sonic_xcvr.sfp_optoe_base import SfpOptoeBase except ImportError as e: raise ImportError(str(e) + "- required module not found") QSFP_INFO_OFFSET = 128 class Sfp(SfpOptoeBase): """ DELLEMC Platform-specific Sfp class """ def __init__(self, index, sfp_type, eeprom_path, sfp_control, sfp_ctrl_idx): SfpOptoeBase.__init__(self) self.sfp_type = sfp_type self.index = index + 1 self.eeprom_path = eeprom_path self.sfp_control = sfp_control self.sfp_ctrl_idx = sfp_ctrl_idx def get_eeprom_path(self): return self.eeprom_path def get_name(self): return "QSFP+ or later" def get_presence(self): """ Retrieves the presence of the sfp """ if self.index > 64: return False presence_ctrl = self.sfp_control + 'qsfp_modprs' try: reg_file = open(presence_ctrl) reg_hex = reg_file.readline().rstrip() # content is a string containing the hex # representation of the register reg_value = int(reg_hex, 16) # Mask off the bit corresponding to our port if (self.sfp_ctrl_idx > 15): index = self.sfp_ctrl_idx % 16 else: index = self.sfp_ctrl_idx # Mask off the bit corresponding to our port mask = (1 << index) # ModPrsL is active low if ((reg_value & mask) == 0): return True except (IOError, ValueError) as err: syslog.syslog(syslog.LOG_ERR, str(err)) return False def get_reset_status(self): """ Retrieves the reset status of SFP """ if self.index > 64: return False reset_status = None reset_ctrl = self.sfp_control + 'qsfp_reset' try: reg_file = open(reset_ctrl, "r+") reg_hex = reg_file.readline().rstrip() # content is a string containing the hex # representation of the register reg_value = int(reg_hex, 16) # Mask off the bit corresponding to our port if (self.sfp_ctrl_idx > 15): index = self.sfp_ctrl_idx % 16 else: index = self.sfp_ctrl_idx mask = (1 << index) if ((reg_value & mask) == 0): reset_status = True else: reset_status = False except (IOError, ValueError) as err: syslog.syslog(syslog.LOG_ERR, str(err)) return False return reset_status def get_lpmode(self): """ Retrieves the lpmode (low power mode) status of this SFP """ if self.index > 64: return False lpmode_ctrl = self.sfp_control + 'qsfp_lpmode' try: reg_file = open(lpmode_ctrl, "r+") reg_hex = reg_file.readline().rstrip() # content is a string containing the hex # representation of the register reg_value = int(reg_hex, 16) # Mask off the bit corresponding to our port if (self.sfp_ctrl_idx > 15): index = self.sfp_ctrl_idx % 16 else: index = self.sfp_ctrl_idx mask = (1 << index) if ((reg_value & mask) == 0): lpmode_state = False else: lpmode_state = True except (IOError, ValueError) as err: syslog.syslog(syslog.LOG_ERR, str(err)) return False return lpmode_state def reset(self): """ Reset SFP and return all user module settings to their default srate. """ if self.index > 64: return False reset_ctrl = self.sfp_control + 'qsfp_reset' try: # Open reset_ctrl in both read & write mode reg_file = open(reset_ctrl, "r+") reg_hex = reg_file.readline().rstrip() reg_value = int(reg_hex, 16) # Mask off the bit corresponding to our port if (self.sfp_ctrl_idx > 15): index = self.sfp_ctrl_idx % 16 else: index = self.sfp_ctrl_idx # Mask off the bit corresponding to our port mask = (1 << index) # ResetL is active low reg_value = (reg_value & ~mask) # Convert our register value back to a # hex string and write back reg_file.seek(0) reg_file.write(hex(reg_value)) reg_file.close() # Sleep 1 second to allow it to settle time.sleep(1) # Flip the bit back high and write back to the # register to take port out of reset reg_file = open(reset_ctrl, "w") reg_value = reg_value | mask reg_file.seek(0) reg_file.write(hex(reg_value)) reg_file.close() except (IOError, ValueError) as err: syslog.syslog(syslog.LOG_ERR, str(err)) return False return True def set_lpmode(self, lpmode): """ Sets the lpmode (low power mode) of SFP """ if self.index > 64: return False lpmode_ctrl = self.sfp_control + 'qsfp_lpmode' try: reg_file = open(lpmode_ctrl, "r+") reg_hex = reg_file.readline().rstrip() # content is a string containing the hex # representation of the register reg_value = int(reg_hex, 16) # Mask off the bit corresponding to our port if (self.sfp_ctrl_idx > 15): index = self.sfp_ctrl_idx % 16 else: index = self.sfp_ctrl_idx mask = (1 << index) # LPMode is active high; set or clear the bit accordingly if lpmode is True: reg_value = (reg_value | mask) else: reg_value = (reg_value & ~mask) # Convert our register value back to a hex string and write back content = hex(reg_value) reg_file.seek(0) reg_file.write(content) reg_file.close() except (IOError, ValueError) as err: syslog.syslog(syslog.LOG_ERR, str(err)) return False return True def get_status(self): """ Retrieves the operational status of the device """ reset = self.get_reset_status() if reset: status = False else: status = True return status def get_position_in_parent(self): """ Retrieves 1-based relative physical position in parent device. Returns: integer: The 1-based relative physical position in parent device or -1 if cannot determine the position """ return self.index def is_replaceable(self): """ Indicate whether this device is replaceable. Returns: bool: True if it is replaceable. """ return True def get_error_description(self): """ Retrives the error descriptions of the SFP module Returns: String that represents the current error descriptions of vendor specific errors In case there are multiple errors, they should be joined by '|', like: "Bad EEPROM|Unsupported cable" """ if not self.get_presence(): return self.SFP_STATUS_UNPLUGGED else: if not os.path.isfile(self.eeprom_path): return "EEPROM driver is not attached" try: with open(self.eeprom_path, mode="rb", buffering=0) as eeprom: eeprom.seek(QSFP_INFO_OFFSET) eeprom.read(1) except OSError as e: return "EEPROM read failed ({})".format(e.strerror) return self.SFP_STATUS_OK