chatlearn/models/deepspeed/deepspeed_utils.py (159 lines of code) (raw):
# Copyright 2024 Alibaba Group Holding Limited. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""DeepSpeed utils"""
import os
import torch
import deepspeed
from deepspeed.ops.adam import DeepSpeedCPUAdam, FusedAdam
from deepspeed.runtime.zero.partition_parameters import ZeroParamStatus
from transformers import AutoTokenizer
DEFAULT_PAD_TOKEN = "[PAD]"
DEFAULT_EOS_TOKEN = "</s>"
DEFAULT_BOS_TOKEN = "<s>"
DEFAULT_UNK_TOKEN = "<unk>"
def get_tokenizer(pretrain_path, model=None, padding_side="left", use_fast=True):
tokenizer = AutoTokenizer.from_pretrained(pretrain_path, trust_remote_code=True, use_fast=use_fast)
tokenizer.padding_side = padding_side
if tokenizer.pad_token is None:
tokenizer.pad_token = tokenizer.eos_token
tokenizer.pad_token_id = tokenizer.eos_token_id
if model is not None:
model.config.pad_token_id = tokenizer.pad_token_id
return tokenizer
def get_eval_ds_config(
offload,
stage=0,
bf16=True,
):
zero_opt_dict = {
"stage": stage,
"stage3_param_persistence_threshold": "auto",
"offload_param": {
"device": "cpu" if offload else "none",
"pin_memory": True,
},
}
return {
"steps_per_print": 100,
"zero_optimization": zero_opt_dict,
"bf16": {
"enabled": bf16,
},
"gradient_clipping": 1.0,
"prescale_gradients": False,
"wall_clock_breakdown": False,
}
def get_train_ds_config(
offload,
adam_offload=True,
stage=2,
bf16=True,
max_norm=1.0,
zpg=8,
grad_accum_dtype=None,
disable_trace_cache=False,
):
device = "cpu" if offload else "none"
zero_opt_dict = {
"stage": stage,
"offload_param": {"device": device},
"offload_optimizer": {
"device": "cpu" if adam_offload else "none",
"pin_memory": True,
},
"sub_group_size": "auto",
"stage3_max_live_parameters": "auto",
"stage3_max_reuse_distance": "auto",
"stage3_param_persistence_threshold": "auto",
"stage3_prefetch_bucket_size": "auto",
"reduce_bucket_size": "auto",
# ZeRO++
"zero_hpz_partition_size": zpg,
"zero_quantized_weights": False,
"zero_quantized_gradients": False,
}
if disable_trace_cache:
zero_opt_dict["stage3_prefetch_bucket_size"] = 0
zero_opt_dict["stage3_max_live_parameters"] = 0
zero_opt_dict["stage3_max_reuse_distance"] = 0
return {
"steps_per_print": 100,
"zero_optimization": zero_opt_dict,
"bf16": {
"enabled": bf16,
},
"gradient_clipping": max_norm,
"prescale_gradients": False,
"wall_clock_breakdown": False,
"data_types": {"grad_accum_dtype": grad_accum_dtype if grad_accum_dtype else "fp32"},
}
def get_optimizer_grouped_parameters(
model,
weight_decay,
no_decay_name_list=("bias", "layer_norm.weight", "layernorm.weight", "norm.weight", "ln_f.weight"),
):
optimizer_grouped_parameters = [
{
"params": [
p
for n, p in model.named_parameters()
if (not any(nd in n for nd in no_decay_name_list) and p.requires_grad)
],
"weight_decay": weight_decay,
},
{
"params": [
p
for n, p in model.named_parameters()
if (any(nd in n for nd in no_decay_name_list) and p.requires_grad)
],
"weight_decay": 0.0,
},
]
return optimizer_grouped_parameters
def create_optimizer(model, adam_offload, **kwargs):
# Optimizer
AdamOptimizer = DeepSpeedCPUAdam if adam_offload else FusedAdam
optim_params = get_optimizer_grouped_parameters(model, kwargs["weight_decay"])
optim = AdamOptimizer(optim_params, **kwargs)
return optim
def save_hf_format(model, tokenizer, output_dir, sub_folder=""):
# used to save huggingface format, so we can use it for hf.from_pretrained
model_to_save = model.module if hasattr(model, 'module') else model
CONFIG_NAME = "config.json"
WEIGHTS_NAME = "pytorch_model.bin"
output_dir = os.path.join(output_dir, sub_folder)
os.makedirs(output_dir, exist_ok=True)
output_model_file = os.path.join(output_dir, WEIGHTS_NAME)
output_config_file = os.path.join(output_dir, CONFIG_NAME)
save_dict = model_to_save.state_dict()
for key in list(save_dict.keys()):
if "lora" in key:
del save_dict[key]
torch.save(save_dict, output_model_file)
model_to_save.config.to_json_file(output_config_file)
tokenizer.save_vocabulary(output_dir)
def _z3_params_to_fetch(param_list):
return [
p for p in param_list
if hasattr(p, 'ds_id') and p.ds_status == ZeroParamStatus.NOT_AVAILABLE
]
def save_zero_three_model(model_ema, global_rank, save_dir, zero_stage=0):
zero_stage_3 = (zero_stage == 3)
os.makedirs(save_dir, exist_ok=True)
WEIGHTS_NAME = "pytorch_model.bin"
output_model_file = os.path.join(save_dir, WEIGHTS_NAME)
model_to_save = model_ema.module if hasattr(model_ema,
'module') else model_ema
if not zero_stage_3:
if global_rank == 0:
torch.save(model_to_save.state_dict(), output_model_file)
else:
output_state_dict = {}
for k, v in model_to_save.named_parameters():
if hasattr(v, 'ds_id'):
with deepspeed.zero.GatheredParameters(_z3_params_to_fetch([v
]),
enabled=zero_stage_3):
v_p = v.data.cpu()
else:
v_p = v.cpu()
if global_rank == 0 and "lora" not in k:
output_state_dict[k] = v_p
if global_rank == 0:
torch.save(output_state_dict, output_model_file)
del output_state_dict