perfkitbenchmarker/linux_benchmarks/fio_benchmark.py (722 lines of code) (raw):

# Copyright 2014 PerfKitBenchmarker Authors. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Runs fio benchmarks. Man: http://manpages.ubuntu.com/manpages/natty/man1/fio.1.html Quick howto: http://www.bluestop.org/fio/HOWTO.txt """ import json import logging import posixpath import re import time from typing import Any from absl import flags import jinja2 from perfkitbenchmarker import background_tasks from perfkitbenchmarker import configs from perfkitbenchmarker import data from perfkitbenchmarker import errors from perfkitbenchmarker import flag_util from perfkitbenchmarker import sample from perfkitbenchmarker import units from perfkitbenchmarker import vm_util from perfkitbenchmarker.linux_packages import fio PKB_FIO_LOG_FILE_NAME = 'pkb_fio_avg' LOCAL_JOB_FILE_SUFFIX = '_fio.job' # used with vm_util.PrependTempDir() REMOTE_JOB_FILE_PATH = posixpath.join(vm_util.VM_TMP_DIR, 'fio.job') DEFAULT_TEMP_FILE_NAME = 'fio-temp-file' MOUNT_POINT = '/scratch' # This dictionary maps scenario names to dictionaries of fio settings. SCENARIOS = { 'sequential_write': { 'name': 'sequential_write', 'rwkind': 'write', 'blocksize': '512k', }, 'sequential_read': { 'name': 'sequential_read', 'rwkind': 'read', 'blocksize': '512k', }, 'random_write': { 'name': 'random_write', 'rwkind': 'randwrite', 'blocksize': '4k', }, 'random_read': { 'name': 'random_read', 'rwkind': 'randread', 'blocksize': '4k', }, 'random_read_write': { 'name': 'random_read_write', 'rwkind': 'randrw', 'blocksize': '4k', }, 'sequential_trim': { 'name': 'sequential_trim', 'rwkind': 'trim', 'blocksize': '512k', }, 'rand_trim': {'name': 'rand_trim', 'rwkind': 'randtrim', 'blocksize': '4k'}, } FLAGS = flags.FLAGS # Modes for --fio_target_mode AGAINST_FILE_WITH_FILL_MODE = 'against_file_with_fill' AGAINST_FILE_WITHOUT_FILL_MODE = 'against_file_without_fill' AGAINST_DEVICE_WITH_FILL_MODE = 'against_device_with_fill' AGAINST_DEVICE_WITHOUT_FILL_MODE = 'against_device_without_fill' AGAINST_DEVICE_MODES = frozenset({ AGAINST_DEVICE_WITH_FILL_MODE, AGAINST_DEVICE_WITHOUT_FILL_MODE, }) FILL_TARGET_MODES = frozenset( {AGAINST_DEVICE_WITH_FILL_MODE, AGAINST_FILE_WITH_FILL_MODE} ) flags.DEFINE_string( 'fio_jobfile', None, 'Job file that fio will use. If not given, use a job file ' 'bundled with PKB. Cannot use with ' '--fio_generate_scenarios.', ) flags.DEFINE_list( 'fio_generate_scenarios', [], 'Generate a job file with the given scenarios. Special ' "scenario 'all' generates all scenarios. Available " 'scenarios are sequential_write, sequential_read, ' 'random_write, and random_read. Cannot use with ' '--fio_jobfile. You can also specify a scenario in the ' 'format accesspattern_blocksize_operation_workingset ' 'for a custom workload.', ) flags.DEFINE_bool( 'fio_use_default_scenarios', True, 'Use the legacy scenario tables defined in fio_benchmark.py ' 'to resolve the scenario name in generate scenarios', ) flags.DEFINE_enum( 'fio_target_mode', AGAINST_FILE_WITHOUT_FILL_MODE, [ AGAINST_DEVICE_WITH_FILL_MODE, AGAINST_DEVICE_WITHOUT_FILL_MODE, AGAINST_FILE_WITH_FILL_MODE, AGAINST_FILE_WITHOUT_FILL_MODE, ], 'Whether to run against a raw device or a file, and whether to prefill.', ) flags.DEFINE_string( 'fio_fill_size', '100%', 'The amount of device to fill in prepare stage. ' 'The valid value can either be an integer, which ' 'represents the number of bytes to fill or a ' 'percentage, which represents the percentage ' 'of the device. A filesystem will be unmounted before ' 'filling and remounted afterwards. Only valid when ' '--fio_target_mode is against_device_with_fill or ' 'against_file_with_fill.', ) flags.DEFINE_string( 'fio_fill_block_size', '512k', 'The block size of the IO request to fill in prepare stage. ' 'A filesystem will be unmounted before ' 'filling and remounted afterwards. Only valid when ' '--fio_target_mode is against_device_with_fill or ' 'against_file_with_fill.', ) flag_util.DEFINE_integerlist( 'fio_io_depths', flag_util.IntegerList([1]), 'IO queue depths to run on. Can specify a single ' 'number, like --fio_io_depths=1, a range, like ' '--fio_io_depths=1-4, or a list, like ' '--fio_io_depths=1-4,6-8', on_nonincreasing=flag_util.IntegerListParser.WARN, module_name=__name__, ) flag_util.DEFINE_integerlist( 'fio_num_jobs', flag_util.IntegerList([1]), 'Number of concurrent fio jobs to run.', on_nonincreasing=flag_util.IntegerListParser.WARN, module_name=__name__, ) flags.DEFINE_integer( 'fio_working_set_size', None, 'The size of the working set, in GB. If not given, use ' 'the full size of the device. If using ' '--fio_generate_scenarios and not running against a raw ' 'device, you must pass --fio_working_set_size.', lower_bound=0, ) flag_util.DEFINE_units( 'fio_blocksize', None, 'The block size for fio operations. Default is given by ' 'the scenario when using --fio_generate_scenarios. This ' 'flag does not apply when using --fio_jobfile.', convertible_to=units.byte, ) flags.DEFINE_integer( 'fio_runtime', 600, 'The number of seconds to run each fio job for.', lower_bound=1, ) flags.DEFINE_integer( 'fio_ramptime', 10, 'The number of seconds to run the specified workload ' 'before logging any performance numbers', lower_bound=0, ) flags.DEFINE_list( 'fio_parameters', ['randrepeat=0'], 'Parameters to apply to all PKB generated fio jobs. Each ' 'member of the list should be of the form "param=value".', ) flags.DEFINE_boolean( 'fio_lat_log', False, 'Whether to collect a latency log of the fio jobs.' ) flags.DEFINE_boolean( 'fio_bw_log', False, 'Whether to collect a bandwidth log of the fio jobs.' ) flags.DEFINE_boolean( 'fio_iops_log', False, 'Whether to collect an IOPS log of the fio jobs.' ) flags.DEFINE_integer( 'fio_log_avg_msec', 1000, 'By default, this will average each log entry in the ' 'fio latency, bandwidth, and iops logs over the specified ' 'period of time in milliseconds. If set to 0, fio will ' 'log an entry for every IO that completes, this can grow ' 'very quickly in size and can cause performance overhead.', lower_bound=0, ) flags.DEFINE_boolean( 'fio_hist_log', False, 'Whether to collect clat histogram.' ) flags.DEFINE_integer( 'fio_log_hist_msec', 1000, 'Same as fio_log_avg_msec, but logs entries for ' 'completion latency histograms. If set to 0, histogram ' 'logging is disabled.', ) flags.DEFINE_integer( 'fio_command_timeout_sec', None, 'Timeout for fio commands in seconds.' ) flags.DEFINE_enum( 'fio_rng', 'tausworthe64', ['tausworthe', 'lfsr', 'tausworthe64'], 'Which RNG to use for 4k Random IOPS.', ) flags.DEFINE_enum( 'fio_ioengine', 'libaio', ['libaio', 'windowsaio'], 'Defines how the job issues I/O to the file', ) _FIO_RATE_BANDWIDTH_LIMIT = flags.DEFINE_string( 'fio_rate_bandwidth_limit', None, 'The bandwidth cap in bytes/sec. For example, using ' 'rate=1m caps bandwidth to 1MiB/sec.', ) _FIO_INCLUDE_LATENCY_PERCENTILES = flags.DEFINE_boolean( 'fio_include_latency_percentiles', True, 'Whether to include FIO latency stats.', ) _DIRECT_IO = flags.DEFINE_boolean( 'fio_direct', True, 'Whether to use O_DIRECT to bypass OS cache. This is strongly ' 'recommended, but not supported by all files.', ) FLAGS_IGNORED_FOR_CUSTOM_JOBFILE = frozenset({ 'fio_generate_scenarios', 'fio_io_depths', 'fio_runtime', 'fio_ramptime', 'fio_blocksize', 'fio_num_jobs', 'fio_parameters', }) def AgainstDevice(): """Check whether we're running against a device or a file. Returns: True if running against a device, False if running against a file. """ return FLAGS.fio_target_mode in AGAINST_DEVICE_MODES def FillTarget(): """Check whether we should pre-fill our target or not. Returns: True if we should pre-fill our target, False if not. """ return FLAGS.fio_target_mode in FILL_TARGET_MODES def FillDevice(vm, disk, fill_size, exec_path): """Fill the given disk on the given vm up to fill_size. Args: vm: a linux_virtual_machine.BaseLinuxMixin object. disk: a disk.BaseDisk attached to the given vm. fill_size: amount of device to fill, in fio format. exec_path: string path to the fio executable """ command = ( f'{exec_path} --filename={disk.GetDevicePath()} ' f'--ioengine={FLAGS.fio_ioengine} --name=fill-device ' f'--blocksize={FLAGS.fio_fill_block_size} --iodepth=64 --rw=write --direct=1 --size={fill_size}' ) vm.RobustRemoteCommand(command) BENCHMARK_NAME = 'fio' BENCHMARK_CONFIG = """ fio: description: Runs fio in sequential, random, read and write modes. vm_groups: default: vm_spec: *default_dual_core disk_spec: *default_500_gb vm_count: null flags: sar: True """ JOB_FILE_TEMPLATE = """ [global] ioengine={{ioengine}} invalidate=1 direct={{direct}} runtime={{runtime}} ramp_time={{ramptime}} time_based filename={{filename}} do_verify=0 verify_fatal=0 group_reporting=1 percentile_list=1:5:10:20:25:30:40:50:60:70:75:80:90:95:99:99.5:99.9:99.95:99.99 {%- for parameter in parameters %} {{parameter}} {%- endfor %} {%- for scenario in scenarios %} {%- for pair in disks_list %} [{{scenario['name']}}-io-depth-{{scenario['iodepth']}}-num-jobs-{{scenario['numjobs']}}{%- if scenario['target_raw_device'] == True %}.{{pair['index']}}{%- endif%}] {%- if pair['index'] == 0 %} stonewall {%- endif%} rw={{scenario['rwkind']}} {%- if scenario['rwmixread'] is defined %} rwmixread={{scenario['rwmixread']}} {%- endif%} {%- if scenario['rwmixwrite'] is defined %} rwmixwrite={{scenario['rwmixwrite']}} {%- endif%} {%- if scenario['fsync'] is defined %} fsync={{scenario['fsync']}} {%- endif%} {%- if scenario['blocksize'] is defined %} blocksize={{scenario['blocksize']}} {%- elif scenario['bssplit'] is defined %} bssplit={{scenario['bssplit']}} {%- endif%} iodepth={{scenario['iodepth']}} size={{scenario['size']}} numjobs={{scenario['numjobs']}} {%- if scenario['rate'] is defined %} rate={{scenario['rate']}} {%- endif%} {%- if pair['disk_filename'] is defined %} filename={{pair['disk_filename']}} {%- endif%} {%- endfor %} {%- endfor %} """ SECONDS_PER_MINUTE = 60 # known rwkind fio parameters RWKIND_SEQUENTIAL_READ = 'read' RWKIND_SEQUENTIAL_WRITE = 'write' RWKIND_RANDOM_READ = 'randread' RWKIND_RANDOM_WRITE = 'randwrite' RWKIND_SEQUENTIAL_READ_WRITE = 'rw' # 'readwrite' is also valid RWKIND_RANDOM_READ_WRITE = 'randrw' RWKIND_SEQUENTIAL_TRIM = 'trim' # define fragments from scenario_strings OPERATION_READ = 'read' OPERATION_WRITE = 'write' OPERATION_TRIM = 'trim' OPERATION_READWRITE = 'readwrite' # mixed read and writes ALL_OPERATIONS = frozenset( [OPERATION_READ, OPERATION_WRITE, OPERATION_TRIM, OPERATION_READWRITE] ) ACCESS_PATTERN_SEQUENTIAL = 'seq' ACCESS_PATTERN_RANDOM = 'rand' ALL_ACCESS_PATTERNS = frozenset( [ACCESS_PATTERN_SEQUENTIAL, ACCESS_PATTERN_RANDOM] ) # map from scenario_string fragments to rwkind fio parameter MAP_ACCESS_OP_TO_RWKIND = { (ACCESS_PATTERN_SEQUENTIAL, OPERATION_READ): RWKIND_SEQUENTIAL_READ, (ACCESS_PATTERN_SEQUENTIAL, OPERATION_WRITE): RWKIND_SEQUENTIAL_WRITE, (ACCESS_PATTERN_RANDOM, OPERATION_READ): RWKIND_RANDOM_READ, (ACCESS_PATTERN_RANDOM, OPERATION_WRITE): RWKIND_RANDOM_WRITE, ( ACCESS_PATTERN_SEQUENTIAL, OPERATION_READWRITE, ): RWKIND_SEQUENTIAL_READ_WRITE, (ACCESS_PATTERN_RANDOM, OPERATION_READWRITE): RWKIND_RANDOM_READ_WRITE, (ACCESS_PATTERN_SEQUENTIAL, RWKIND_SEQUENTIAL_TRIM): RWKIND_SEQUENTIAL_TRIM, } # check for known fields, as the JOB_FILE_TEMPLATE looks for these # fields explicitly and needs to be updated FIO_KNOWN_FIELDS_IN_JINJA = [ 'rwmixread', 'rwmixwrite', 'fsync', 'iodepth', # overrides --fio_io_depths 'numjobs', # overrides --fio_num_jobs ] def _IsBlockSizeASplitSpecification(blocksize_str: str) -> bool: """determines if a blocksize_str looks like a bssplit parameter. an example parameter would be: format is blocksize/percent:blocksize/percent:...blocksize/percent e.g. 8k/28:12k/23:4k/23:16k/7:20k/2:32k/17 This is just a heuristic. Args: blocksize_str: either a blocksize (like 4k) or a bssplit parameter Returns: True if this is split specification, false if a single block size. """ return ':' in blocksize_str def GetScenarioFromScenarioString(scenario_string): """Extract rwkind,blocksize,size from scenario string.""" # look for legacy entries in the scenario map first result = ( SCENARIOS.get(scenario_string, None) if FLAGS.fio_use_default_scenarios else None ) if result: # return a copy so that the scenario can be mutated further if needed # without modifying the SCENARIO map return result.copy() # decode the parameters from the scenario name # in the format accesspattern_blocksize_operation_workingset # example pattern would be: # rand_16k_write_100% # seq_1M_write_100% fields = scenario_string.split('_') if len(fields) < 4: raise errors.Setup.InvalidFlagConfigurationError( f'Unexpected Scenario string format: {scenario_string}' ) (access_pattern, blocksize_str, operation, workingset_str) = fields[0:4] if access_pattern not in ALL_ACCESS_PATTERNS: raise errors.Setup.InvalidFlagConfigurationError( f'Unexpected access pattern {access_pattern} ' f'in scenario {scenario_string}' ) if operation not in ALL_OPERATIONS: raise errors.Setup.InvalidFlagConfigurationError( f'Unexpected operation {operation}in scenario {scenario_string}' ) access_op = (access_pattern, operation) rwkind = MAP_ACCESS_OP_TO_RWKIND.get(access_op, None) if not rwkind: raise errors.Setup.InvalidFlagConfigurationError( f'{access_pattern} and {operation} could not be mapped ' 'to a rwkind fio parameter from ' f'scenario {scenario_string}' ) # required fields of JOB_FILE_TEMPLATE result = { 'name': scenario_string.replace(',', '__'), 'rwkind': rwkind, 'size': workingset_str, } if _IsBlockSizeASplitSpecification(blocksize_str): result['bssplit'] = blocksize_str else: result['blocksize'] = blocksize_str # The first four fields are well defined - after that, we use # key value pairs to encode any extra fields we need # The format is key-value for any additional fields appended # e.g. rand_16k_readwrite_5TB_rwmixread-65 # random access pattern # 16k block size # readwrite operation # 5 TB working set # rwmixread of 65. (so 65% reads and 35% writes) for extra_fields in fields[4:]: key_value = extra_fields.split('-') key = key_value[0] value = key_value[1] if key not in FIO_KNOWN_FIELDS_IN_JINJA: raise errors.Setup.InvalidFlagConfigurationError( 'Unrecognized FIO parameter {} out of scenario {}'.format( key, scenario_string ) ) result[key] = value return result def GenerateJobFileString( filename: str, scenario_strings: list[str], io_depths: list[int] | None, num_jobs: list[int] | None, working_set_size: int | None, block_size: Any, runtime: int, ramptime: int, direct: bool, parameters: list[str], raw_files: list[str], require_merge: bool, ) -> str: """Make a string with our fio job file. Args: filename: the file or disk we pre-filled, if any. scenario_strings: list of strings with names in SCENARIOS. io_depths: iterable of integers. The IO queue depths to test. num_jobs: iterable of integers. The number of fio processes to test. working_set_size: int or None. If int, the size of the working set in GB. block_size: Quantity or None. If quantity, the block size to use. runtime: int. The number of seconds to run each job. ramptime: int. The number of seconds to run the specified workload before logging any performance numbers. direct: boolean. Whether to use direct IO. parameters: list. Other fio parameters to be applied to all jobs. raw_files: A list of raw device paths. require_merge: Whether the jobs will be merged to be reported later. Returns: The contents of a fio job file, as a string. """ if 'all' in scenario_strings and FLAGS.fio_use_default_scenarios: scenarios = SCENARIOS.values() else: scenarios = [ GetScenarioFromScenarioString(scenario_string.strip('"')) for scenario_string in scenario_strings ] default_size_string = ( str(working_set_size) + 'G' if working_set_size else '100%' ) blocksize_override = ( str(int(block_size.m_as(units.byte))) + 'B' if block_size else None ) should_use_scenario_iodepth_numjobs = True for scenario in scenarios: # per legacy behavior, the block size parameter # overrides what is defined in the scenario string if blocksize_override: scenario['blocksize'] = blocksize_override # per legacy behavior, the size in the scenario_string overrides # size defined on the command line if 'size' not in scenario: scenario['size'] = default_size_string if 'iodepth' not in scenario or 'numjobs' not in scenario: should_use_scenario_iodepth_numjobs = False jinja_scenarios = [] if should_use_scenario_iodepth_numjobs: # All scenarios supply iodepth and numjobs. # Remove iodepth and numjobs from scenario name to prevent redundancy. for scenario in scenarios: scenario['name'] = scenario['name'].replace( # pytype: disable=attribute-error f'_iodepth-{scenario["iodepth"]}', '' ) scenario['name'] = scenario['name'].replace( # pytype: disable=attribute-error f'_numjobs-{scenario["numjobs"]}', '' ) jinja_scenarios = scenarios # scenarios already includes iodepth, numjobs else: # preserve functionality of creating a cross product of all # (scenario X io_depths X num_jobs) # which allows a single run to produce all of these metrics for scenario in scenarios: for num_job in num_jobs: for io_depth in io_depths: scenario_copy = scenario.copy() scenario_copy['iodepth'] = io_depth scenario_copy['numjobs'] = num_job jinja_scenarios.append(scenario_copy) disks_list = [{'index': 0}] for scenario in jinja_scenarios: if _FIO_RATE_BANDWIDTH_LIMIT.value: scenario['rate'] = _FIO_RATE_BANDWIDTH_LIMIT.value scenario['target_raw_device'] = require_merge if require_merge and raw_files: disks_list = [ { 'index': index, 'disk_filename': raw_file, } for index, raw_file in enumerate(raw_files) ] job_file_template = jinja2.Template( JOB_FILE_TEMPLATE, undefined=jinja2.StrictUndefined ) return str( job_file_template.render( ioengine=FLAGS.fio_ioengine, runtime=runtime, ramptime=ramptime, filename=filename, scenarios=jinja_scenarios, direct=int(direct), parameters=parameters, disks_list=disks_list, ) ) FILENAME_PARAM_REGEXP = re.compile(r'filename\s*=.*$', re.MULTILINE) def ProcessedJobFileString(fio_jobfile_contents, remove_filename): """Modify the fio job if requested. Args: fio_jobfile_contents: the contents of a fio job file. remove_filename: bool. If true, remove the filename parameter from the job file. Returns: The job file as a string, possibly without filename parameters. """ if remove_filename: return FILENAME_PARAM_REGEXP.sub('', fio_jobfile_contents) else: return fio_jobfile_contents def GetOrGenerateJobFileString( job_file_path, scenario_strings, against_device, disks, io_depths, num_jobs, working_set_size, block_size, runtime, ramptime, direct, parameters, job_file_contents, require_merge, ): """Get the contents of the fio job file we're working with. This will either read the user's job file, if given, or generate a new one. Args: job_file_path: string or None. The path to the user's job file, if provided. scenario_strings: list of strings or None. The workload scenarios to generate. against_device: bool. True if testing against a raw device, False if testing against a filesystem. disks: the disk.BaseDisk object(s) to test against. io_depths: iterable of integers. The IO queue depths to test. num_jobs: iterable of integers. The number of fio processes to test. working_set_size: int or None. If int, the size of the working set in GB. block_size: Quantity or None. If Quantity, the block size to use. runtime: int. The number of seconds to run each job. ramptime: int. The number of seconds to run the specified workload before logging any performance numbers. direct: boolean. Whether to use direct IO. parameters: list. Other fio parameters to apply to all jobs. job_file_contents: string contents of fio job. require_merge: whether jobs will need to be merged for reporting. Returns: A string containing a fio job file. """ user_job_file_string = GetFileAsString(job_file_path) use_user_jobfile = job_file_path or not scenario_strings if use_user_jobfile: remove_filename = against_device return ProcessedJobFileString( user_job_file_string or job_file_contents, remove_filename ) else: raw_files = [] if against_device: filename = ':'.join([disk.GetDevicePath() for disk in disks]) for disk in disks: if disk.is_striped: raw_files += [d.GetDevicePath() for d in disk.disks] else: raw_files += [disk.GetDevicePath()] else: # Since we pass --directory to fio, we must use relative file # paths or get an error. filename = DEFAULT_TEMP_FILE_NAME return GenerateJobFileString( filename, scenario_strings, io_depths, num_jobs, working_set_size, block_size, runtime, ramptime, direct, parameters, raw_files, require_merge, ) NEED_SIZE_MESSAGE = ( 'You must specify the working set size when using ' 'generated scenarios with a filesystem.' ) def WarnOnBadFlags(): """Warn the user if they pass bad flag combinations.""" if FLAGS.fio_jobfile: ignored_flags = { '--' + flag_name for flag_name in FLAGS_IGNORED_FOR_CUSTOM_JOBFILE if FLAGS[flag_name].present } if ignored_flags: logging.warning( 'Fio job file specified. Ignoring options "%s"', ', '.join(ignored_flags), ) if ( FLAGS.fio_jobfile is None and FLAGS.fio_generate_scenarios and not FLAGS.fio_working_set_size and not AgainstDevice() ): logging.error(NEED_SIZE_MESSAGE) raise errors.Benchmarks.PrepareException(NEED_SIZE_MESSAGE) def GetConfig(user_config): config = configs.LoadConfig(BENCHMARK_CONFIG, user_config, BENCHMARK_NAME) if FLAGS.fio_target_mode != AGAINST_FILE_WITHOUT_FILL_MODE: disk_spec = config['vm_groups']['default']['disk_spec'] for cloud in disk_spec: disk_spec[cloud]['mount_point'] = None return config def GetLogFlags(log_file_base): """Gets fio log files.""" collect_logs = FLAGS.fio_lat_log or FLAGS.fio_bw_log or FLAGS.fio_iops_log fio_log_flags = [ ( FLAGS.fio_lat_log, '--write_lat_log=%(filename)s', ), ( FLAGS.fio_bw_log, '--write_bw_log=%(filename)s', ), ( FLAGS.fio_iops_log, '--write_iops_log=%(filename)s', ), ( FLAGS.fio_hist_log, '--write_hist_log=%(filename)s', ), ( collect_logs, '--log_avg_msec=%(interval)d', ), ] fio_command_flags = ' '.join([flag for given, flag in fio_log_flags if given]) if FLAGS.fio_hist_log: fio_command_flags = ' '.join( [fio_command_flags, '--log_hist_msec=%(hist_interval)d'] ) return fio_command_flags % { 'filename': log_file_base, 'interval': FLAGS.fio_log_avg_msec, 'hist_interval': FLAGS.fio_log_hist_msec, } def CheckPrerequisites(benchmark_config): """Perform flag checks.""" del benchmark_config # unused WarnOnBadFlags() def Prepare(benchmark_spec): """Prepare VM's in benchmark_spec to run FIO. Args: benchmark_spec: The benchmarks specification. """ exec_path = fio.GetFioExec() vms = benchmark_spec.vms background_tasks.RunThreaded(lambda vm: PrepareWithExec(vm, exec_path), vms) def GetFileAsString(file_path): if not file_path: return None with open(data.ResourcePath(file_path)) as jobfile: return jobfile.read() def PrepareWithExec(vm, exec_path): """Prepare the virtual machine to run FIO. This includes installing fio, bc, and libaio1 and pre-filling the attached disk. We also make sure the job file is always located at the same path on the local machine. Args: vm: The virtual machine to prepare the benchmark on. exec_path: string path to the fio executable """ logging.info('FIO prepare on %s', vm) vm.Install('fio') if FillTarget(): logging.info( 'Fill devices %s on %s', [disk.GetDevicePath() for disk in vm.scratch_disks], vm) background_tasks.RunThreaded( lambda disk: FillDevice(vm, disk, FLAGS.fio_fill_size, exec_path), vm.scratch_disks, ) # We only need to format and mount if the target mode is against # file with fill because 1) if we're running against the device, we # don't want it mounted and 2) if we're running against a file # without fill, it was never unmounted (see GetConfig()). if len(vm.scratch_disks) > 1: if not AgainstDevice(): raise errors.Setup.InvalidSetupError( f'Target mode {FLAGS.fio_target_mode} tests against 1 file, ' 'but multiple scratch disks are configured.' ) return disk = vm.scratch_disks[0] if FLAGS.fio_target_mode == AGAINST_FILE_WITH_FILL_MODE: disk.mount_point = FLAGS.scratch_dir or MOUNT_POINT disk_spec = vm.disk_specs[0] vm.FormatDisk(disk.GetDevicePath(), disk_spec.disk_type) vm.MountDisk( disk.GetDevicePath(), disk.mount_point, disk_spec.disk_type, disk.mount_options, disk.fstab_options, ) def Run(benchmark_spec): """Spawn fio on vm(s) and gather results.""" vms = benchmark_spec.vms return RunFioOnVMs(vms) def RunFioOnVMs(vms): """Spawn fio on vm(s) and gather results. Args: vms: A list of VMs to run FIO on. Returns: A list of sample.Sample objects. """ fio_exe = fio.GetFioExec() default_job_file_contents = GetFileAsString(data.ResourcePath('fio.job')) samples = [] path = REMOTE_JOB_FILE_PATH samples_list = background_tasks.RunThreaded( lambda vm: RunWithExec(vm, fio_exe, path, default_job_file_contents), vms ) for i, _ in enumerate(samples_list): for item in samples_list[i]: item.metadata['machine_instance'] = i samples.extend(samples_list[i]) return samples def RunWithExec( vm, exec_path, remote_job_file_path, job_file_contents, fio_generate_scenarios=None, ): """Spawn fio and gather the results. Used by Windows FIO as well. Args: vm: vm to run the benchmark on. exec_path: string path to the fio executable. remote_job_file_path: path, on the vm, to the location of the job file. job_file_contents: string contents of the fio job file. fio_generate_scenarios: list of strings with scenarios to benchmark. Returns: A list of sample.Sample objects. """ logging.info('FIO running on %s', vm) if not fio_generate_scenarios: fio_generate_scenarios = FLAGS.fio_generate_scenarios disks = vm.scratch_disks require_merge = len(disks) > 1 job_file_string = GetOrGenerateJobFileString( FLAGS.fio_jobfile, fio_generate_scenarios, AgainstDevice(), disks, FLAGS.fio_io_depths, FLAGS.fio_num_jobs, FLAGS.fio_working_set_size, FLAGS.fio_blocksize, FLAGS.fio_runtime, FLAGS.fio_ramptime, _DIRECT_IO.value, FLAGS.fio_parameters, job_file_contents, require_merge, ) job_file_path = vm_util.PrependTempDir(vm.name + LOCAL_JOB_FILE_SUFFIX) with open(job_file_path, 'w') as job_file: job_file.write(job_file_string) logging.info('Wrote fio job file at %s', job_file_path) logging.info(job_file_string) vm.PushFile(job_file_path, remote_job_file_path) if AgainstDevice(): if 'filename' in job_file_string: filename_parameter = '' else: filenames = ':'.join([disk.GetDevicePath() for disk in disks]) filename_parameter = f'--filename={filenames}' fio_command = ( f'{exec_path} --output-format=json ' f'--random_generator={FLAGS.fio_rng} ' f'{filename_parameter} {remote_job_file_path}' ) else: assert(len(disks) == 1) fio_command = ( f'{exec_path} --output-format=json ' f'--random_generator={FLAGS.fio_rng} ' f'--directory={disks[0].mount_point} {remote_job_file_path}' ) collect_logs = any([ FLAGS.fio_lat_log, FLAGS.fio_bw_log, FLAGS.fio_iops_log, FLAGS.fio_hist_log, ]) log_file_base = '' if collect_logs: log_file_base = '%s_%s' % (PKB_FIO_LOG_FILE_NAME, str(time.time())) fio_command = ' '.join([fio_command, GetLogFlags(log_file_base)]) # TODO(user): This only gives results at the end of a job run # so the program pauses here with no feedback to the user. # This is a pretty lousy experience. logging.info('FIO Results:') start_time = time.time() stdout, _ = vm.RobustRemoteCommand( fio_command, timeout=FLAGS.fio_command_timeout_sec ) end_time = time.time() bin_vals = [] if collect_logs: vm.PullFile(vm_util.GetTempDir(), '%s*.log' % log_file_base) if FLAGS.fio_hist_log: num_logs = int( vm.RemoteCommand('ls %s_clat_hist.*.log | wc -l' % log_file_base)[0] ) bin_vals += [ fio.ComputeHistogramBinVals( vm, '%s_clat_hist.%s.log' % (log_file_base, idx + 1) ) for idx in range(num_logs) ] samples = fio.ParseResults( job_file_string, json.loads(stdout), log_file_base=log_file_base, bin_vals=bin_vals, skip_latency_individual_stats=( not _FIO_INCLUDE_LATENCY_PERCENTILES.value ), require_merge=require_merge ) samples.append( sample.Sample('start_time', start_time, 'sec', samples[0].metadata) ) samples.append( sample.Sample('end_time', end_time, 'sec', samples[0].metadata) ) for item in samples: item.metadata['fio_target_mode'] = FLAGS.fio_target_mode item.metadata['fio_fill_size'] = FLAGS.fio_fill_size item.metadata['fio_fill_block_size'] = FLAGS.fio_fill_block_size item.metadata['fio_rng'] = FLAGS.fio_rng return samples def Cleanup(benchmark_spec): """Uninstall packages required for fio and remove benchmark files. Args: benchmark_spec: The benchmark specification. Contains all data that is required to run the benchmark. """ vms = benchmark_spec.vms background_tasks.RunThreaded(CleanupVM, vms) def CleanupVM(vm): logging.info('FIO Cleanup up on %s', vm) vm.RemoveFile(REMOTE_JOB_FILE_PATH) if not AgainstDevice() and not FLAGS.fio_jobfile: # If the user supplies their own job file, then they have to clean # up after themselves, because we don't know their temp file name. vm.RemoveFile(posixpath.join(vm.GetScratchDir(), DEFAULT_TEMP_FILE_NAME))