perfkitbenchmarker/linux_packages/omb.py (499 lines of code) (raw):

"""Installs and runs the OSU MPI micro-benchmark.""" import dataclasses import itertools import logging import re import time from typing import Any, Dict, Iterator, List, Pattern, Sequence, Tuple, Union from absl import flags from perfkitbenchmarker import errors from perfkitbenchmarker import flag_util from perfkitbenchmarker import nfs_service from perfkitbenchmarker.linux_packages import intelmpi from perfkitbenchmarker.linux_packages import openmpi FLAGS = flags.FLAGS flags.DEFINE_enum('mpi_vendor', 'intel', ['intel', 'openmpi'], 'MPI provider.') flags.DEFINE_list( 'omb_mpi_env', [], 'Comma separated list of environment variables, e.g. ' '--omb_mpi_env=FI_PROVIDER=tcp,FI_LOG_LEVEL=info', ) flags.DEFINE_list( 'omb_mpi_genv', [], 'Comma separated list of global environment variables, ' 'i.e. environment variables to be applied to all nodes, e.g. ' '--omb_mpi_genv=I_MPI_PIN_PROCESSOR_LIST=0,I_MPI_PIN=1 ' 'When running with Intel MPI, these translate to -genv mpirun options. ' 'When running with OpenMPI, both --omb_mpi_env and --omb_mpi_genv are ' 'treated the same via the -x mpirun option', ) flags.register_validator( 'omb_mpi_env', lambda env_params: all('=' in param for param in env_params), message='--omb_mpi_env values must be in format "key=value" or "key="', ) flags.register_validator( 'omb_mpi_genv', lambda genv_params: all('=' in param for param in genv_params), message='--omb_mpi_genv values must be in format "key=value" or "key="', ) VERSION = '5.7.1' _PKG_NAME = 'osu-micro-benchmarks' _DATA_URL = ( 'http://mvapich.cse.ohio-state.edu/download/mvapich/' f'{_PKG_NAME}-{VERSION}.tgz' ) _SRC_DIR = f'{_PKG_NAME}-{VERSION}' _TARBALL = f'{_PKG_NAME}-{VERSION}.tgz' _RUN_DIR = f'/usr/local/libexec/{_PKG_NAME}/mpi' # Benchmarks that can only be run with a single thread per host _SINGLE_THREADED_BENCHMARKS = frozenset({ 'acc_latency', 'bibw', 'bw', 'cas_latency', 'fop_latency', 'get_bw', 'get_latency', }) PREPROVISIONED_DATA = { _TARBALL: 'cb5ce4e2e68ed012d9952e96ef880a802058c87a1d840a2093b19bddc7faa165' } PACKAGE_DATA_URL = {_TARBALL: _DATA_URL} # Regexs to match on headers in the benchmark's stdout. _HEADERS = [ re.compile(r'^# Window creation: (?P<window_creation>.*)'), re.compile(r'^# Synchronization: (?P<sync>.*)'), re.compile(r'^# Number of Sender threads: (?P<sender_threads>\d+)'), re.compile(r'^# Number of Receiver threads: (?P<receiver_threads>\d+)'), re.compile( r'# \[ pairs: (?P<pairs>\d+) \] ' r'\[ window size: (?P<window_size>\d+) \]' ), ] _LATENCY_RE = re.compile( r'^(?P<size>\d+)\s+' r'(?P<value>[\d\.]+)\s*$', re.X | re.MULTILINE ) # the "overall" value is the metric recorded. Others are put into metadata _COMPUTE_RE = re.compile( r'^(?P<size>^\d+)\s+' r'(?P<value>[\d\.]+)\s+' r'(?P<compute>[\d\.]+)\s+' r'(?P<comm>[\d\.]+)\s+' r'(?P<overlap>[\d\.]+)\s*$', re.X | re.MULTILINE, ) # parse MPI pinning _MPI_STARTUP_PREFIX = r'^\[(?P<unused_cpuid>\d+)\] MPI startup\(\):\s+' _MPI_PIN_RE = re.compile( _MPI_STARTUP_PREFIX + ( r'(?P<rank>\d+)\s+' r'(?P<pid>\d+)\s+' r'(?P<nodename>\S+)\s+' r'.*?(?P<cpuids>[\d,-]+)' ) ) _PKB_NODE_RE = re.compile(r'pkb-(?P<pkbid>.*?)-(?P<nodeindex>\d+)') # parameters to pass into the benchmark _NUMBER_ITERATIONS = flags.DEFINE_integer( 'omb_iterations', None, 'Number of iterations to run in a test.' ) _SYNC_OPTION = flags.DEFINE_string( 'omb_sync_option', None, '--sync-option value to pass in' ) _NUM_SERVER_THREADS = flags.DEFINE_integer( 'omb_server_threads', None, 'Number of server threads to use.' ) _NUM_RECEIVER_THREADS = flags.DEFINE_integer( 'omb_receiver_threads', None, 'Number of server threads to use.' ) flag_util.DEFINE_integerlist( 'omb_mpi_processes', flag_util.IntegerList([1, 0]), 'MPI processes to use per host. 1=One process, 0=only real cores', ) _MPI_PERHOST = flags.DEFINE_integer('omb_perhost', 1, 'MPI option -perhost.') @dataclasses.dataclass(frozen=True) class _RunType: """Metadata about a benchmark. Attributes: columns: The columns in the output. value_column: The column that should be use as a sample.Sample value units: The units of the value_column. supports_full: Whether this benchmark supports --full. long_running: Whether this benchmark takes a long time to run. """ columns: Tuple[str] value_column: str units: str = 'usec' supports_full: bool = True long_running: bool = False # Benchmarks that support --full _LATENCY = _RunType( ('size', 'latency', 'min_latency', 'max_latency', 'iterations'), 'latency' ) _LATENCY_NOSIZE = _RunType( ('latency', 'min_latency', 'max_latency', 'iterations'), 'latency' ) _COMPUTE = _RunType( ( 'size', 'overall', 'compute', 'collection_init', 'mpi_test', 'mpi_wait', 'min_comm', 'max_comm', 'pure_comm', 'overlap', ), 'mpi_wait', ) _COMPUTE_NOSIZE = _RunType( ( 'overall', 'compute', 'collection_init', 'mpi_test', 'mpi_wait', 'pure_comm', 'min_comm', 'max_comm', 'overlap', ), 'mpi_wait', ) # Benchmarks that do not support --full _LATENCY_SIZE_ONLY = _RunType(('size', 'latency'), 'latency', 'usec', False) _BANDWIDTH = _RunType(('size', 'bandwidth'), 'bandwidth', 'MB/s', False) _BANDWIDTH_MESSAGES = _RunType( ('size', 'bandwidth', 'messages_per_second'), 'bandwidth', 'MB/s', False ) # The get_acc_latency and latency_mt take a really long time to finish _LATENCY_LONG_RUNNING = _RunType( ('size', 'latency'), 'latency', 'usec', False, True ) BENCHMARKS: Dict[str, _RunType] = { 'acc_latency': _LATENCY_SIZE_ONLY, 'allgather': _LATENCY, 'allgatherv': _LATENCY, 'allreduce': _LATENCY, 'alltoall': _LATENCY, 'alltoallv': _LATENCY, 'barrier': _LATENCY_NOSIZE, 'bcast': _LATENCY, 'bibw': _BANDWIDTH, 'bw': _BANDWIDTH, 'cas_latency': _LATENCY_SIZE_ONLY, 'fop_latency': _LATENCY_SIZE_ONLY, 'gather': _LATENCY, 'gatherv': _LATENCY, 'get_acc_latency': _LATENCY_LONG_RUNNING, 'get_bw': _BANDWIDTH, 'get_latency': _LATENCY_SIZE_ONLY, 'iallgather': _COMPUTE, 'iallgatherv': _COMPUTE, 'iallreduce': _COMPUTE, 'ialltoall': _COMPUTE, 'ialltoallv': _COMPUTE, 'ialltoallw': _COMPUTE, 'ibarrier': _COMPUTE_NOSIZE, 'ibcast': _COMPUTE, 'igather': _COMPUTE, 'igatherv': _COMPUTE, 'ireduce': _COMPUTE, 'iscatter': _COMPUTE, 'iscatterv': _COMPUTE, 'latency': _LATENCY_SIZE_ONLY, 'latency_mp': _LATENCY_SIZE_ONLY, 'latency_mt': _LATENCY_LONG_RUNNING, 'mbw_mr': _BANDWIDTH_MESSAGES, 'multi_lat': _LATENCY_SIZE_ONLY, 'put_bibw': _BANDWIDTH, 'put_bw': _BANDWIDTH, 'put_latency': _LATENCY_SIZE_ONLY, 'reduce': _LATENCY, 'reduce_scatter': _LATENCY, 'scatter': _LATENCY, 'scatterv': _LATENCY, } @dataclasses.dataclass(frozen=True) class RunResult: """The parsed output of a benchmark run. Attributes: name: The metric name to use in a sample.Sample. metadata: Any output of the benchmark that describes the run. data: The parsed output data, for example [{'size': 1, 'latency': 10.9}]. full_cmd: Command used to run the benchmark. units: Units of the value_column in the data. params: Any parameters passed along to the benchmark. mpi_vendor: Name of the MPI vendor. mpi_version: Version of the MPI library. value_column: The name of the column in the data rows that should be used for the sample.Sample value. number_processes: The total number of MPI processes used. run_time: Time in seconds to run the test. pinning: MPI processes pinning. perhost: MPI option -perhost. mpi_env: environment variables to set for mpirun command. """ name: str metadata: Dict[str, Any] data: List[Dict[str, float]] full_cmd: str units: str params: Dict[str, Union[str, int]] mpi_vendor: str mpi_version: str value_column: str number_processes: int run_time: int pinning: List[str] perhost: int mpi_env: Dict[str, str] @dataclasses.dataclass(frozen=True) class RunRequest: test_name: str vms: List[Any] # virtual machine message_size: Union[str, int] | None = None # default: run all message sizes FLAGS = flags.FLAGS def _InstallForIntelMpi(vm) -> None: """Installs the omb package with IntelMPI lib on the VM.""" vm.Install('intelmpi') txt, _ = vm.RemoteCommand( f'{intelmpi.SourceMpiVarsCommand(vm)}; which mpicc mpicxx' ) mpicc_path, mpicxx_path = txt.splitlines() vm.Install('build_tools') vm.InstallPreprovisionedPackageData('omb', [_TARBALL], '.') vm.RemoteCommand(f'tar -xvf {_TARBALL}') vm.RemoteCommand( f'cd {_SRC_DIR}; {intelmpi.SourceMpiVarsCommand(vm)}; ' f'./configure CC={mpicc_path} CXX={mpicxx_path}; ' 'make; sudo make install' ) _TestInstall([vm]) def _InstallForOpenMpi(vm) -> None: """Installs the omb package with OpenMPI lib on the VM.""" vm.Install('openmpi') txt, _ = vm.RemoteCommand('which mpicc mpicxx') mpicc_path, mpicxx_path = txt.splitlines() vm.Install('build_tools') vm.InstallPreprovisionedPackageData('omb', [_TARBALL], '.') vm.RemoteCommand(f'tar -xvf {_TARBALL}') vm.RemoteCommand( f'cd {_SRC_DIR}; ./configure CC={mpicc_path} ' f'CXX={mpicxx_path}; make; sudo make install' ) _TestInstall([vm]) def Install(vm) -> None: """Installs the omb package with specified MPI lib on the VM.""" vm.AuthenticateVm() if FLAGS.mpi_vendor == 'intel': _InstallForIntelMpi(vm) else: _InstallForOpenMpi(vm) def PrepareWorkers(vms) -> None: if FLAGS.mpi_vendor == 'intel': intelmpi.NfsExportIntelDirectory(vms) else: for vm in vms: vm.Install('openmpi') nfs_service.NfsExportAndMount(vms, _RUN_DIR) _TestInstall(vms) def RunBenchmark(request: RunRequest) -> Iterator[RunResult]: """Yields the RunResult of running the microbenchmark. Args: request: Run configuration. """ vms = request.vms name = request.test_name params = {} if _NUMBER_ITERATIONS.value: params['--iterations'] = _NUMBER_ITERATIONS.value if _SYNC_OPTION.value: params['--sync-option'] = _SYNC_OPTION.value if request.message_size: # flag does not work on cas_latency and fop_latency, always runs with size=8 # for barrier and ibarrier does not appear to set a message size if ':' in str(request.message_size): params['-m'] = f'{request.message_size}' else: # Pass in '-m size:size' to only do one size. params['-m'] = f'{request.message_size}:{request.message_size}' if _NUM_RECEIVER_THREADS.value: value = str(_NUM_RECEIVER_THREADS.value) if _NUM_SERVER_THREADS.value: value += f':{_NUM_SERVER_THREADS.value}' # --num_threads errors out with 'Invalid option [-]' params['-t'] = value for processes_per_host in FLAGS.omb_mpi_processes: # special case processes_per_host=0 means use all the real cores processes_per_host = processes_per_host or vms[0].NumCpusForBenchmark(True) if name in _SINGLE_THREADED_BENCHMARKS and processes_per_host != 1: continue number_processes = processes_per_host * len(vms) try: start_time = time.time() txt, full_cmd = _RunBenchmark( vms[0], name, _MPI_PERHOST.value, number_processes, vms, params ) run_time = time.time() - start_time except errors.VirtualMachine.RemoteCommandError: logging.exception( 'Error running %s benchmark with %s MPI proccesses', name, number_processes, ) continue yield RunResult( name=name, metadata=_ParseBenchmarkMetadata(txt), data=_ParseBenchmarkData(name, txt), full_cmd=full_cmd, units='MB/s' if 'MB/s' in txt else 'usec', params=params, mpi_vendor=FLAGS.mpi_vendor, mpi_version=_GetMpiVersion(vms[0]), value_column=BENCHMARKS[name].value_column, number_processes=number_processes, run_time=run_time, pinning=ParseMpiPinning(txt.splitlines()), perhost=_MPI_PERHOST.value, mpi_env={ k: v for k, v in [ envvar.split('=', 1) for envvar in FLAGS.omb_mpi_env + FLAGS.omb_mpi_genv ] }, ) def _GetMpiVersion(vm) -> str | None: """Returns the MPI version to use for the given OS type.""" if FLAGS.mpi_vendor == 'intel': return intelmpi.MpirunMpiVersion(vm) elif FLAGS.mpi_vendor == 'openmpi': return openmpi.GetMpiVersion(vm) def _RunBenchmarkWithIntelMpi( vm, name: str, perhost: int, environment: List[str], global_environment: List[str], number_processes: int = None, hosts: List[Any] = None, options: Dict[str, Any] = None, ) -> Tuple[str, str]: """Runs the microbenchmark using Intel MPI library.""" # Create the mpirun command full_benchmark_path = _PathToBenchmark(vm, name) mpirun_cmd = [] mpirun_cmd.extend(sorted(environment)) mpirun_cmd.append('mpirun') mpirun_cmd.extend( f'-genv {variable}' for variable in sorted(global_environment) ) if perhost: mpirun_cmd.append(f'-perhost {perhost}') if number_processes: mpirun_cmd.append(f'-n {number_processes}') if hosts: host_ips = ','.join([vm.internal_ip for vm in hosts]) mpirun_cmd.append(f'-hosts {host_ips}') mpirun_cmd.append(full_benchmark_path) if options: for key, value in sorted(options.items()): mpirun_cmd.append(f'{key} {value}') # _TestInstall runs the "hello" test which isn't a benchmark if name in BENCHMARKS and BENCHMARKS[name].supports_full: mpirun_cmd.append('--full') full_cmd = f'{intelmpi.SourceMpiVarsCommand(vm)}; {" ".join(mpirun_cmd)}' txt, _ = vm.RobustRemoteCommand(full_cmd) return txt, full_cmd def _RunBenchmarkWithOpenMpi( vm, name: str, perhost: int, environment: List[str], global_environment: List[str], number_processes: int = None, hosts: List[Any] = None, options: Dict[str, Any] = None, ) -> Tuple[str, str]: """Runs the microbenchmark using OpenMPI library.""" # Create the mpirun command full_env = sorted(environment + global_environment) full_benchmark_path = _PathToBenchmark(vm, name) mpirun_cmd = [f'{env_var}' for env_var in full_env] mpirun_cmd.append('mpirun') mpirun_cmd.extend([f'-x {env_var.split("=", 1)[0]}' for env_var in full_env]) # Useful for verifying process mapping. mpirun_cmd.append('-report-bindings') mpirun_cmd.append('-display-map') if number_processes: mpirun_cmd.append(f'-n {number_processes}') if perhost: mpirun_cmd.append(f'-npernode {perhost}') mpirun_cmd.append('--use-hwthread-cpus') if hosts: host_ips = ','.join( [f'{vm.internal_ip}:slots={number_processes}' for vm in hosts] ) mpirun_cmd.append(f'-host {host_ips}') mpirun_cmd.append(full_benchmark_path) if options: for key, value in sorted(options.items()): mpirun_cmd.append(f'{key} {value}') # _TestInstall runs the "hello" test which isn't a benchmark if name in BENCHMARKS and BENCHMARKS[name].supports_full: mpirun_cmd.append('--full') full_cmd = ' '.join(mpirun_cmd) txt, _ = vm.RobustRemoteCommand(full_cmd) return txt, full_cmd def _RunBenchmark( vm, name: str, perhost: int, number_processes: int = None, hosts: List[Any] = None, options: Dict[str, Any] = None, ) -> Tuple[str, str]: """Runs the microbenchmark. Args: vm: The headnode to run on. name: The name of the microbenchmark. perhost: MPI option -perhost. Use 0 to not set value. number_processes: The number of mpi processes to use. hosts: List of BaseLinuxVirtualMachines to use in the cluster. options: Optional dict of flags to pass into the benchmark. Returns: Tuple of the output text of mpirun and the command ran. """ run_impl = _RunBenchmarkWithIntelMpi if FLAGS.mpi_vendor == 'openmpi': run_impl = _RunBenchmarkWithOpenMpi return run_impl( vm, name, perhost, FLAGS.omb_mpi_env, FLAGS.omb_mpi_genv, number_processes, hosts, options, ) def _PathToBenchmark(vm, name: str) -> str: """Returns the full path to the benchmark.""" txt, _ = vm.RemoteCommand(f'ls {_RUN_DIR}/*/osu_{name}') return txt.strip() def _TestInstall(vms): number_processes = len(vms) logging.info('Running hello world on %s process(es)', number_processes) hosts = vms if number_processes > 1 else None txt, _ = _RunBenchmark(vms[0], 'hello', 1, number_processes, hosts=hosts) logging.info('Hello world output: %s', txt.splitlines()[-1]) def _ParseBenchmarkMetadata(txt: str) -> Dict[str, str]: """Returns the metadata found in the benchmark text output. For most benchmarks this is an empty dict. An example of acc_latency's: {'sync': 'MPI_Win_flush', 'window_creation': 'MPI_Win_allocate'} Args: txt: The benchmark's text output. """ def _MatchHeader(line: txt) -> Dict[str, str]: for header in _HEADERS: match = header.match(line) if match: return match.groupdict() return {} ret = {} for line in txt.splitlines(): ret.update(_MatchHeader(line)) return ret def _LinesAfterMarker(line_re: Pattern[str], txt: str) -> List[str]: r"""Returns the text lines after the matching regex. _LinesAfterMarker(re.compile('^Hello'), "Line1\nHello\nLine2") == ['Line2'] Args: line_re: Pattern to match the start of data txt: Text input """ def StartBlock(line: str) -> bool: return not line_re.match(line) lines = list(itertools.dropwhile(StartBlock, txt.splitlines())) return lines[1:] if lines else [] def _ParseBenchmarkData( benchmark_name: str, txt: str ) -> List[Dict[str, float]]: """Returns the parsed metrics from the benchmark stdout. Text for benchmark_name='bw': # OSU MPI Bandwidth Test v5.7 # Size Bandwidth (MB/s) 1 0.48 2 0.98 Returns [{'size': 1, 'bandwidth': 0.48}, {'size': 2, 'bandwidth': 0.98}] Args: benchmark_name: Name of the benchmark. txt: The standard output of the benchmark. """ data = [] for line in _LinesAfterMarker(re.compile('# OSU MPI.*'), txt): if not line or line.startswith('#') or 'Overall' in line: continue columns = BENCHMARKS[benchmark_name].columns row_data = [float(item) for item in line.split()] if len(columns) != len(row_data): raise ValueError( f'Expected {len(columns)} columns ({columns}) in the ' f'{benchmark_name} benchmark, received {len(row_data)} ({row_data})' ) row_with_headers = dict(zip(columns, row_data)) for int_column in ('size', 'iterations'): if int_column in row_with_headers: row_with_headers[int_column] = int(row_with_headers[int_column]) data.append(row_with_headers) return data def ParseMpiPinning(lines: Sequence[str]) -> List[str]: """Grabs MPI pinning to CPUs from the log files. Output format is (rank:node id:comma separated CPUs) 0:1:0,24;1:1:1,25;2:1:2,26;3:1:3,27 Rank 0 and 1 are on node 1 and are pinned to CPUs (0,24) and (1,25) Args: lines: Text lines from mpirun output. Returns: Text strings of the MPI pinning """ def _CondenseMpiPinningMultiline(lines: Sequence[str]) -> Sequence[str]: """Returns multi-line MPI pinning info as one line.""" condensed_lines = [] for line in lines: m = _MPI_PIN_RE.search(line) if m: condensed_lines.append(line) else: if condensed_lines and not condensed_lines[-1].endswith('}'): condensed_lines[-1] += line.strip() return condensed_lines ret = [] for line in _CondenseMpiPinningMultiline(lines): row = _MPI_PIN_RE.search(line) if not row: continue nodename = row['nodename'] m2 = _PKB_NODE_RE.match(nodename) if m2: nodename = m2.group('nodeindex') cpuids = ','.join(str(x) for x in _IntRange(row['cpuids'])) ret.append(':'.join((row['rank'], nodename, cpuids))) return ret def _IntRange(mask: str) -> List[int]: """Converts an integer mask into a sorted list of integers. _IntRange('0,1,5-7') == [0, 1, 5, 6, 7] Args: mask: String integer mask as from MPI pinning list Returns: Sorted list of integers from the mask """ ints = [] for raw in mask.split(','): parts = raw.split('-') start = int(parts[0]) end = start if len(parts) == 1 else int(parts[1]) ints.extend(list(range(start, end + 1))) return sorted(ints)