fiosynth_lib/flash_config.py (216 lines of code) (raw):

# Copyright (c) Facebook, Inc. and its affiliates. # # AUTHOR = "Darryl Gardner <darryleg@fb.com>" # # flash_config.py- Logs flash configuration information in .csv format # By default, results will be stored in "flashconfig.csv" file. # from __future__ import absolute_import from __future__ import division from __future__ import print_function from __future__ import unicode_literals import argparse import csv import json import os import shlex import subprocess import sys from subprocess import PIPE, Popen def set_attributes(): # # Attribute Table Definition # parser = argparse.ArgumentParser( description="flash_config- Logs flash configuration in .csv file." ) parser.add_argument( "-f", action="store", dest="filename", type=str, help=("(Optional) Flash configuration filename (default = " "flashconfig.csv"), default="flashconfig.csv", ) args = parser.parse_args() return args def smartctlToJson(data): # Skip boot device (sda) index = 1 device = "disk" smart = {} KEY = 0 VALUE = 2 while index != len(data[device]): syntax = "smartctl -i /dev/%s | grep :" % (data[device][index]) lb = subprocess.Popen(syntax, stdout=subprocess.PIPE, shell=True) smart.setdefault(index, {}) device_path = "/dev/%s" % data[device][index] smart[index].setdefault("Device Path:", device_path) for line in lb.stdout: bits = line.split() a = "%s %s" % (bits[KEY].decode("utf-8"), bits[KEY + 1].decode("utf-8")) try: b = "%s %s" % ( bits[VALUE].decode("utf-8"), bits[VALUE + 1].decode("utf-8"), ) except IndexError: b = "%s" % (bits[VALUE].decode("utf-8")) smart.setdefault(index, {}) smart[index].setdefault(a, b) index += 1 return json.dumps(smart) def drivesToJson(): lb = subprocess.Popen(["/bin/lsblk", "-rnb"], stdout=subprocess.PIPE) drives = {} TYPE = 5 DEVICE = 0 maxcol = max(TYPE, DEVICE) for line in lb.stdout: bits = line.split() if len(bits) > maxcol: drive_type = bits[TYPE].decode("utf-8") drive_device = bits[DEVICE].decode("utf-8") drives.setdefault(drive_type, []) drives[drive_type].append(drive_device) return json.dumps(drives) def cmdline(cmd): process = Popen(args=cmd, stdout=PIPE, shell=True) return process.communicate()[0] def new_csv(f): try: col_names = [] col_names = [ "Index", "DevicePath", "Capacity", "ModelNumber", "SerialNumber", "Firmware", "Hostname", "KernelVersion", ] writer = csv.writer(f) writer.writerow((col_names)) except IOError: print("cannot write to ", f) f.close() sys.exit(1) def print_nvme_line(f, data, hostname, kernel): CAPACITY_KEY = 3 for datum in data: device = datum["DevicePath"] syntax = "lsblk -rnb %s | grep disk" % device capacity = cmdline(syntax).split()[CAPACITY_KEY] try: writer = csv.writer(f) row = ( datum["Index"], datum["DevicePath"], capacity, datum["ModelNumber"], datum["SerialNumber"], datum["Firmware"], hostname, kernel, ) writer.writerow(row) except IOError: print("cannot write to ", f) f.close() sys.exit(1) def print_flash_line(f, data, hostname, kernel): # Use flash_manager for legacy flash card support. card_id = "card.1" try: writer = csv.writer(f) writer.writerow( ( data[card_id]["pci_address"], data[card_id]["logical_location"], data[card_id]["size"], data[card_id]["board_name"], data[card_id]["sn"], data[card_id]["firmware_version"], hostname, kernel, ) ) except IOError: print("cannot write to ", f) sys.exit(1) def print_smart_line(f, data, hostname, kernel): KEY = "1" index = 1 while index - 1 <= len(data[KEY]): sidx = str(index) if data[sidx]["Rotation Rate:"] == "Solid State": try: writer = csv.writer(f) writer.writerow( ( index, data[sidx]["Device Path:"], data[sidx]["User Capacity:"], data[sidx]["Device Model:"], data[sidx]["Serial Number:"], data[sidx]["Firmware Version:"], hostname, kernel, ) ) except IOError: print("cannot write to ", f) sys.exit(1) index += 1 def print_csv_line(f, data, tool): hostname = cmdline("uname -n").decode("utf-8").rstrip() kernel = cmdline("uname -r").decode("utf-8").rstrip() # nvme tool will work for all NVMe flash devices if tool == "nvme": print_nvme_line(f, data["Devices"], hostname, kernel) elif tool == "flash_manager": print_flash_line(f, data, hostname, kernel) elif tool == "smartctl": print_smart_line(f, data, hostname, kernel) else: print("tool: %s not found." % tool) def command_exist(cmd): if cmd.split()[0] not in ["nvme", "flash_manager", "smartctl"]: return False args = shlex.split(cmd) try: subprocess.call( args, stdout=open(os.devnull, "wb"), stderr=open(os.devnull, "wb") ) except OSError: print("%s command not installed" % cmd) return False test = cmdline(cmd) if test.decode("UTF-8") == "": return False else: return True class GetFlashConfig: def get_json(self): if command_exist("nvme list"): syntax = "nvme list -o json" tool = "nvme" args = shlex.split(syntax) config = subprocess.check_output(args) elif command_exist("flash_manager status"): syntax = "flash_manager status --json" tool = "flash_manager" args = shlex.split(syntax) config = subprocess.check_output(args) elif command_exist("smartctl -i /dev/sdb"): tool = "smartctl" devices = drivesToJson() data = json.loads(devices) config = smartctlToJson(data) else: print("Flash configuration tool not found.") sys.exit(1) config_as_json = json.loads(config) return config_as_json, tool def json_to_csv(self, path, config, csv_file, tool): out_file = os.path.join(path, csv_file) with open(out_file, "w") as csv_out: new_csv(csv_out) print_csv_line(csv_out, config, tool) print('Flash configuration filename is "%s"' % csv_file) def main(): args = set_attributes() config = GetFlashConfig() config_as_json, tool = config.get_json() config.json_to_csv(".", config_as_json, args.filename, tool) if __name__ == "__main__": main()