pci_vpd_lib/pci_vpd_lib.py (87 lines of code) (raw):
'''
Copyright (c) Facebook, Inc. and its affiliates.
This source code is licensed under the MIT license found in the
LICENSE file in the root directory of this source tree.
'''
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
import io
import os
from collections import OrderedDict
class VPDDataException(Exception):
pass
class VitalProductDataReader:
"""
Class for reading Vital Product Data for PCI devices through sysfs.
It follows the format defined by
PCI Local Bus Specification v2.2 (Appendix I)
http://www.ics.uci.edu/~harris/ics216/pci/PCI_22.pdf
Note: this class avoids touching read/write fields, because this may
be very slow.
"""
fields = OrderedDict()
identifier_string = None
def __init__(self, vpd_path):
if os.path.exists(vpd_path):
with io.FileIO(vpd_path, 'rb') as inf:
self._read_vpd(inf)
def _combine_checksum(self, a, buf):
for x in buf:
a = (a + x) & 0xFF
return a
# Read resource data header, return its tag, size and header checksum.
def _read_resource_dt_header(self, f):
checksum = 0
buf = bytearray(f.read(1))
# not enough data, return END tag
if len(buf) < 1:
return (0xF, 0, 0)
checksum = self._combine_checksum(checksum, buf)
# handle small resource data type
if (buf[0] & 0x80) == 0:
# small resource data type header just one byte with mask
# 0b0XXXXYYY, where XXXX is a tag id and YYY is length
tag = (buf[0] & 0b1111000) >> 3
length = buf[0] & 0b111
return (tag, length, checksum)
else:
# large resource data type header consists of three bytes
# first one is 0b1XXXXXXX, where XXXXXXX is a tag id and
# next two bytes are length with first of the two being least
# significant
tag = buf[0] & 0x7F
# read the length
buf = bytearray(f.read(2))
# handle not enough data
if len(buf) < 2:
return (0xF, 0, 0)
checksum = self._combine_checksum(checksum, buf)
return (tag, buf[0] + buf[1] * 256, checksum)
# For compatibility between py2.7 and py3, due to different FileIO.read
# return type.
def _value_to_str(self, data):
if type(data) is str:
return data
elif type(data) is bytes:
return data.decode('ascii', errors='replace')
def _process_vpd_list(self, data):
# Each item header is 3 bytes long. 2 bytes for the key and 1
# byte for the length of the data
while len(data) >= 3:
key = data[0:2].decode('ascii')
length = bytearray(data[2:3])[0]
# Check if we have enough data
if len(data) < 3 + length:
return
# RV is a checksum and reserved bytes. It doesn't provide any
# useful information and is used just for checksum, skip it.
if key != 'RV':
value = self._value_to_str(data[3:3 + length]).strip()
self.fields[key] = value
data = data[3 + length:]
def _read_vpd(self, f):
tag = 0
checksum = 0
while tag != 0xF:
(tag, l, header_sum) = self._read_resource_dt_header(f)
if tag == 0x02:
# 0x02 is an identifier string tag
value = f.read(l)
if len(value) < l:
raise VPDDataException('VPD data is truncated!')
checksum =\
self._combine_checksum(checksum, bytearray([header_sum]))
checksum =\
self._combine_checksum(checksum, bytearray(value))
value = self._value_to_str(value).strip()
self.identifier_string = value
elif tag == 0x10:
# This is a VPD-R tag (i.e. read only data), read the data and
# parse the fields.
data = f.read(l)
if len(data) < l:
raise VPDDataException('VPD data is truncated!')
checksum =\
self._combine_checksum(checksum, bytearray([header_sum]))
checksum =\
self._combine_checksum(checksum, bytearray(data))
if checksum != 0:
raise VPDDataException('VPD-R checksum failed!')
self._process_vpd_list(data)
elif tag == 0x11:
# This is a VPD-W tag (i.e. read/write data, skip it).
f.seek(l, io.SEEK_CUR)
elif tag != 0xF:
raise VPDDataException('Unknown VPD tag {}!'.format(tag))