chatlearn/models/deepspeed/deepspeed_utils.py (159 lines of code) (raw):

# Copyright 2024 Alibaba Group Holding Limited. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ============================================================================== """DeepSpeed utils""" import os import torch import deepspeed from deepspeed.ops.adam import DeepSpeedCPUAdam, FusedAdam from deepspeed.runtime.zero.partition_parameters import ZeroParamStatus from transformers import AutoTokenizer DEFAULT_PAD_TOKEN = "[PAD]" DEFAULT_EOS_TOKEN = "</s>" DEFAULT_BOS_TOKEN = "<s>" DEFAULT_UNK_TOKEN = "<unk>" def get_tokenizer(pretrain_path, model=None, padding_side="left", use_fast=True): tokenizer = AutoTokenizer.from_pretrained(pretrain_path, trust_remote_code=True, use_fast=use_fast) tokenizer.padding_side = padding_side if tokenizer.pad_token is None: tokenizer.pad_token = tokenizer.eos_token tokenizer.pad_token_id = tokenizer.eos_token_id if model is not None: model.config.pad_token_id = tokenizer.pad_token_id return tokenizer def get_eval_ds_config( offload, stage=0, bf16=True, ): zero_opt_dict = { "stage": stage, "stage3_param_persistence_threshold": "auto", "offload_param": { "device": "cpu" if offload else "none", "pin_memory": True, }, } return { "steps_per_print": 100, "zero_optimization": zero_opt_dict, "bf16": { "enabled": bf16, }, "gradient_clipping": 1.0, "prescale_gradients": False, "wall_clock_breakdown": False, } def get_train_ds_config( offload, adam_offload=True, stage=2, bf16=True, max_norm=1.0, zpg=8, grad_accum_dtype=None, disable_trace_cache=False, ): device = "cpu" if offload else "none" zero_opt_dict = { "stage": stage, "offload_param": {"device": device}, "offload_optimizer": { "device": "cpu" if adam_offload else "none", "pin_memory": True, }, "sub_group_size": "auto", "stage3_max_live_parameters": "auto", "stage3_max_reuse_distance": "auto", "stage3_param_persistence_threshold": "auto", "stage3_prefetch_bucket_size": "auto", "reduce_bucket_size": "auto", # ZeRO++ "zero_hpz_partition_size": zpg, "zero_quantized_weights": False, "zero_quantized_gradients": False, } if disable_trace_cache: zero_opt_dict["stage3_prefetch_bucket_size"] = 0 zero_opt_dict["stage3_max_live_parameters"] = 0 zero_opt_dict["stage3_max_reuse_distance"] = 0 return { "steps_per_print": 100, "zero_optimization": zero_opt_dict, "bf16": { "enabled": bf16, }, "gradient_clipping": max_norm, "prescale_gradients": False, "wall_clock_breakdown": False, "data_types": {"grad_accum_dtype": grad_accum_dtype if grad_accum_dtype else "fp32"}, } def get_optimizer_grouped_parameters( model, weight_decay, no_decay_name_list=("bias", "layer_norm.weight", "layernorm.weight", "norm.weight", "ln_f.weight"), ): optimizer_grouped_parameters = [ { "params": [ p for n, p in model.named_parameters() if (not any(nd in n for nd in no_decay_name_list) and p.requires_grad) ], "weight_decay": weight_decay, }, { "params": [ p for n, p in model.named_parameters() if (any(nd in n for nd in no_decay_name_list) and p.requires_grad) ], "weight_decay": 0.0, }, ] return optimizer_grouped_parameters def create_optimizer(model, adam_offload, **kwargs): # Optimizer AdamOptimizer = DeepSpeedCPUAdam if adam_offload else FusedAdam optim_params = get_optimizer_grouped_parameters(model, kwargs["weight_decay"]) optim = AdamOptimizer(optim_params, **kwargs) return optim def save_hf_format(model, tokenizer, output_dir, sub_folder=""): # used to save huggingface format, so we can use it for hf.from_pretrained model_to_save = model.module if hasattr(model, 'module') else model CONFIG_NAME = "config.json" WEIGHTS_NAME = "pytorch_model.bin" output_dir = os.path.join(output_dir, sub_folder) os.makedirs(output_dir, exist_ok=True) output_model_file = os.path.join(output_dir, WEIGHTS_NAME) output_config_file = os.path.join(output_dir, CONFIG_NAME) save_dict = model_to_save.state_dict() for key in list(save_dict.keys()): if "lora" in key: del save_dict[key] torch.save(save_dict, output_model_file) model_to_save.config.to_json_file(output_config_file) tokenizer.save_vocabulary(output_dir) def _z3_params_to_fetch(param_list): return [ p for p in param_list if hasattr(p, 'ds_id') and p.ds_status == ZeroParamStatus.NOT_AVAILABLE ] def save_zero_three_model(model_ema, global_rank, save_dir, zero_stage=0): zero_stage_3 = (zero_stage == 3) os.makedirs(save_dir, exist_ok=True) WEIGHTS_NAME = "pytorch_model.bin" output_model_file = os.path.join(save_dir, WEIGHTS_NAME) model_to_save = model_ema.module if hasattr(model_ema, 'module') else model_ema if not zero_stage_3: if global_rank == 0: torch.save(model_to_save.state_dict(), output_model_file) else: output_state_dict = {} for k, v in model_to_save.named_parameters(): if hasattr(v, 'ds_id'): with deepspeed.zero.GatheredParameters(_z3_params_to_fetch([v ]), enabled=zero_stage_3): v_p = v.data.cpu() else: v_p = v.cpu() if global_rank == 0 and "lora" not in k: output_state_dict[k] = v_p if global_rank == 0: torch.save(output_state_dict, output_model_file) del output_state_dict