in src/accelerate/accelerator.py [0:0]
def save_state(self, output_dir: str = None, safe_serialization: bool = True, **save_model_func_kwargs):
"""
Saves the current states of the model, optimizer, scaler, RNG generators, and registered objects to a folder.
If a `ProjectConfiguration` was passed to the `Accelerator` object with `automatic_checkpoint_naming` enabled
then checkpoints will be saved to `self.project_dir/checkpoints`. If the number of current saves is greater
than `total_limit` then the oldest save is deleted. Each checkpoint is saved in separate folders named
`checkpoint_<iteration>`.
Otherwise they are just saved to `output_dir`.
<Tip>
Should only be used when wanting to save a checkpoint during training and restoring the state in the same
environment.
</Tip>
Args:
output_dir (`str` or `os.PathLike`):
The name of the folder to save all relevant weights and states.
safe_serialization (`bool`, *optional*, defaults to `True`):
Whether to save the model using `safetensors` or the traditional PyTorch way (that uses `pickle`).
save_model_func_kwargs (`dict`, *optional*):
Additional keyword arguments for saving model which can be passed to the underlying save function, such
as optional arguments for DeepSpeed's `save_checkpoint` function.
Example:
```python
>>> from accelerate import Accelerator
>>> accelerator = Accelerator()
>>> model, optimizer, lr_scheduler = ...
>>> model, optimizer, lr_scheduler = accelerator.prepare(model, optimizer, lr_scheduler)
>>> accelerator.save_state(output_dir="my_checkpoint")
```
"""
if self.project_configuration.automatic_checkpoint_naming:
output_dir = os.path.join(self.project_dir, "checkpoints")
os.makedirs(output_dir, exist_ok=True)
if self.project_configuration.automatic_checkpoint_naming:
folders = [os.path.join(output_dir, folder) for folder in os.listdir(output_dir)]
if (
self.project_configuration.total_limit is not None
and (len(folders) + 1 > self.project_configuration.total_limit)
and self.is_main_process
):
def _inner(folder):
return list(map(int, re.findall(r"[\/]?([0-9]+)(?=[^\/]*$)", folder)))[0]
folders.sort(key=_inner)
logger.warning(
f"Deleting {len(folders) + 1 - self.project_configuration.total_limit} checkpoints to make room for new checkpoint."
)
for folder in folders[: len(folders) + 1 - self.project_configuration.total_limit]:
shutil.rmtree(folder)
output_dir = os.path.join(output_dir, f"checkpoint_{self.save_iteration}")
if os.path.exists(output_dir):
raise ValueError(
f"Checkpoint directory {output_dir} ({self.save_iteration}) already exists. Please manually override `self.save_iteration` with what iteration to start with."
)
self.wait_for_everyone()
os.makedirs(output_dir, exist_ok=True)
logger.info(f"Saving current state to {output_dir}")
if self.distributed_type == DistributedType.XLA:
# Finish running the previous step before checkpointing
xm.mark_step()
# Save the models taking care of FSDP and DeepSpeed nuances
weights = []
for i, model in enumerate(self._models):
if self.distributed_type == DistributedType.FSDP:
logger.info("Saving FSDP model")
save_fsdp_model(self.state.fsdp_plugin, self, model, output_dir, i)
logger.info(f"FSDP Model saved to output dir {output_dir}")
elif self.distributed_type == DistributedType.DEEPSPEED:
logger.info("Saving DeepSpeed Model and Optimizer")
ckpt_id = f"{MODEL_NAME}" if i == 0 else f"{MODEL_NAME}_{i}"
model.save_checkpoint(output_dir, ckpt_id, **save_model_func_kwargs)
logger.info(f"DeepSpeed Model and Optimizer saved to output dir {os.path.join(output_dir, ckpt_id)}")
elif self.distributed_type == DistributedType.MEGATRON_LM:
logger.info("Saving Megatron-LM Model, Optimizer and Scheduler")
model.save_checkpoint(output_dir)
logger.info(f"Megatron-LM Model , Optimizer and Scheduler saved to output dir {output_dir}")
else:
weights.append(self.get_state_dict(model, unwrap=False))
# Save the optimizers taking care of FSDP and DeepSpeed nuances
optimizers = []
if self.distributed_type == DistributedType.FSDP:
for i, opt in enumerate(self._optimizers):
logger.info("Saving FSDP Optimizer")
save_fsdp_optimizer(self.state.fsdp_plugin, self, opt, self._models[i], output_dir, i)
logger.info(f"FSDP Optimizer saved to output dir {output_dir}")
elif self.distributed_type not in [DistributedType.DEEPSPEED, DistributedType.MEGATRON_LM]:
optimizers = self._optimizers
# Save the lr schedulers taking care of DeepSpeed nuances
schedulers = []
if self.distributed_type == DistributedType.DEEPSPEED:
for i, scheduler in enumerate(self._schedulers):
if isinstance(scheduler, DeepSpeedSchedulerWrapper):
continue
schedulers.append(scheduler)
elif self.distributed_type not in [DistributedType.MEGATRON_LM]:
schedulers = self._schedulers
# Save the samplers of the dataloaders
dataloaders = self._dataloaders
# Call model loading hooks that might have been registered with
# accelerator.register_model_state_hook
for hook in self._save_model_state_pre_hook.values():
hook(self._models, weights, output_dir)
save_location = save_accelerator_state(
output_dir,
weights,
optimizers,
schedulers,
dataloaders,
self.state.process_index,
self.step,
self.scaler,
save_on_each_node=self.project_configuration.save_on_each_node,
safe_serialization=safe_serialization,
)
for i, obj in enumerate(self._custom_objects):
save_custom_state(obj, output_dir, i, save_on_each_node=self.project_configuration.save_on_each_node)
self.project_configuration.iteration += 1
return save_location