optimum/habana/distributed/distributed_runner.py (164 lines of code) (raw):

# coding=utf-8 # Copyright 2022 The HuggingFace Team. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. ############################################################################### # Copyright (C) 2020-2021 Habana Labs, Ltd. an Intel Company ############################################################################### import os import subprocess import sys from pathlib import Path from typing import List, Union from optimum.utils import logging logger = logging.get_logger(__name__) class DistributedRunner: """ Set up training/inference hardware configurations and run distributed commands. """ def __init__( self, command_list: List = [], world_size: int = 1, hostfile: Union[str, Path] = None, use_mpi: bool = False, use_deepspeed: bool = False, master_port: int = 29500, use_env: bool = False, map_by: bool = "socket", multi_hls=None, ): """ The `DistributedRunner` enables to exectute a command in a distributed way: - On one Gaudi server with MPI, DeepSpeed or `torch.distributed` - On several nodes with DeepSpeed. Args: command_list (List, optional): The list of commands to execute. Defaults to []. world_size (int, optional): The number of devices to use. This is only used for single-node runs. Defaults to 1. hostfile (Union[str, Path], optional): The path to the hostfile specifying the IP addresses and the number of devices to use for each node. This is only used for multi-node runs. Defaults to None. use_mpi (bool, optional): Whether to use OpenMPI for the communication between devices. Defaults to False. use_deepspeed (bool, optional): Wheter to use DeepSpeed. Defaults to False. use_env (bool, optional): Whether to use `--use_env` with `torch.distributed`. Defaults to False. map_by (bool, optional): The mapping unit used for assigning processes with MPI. Defaults to "socket". """ logging.set_verbosity(logging.INFO) logging.enable_default_handler() logging.enable_explicit_format() self._commands = command_list self._world_size = world_size self._hostfile = hostfile self._map_by = map_by self._master_port = master_port self._use_env = use_env self._interpreter = f"{sys.executable} " self._model_env_vars = {} # TODO: remove multi_hls if multi_hls is not None: logger.warning("`multi_hls` is deprecated and will be removed in a future version.") if use_deepspeed and use_mpi: raise ValueError("`use_mpi` and `use_deepspeed` cannot be both True.") if hostfile is not None: if isinstance(self._hostfile, str): self._hostfile = Path(self._hostfile) # Multi-node run if use_deepspeed: self.create_multi_node_setup() else: raise ValueError( "A hostfile is specified to perform a multi-node run. This requires to enable DeepSpeed with" " `use_deepspeed=True`." ) elif self._world_size > 1: # Distributed run if use_deepspeed: # Single-node multi-card run with DeepSpeed self.create_single_node_setup_deepspeed() elif use_mpi: # Single-node multi-card run with MPI self._model_env_vars["MASTER_ADDR"] = "localhost" self._model_env_vars["MASTER_PORT"] = self._master_port self.create_single_node_setup_mpirun() else: # Single-node multi-card run with torch.distributed self.create_single_node_setup() else: # Single-card run logger.warning( "The run will be executed on one device only. Specify `world_size` > 1 or `hostfile` to perform a" " distributed run." ) self.create_single_card_setup(use_deepspeed) def get_peval(self): cmd1 = r"lscpu 2>/dev/null | awk '/Socket\(s\)/ { print $2 }'" cmd2 = r"lscpu 2>/dev/null | awk '/Core\(s\) per socket/ { print $4 }'" with subprocess.Popen( cmd1, shell=True, executable="/bin/bash", stdout=subprocess.PIPE, stderr=subprocess.STDOUT ) as proc: lscpu_output1 = proc.stdout.read() with subprocess.Popen( cmd2, shell=True, executable="/bin/bash", stdout=subprocess.PIPE, stderr=subprocess.STDOUT ) as proc: lscpu_output2 = proc.stdout.read() sockets = int(lscpu_output1) corespsocket = int(lscpu_output2) if corespsocket == 1: # running inside VM? logger.warning(f"Cores per socket is {corespsocket}. Running inside a VM?") logger.warning("Mapping by slot instead of socket") self._map_by = "slot" if self._hostfile: _hls_list = str(os.getenv("MULTI_HLS_IPS", "")).split(",") _world_size = 8 _per_node_processes = int(_world_size / len(_hls_list)) peval = (sockets * corespsocket) // _per_node_processes else: peval = (sockets * corespsocket) // self._world_size return peval, sockets, corespsocket def setup_config_env_mpirun(self): peval, _, _ = self.get_peval() return f"--map-by {self._map_by}:PE={peval}" def create_single_card_setup(self, use_deepspeed=False): """ Single-card setup. """ if use_deepspeed: self._interpreter = f"deepspeed --num_gpus 1 --master_port {self._master_port} " else: self._interpreter = f"{sys.executable} " def create_single_node_setup_mpirun(self): """ Single-node multi-card configuration setup for mpirun. """ mpi_cmd = self.setup_config_env_mpirun() self._interpreter = ( f"mpirun -n {self._world_size} --bind-to core {mpi_cmd} --rank-by core --report-bindings" f" --allow-run-as-root {sys.executable} " ) def create_single_node_setup_deepspeed(self): """ Single-node multi-card configuration setup for DeepSpeed. """ self._interpreter = ( f"deepspeed --num_nodes 1 --num_gpus {self._world_size} --no_local_rank --master_port {self._master_port} " ) def create_single_node_setup(self): """ Single-node multi-card configuration setup. """ use_env_param = "--use_env" if self._use_env else "" self._interpreter = ( f"{sys.executable} -um torch.distributed.run --nproc_per_node={self._world_size} {use_env_param} " ) def create_multi_node_setup(self): """ Multi-node configuration setup for DeepSpeed. """ master_addr = self.process_hostfile() self._interpreter = f"deepspeed --hostfile {self._hostfile} --master_addr {master_addr} --no_local_rank --master_port {self._master_port} " def run(self): """ Runs the desired command with configuration specified by the user. """ try: if self._model_env_vars: print("Running with the following model specific env vars: ") for env_name, env_val in [ *self._model_env_vars.items() ]: # iterate key value pairs of self._model_env_vars print(f"{env_name}={env_val}") if "LD_PRELOAD" in str(env_name) and os.environ.get(str(env_name), None): os.environ[str(env_name)] = str(env_val) + ":" + os.environ.get(str(env_name), None) else: os.environ[str(env_name)] = str(env_val) for command in self._commands: command = self._interpreter + command print(f"{self.__class__.__name__} run(): command = {command}") sys.stdout.flush() sys.stderr.flush() with subprocess.Popen(command, shell=True, executable="/bin/bash") as proc: proc.wait() sys.stdout.flush() sys.stderr.flush() if proc.returncode != 0: logger.error(f"{command} exited with status = {proc.returncode}") return proc.returncode if self._model_env_vars: for env_name in [*self._model_env_vars.keys()]: # iterate keys of self._model_env_vars del os.environ[str(env_name)] except Exception as exc: raise RuntimeError(f"Error in {self.__class__.__name__} run()") from exc def process_hostfile(self) -> str: """ Returns the master address to use for multi-node runs with DeepSpeed. Directly inspired from https://github.com/microsoft/DeepSpeed/blob/316c4a43e0802a979951ee17f735daf77ea9780f/deepspeed/autotuning/utils.py#L145. Returns: str: address of the master node. """ if not self._hostfile.is_file(): raise ValueError(f"Unable to find hostfile at {self._hostfile}.") # e.g., worker-0 slots=16 with self._hostfile.open("r") as file: resource_pool = {} master_addr = None for line in file.readlines(): line = line.strip() if line == "": # skip empty lines continue try: hostname, slots = line.split() _, slot_count = slots.split("=") slot_count = int(slot_count) if master_addr is None: master_addr = hostname except ValueError as err: logger.error("Hostfile is not formatted correctly, unable to proceed with training/inference.") raise err if hostname in resource_pool: logger.error("Hostfile contains duplicate hosts, unable to proceed with training/inference.") raise ValueError(f"Host {hostname} is already defined") resource_pool[hostname] = slot_count return master_addr