fiosynth_lib/fiosynth.py (718 lines of code) (raw):
#!/usr/bin/env python3
# Copyright (c) Facebook, Inc. and its affiliates.
#
# AUTHOR = 'Darryl Gardner <darryleg@fb.com>'
import argparse
import datetime
import json
import os
import os.path
import shutil
import socket
import subprocess
import sys
from distutils.version import LooseVersion
from random import randint
from subprocess import PIPE, Popen
from . import fio_json_parser
from . import flash_config
from . import health_tools
class Parser:
def __init__(self, jname, cname):
self.json_file = jname
self.json_path = "."
self.all_json = ""
self.csv_path = "."
self.csv_file = cname
self.serverMode = "n"
self.combine_csv_path = ""
def parseLocalResults(args):
fio_json_parser.main(args)
def set_attributes():
#
# Attribute Table Definition
#
parser = argparse.ArgumentParser(
description="FB fio Synthetic Benchmark Suite for storage ver 3.5.48"
)
parser.add_argument(
"-d",
action="store",
dest="device",
type=str,
help=(
"(Required) device path for single device target, ALL for all"
"data devices, or ALLRAID for all mdraid devices"
),
required=True,
default="",
)
parser.add_argument(
"-c",
action="store",
dest="factor",
type=float,
help="(Optional) specify capacity in TB (default = <device capacity>)",
default=-1.0,
)
parser.add_argument(
"-w",
action="store",
dest="wklds",
type=str,
help="(Required) filename for workload suite (default = )",
required=True,
default="",
)
parser.add_argument(
"-f",
action="store",
dest="fname",
type=str,
help="(Required) Results filename (default = )",
required=True,
default=".",
)
parser.add_argument(
"-r",
action="store",
dest="dryrun",
type=str,
help="(Optional) Set to y to do dry run (default = n)",
default="n",
)
parser.add_argument(
"-t",
action="store",
dest="health",
type=str,
help="(Optional) Enter Health Monitoring Tool Syntax (default = )",
default="",
)
parser.add_argument(
"-p",
action="store",
dest="prep",
type=str,
help="(Optional) Set to n to skip drive prep, o to prep on first cycle "
"only (default = y)",
default="y",
)
parser.add_argument(
"-n",
action="store",
dest="cycles",
type=int,
help="(Optional) Specific the number of run cycles (default = 3 )",
default=-1,
)
parser.add_argument(
"-g",
action="store",
dest="getflash",
type=str,
help="(Optional) Set to y to enable flash configuration logging "
"(default = n)",
default="n",
)
parser.add_argument(
"-s",
action="append",
dest="servers",
type=str,
help="(Optional) Add a server to the list for client/server mode",
default=[],
)
parser.add_argument(
"-l",
action="store",
dest="server_file",
type=str,
help="(Optional) Path to a text file with a server name on each line",
default="",
)
parser.add_argument(
"-j",
action="store",
dest="job_scale",
type=int,
help="(Optional) Scale by jobs (default = 1 job per drive)",
default=1,
)
parser.add_argument(
"-x",
action="store_true",
dest="exitall",
help="(Optional) Pass --exitall to fio",
)
parser.add_argument(
"-z",
action="store_true",
dest="deterministic",
help="(Optional) Static file and directory names",
)
parser.add_argument(
"-m",
action="store",
dest="misc",
type=str,
help="(Optional) Set a misc variable in a workload suite" "(default = )",
default="",
)
parser.add_argument(
"-e",
action="store",
dest="expert",
type=str,
help="(Optional) Pass this string directly to fio command line invocation and attach just before jobfile argument"
"(default = )",
default="",
)
parser.add_argument(
"-u",
action="store",
dest="user",
type=str,
help="(Optional) The user to login as on the server when running fiosynth in client/server mode (default = root)",
default="root",
)
parser.add_argument(
"-y",
action="store",
dest="tunneling",
type=str,
help="(Optional) Set to y to perform server/client mode fio via SSH tunnels. (default = n)",
default="n",
)
parser.add_argument("-v", action="version", version=parser.description)
args = parser.parse_args()
return args
def checkVersion(tool):
cmd = None
if tool == "fio":
minVersion = "2.20"
cmd = "fio -v | sed s/fio-//"
if cmd is not None:
min_version = LooseVersion(minVersion)
v3_version = LooseVersion("3.0")
version = cmdline(cmd).rstrip()
fio_version = LooseVersion(version.decode("utf-8"))
# Using utf-8 encoding to insure that version is reported as a string
if fio_version < min_version:
print("%s older than version %s" % (tool, minVersion))
sys.exit(1)
if fio_version >= v3_version:
return 3
else:
return 2
else:
print("Unknown tool %s, can't check version." % tool)
sys.exit(1)
def cmdline(cmd):
process = Popen(args=cmd, stdout=PIPE, shell=True)
return process.communicate()[0]
def checkMounted(device, dut):
cmd = "grep -c %s /proc/mounts" % device
if not dut.inLocalMode():
dutSsh = getSshProc(dut)
out, err = dutSsh.communicate(cmd)
mount = out.strip()
else:
mount = cmdline(cmd)
if int(mount) > 0:
return True
else:
return False
def getAllDataDrives(data, command, profile, dut):
# getAllDataDrives creates a list of devices separated by a ':'.
# An example of the string that is created in this function is
# '/dev/sdb:/dev/sdc:/dev/sdd:/dev/sde:/dev/sdf:'.
# If devices_in_global option is 'N' each device will be listed
# on a separate line.
if command == "ALL":
device = "disk"
# Skip boot device (sda)
else:
device = getRaidLevel(data)
dev_path = ""
dev_list = set()
for dev in data[device]:
if checkFileExist(dev, dut) is True:
if checkMounted(dev, dut) is False:
if profile["devices_in_global"] == "N":
dev_path += "[%s]\n" % dev
dev_path += "filename=%s\n" % dev
dut.device += "%s:" % dev
else:
if dev not in dev_list:
dev_list.add(dev)
dev_path += "%s:" % dev
return dev_path
def createTempJobTemplate(dut, jobname):
# (Used to be called createTempJobFile, renamed for clarity)
# Creates a temporary job template
dst_file = "tmp.fio"
shutil.copyfile(jobname, dst_file)
with open(dst_file, "a") as tmp_file:
try:
tmp_file.write(dut.dev_list)
except IOError:
print("cannot write to %s" % tmp_file)
sys.exit(1)
return dst_file
def getTotalDataCapacity(data):
VAL = 1
data_cap = int(data["disk"][VAL])
return data_cap
# For checking devices, but can also be used for checking files
def checkFileExist(path, dut):
if dut.inLocalMode():
try:
os.stat(path)
except OSError:
print("%s does not exist " % path)
return False
return True
else:
proc = getSshProc(dut)
cmdStr = "stat %s 2> /dev/null | wc -l" % path
out, err = proc.communicate(cmdStr)
result = out.strip()
if int(result) > 0:
return True
else:
print("%s does not exist on server %s " % (path, dut.serverName))
return False
def getNumJobs(data, command, dut):
# getNumJobs checks the number of devices in the system and
# subtracts by 1 to avoid counting the boot device if using
# ALL option. If using ALLRAID option, it will could the
# number of unique mdraid devices. Number of jobs is used
# to scale fio jobfiles for multiple devices.
if command == "ALL":
jobs = len(data["disk"]) - 1
else:
dev_list = set()
raid = getRaidLevel(data)
for dev in data[raid]:
if checkFileExist(dev, dut) is True:
dev_list.add(dev)
jobs = len(dev_list)
return jobs
def getSshProc(dut):
sshProc = subprocess.Popen(
["ssh", dut.sshUser + "@" + dut.serverName, "/bin/bash"],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
universal_newlines=True,
)
return sshProc
def getMultiplier(capacity):
factor = float(capacity) / 1e12
return max(round(factor, 1), 0.1)
def getJobVars(dut, profile, scale_factor):
vars_static = [
"SIZE",
"TIME",
"RAMPTIME",
"RATE1",
"RATE2",
"BLKSIZE",
"DEPTH1",
"DEPTH2",
"MISC",
]
vars_scaleup = ["RRATE", "DEPTH", "OFFSET2"]
vars_scaledown = ["W1THINK", "W2THINK", "W3THINK", "W4THINK", "W5THINK"]
job_vars = {}
for var in vars_static:
if var in profile:
job_vars[var] = profile[var]
for var in vars_scaleup:
if var in profile:
if var == "OFFSET2":
job_vars["OFFSET2"] = int(
profile["OFFSET2"] * getMultiplier(dut.capacity)
)
else:
job_vars[var] = int(profile[var] * scale_factor)
if job_vars[var] < 1:
job_vars[var] = 1
for var in vars_scaledown:
if var in profile:
job_vars[var] = int(int(profile[var]) / scale_factor)
if job_vars[var] < 1:
job_vars[var] = 1
job_vars["DEV"] = dut.device
job_vars["JOBS"] = dut.numjobs
if "JOBS" in profile:
job_vars["JOBS"] *= profile["JOBS"]
job_vars["OFFSET1"] = dut.offset
job_vars["INCREMENT"] = dut.increment
return job_vars
def run_fio(p, VAL, dut_list, args, run, rtype):
resultsFileName = "%s/%s_run%d.json" % (FioDUT.fname, p[rtype][VAL]["alias"], run)
exitall_flag = " "
if args.exitall:
exitall_flag = " --exitall "
fioCmd = "fio --output-format=json%s--output=%s " % (
exitall_flag,
resultsFileName,
)
fioCmd += " " + args.expert.lstrip().rstrip() + " "
currDir = os.getcwd()
tmpJobDir = ""
if not dut_list[0].inLocalMode():
tmpJobDir = os.path.join(currDir, "tmpJobFiles")
cmdline("rm -rf %s" % (tmpJobDir))
cmdline("mkdir %s" % (tmpJobDir))
for dut in dut_list:
template = os.path.join(FioDUT.jobfiles, p[rtype][VAL]["template"])
f = dut.factor
jobVars = getJobVars(dut, p[rtype][VAL]["values"], f)
if args.misc != "":
jobVars["MISC"] = args.misc
if p["devices_in_global"] == "N":
template = createTempJobTemplate(dut, template)
if dut.inLocalMode():
for k, v in jobVars.items():
fioCmd = k + "=" + str(v) + " " + fioCmd
fioCmd = fioCmd + template
else:
f = open(template, "r")
tmpJbStr = f.read()
f.close()
for var in jobVars.keys():
tmpJbStr = tmpJbStr.replace("${%s}" % str(var), str(jobVars[var]))
tmpJbFileName = dut.serverName + "tmpJbFile"
tmpJbFilePath = os.path.join(tmpJobDir, tmpJbFileName)
cmdline("touch %s" % tmpJbFilePath)
try:
tmpFile = open(tmpJbFilePath, "w")
tmpFile.write(tmpJbStr)
finally:
tmpFile.close()
if dut.tunnel:
fioCmd = fioCmd + (
" --client=ip6:localhost,%d %s" % (dut.sshTunnelPort, tmpJbFilePath)
)
else:
fioCmd = fioCmd + (
" --client=ip6:%s %s" % (dut.serverName, tmpJbFilePath)
)
if args.dryrun == "n":
cmdline(fioCmd)
else:
print(fioCmd)
return resultsFileName
def isPortAvailable(port):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
sock.bind(("127.0.0.1", port))
except socket.error:
return False
finally:
sock.close()
return True
class FioDUT:
# FioDUT (Device Under Test) represents a single machine that runs fio jobs
jobfiles = "/usr/local/fb-FioSynthFlash/jobfiles"
wkldsuites = "/usr/local/fb-FioSynthFlash/wkldsuites"
fname = ""
prep = "prep"
csvfname = ""
nextTunnelPort = 8765
def __init__(self, sName="", user=""):
self.factor = 0.0
self.numjobs = 1
self.offset = 0
self.increment = 30
self.dev_list = "" # String, not a list!
self.device = "" # This replaces args.device
self.capacity = 0
self.serverName = sName # If blank string, then local mode
self.sshUser = user
self.sshTunnelPort = 0
self.tunnel = None
def __del__(self):
if isinstance(self.tunnel, Popen):
self.tunnel.terminate()
def inLocalMode(self):
return self.serverName == ""
@classmethod
def getTunnelPort(cls):
port = -1
while cls.nextTunnelPort < 65535:
try:
if not isPortAvailable(cls.nextTunnelPort):
continue
else:
port = cls.nextTunnelPort
break
finally:
cls.nextTunnelPort += 1
return port
def drivesToJson(dut):
drives = {}
TYPE = 5
DEVICE = 0
maxcol = max(TYPE, DEVICE)
output = None
if dut.inLocalMode():
proc = subprocess.Popen(["/bin/lsblk", "-rnbp"], stdout=subprocess.PIPE)
output = proc.stdout.read()
output = output.decode("utf-8")
else:
sshProc = getSshProc(dut)
output, err = sshProc.communicate("/bin/lsblk -rnbp\n")
for line in output.splitlines():
bits = line.split()
if len(bits) > maxcol:
# drive_type = bits[TYPE].decode("utf-8")
drive_type = bits[TYPE]
# drive_device = bits[DEVICE].decode("utf-8")
drive_device = bits[DEVICE]
drives.setdefault(drive_type, [])
drives[drive_type].append(drive_device)
return json.dumps(drives)
def createOffsetFile(dut, dst_file):
if dut.inLocalMode():
try:
tmp_file = open(dst_file, "w")
tmp_file.write(str(dut.offset))
except IOError:
print("cannot write to %s" % tmp_file)
sys.exit(1)
finally:
tmp_file.close()
return
else:
dutSsh = getSshProc(dut)
dutSsh.communicate('echo "%s" > %s' % (str(dut.offset), dst_file))
return
def readOffsetFile(dut, dst_file):
if checkFileExist(dst_file, dut):
tmp_file = open(dst_file, "r")
try:
dut.offset = int(tmp_file.readline().strip())
except IOError:
print("cannot read from %s" % tmp_file)
sys.exit(1)
tmp_file.close()
else:
if dut.inLocalMode():
print("offset file (%s) does not exist" % dst_file)
else:
print(
"offset file (%s) does not exist on server %s"
% (dst_file, dut.server_name)
)
print("Device has not been preconditioned yet")
sys.exit(1)
def getRaidLevel(data):
raid = None
raidLevels = ["raid0", "raid1", "raid5", "raid6"]
for x in data:
for y in raidLevels:
if x.startswith(y):
raid = x
break
if raid:
return raid
else:
print("No mdraid arrays found")
sys.exit(1)
def setDutCapacity(dut, cmd, profile):
if not cmd:
if not dut.inLocalMode():
host = " on %s" % dut.serverName
else:
host = ""
sys.exit("No devices available or device is mounted%s." % host)
if profile["devices_in_global"] != "N":
dut.device = dut.dev_list
if not dut.inLocalMode():
dutSsh = getSshProc(dut)
out, err = dutSsh.communicate(cmd)
dut.capacity = out.strip()
else:
# set capacity to the smallest device under test
capacity = cmdline(cmd)
if (int(capacity) < int(dut.capacity)) or (int(dut.capacity) == 0):
dut.capacity = capacity
def loadDevList(dut_list, args, profile):
prefix = "lsblk -bno SIZE "
suffix = " | head -n 1"
command = args.device
if (command == "ALL") or (command == "ALLRAID"):
for dut in dut_list:
cmd = None
devices = drivesToJson(dut)
data = json.loads(devices)
dut.dev_list = getAllDataDrives(data, command, profile, dut)
devs = None
if command == "ALL":
devs = data["disk"]
else:
devs = data[getRaidLevel(data)]
for dev in devs:
if checkFileExist(dev, dut) and not checkMounted(dev, dut):
cmd = prefix + dev + suffix
setDutCapacity(dut, cmd, profile)
dut.numjobs = getNumJobs(data, command, dut)
else:
devs = command.split(":")
ndevs = len(devs)
for dut in dut_list:
for dev in devs:
cmd = None
if checkFileExist(dev, dut) and not checkMounted(dev, dut):
cmd = prefix + dev + suffix
if profile["devices_in_global"] == "N":
dut.dev_list += "[%s]\n" % dev
dut.dev_list += "filename=%s\n" % dev
setDutCapacity(dut, cmd, profile)
dut.device = command
dut.numjobs = ndevs
def startAoeServer(dut):
sshProc = getSshProc(dut)
ipAddr, err = sshProc.communicate("killall fio -q; hostname -i")
fioSvrCmd = (
"nohup fio --server=ip6:%s " % (ipAddr.rstrip())
+ "> /tmp/fio.log 2> /tmp/fio.err &\n"
)
sshProc = getSshProc(dut)
sshProc.stdin.write(fioSvrCmd)
sshProc.stdin.close()
sshProc.stdout.close()
def startSshTunnel(dut):
dut.sshTunnelPort = FioDUT.getTunnelPort()
if dut.sshTunnelPort <= 0:
print(
"Unable to find an available port for ssh tunneling for host %s."
% dut.serverName
)
sys.exit(1)
cmd = [
"ssh",
"%s@%s" % (dut.sshUser, dut.serverName),
"-N",
"-L",
"%d:%s:8765" % (dut.sshTunnelPort, dut.serverName),
]
dut.tunnel = Popen(cmd, stdout=subprocess.DEVNULL)
def clearDriveData(dut_list, dryrun="n"):
cmd = ""
if dut_list[0].inLocalMode():
cmd = "fio --name=trim --filename=%s --rw=trim --bs=1G" % (dut_list[0].device)
else:
cmd = "fio --name=trim --rw=trim --bs=1G "
for dut in dut_list:
if dut.tunnel:
cmd = cmd + "--client=ip6:localhost,%d --filename=%s" % (
dut.sshTunnelPort,
dut.device,
)
else:
cmd = cmd + "--client=ip6:%s --filename=%s" % (
dut.serverName,
dut.device,
)
if dryrun == "n":
cmdline(cmd)
def getServers(servers, server_file, user):
dut_list = [] # list of machines running tests
if len(servers) == 0 and server_file == "":
dut_list.append(FioDUT())
else:
if len(servers) > 0:
for server in servers:
dut_list.append(FioDUT(sName=server, user=user))
if not server_file == "":
try:
sf = open(server_file, "r")
for server in sf.read().split():
server = server.strip()
dut_list.append(FioDUT(sName=server, user=user))
except IOError:
print("Can't open server file")
finally:
sf.close()
return dut_list
def prepServers(dut_list, args, profile):
# The increment variable is used to set the "offset_increment" fio option
# for the readhammer workload. Setting this option allows the workload to
# read from 32 equally
# spaced regions on the flash device in parallel.
# The offset_increment is calculated by converting the flash device
# capacity from bytes to MiB then dividing by 32, representing 1/32
# of the total flash device capacity. Offet is in MiB.
for dut in dut_list:
if not dut.inLocalMode():
startAoeServer(dut)
if args.tunneling == "y":
startSshTunnel(dut)
fio_json_parser.tunnel2host[dut.sshTunnelPort] = dut.serverName
if dut.capacity:
dut.increment = int(float(dut.capacity) / 2 ** 20 / 32)
dut.offset = randint(0, dut.increment)
dut.numjobs *= args.job_scale
if args.factor <= 0.0:
if profile["scale_by_capacity"] != "N":
dut.factor = getMultiplier(dut.capacity)
else:
dut.factor = 1.0
else:
dut.factor = args.factor
def runHealthMon(fname, health="", flash=None):
if health != "":
runHealthTool = health_tools.HealthTools()
runHealthTool.logger(health)
if flash == "y":
filename = os.path.join(fname, "flashconfig.csv")
runGetFlashConfig = flash_config.GetFlashConfig()
config_as_json, tool = runGetFlashConfig.get_json()
runGetFlashConfig.json_to_csv(".", config_as_json, filename, tool)
def runTest(dut_list, profile, args, csvFolderPath, rtype, index, rcycle):
jfile = run_fio(profile, index, dut_list, args, rcycle, rtype)
if args.dryrun == "n":
if dut_list[0].inLocalMode(): # Health tools only works locally
runHealthMon(dut_list[0].fname, args.health, args.getflash)
results = Parser(jfile, "%s/%s.csv" % (FioDUT.fname, FioDUT.fname))
parseLocalResults(results)
else:
fio_json_parser.parseServerResults(jfile, csvFolderPath)
else:
print("parse file: %s" % jfile)
def runCycles(dut_list, profile, args, rc, pc, lp, csvFolderPath):
for rcycle in range(1, rc + 1):
if "pre" in profile and pc > 0 and lp > 0:
lp -= 1
for _pcycle in range(1, pc + 1):
for index in range(len(profile["pre"])):
clearDriveData(dut_list, args.dryrun)
runTest(
dut_list, profile, args, csvFolderPath, "pre", index, rcycle
)
for index in range(len(profile["def"])):
runTest(dut_list, profile, args, csvFolderPath, "def", index, rcycle)
def runSuite(args):
dut_list = getServers(args.servers, args.server_file, args.user)
# Use absolute path for workload suite files
wklds = os.path.join(FioDUT.wkldsuites, args.wklds)
profile = fio_json_parser.read_json(wklds)
# dst_file = '/usr/local/fb-FioSynthFlash/offset.txt'
dst_file = "/tmp/offset.txt"
loadDevList(dut_list, args, profile)
prepServers(dut_list, args, profile)
timestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M")
if args.deterministic:
FioDUT.fname = args.fname
else:
FioDUT.fname = "-".join([args.fname, timestamp])
FioDUT.csvfname = "-".join([FioDUT.fname, "compiled_results"])
csvFolderPath = os.path.join(os.getcwd(), FioDUT.csvfname)
print("Results are in directory: %s" % FioDUT.fname)
if not os.path.isdir(FioDUT.fname):
os.mkdir(FioDUT.fname)
if args.cycles == -1:
rc = profile["run_cycles"]
else:
rc = args.cycles
if "precondition_first_cycle_only" not in profile:
profile["precondition_first_cycle_only"] = None
if profile["precondition_first_cycle_only"] == "Y":
lp = 1
else:
lp = 1000
if args.prep == "n":
pc = 0
if dut_list[0].inLocalMode():
for dut in dut_list:
dut.offset = readOffsetFile(dut, dst_file)
elif args.prep == "o":
lp = 1 # The number of loops that will be prepared
pc = profile["precondition_cycles"]
for dut in dut_list:
createOffsetFile(dut, dst_file)
else:
pc = profile["precondition_cycles"]
for dut in dut_list:
createOffsetFile(dut, dst_file)
runCycles(dut_list, profile, args, rc, pc, lp, csvFolderPath)
if dut_list[0].inLocalMode():
runHealthMon(dut_list[0].fname, args.health, args.getflash)
if args.dryrun == "n":
if not dut_list[0].inLocalMode():
fio_json_parser.combineCsv(csvFolderPath, FioDUT.fname, dut_list)
print("Your results are in: %s" % csvFolderPath)
def main():
args = set_attributes()
# checkVersion('fio')
runSuite(args)
if __name__ == "__main__":
main()