platform/broadcom/sonic-platform-modules-dell/z9332f/sonic_platform/sfp.py (262 lines of code) (raw):
#!/usr/bin/env python
"""
#############################################################################
# DELLEMC Z9332F
#
# Module contains an implementation of SONiC Platform Base API and
# provides the platform information
#
#############################################################################
"""
try:
import os
import time
import mmap
from sonic_platform_base.sonic_xcvr.sfp_optoe_base import SfpOptoeBase
except ImportError as err:
raise ImportError(str(err) + "- required module not found")
QSFP_INFO_OFFSET = 128
SFP_INFO_OFFSET = 0
QSFP_DD_PAGE0 = 0
SFP_TYPE_LIST = [
'0x3' # SFP/SFP+/SFP28 and later
]
QSFP_TYPE_LIST = [
'0xc', # QSFP
'0xd', # QSFP+ or later
'0x11' # QSFP28 or later
]
QSFP_DD_TYPE_LIST = [
'0x18' #QSFP_DD Type
]
class Sfp(SfpOptoeBase):
"""
DELLEMC Platform-specific Sfp class
"""
BASE_RES_PATH = "/sys/bus/pci/devices/0000:09:00.0/resource0"
_port_to_i2c_mapping = {
1: 10,
2: 11,
3: 12,
4: 13,
5: 14,
6: 15,
7: 16,
8: 17,
9: 18,
10: 19,
11: 20,
12: 21,
13: 22,
14: 23,
15: 24,
16: 25,
17: 26,
18: 27,
19: 28,
20: 29,
21: 30,
22: 31,
23: 32,
24: 33,
25: 34,
26: 35,
27: 36,
28: 37,
29: 38,
30: 39,
31: 40,
32: 41,
33: 1,
34: 2
}
def __init__(self, index, sfp_type, eeprom_path):
"""
SFP Dunder init
"""
SfpOptoeBase.__init__(self)
self.index = index
self.eeprom_path = eeprom_path
#port_type is the native port type and sfp_type is the transceiver type
#sfp_type will be detected in get_transceiver_info
self.port_type = sfp_type
self.sfp_type = self.port_type
self._initialize_media(delay=False)
def get_eeprom_path(self):
"""
Returns SFP eeprom path
"""
return self.eeprom_path
def get_name(self):
"""
Returns native transceiver type
"""
return "QSFP-DD Double Density 8X Pluggable Transceiver" if self.index < 33 else "SFP/SFP+/SFP28"
@staticmethod
def pci_mem_read(mem, offset):
"""
Returns the desired byte in PCI memory space
"""
mem.seek(offset)
return mem.read_byte()
@staticmethod
def pci_mem_write(mem, offset, data):
"""
Writes the desired byte in PCI memory space
"""
mem.seek(offset)
# print "data to write:%x"%data
mem.write_byte(data)
def pci_set_value(self, resource, val, offset):
"""
Sets the value in PCI memory space
"""
filed = os.open(resource, os.O_RDWR)
mem = mmap.mmap(filed, 0)
self.pci_mem_write(mem, offset, val)
mem.close()
os.close(filed)
return val
def pci_get_value(self, resource, offset):
"""
Retrieves the value from PCI memory space
"""
filed = os.open(resource, os.O_RDWR)
mem = mmap.mmap(filed, 0)
val = self.pci_mem_read(mem, offset)
mem.close()
os.close(filed)
return val
def _initialize_media(self, delay=False):
"""
Initialize the media type and eeprom driver for SFP
"""
if delay:
time.sleep(1)
self._xcvr_api = None
self.get_xcvr_api()
self.set_media_type()
self.reinit_sfp_driver()
def get_presence(self):
"""
Retrieves the presence of the sfp
Returns : True if sfp is present and false if it is absent
"""
# Check for invalid port_num
mask = {'QSFP_DD' : (1 << 4), 'SFP' : (1 << 0)}
# Port offset starts with 0x4004
port_offset = 16388 + ((self.index-1) * 16)
try:
status = self.pci_get_value(self.BASE_RES_PATH, port_offset)
reg_value = int(status)
# ModPrsL is active low
if reg_value & mask[self.port_type] == 0:
return True
except ValueError:
pass
return False
def get_reset_status(self):
"""
Retrives the reset status of SFP
"""
reset_status = False
try:
if self.port_type == 'QSFP_DD':
# Port offset starts with 0x4000
port_offset = 16384 + ((self.index-1) * 16)
status = self.pci_get_value(self.BASE_RES_PATH, port_offset)
reg_value = int(status)
# Mask off 4th bit for reset status
mask = (1 << 4)
reset_status = not (reg_value & mask)
except ValueError:
pass
return reset_status
def get_lpmode(self):
"""
Retrieves the lpmode(low power mode) of this SFP
"""
lpmode_state = False
try:
if self.sfp_type == 'QSFP_DD':
return SfpOptoeBase.get_lpmode(self)
else:
# Port offset starts with 0x4000
port_offset = 16384 + ((self.index-1) * 16)
status = self.pci_get_value(self.BASE_RES_PATH, port_offset)
reg_value = int(status)
# Mask off 6th bit for lpmode
mask = (1 << 6)
lpmode_state = (reg_value & mask)
except ValueError:
pass
return bool(lpmode_state)
def reset(self):
"""
Reset the SFP and returns all user settings to their default state
"""
try:
if self.port_type == 'QSFP_DD':
# Port offset starts with 0x4000
port_offset = 16384 + ((self.index-1) * 16)
status = self.pci_get_value(self.BASE_RES_PATH, port_offset)
reg_value = int(status)
# Mask off 4th bit for reset
mask = (1 << 4)
# ResetL is active low
reg_value = reg_value & ~mask
# Convert our register value back to a hex string and write back
self.pci_set_value(self.BASE_RES_PATH, reg_value, port_offset)
# Sleep 1 second to allow it to settle
time.sleep(1)
reg_value = reg_value | mask
# Convert our register value back to a hex string and write back
self.pci_set_value(self.BASE_RES_PATH, reg_value, port_offset)
else:
return False
except ValueError:
return False
return True
def set_lpmode(self, lpmode):
"""
Sets the lpmode(low power mode) of this SFP
"""
try:
if self.sfp_type == 'QSFP_DD':
return SfpOptoeBase.set_lpmode(self, lpmode)
else:
# Port offset starts with 0x4000
port_offset = 16384 + ((self.index-1) * 16)
status = self.pci_get_value(self.BASE_RES_PATH, port_offset)
reg_value = int(status)
# Mask off 6th bit for lowpower mode
mask = (1 << 6)
# LPMode is active high; set or clear the bit accordingly
if lpmode is True:
reg_value = reg_value | mask
else:
reg_value = reg_value & ~mask
# Convert our register value back to a hex string and write back
self.pci_set_value(self.BASE_RES_PATH, reg_value, port_offset)
except ValueError:
return False
return True
def get_intl_state(self):
"""
Sets the intL (interrupt; active low) pin of this SFP
"""
intl_state = True
try:
if self.port_type == 'QSFP_DD':
# Port offset starts with 0x4004
port_offset = 16388 + ((self.index-1) * 16)
status = self.pci_get_value(self.BASE_RES_PATH, port_offset)
reg_value = int(status)
# Mask off 4th bit for intL
mask = (1 << 4)
intl_state = (reg_value & mask)
except ValueError:
pass
return intl_state
def set_media_type(self):
"""
Reads optic eeprom byte to determine media type inserted
"""
eeprom_raw = []
eeprom_raw = self._xcvr_api_factory._get_id()
if eeprom_raw is not None:
eeprom_raw = hex(eeprom_raw)
if eeprom_raw in SFP_TYPE_LIST:
self.sfp_type = 'SFP'
elif eeprom_raw in QSFP_TYPE_LIST:
self.sfp_type = 'QSFP'
elif eeprom_raw in QSFP_DD_TYPE_LIST:
self.sfp_type = 'QSFP_DD'
else:
#Set native port type if EEPROM type is not recognized/readable
self.sfp_type = self.port_type
else:
self.sfp_type = self.port_type
return self.sfp_type
def reinit_sfp_driver(self):
"""
Changes the driver based on media type detected
"""
del_sfp_path = "/sys/class/i2c-adapter/i2c-{0}/delete_device".format(
self._port_to_i2c_mapping[self.index])
new_sfp_path = "/sys/class/i2c-adapter/i2c-{0}/new_device".format(
self._port_to_i2c_mapping[self.index])
driver_path = "/sys/class/i2c-adapter/i2c-{0}/{0}-0050/name".format(
self._port_to_i2c_mapping[self.index])
if not os.path.isfile(driver_path):
print(driver_path, "does not exist")
return False
try:
with os.fdopen(os.open(driver_path, os.O_RDONLY)) as filed:
driver_name = filed.read()
driver_name = driver_name.rstrip('\r\n')
driver_name = driver_name.lstrip(" ")
#Avoid re-initialization of the QSFP/SFP optic on QSFP/SFP port.
if self.sfp_type == 'SFP' and driver_name in ['optoe1', 'optoe3']:
with open(del_sfp_path, 'w') as f:
f.write('0x50\n')
time.sleep(0.2)
with open(new_sfp_path, 'w') as f:
f.write('optoe2 0x50\n')
time.sleep(2)
elif self.sfp_type == 'QSFP' and driver_name in ['optoe2', 'optoe3']:
with open(del_sfp_path, 'w') as f:
f.write('0x50\n')
time.sleep(0.2)
with open(new_sfp_path, 'w') as f:
f.write('optoe1 0x50\n')
time.sleep(2)
elif self.sfp_type == 'QSFP_DD' and driver_name in ['optoe1', 'optoe2']:
with open(del_sfp_path, 'w') as f:
f.write('0x50\n')
time.sleep(0.2)
with open(new_sfp_path, 'w') as f:
f.write('optoe3 0x50\n')
time.sleep(2)
except IOError as err:
print("Error: Unable to open file: %s" %str(err))
return False
return True
def get_position_in_parent(self):
"""
Retrieves 1-based relative physical position in parent device.
Returns:
integer: The 1-based relative physical position in parent
device or -1 if cannot determine the position
"""
return self.index
@staticmethod
def is_replaceable():
"""
Indicate whether this device is replaceable.
Returns:
bool: True if it is replaceable.
"""
return True
def get_error_description(self):
"""
Retrives the error descriptions of the SFP module
Returns:
String that represents the current error descriptions of vendor specific errors
In case there are multiple errors, they should be joined by '|',
like: "Bad EEPROM|Unsupported cable"
"""
if not self.get_presence():
return self.SFP_STATUS_UNPLUGGED
else:
if not os.path.isfile(self.eeprom_path):
return "EEPROM driver is not attached"
if self.sfp_type == 'SFP':
offset = SFP_INFO_OFFSET
elif self.sfp_type == 'QSFP':
offset = QSFP_INFO_OFFSET
elif self.sfp_type == 'QSFP_DD':
offset = QSFP_DD_PAGE0
try:
with open(self.eeprom_path, mode="rb", buffering=0) as eeprom:
eeprom.seek(offset)
eeprom.read(1)
except OSError as e:
return "EEPROM read failed ({})".format(e.strerror)
return self.SFP_STATUS_OK