perfkitbenchmarker/linux_benchmarks/mlperf_multiworkers_benchmark.py (773 lines of code) (raw):

# Copyright 2019 PerfKitBenchmarker Authors. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Run MLPerf benchmarks on multiple workers.""" import posixpath from absl import flags from perfkitbenchmarker import background_tasks from perfkitbenchmarker import configs from perfkitbenchmarker import errors from perfkitbenchmarker import hpc_util from perfkitbenchmarker import vm_util from perfkitbenchmarker.linux_benchmarks import mlperf_benchmark from perfkitbenchmarker.linux_packages import nvidia_driver from perfkitbenchmarker.linux_packages import tensorflow from perfkitbenchmarker.providers.gcp import gcs from perfkitbenchmarker.providers.gcp import util FLAGS = flags.FLAGS HOSTFILE = 'HOSTFILE' PORT = '4242' DGXSYSTEM = 'DGXA100_multinode' CONFIG = f'config_{DGXSYSTEM}.sh' AWS_EFA_NCCL_BASEAMI_PIPELINE_URL = ( 'https://github.com/aws-samples/aws-efa-nccl-baseami-pipeline.git' ) NVIDIA_EFA_DOCKERFILE = ( 'aws-efa-nccl-baseami-pipeline/nvidia-efa-docker_base/Dockerfile*.base' ) NVIDIA_EFA_DOCKERFILE_COMMIT = '3934b3477993661ae0d29e9f23fbd46b64342301' BENCHMARK_NAME = 'mlperf_multiworkers' BENCHMARK_CONFIG = """ mlperf_multiworkers: description: Runs MLPerf Benchmark on multiple workers. vm_groups: default: disk_spec: *default_500_gb vm_spec: GCP: machine_type: a2-highgpu-8g zone: us-central1-b boot_disk_size: 200 AWS: machine_type: p4d.24xlarge zone: us-west-2a boot_disk_size: 200 Azure: machine_type: Standard_ND96asr_v4 zone: westus2 boot_disk_size: 200 image: microsoft-dsvm:ubuntu-hpc:1804:latest vm_count: null """ flags.DEFINE_boolean( 'mlperf_keep_nccl_log', False, 'whether to keep NCCL debug information' ) def GetConfig(user_config): """Load and return benchmark config. Args: user_config: user supplied configuration (flags and config file) Returns: loaded benchmark configuration """ config = configs.LoadConfig(BENCHMARK_CONFIG, user_config, BENCHMARK_NAME) if 'tpu_groups' in config: raise errors.Setup.InvalidFlagConfigurationError( 'Invalid configuration. ' 'The multiworker mlperf benchmark cannot run with TPUs' ) return config def CheckPrerequisites(_): """Verify that the required prerequisites are met. Args: _: Unused. Raises: perfkitbenchmarker.errors.Setup.InvalidFlagConfigurationError: On invalid flag configuration. """ if not FLAGS.openmpi_enable_shared: raise errors.Setup.InvalidFlagConfigurationError( 'The flag openmpi_enable_shared must be True ' 'in order to run with multiple workers.' ) def _UpdateBenchmarkSpecWithFlags(benchmark_spec): """Update the benchmark_spec with supplied command line flags. Args: benchmark_spec: benchmark specification to update """ gpus_per_vm = nvidia_driver.QueryNumberOfGpus(benchmark_spec.vms[0]) num_vms = len(benchmark_spec.vms) total_num_gpus = gpus_per_vm * num_vms benchmark_spec.gpus_per_vm = gpus_per_vm benchmark_spec.num_vms = num_vms benchmark_spec.total_num_gpus = total_num_gpus benchmark_spec.zones = FLAGS.zone # pylint: disable=protected-access mlperf_benchmark._UpdateBenchmarkSpecWithFlags(benchmark_spec) # pylint: enable=protected-access storage_service = gcs.GoogleCloudStorageService() benchmark_spec.storage_service = storage_service if FLAGS.mlperf_bucket: benchmark_spec.bucket = FLAGS.mlperf_bucket benchmark_spec.model_dir = 'gs://{bucket}/pkb-{uri}'.format( bucket=FLAGS.mlperf_bucket, uri=FLAGS.run_uri ) else: benchmark_spec.bucket = None benchmark_spec.model_dir = None def _PrepareWorker(vm): """Install and set up cuda + openmpi on the target vm. Args: vm: The target vm """ vm.Install('cuda_toolkit') vm.Install('openmpi') vm.AuthenticateVm() def _PrepareMLPerfBenchmark(benchmark_spec, node_rank): """Install and set up MLPerf on the target vm. Args: benchmark_spec: The benchmark specification node_rank: int, The rank of the node for multi-node distributed training """ vm = benchmark_spec.vms[node_rank] mlperf_benchmark.PrepareBenchmark(benchmark_spec, vm) def _PrepareMLPerfRunner(benchmark_spec, node_rank): """Install and set up MLPerf on the target vm. Args: benchmark_spec: The benchmark specification node_rank: int, The rank of the node for multi-node distributed training """ vm = benchmark_spec.vms[node_rank] mlperf_benchmark.PrepareRunner(benchmark_spec, vm) def _SedPairsToString(pairs): """Convert a list of sed pairs to a string for the sed command. Args: pairs: a list of pairs, indicating the replacement requests Returns: a string to supply to the sed command """ sed_str = '; '.join(['s/%s/%s/g' % pair for pair in pairs]) if pairs: sed_str += ';' return sed_str def _DictToString(dictionary): """Convert a dictionary to a space separated 'key=value' string. Args: dictionary: the key-value dictionary to be convert Returns: a string representing the dictionary """ dict_str = ' '.join( ' {key}={value}'.format(key=key, value=value) for key, value in sorted(dictionary.items()) ) return dict_str def _GetNcclParam(): for extra_param in FLAGS.nccl_extra_params: param_key, param_value = extra_param.split('=', 1) param_value = param_value.replace('/', '\\/').replace('$', '\\$') yield rf'export {param_key}={param_value}' def _GetNcclParams(): return r'\n'.join(_GetNcclParam()) def _GetChangesForTransformer( benchmark_spec, vm, script_path, nvprof_flags, config_sed_input, run_sed_input, run_and_time_sed_input, ): """Get changes to config and run scripts for Transformer. Also updates run_training.sh on the vm. Args: benchmark_spec: The benchmark specification. vm: The target vm. script_path: The location of scripts on vm. nvprof_flags: The flags for nvprof. config_sed_input: Input list of sed pairs for config_DGXA100_multi.sh. run_sed_input: Input list of sed pairs for run.sub. run_and_time_sed_input: Input list of sed pairs for run_and_time.sh. Returns: config_sed_output: Output list of sed pairs for config_DGXA100_multi.sh. run_sed_output: Output list of sed pairs for run.sub. run_and_time_sed_output: Output list of sed pairs for run_and_time.sh. """ config_sed = config_sed_input run_sed = run_sed_input run_and_time_sed = run_and_time_sed_input per_gpu_batch_size = min(8192, 614400 / benchmark_spec.total_num_gpus) run_training_sed = [] config_sed += [( r'MAX_TOKENS=.*', r'MAX_TOKENS={per_gpu_batch_size}'.format( per_gpu_batch_size=per_gpu_batch_size ), )] if mlperf_benchmark.NVPROF in FLAGS.mlperf_profiler: run_training_sed += [( r'python', r'nvprof {nvprof_flags} python'.format(nvprof_flags=nvprof_flags), )] run_training_sed += [( r'--max-epoch.*', r'--max-update {profile_steps} \\\\'.format( profile_steps=FLAGS.mlperf_profile_steps ), )] vm.RemoteCommand( rf'cd {script_path} && ' rf'sed "{mlperf_benchmark.SedPairsToString(run_training_sed)}" ' r'run_training.sh > run_training1.sh && chmod 755 run_training1.sh' ) run_sed += [( r'sleep infinity', r' bash -c \"\x27 cp \/workspace\/{model}1\/*.sh ' r'\/workspace\/translation\/ \&\& sleep infinity\x27 \"'.format( model=benchmark_spec.benchmark ), )] return config_sed, run_sed, run_and_time_sed def _GetChangesForSSD( benchmark_spec, nvprof_flags, config_sed_input, run_sed_input, run_and_time_sed_input, ): """Get changes to config and run scripts for SSD. Args: benchmark_spec: The benchmark specification. nvprof_flags: The flags for nvprof. config_sed_input: Input list of sed pairs for config_DGXA100_multi.sh. run_sed_input: Input list of sed pairs for run.sub. run_and_time_sed_input: Input list of sed pairs for run_and_time.sh. Returns: config_sed_output: Output list of sed pairs for config_DGXA100_multi.sh. run_sed_output: Output list of sed pairs for run.sub. run_and_time_sed_output: Output list of sed pairs for run_and_time.sh. """ config_sed = config_sed_input run_sed = run_sed_input run_and_time_sed = run_and_time_sed_input per_gpu_train_batch_size = min(24, 1680 / benchmark_spec.total_num_gpus) per_gpu_eval_batch_size = 40 config_sed += [( r'--batch-size .*', r'--batch-size \"{per_gpu_train_batch_size}\"'.format( per_gpu_train_batch_size=per_gpu_train_batch_size ), )] config_sed += [( r'--eval-batch-size .*', r'--eval-batch-size \"{per_gpu_eval_batch_size}\"'.format( per_gpu_eval_batch_size=per_gpu_eval_batch_size ), )] if mlperf_benchmark.NVPROF in FLAGS.mlperf_profiler: run_and_time_sed += [( r'python', r'nvprof {nvprof_flags} python'.format(nvprof_flags=nvprof_flags), )] run_and_time_sed += [(r'--epochs .*', r'--epochs 1 \\\\')] run_sed += [( r'sleep infinity', r' bash -c \"\x27 cp \/workspace\/{model}1\/*.sh ' r'\/workspace\/single_stage_detector\/ \&\& ' r'sleep infinity\x27 \"'.format(model=benchmark_spec.benchmark), )] return config_sed, run_sed, run_and_time_sed def _GetChangesForGNMT( benchmark_spec, nvprof_flags, config_sed_input, run_sed_input, run_and_time_sed_input, ): """Get changes to config and run scripts for GNMT. Args: benchmark_spec: The benchmark specification. nvprof_flags: The flags for nvprof. config_sed_input: Input list of sed pairs for config_DGXA100_multi.sh. run_sed_input: Input list of sed pairs for run.sub. run_and_time_sed_input: Input list of sed pairs for run_and_time.sh. Returns: config_sed_output: Output list of sed pairs for config_DGXA100_multi.sh. run_sed_output: Output list of sed pairs for run.sub. run_and_time_sed_output: Output list of sed pairs for run_and_time.sh. """ config_sed = config_sed_input run_sed = run_sed_input run_and_time_sed = run_and_time_sed_input per_gpu_train_batch_size = 32 per_gpu_eval_batch_size = min(16, 3072 / benchmark_spec.total_num_gpus) config_sed += [( r'TRAIN_BATCH_SIZE=.*', r'TRAIN_BATCH_SIZE={per_gpu_train_batch_size}'.format( per_gpu_train_batch_size=per_gpu_train_batch_size ), )] config_sed += [( r'TEST_BATCH_SIZE=.*', r'TEST_BATCH_SIZE={per_gpu_eval_batch_size}'.format( per_gpu_eval_batch_size=per_gpu_eval_batch_size ), )] if mlperf_benchmark.NVPROF in FLAGS.mlperf_profiler: run_and_time_sed += [( r'python', r'nvprof {nvprof_flags} python'.format(nvprof_flags=nvprof_flags), )] run_and_time_sed += [(r'--epochs .*', r'--epochs \"1\" \\\\')] run_sed += [( r'sleep infinity', r' bash -c \"\x27 cp \/workspace\/{model}1\/*.sh ' r'\/workspace\/rnn_translator\/ \&\& sleep infinity\x27 \"'.format( model=benchmark_spec.benchmark ), )] return config_sed, run_sed, run_and_time_sed def _GetChangesForMask( benchmark_spec, node_rank, script_path, nvprof_flags, config_sed_input, run_sed_input, run_and_time_sed_input, ): """Get changes to config and run scripts for MaskRCNN. Also update train_mlperf.py if nvprof is used. Args: benchmark_spec: The benchmark specification. node_rank: int, The rank of the node for multi-node distributed training script_path: The location of scripts on vm. nvprof_flags: The flags for nvprof. config_sed_input: Input list of sed pairs for config_DGXA100_multi.sh. run_sed_input: Input list of sed pairs for run.sub. run_and_time_sed_input: Input list of sed pairs for run_and_time.sh. Returns: config_sed_output: Output list of sed pairs for config_DGXA100_multi.sh. run_sed_output: Output list of sed pairs for run.sub. run_and_time_sed_output: Output list of sed pairs for run_and_time.sh. """ vm = benchmark_spec.vms[node_rank] master_vm = benchmark_spec.vms[0] config_sed = config_sed_input run_sed = run_sed_input run_and_time_sed = run_and_time_sed_input nsockets_per_node = vm.CheckLsCpu().socket_count ncores_per_socket = vm.CheckLsCpu().cores_per_socket nproc_per_node = nvidia_driver.QueryNumberOfGpus(vm) nnodes = benchmark_spec.num_vms dist_world_size = nproc_per_node * nnodes if FLAGS.mlperf_training_version == 'v2.0': vm.RemoteCommand( 'git clone https://github.com/mlcommons/training_results_v1.1.git' ) vm.RemoteCommand( 'cp training_results_v1.1/NVIDIA/benchmarks/maskrcnn/implementations/pytorch/bind_launch.py' ' training_results_v2.0/NVIDIA/benchmarks/maskrcnn/implementations/pytorch/' ) config_sed += [( r'WALLTIME_MINUTES=30', ( r'WALLTIME_MINUTES=30\n' r'export CONT=mlperf-nvidia:object_detection\n' r'export DATADIR=\/data\n' r'export PKLDIR=\/data\/coco2017\/pkl_coco\n' rf'export MASTER_ADDR={master_vm.internal_ip}\n' rf'export MASTER_PORT={PORT}\n' rf'export WORLD_SIZE={dist_world_size}\n' rf'export RANK={node_rank}\n' r'export NEXP=1' ), )] if FLAGS.mlperf_maskrcnn_batch_size: config_sed.append( (r'BATCHSIZE=.*', rf'BATCHSIZE={FLAGS.mlperf_maskrcnn_batch_size}') ) hyperthreads = '' if FLAGS.mlperf_hyperthreads else "'--no_hyperthreads'" run_and_time_sed.append(( r' CMD=.*', r' CMD=( ' r"'python' " r"'-u' " r"'-m' " r"'bind_launch' " f'{hyperthreads} ' f"'--nnodes={nnodes}' " f"'--node_rank={node_rank}' " f"'--master_addr={master_vm.internal_ip}' " f"'--master_port={PORT}' " f"'--nsockets_per_node={nsockets_per_node}' " f"'--ncores_per_socket={ncores_per_socket}' " f"'--nproc_per_node={nproc_per_node}' " ')', )) if mlperf_benchmark.NVPROF in FLAGS.mlperf_profiler: run_and_time_sed += [( r'python', r'nvprof {nvprof_flags} python'.format(nvprof_flags=nvprof_flags), )] vm.RemoteCommand( r'cd {script_path} && ' r'cp tools/train_mlperf.py tools/train_mlperf0.py && ' r'sed "s/min_bbox_map=.*/min_bbox_map=0.01,/g; ' r' s/min_segm_map=.*/min_segm_map=0.01)/g;" ' r' tools/train_mlperf0.py > tools/train_mlperf.py'.format( script_path=script_path ) ) run_sed += [(r'SYSLOGGING=1', r'SYSLOGGING=0')] run_sed += [(r'.*run_and_time', r'.\/run_and_time')] return config_sed, run_sed, run_and_time_sed def _GetChangesForResnet( benchmark_spec, node_rank, nvprof_flags, config_sed_input, run_sed_input, run_and_time_sed_input, ): """Get changes to config and run scripts for Resnet. Args: benchmark_spec: The benchmark specification. node_rank: int, The rank of the node for multi-node distributed training nvprof_flags: The flags for nvprof. config_sed_input: Input list of sed pairs for config_DGXA100_multi.sh. run_sed_input: Input list of sed pairs for run.sub. run_and_time_sed_input: Input list of sed pairs for run_and_time.sh. Returns: config_sed_output: Output list of sed pairs for config_DGXA100_multi.sh. run_sed_output: Output list of sed pairs for run.sub. run_and_time_sed_output: Output list of sed pairs for run_and_time.sh. """ config_sed = config_sed_input run_sed = run_sed_input run_and_time_sed = run_and_time_sed_input hosts = ','.join( f'{vm.internal_ip}:{benchmark_spec.gpus_per_vm}' for vm in benchmark_spec.vms ) np = benchmark_spec.gpus_per_vm * benchmark_spec.num_vms config_sed.append(( r'.*config_DGXA100_common\.sh', ( r'export CONT=mlperf-nvidia:image_classification\n' r'export DATADIR=\/data\/imagenet\n' rf'export DISTRIBUTED=\'horovodrun -H {hosts} -p {PORT} -np' rf' {np}\'' ), )) if FLAGS.mlperf_resnet_batch_size: config_sed.append( (r'BATCHSIZE=.*', rf'BATCHSIZE={FLAGS.mlperf_resnet_batch_size}') ) if mlperf_benchmark.NVPROF in FLAGS.mlperf_profiler: run_and_time_sed += [( r'python', r'nvprof {nvprof_flags} python'.format(nvprof_flags=nvprof_flags), )] run_and_time_sed += [( r'num-epochs.*', r'num-epochs \"1\"\n' r' --epoch-size \"{profile_steps}\"'.format( profile_steps=FLAGS.mlperf_profile_steps ), )] run_and_time_sed.append((r'BIND=.*', r'BIND=')) run_sed += [( r'_cont_mounts=(', r'_cont_mounts=(\"--volume=\$HOME\/.ssh:\/tmp\/.ssh\" ', )] run_sed += [(r'.*run_and_time', r'.\/run_and_time')] if node_rank == 0: run_sed += [( r'sleep infinity', r'bash -c \"cp -r \/tmp\/.ssh \/root\/.ssh;sleep infinity\"', )] else: run_sed += [( r'sleep infinity', r'bash -c \"cp -r \/tmp\/.ssh \/root\/.ssh;' r'apt update;' r'apt-get install -y openssh-server;' r'systemctl enable ssh;' r'mkdir -p \/run\/sshd;' rf'\/usr\/sbin\/sshd -p {PORT};' r'sleep infinity\"', )] run_sed += [(r'.*run_and_time.*', r'hostname')] run_sed += [(r'trap.*', r'')] return config_sed, run_sed, run_and_time_sed def _GetChangesForBert( benchmark_spec, node_rank, nvprof_flags, config_sed_input, run_sed_input, run_and_time_sed_input, ): """Get changes to config and run scripts for BERT. Also update train_mlperf.py if nvprof is used. Args: benchmark_spec: The benchmark specification. node_rank: int, The rank of the node for multi-node distributed training nvprof_flags: The flags for nvprof. config_sed_input: Input list of sed pairs for config_DGXA100_multi.sh. run_sed_input: Input list of sed pairs for run.sub. run_and_time_sed_input: Input list of sed pairs for run_and_time.sh. Returns: config_sed_output: Output list of sed pairs for config_DGXA100_multi.sh. run_sed_output: Output list of sed pairs for run.sub. run_and_time_sed_output: Output list of sed pairs for run_and_time.sh. """ vm = benchmark_spec.vms[node_rank] master_vm = benchmark_spec.vms[0] config_sed = config_sed_input run_sed = run_sed_input run_and_time_sed = run_and_time_sed_input nsockets_per_node = vm.CheckLsCpu().socket_count ncores_per_socket = vm.CheckLsCpu().cores_per_socket nproc_per_node = nvidia_driver.QueryNumberOfGpus(vm) nnodes = benchmark_spec.num_vms dist_world_size = nproc_per_node * nnodes config_sed += [( r'.*config_DGXA100_common\.sh', r'export CONT=mlperf-nvidia:language_model\n' r'export NEXP=1\n' rf'export MASTER_ADDR={master_vm.internal_ip}\n' rf'export MASTER_PORT={PORT}\n' rf'export WORLD_SIZE={dist_world_size}\n' rf'export RANK={node_rank}\n', )] config_sed.append(( r'DATADIR=.*', r'DATADIR=\/data\/bert_data\/hdf5\/training-4320\/hdf5_4320_shards_varlength', )) config_sed.append(( r'DATADIR_PHASE2=.*', r'DATADIR_PHASE2=\/data\/bert_data\/hdf5\/training-4320\/hdf5_4320_shards_varlength', )) config_sed.append( (r'EVALDIR=.*', r'EVALDIR=\/data\/bert_data\/hdf5\/eval_varlength') ) config_sed.append( (r'CHECKPOINTDIR=.*', r'CHECKPOINTDIR=\/data\/bert_data\/phase1') ) config_sed.append(( r'CHECKPOINTDIR_PHASE1=.*', r'CHECKPOINTDIR_PHASE1=\/data\/bert_data\/phase1', )) if FLAGS.mlperf_bert_batch_size: config_sed.append( (r'BATCHSIZE=.*', rf'BATCHSIZE={FLAGS.mlperf_bert_batch_size}') ) if mlperf_benchmark.NVPROF in FLAGS.mlperf_profiler: run_and_time_sed += [(r'python', rf'nvprof {nvprof_flags} python')] if FLAGS.mlperf_training_version == 'v2.0': vm.RemoteCommand( 'git clone https://github.com/mlcommons/training_results_v1.1.git' ) vm.RemoteCommand( 'cp training_results_v1.1/NVIDIA/benchmarks/bert/implementations/pytorch/bind_pyt.py' ' training_results_v2.0/NVIDIA/benchmarks/bert/implementations/pytorch/' ) hyperthreads = '' if FLAGS.mlperf_hyperthreads else "'--no_hyperthreads'" run_and_time_sed.append(( r' CMD=.*', r' CMD=( ' r"'python' " r"'-u' " r"'-m' " r"'bind_pyt' " f'{hyperthreads} ' f"'--nnodes={nnodes}' " f"'--node_rank={node_rank}' " f"'--master_addr={master_vm.internal_ip}' " f"'--master_port={PORT}' " f"'--nsockets_per_node={nsockets_per_node}' " f"'--ncores_per_socket={ncores_per_socket}' " f"'--nproc_per_node={nproc_per_node}' " ')', )) run_sed += [(r'.*run_and_time', r'.\/run_and_time')] return config_sed, run_sed, run_and_time_sed def _UpdateScripts(benchmark_spec, node_rank): """Update the running scripts on the target vm. Args: benchmark_spec: The benchmark specification. node_rank: int, The rank of the node for multi-node distributed training """ vm = benchmark_spec.vms[node_rank] benchmark = benchmark_spec.benchmark # TODO(tohaowu) Change config and script using a patch file. # request pairs to the sed command # each pair('str_A', 'str_B') indicates a request "replace anything # matching str_A to str_B" for a specific file config_sed = [] config_sed += [(r'DGXSYSTEM=.*', rf'DGXSYSTEM=\"{DGXSYSTEM}\"')] config_sed += [( r'DGXNNODES=.*', r'DGXNNODES={num_vms}'.format(num_vms=benchmark_spec.num_vms), )] config_sed += [( r'DGXNGPU=.*', ( rf'DGXNGPU={benchmark_spec.gpus_per_vm}\nexport' rf' CUDA_VISIBLE_DEVICES={",".join([str(i) for i in range(benchmark_spec.gpus_per_vm)])}' ), )] config_sed += [( r'DGXNSOCKET=.*', r'DGXNSOCKET={nsockets}'.format(nsockets=vm.CheckLsCpu().socket_count), )] config_sed += [( r'DGXSOCKETCORES=.*', r'DGXSOCKETCORES={ncores}'.format( ncores=vm.CheckLsCpu().cores_per_socket ), )] run_and_time_sed = [] run_and_time_sed += [(r'run_training.sh', r'run_training1.sh')] run_and_time_sed += [(r'DGXSYSTEM=.*', rf'DGXSYSTEM=\"{DGXSYSTEM}\"')] if FLAGS.mlperf_keep_nccl_log: run_and_time_sed += [( r'#\!\/bin\/bash', ( r'#\!\/bin\/bash\n' r'export NCCL_DEBUG=INFO\n' r'export NCCL_DEBUG_SUBSYS=ALL\n' r'export NCCL_DEBUG_FILE=\/results\/%h.%p.nccl' ), )] nccl_exports = _GetNcclParams() if FLAGS.nccl_extra_params else r'' run_and_time_sed += [( r'#!\/bin\/bash', r'#!\/bin\/bash\n' rf'{nccl_exports}', )] run_sed = [] run_sed += [(r'SYSLOGGING=1', r'SYSLOGGING=0')] run_sed += [( r'env [|] grep SLURM', r'export SLURM_NNODES={num_vms}'.format(num_vms=benchmark_spec.num_vms), )] run_sed += [( r'data -v \$LOGDIR', r'data -v \$(pwd):\/workspace\/{model}1 -v \$LOGDIR'.format( model=benchmark ), )] run_sed += [( r'scontrol show hostname', r'mpirun -hostfile \$HOME\/{hostfile} -N 1 hostname -I ' r'\| awk \'{{print \$1}}\' '.format(hostfile=HOSTFILE), )] run_sed += [( r'srun --mem=0 -N 1 -n 1 -w \$hostn', r'mpirun -N 1 -n 1 -H \$hostn', )] run_sed += [(r'sleep 30', r'sleep 60')] run_sed += [(r'docker exec -it', r'docker exec -t')] run_sed += [(r'run_and_time.sh', r'run_and_time1.sh')] if FLAGS.aws_efa or FLAGS.azure_infiniband: stdout, _ = vm.RemoteCommand('ls -d /dev/infiniband/*') devices = [device.replace('/', '\\/') for device in stdout.split()] device_args = ' '.join(f'--device={device}' for device in devices) run_sed += [(r'nvidia-docker run', rf'nvidia-docker run {device_args}')] if FLAGS.azure_infiniband: run_sed.append(( r'_cont_mounts=(', r'_cont_mounts=(\"--volume=\/opt\/microsoft:\/opt\/microsoft\" ', )) run_sed.append(( r'^CONT_MOUNTS=\(.*\)$', r'CONT_MOUNTS=\"\1 --volume=\/opt\/microsoft:\/opt\/microsoft\"', )) nvprof_flags = r'-f -o \/results\/%h.%p.nvprof --profile-child-processes' script_path = ( r'$HOME/training_results_{version}/NVIDIA/benchmarks/{model}' r'/implementations/{framework}'.format( version=FLAGS.mlperf_training_version, model='maskrcnn' if mlperf_benchmark.MASK in benchmark else benchmark, framework='mxnet' if mlperf_benchmark.RESNET in benchmark else 'pytorch', ) ) config_files = [CONFIG] if mlperf_benchmark.TRANSFORMER in benchmark: config_sed, run_sed, run_and_time_sed = _GetChangesForTransformer( benchmark_spec, vm, script_path, nvprof_flags, config_sed, run_sed, run_and_time_sed, ) elif mlperf_benchmark.SSD in benchmark: config_sed, run_sed, run_and_time_sed = _GetChangesForSSD( benchmark_spec, nvprof_flags, config_sed, run_sed, run_and_time_sed ) elif mlperf_benchmark.GNMT in benchmark: config_sed, run_sed, run_and_time_sed = _GetChangesForGNMT( benchmark_spec, nvprof_flags, config_sed, run_sed, run_and_time_sed ) elif mlperf_benchmark.MASK in benchmark: config_sed, run_sed, run_and_time_sed = _GetChangesForMask( benchmark_spec, node_rank, script_path, nvprof_flags, config_sed, run_sed, run_and_time_sed, ) config_files = ['config_DGXA100_multi_8x8x2.sh'] elif mlperf_benchmark.RESNET in benchmark: config_sed, run_sed, run_and_time_sed = _GetChangesForResnet( benchmark_spec, node_rank, nvprof_flags, config_sed, run_sed, run_and_time_sed, ) config_files = ['config_DGXA100_common.sh', 'config_DGXA100_multi_8x8x*.sh'] mlperf_benchmark.UpdateScriptForSmallGpuMem(vm) elif mlperf_benchmark.BERT in benchmark: config_sed, run_sed, run_and_time_sed = _GetChangesForBert( benchmark_spec, node_rank, nvprof_flags, config_sed, run_sed, run_and_time_sed, ) config_files = ['config_DGXA100_common.sh', 'config_DGXA100_8x8x48x1.sh'] vm.RemoteCommand( f'cd {script_path} && ' f'sed "{mlperf_benchmark.SedPairsToString(config_sed)}" ' f'{" ".join(config_files)} > {CONFIG} && ' f'chmod 755 {CONFIG} ' ) vm.RemoteCommand( f'cd {script_path} && ' f'sed "{mlperf_benchmark.SedPairsToString(run_and_time_sed)}" ' f'run_and_time.sh | sed "2 i source {CONFIG}" > run_and_time1.sh && ' 'chmod 755 run_and_time1.sh ' ) vm.RemoteCommand( f'cd {script_path} && ' f'sed "{mlperf_benchmark.SedPairsToString(run_sed)}" run_with_docker.sh ' f'| sed "2 i source {CONFIG}" > run_with_docker1.sh && ' 'chmod 755 run_with_docker1.sh' ) docker_file = posixpath.join(script_path, 'Dockerfile') if FLAGS.nccl_net_plugin: vm_util.ReplaceText( vm, 'RUN apt-get update', r'RUN echo \"deb https:\/\/packages.cloud.google.com\/apt ' r'google-fast-socket main\" | ' r'tee \/etc\/apt\/sources.list.d\/google-fast-socket.list\n' r'RUN curl -s -L ' r'https:\/\/packages.cloud.google.com\/apt\/doc\/apt-key.gpg | ' r'apt-key add -\n' r'RUN rm -f \/opt\/hpcx\/nccl_rdma_sharp_plugin\/lib\/libnccl-net.so\n' r'RUN apt-get update', docker_file, ) vm_util.ReplaceText( vm, 'apt-get install -y --no-install-recommends', 'apt-get install -y --no-install-recommends google-fast-socket', docker_file, ) if FLAGS.aws_efa: vm.RemoteCommand(f'git clone {AWS_EFA_NCCL_BASEAMI_PIPELINE_URL}') vm.RemoteCommand( 'cd aws-efa-nccl-baseami-pipeline && git checkout' f' {NVIDIA_EFA_DOCKERFILE_COMMIT}' ) vm.RemoteCommand(f'cat {NVIDIA_EFA_DOCKERFILE} >> {docker_file}') vm.RemoteCommand( 'echo "RUN rm -rf /opt/hpcx/ompi && ln -s /opt/amazon/openmpi' f' /opt/hpcx/ompi" >> {docker_file}' ) vm_util.ReplaceText(vm, 'FROM nvcr.*', '', docker_file) vm_util.ReplaceText(vm, 'yum-utils.*', '', docker_file) vm_util.ReplaceText( vm, 'python3-distutils.*', 'python3-distutils', docker_file ) vm_util.ReplaceText(vm, 'cmake', '', docker_file) def _PrepareBucket(benchmark_spec): """Prepare storage bucket for profiling results, if needed. Args: benchmark_spec: The benchmark specification """ if ( mlperf_benchmark.NONE in FLAGS.mlperf_profiler and not FLAGS.mlperf_keep_nccl_log ): return if FLAGS.cloud != 'GCP': return location = benchmark_spec.zones[0] bucket = benchmark_spec.bucket storage_service = benchmark_spec.storage_service storage_service.PrepareService(util.GetRegionFromZone(location)) storage_service.MakeBucket(bucket, raise_on_failure=False) storage_service.AclBucket( benchmark_spec.gcp_service_account, gcs.WRITER, bucket ) def _ClearTmpDirectory(benchmark_spec, node_rank): vm = benchmark_spec.vms[node_rank] vm.RemoteCommand( r'sudo rm -rf {dir}'.format( dir=posixpath.join(vm_util.VM_TMP_DIR, benchmark_spec.benchmark) ) ) def Prepare(benchmark_spec): """Install and set up MLPerf on multiple vms. Args: benchmark_spec: The benchmark specification """ vms = benchmark_spec.vms background_tasks.RunThreaded(_PrepareWorker, vms) _UpdateBenchmarkSpecWithFlags(benchmark_spec) list_params = [ ((benchmark_spec, node_rank), {}) for node_rank in range(len(vms)) ] _PrepareBucket(benchmark_spec) background_tasks.RunThreaded(_ClearTmpDirectory, list_params) background_tasks.RunThreaded(_PrepareMLPerfBenchmark, list_params) background_tasks.RunThreaded(_UpdateScripts, list_params) background_tasks.RunThreaded(_PrepareMLPerfRunner, list_params) hpc_util.CreateMachineFile( vms, lambda _: benchmark_spec.gpus_per_vm, HOSTFILE ) vms[0].RemoteCommand('sleep 30') def _CreateMetadataDict(benchmark_spec): """Create metadata dict to be used in run results. Args: benchmark_spec: The benchmark specification. Contains all data that is required to run the benchmark. Returns: metadata dict """ metadata = { 'use_tpu': bool(benchmark_spec.tpus), 'model_dir': benchmark_spec.model_dir, 'model': benchmark_spec.benchmark, 'version': FLAGS.mlperf_training_version, } return metadata def MakeSamplesFromOutput(metadata, output, model=mlperf_benchmark.RESNET): """Create samples containing metrics. Args: metadata: dict contains all the metadata that reports. output: string, command output model: string, model name Example output: perfkitbenchmarker/tests/linux_benchmarks/mlperf_benchmark_test.py Returns: Samples containing training metrics. """ return mlperf_benchmark.MakeSamplesFromOutput( metadata, output, use_tpu=False, model=model ) def Run(benchmark_spec): """Run MLPerf on the cluster. Args: benchmark_spec: The benchmark specification. Contains all data that is required to run the benchmark. Returns: A list of sample.Sample objects. """ _UpdateBenchmarkSpecWithFlags(benchmark_spec) vms = benchmark_spec.vms master_vm = vms[0] benchmark = benchmark_spec.benchmark env_params = {} env_params['SLURM_JOB_ID'] = r'{uri}'.format(uri=FLAGS.run_uri) env_params['PULL'] = 0 env_params['DGXSYSTEM'] = DGXSYSTEM env_params['NEXP'] = 1 env_params['LOGDIR'] = posixpath.join(vm_util.VM_TMP_DIR, benchmark) script_path = ( '$HOME/training_results_{version}/NVIDIA/benchmarks/{model}' r'/implementations/{framework}'.format( version=FLAGS.mlperf_training_version, model='maskrcnn' if mlperf_benchmark.MASK in benchmark else benchmark, framework='mxnet' if mlperf_benchmark.RESNET in benchmark else 'pytorch', ) ) benchmark_env_params = { mlperf_benchmark.TRANSFORMER: { 'CONT': r'"mlperf-nvidia:translation"', 'DATADIR': r'/data/wmt/utf8', }, mlperf_benchmark.SSD: { 'CONT': r'"mlperf-nvidia:single_stage_detector"', 'DATADIR': '/data', }, mlperf_benchmark.GNMT: { 'CONT': r'"mlperf-nvidia:rnn_translator"', 'DATADIR': r'/data/gnmt', }, mlperf_benchmark.MASK: {}, mlperf_benchmark.RESNET: {}, mlperf_benchmark.BERT: {}, } env_params.update(benchmark_env_params.get(benchmark, {})) if mlperf_benchmark.RESNET in benchmark: env_params['SLURM_JOB_NUM_NODES'] = benchmark_spec.num_vms env = r'' if nvidia_driver.CheckNvidiaGpuExists(master_vm): env = tensorflow.GetEnvironmentVars(master_vm) cmd = ( f'cd {script_path} && ' f'{env} {_DictToString(env_params)} ' f'{FLAGS.nccl_mpi} ' '--allow-run-as-root ' '-hostfile $HOME/HOSTFILE ' '--mca pml ^cm ' '--mca btl tcp,self ' '--mca btl_tcp_if_exclude docker0,lo ' '--bind-to none ' '-N 1 ' './run_with_docker1.sh' ) if ( mlperf_benchmark.NVPROF in FLAGS.mlperf_profiler or FLAGS.mlperf_keep_nccl_log ): cmd += r' && cp /tmp/pkb/cmd* {logdir}'.format( logdir=posixpath.join(vm_util.VM_TMP_DIR, benchmark) ) samples = [] metadata = _CreateMetadataDict(benchmark_spec) stdout, _ = master_vm.RobustRemoteCommand(cmd) if mlperf_benchmark.NONE in FLAGS.mlperf_profiler: samples.extend(MakeSamplesFromOutput(metadata, stdout, model=benchmark)) if ( mlperf_benchmark.NVPROF in FLAGS.mlperf_profiler or FLAGS.mlperf_keep_nccl_log ): master_vm.RemoteCommand( r'mkdir -p /data/aggregated/{model}'.format(model=benchmark) ) master_vm.RemoteCommand( r'mpirun -hostfile $HOME/{hostfile} -N 1 scp -r {logdir} ' r'{master_ip}:/data/aggregated/'.format( hostfile=HOSTFILE, logdir=posixpath.join(vm_util.VM_TMP_DIR, benchmark), master_ip=master_vm.internal_ip, ) ) return samples def Cleanup(benchmark_spec): """Cleanup MLPerf on the cluster. Args: benchmark_spec: The benchmark specification. Contains all data that is required to run the benchmark. """ del benchmark_spec # Unused. pass