in src/accelerate/state.py [0:0]
def __init__(self, cpu: bool = False, **kwargs):
self.__dict__ = self._shared_state
if not self.initialized:
self._cpu = cpu
self.backend = None
env_device = os.environ.get("ACCELERATE_TORCH_DEVICE", None)
self.device = torch.device(env_device) if env_device is not None else None
self.debug = parse_flag_from_env("ACCELERATE_DEBUG_MODE")
use_sagemaker_dp = kwargs.pop("_use_sagemaker_dp", None)
dist_information = None
if use_sagemaker_dp is None:
use_sagemaker_dp = (
os.environ.get("ACCELERATE_USE_SAGEMAKER", "false") == "true"
and os.environ.get("ACCELERATE_SAGEMAKER_DISTRIBUTED_TYPE") != SageMakerDistributedType.NO
)
# Sets up self.backend + imports
original_backend = kwargs.pop("backend", None)
backend, distributed_type = self._prepare_backend(cpu, use_sagemaker_dp, original_backend)
if original_backend is not None and backend != original_backend:
raise ValueError(f"Your assigned backend {original_backend} is not avaliable, please use {backend}")
self.backend = backend
self.distributed_type = distributed_type
use_deepspeed = False
if not cpu and self.backend != "xla":
if int(os.environ.get("LOCAL_RANK", -1)) != -1:
# Deal with spawning deepspeed
if os.environ.get("ACCELERATE_USE_DEEPSPEED", "false") == "true":
if not is_deepspeed_available():
raise ImportError(
"DeepSpeed is not available => install it using `pip3 install deepspeed` or build it from source"
)
from deepspeed import comm as dist
if not dist.is_initialized():
if self.backend == "tccl":
local_rank = os.environ.get("LOCAL_RANK", -1)
torch.sdaa.set_device(f"sdaa:{local_rank}")
if (
self.backend == "nccl"
and os.environ.get("ACCELERATE_USE_FSDP", "false") == "true"
and os.environ.get("FSDP_OFFLOAD_PARAMS", "false") == "true"
):
self.backend = "cuda:nccl,cpu:gloo"
dist.init_distributed(dist_backend=self.backend, auto_mpi_discovery=False, **kwargs)
# We need to flag to `use_deepspeed` to be True to override `distributed_type` later
use_deepspeed = True
# Deal with all other backends but XPU and CPU, that gets handled special later
elif (
self.distributed_type not in (DistributedType.MULTI_XPU, DistributedType.MULTI_CPU)
and not torch.distributed.is_initialized()
):
if self.backend == "tccl":
local_rank = os.environ.get("LOCAL_RANK", -1)
torch.sdaa.set_device(f"sdaa:{local_rank}")
torch.distributed.init_process_group(backend=self.backend, **kwargs)
# XPU and CPU require special env configs to be set
if self.distributed_type in (DistributedType.MULTI_XPU, DistributedType.MULTI_CPU):
dist_information = get_cpu_distributed_information()
os.environ["RANK"] = str(dist_information.rank)
os.environ["WORLD_SIZE"] = str(dist_information.world_size)
os.environ["LOCAL_RANK"] = str(dist_information.local_rank)
os.environ["LOCAL_WORLD_SIZE"] = str(dist_information.local_world_size)
if not os.environ.get("MASTER_PORT", None):
os.environ["MASTER_PORT"] = "29500"
if (
not os.environ.get("MASTER_ADDR", None)
and dist_information.local_world_size != dist_information.world_size
and self.backend != "mpi"
):
raise ValueError(
"Tried to launch on distributed with multinode, but `MASTER_ADDR` env was not set, "
"please try exporting rank 0's hostname as `MASTER_ADDR`"
)
kwargs["rank"] = dist_information.rank
kwargs["world_size"] = dist_information.world_size
if (
self.distributed_type == DistributedType.MULTI_CPU
and get_int_from_env(["OMP_NUM_THREADS"], 0) == 0
):
import psutil
num_cpu_threads_per_process = int(
psutil.cpu_count(logical=False) / dist_information.local_world_size
)
if num_cpu_threads_per_process == 0:
num_cpu_threads_per_process = 1
torch.set_num_threads(num_cpu_threads_per_process)
warnings.warn(
f"OMP_NUM_THREADS/MKL_NUM_THREADS unset, we set it at {num_cpu_threads_per_process} to improve oob"
" performance."
)
if not torch.distributed.is_initialized():
torch.distributed.init_process_group(backend=self.backend, **kwargs)
# No backend == no distributed training
if self.backend is None:
self.distributed_type = DistributedType.NO
self.num_processes = 1
self.process_index = 0
self.local_process_index = 0
elif self.backend == "xla":
# XLA needs device setting first for `set_replication`
self.set_device()
xm.set_replication(self.device, xm.get_xla_supported_devices())
self.num_processes = xr.world_size()
self.process_index = xr.global_ordinal()
if is_torch_xla_available(check_is_tpu=True):
self.local_process_index = xm.get_local_ordinal()
else:
self.local_process_index = int(os.environ.get("LOCAL_RANK", -1))
else:
self.num_processes = torch.distributed.get_world_size()
self.process_index = torch.distributed.get_rank()
self.local_process_index = (
int(os.environ.get("LOCAL_RANK", -1)) if dist_information is None else dist_information.local_rank
)
self.set_device()
# Now we can change to deepseed
if use_deepspeed:
self.distributed_type = DistributedType.DEEPSPEED
# Set CPU affinity if enabled
if parse_flag_from_env("ACCELERATE_CPU_AFFINITY", False):
set_numa_affinity(self.local_process_index)
# Check for old RTX 4000's that can't use P2P or IB and are on old drivers
if self.device.type == "cuda" and not check_cuda_p2p_ib_support():
if "NCCL_P2P_DISABLE" not in os.environ or "NCCL_IB_DISABLE" not in os.environ:
raise NotImplementedError(
"Using RTX 4000 series doesn't support faster communication broadband via P2P or IB. "
'Please set `NCCL_P2P_DISABLE="1"` and `NCCL_IB_DISABLE="1" or use `accelerate launch` which '
"will do this automatically."
)
# Important: This should be the *only* code outside of `self.initialized!`
self.fork_launched = parse_flag_from_env("FORK_LAUNCHED", 0)