rlhf/deepspeed-chat/rm_main.py (319 lines of code) (raw):

#!/usr/bin/env python # Copyright (c) Microsoft Corporation. # SPDX-License-Identifier: Apache-2.0 # DeepSpeed Team import argparse import os import math import sys from torch.utils.tensorboard import SummaryWriter import torch from torch.utils.data import DataLoader, RandomSampler, SequentialSampler from torch.utils.data.distributed import DistributedSampler from transformers import ( AutoTokenizer, SchedulerType, get_scheduler, ) import deepspeed from deepspeed.ops.adam import DeepSpeedCPUAdam, FusedAdam sys.path.append( os.path.abspath(os.path.join(os.path.dirname(__file__), os.path.pardir))) from utils.model.model_utils import create_critic_model from utils.data.data_utils import create_prompt_dataset, DataCollatorReward from utils.utils import print_rank_0, to_device, save_hf_format, set_random_seed, get_all_reduce_mean, get_optimizer_grouped_parameters, save_zero_three_model from utils.ds_utils import get_train_ds_config from utils.module.lora import convert_linear_layer_to_lora, convert_lora_to_linear_layer, only_optimize_lora_parameters def parse_args(): parser = argparse.ArgumentParser( description= "Finetune a transformers model on a causal language modeling task") parser.add_argument('--data_path', nargs='*', default=['Dahoas/rm-static'], help='Path to the training dataset. Accepted format:' '1) a single data path, 2) multiple datasets in the' 'form: dataset1-path dataset2-path ...') parser.add_argument('--data_split', type=str, default='6,2,2', help='Comma-separated list of proportions for training' 'phase 1, 2, and 3 data. For example the split `2,4,4`' 'will use 60% of data for phase 1, 20% for phase 2' 'and 20% for phase 3.') parser.add_argument( '--data_output_path', type=str, default='/tmp/data_files/', help='Where to store the data-related files such as shuffle index.') parser.add_argument( "--model_name_or_path", type=str, help= "Path to pretrained model or model identifier from huggingface.co/models.", required=True, ) parser.add_argument( "--num_padding_at_beginning", type=int, default=1, help= "OPT model has a fixed number (1) of padding tokens at the beginning of the input. " "We did not see this in other models but keep it as an option for now.", ) parser.add_argument( "--per_device_train_batch_size", type=int, default=16, help="Batch size (per device) for the training dataloader.", ) parser.add_argument( "--per_device_eval_batch_size", type=int, default=16, help="Batch size (per device) for the evaluation dataloader.", ) parser.add_argument( "--max_seq_len", type=int, default=512, help="The maximum sequence length.", ) parser.add_argument( "--learning_rate", type=float, default=5e-5, help= "Initial learning rate (after the potential warmup period) to use.", ) parser.add_argument("--weight_decay", type=float, default=0., help="Weight decay to use.") parser.add_argument("--num_train_epochs", type=int, default=1, help="Total number of training epochs to perform.") parser.add_argument( "--gradient_accumulation_steps", type=int, default=1, help= "Number of updates steps to accumulate before performing a backward/update pass.", ) parser.add_argument( "--lr_scheduler_type", type=SchedulerType, default="cosine", help="The scheduler type to use.", choices=[ "linear", "cosine", "cosine_with_restarts", "polynomial", "constant", "constant_with_warmup" ], ) parser.add_argument( "--num_warmup_steps", type=int, default=0, help="Number of steps for the warmup in the lr scheduler.") parser.add_argument("--output_dir", type=str, default=None, help="Where to store the model.") parser.add_argument("--seed", type=int, default=1234, help="A seed for reproducible training.") parser.add_argument("--local_rank", type=int, default=-1, help="local_rank for distributed training on gpus") parser.add_argument( '--gradient_checkpointing', action='store_true', help='Enable HF gradient checkpointing for Actor model.') parser.add_argument('--disable_dropout', action='store_true', help='Disable the dropout of the model.') # deepspeed features parser.add_argument('--offload', action='store_true', help='Enable ZeRO Offload techniques.') parser.add_argument( '--zero_stage', type=int, default=0, help='ZeRO optimization stage for Actor model (and clones).') ## LoRA for efficient training setting parser.add_argument("--lora_dim", type=int, default=0, help="If > 0, use LoRA for efficient training.") parser.add_argument("--lora_module_name", type=str, default="decoder.layers.", help="The scope of LoRA.") parser.add_argument('--only_optimize_lora', action='store_true', help='Only optimize the LoRA parameters.') parser = deepspeed.add_config_arguments(parser) args = parser.parse_args() # Validate settings if args.gradient_checkpointing and args.lora_dim > 0: assert ( not args.only_optimize_lora ), "--gradient_checkpointing and --only_optimize_lora cannot be enabled at the same time." return args def main(): args = parse_args() if args.local_rank == -1: device = torch.device("cuda") else: torch.cuda.set_device(args.local_rank) device = torch.device("cuda", args.local_rank) # Initializes the distributed backend which will take care of sychronizing nodes/GPUs # torch.distributed.init_process_group(backend='nccl') deepspeed.init_distributed() args.global_rank = torch.distributed.get_rank() tensorboard = SummaryWriter(log_dir=os.path.join(args.output_dir, 'log')) assert not args.offload, "zero-offload is not currently supported but coming soon!" ds_config = get_train_ds_config(offload=args.offload, stage=args.zero_stage) ds_config[ 'train_micro_batch_size_per_gpu'] = args.per_device_train_batch_size ds_config[ 'train_batch_size'] = args.per_device_train_batch_size * torch.distributed.get_world_size( ) * args.gradient_accumulation_steps # If passed along, set the training seed now. set_random_seed(args.seed) torch.distributed.barrier() tokenizer = AutoTokenizer.from_pretrained(args.model_name_or_path, fast_tokenizer=True) # tokenizer.pad_token = tokenizer.eos_token tokenizer.padding_side = 'right' tokenizer.truncation_side = 'right' rm_model = create_critic_model(args.model_name_or_path, tokenizer, ds_config, args.num_padding_at_beginning, disable_dropout=args.disable_dropout) if args.lora_dim > 0: rm_model = convert_linear_layer_to_lora(rm_model, args.lora_module_name, args.lora_dim) if args.only_optimize_lora: rm_model = only_optimize_lora_parameters(rm_model) train_phase = 2 train_dataset, eval_dataset = create_prompt_dataset( args.local_rank, args.data_path, args.data_split, args.data_output_path, train_phase, args.seed, tokenizer, args.max_seq_len, end_of_conversation_token=tokenizer.eos_token) # DataLoaders creation: data_collator = DataCollatorReward() if args.local_rank == -1: train_sampler = RandomSampler(train_dataset) eval_sampler = SequentialSampler(eval_dataset) else: train_sampler = DistributedSampler(train_dataset) eval_sampler = DistributedSampler(eval_dataset) train_dataloader = DataLoader(train_dataset, collate_fn=data_collator, sampler=train_sampler, batch_size=args.per_device_train_batch_size) eval_sampler = SequentialSampler(eval_dataset) eval_dataloader = DataLoader(eval_dataset, collate_fn=data_collator, sampler=eval_sampler, batch_size=args.per_device_eval_batch_size) def evaluation_reward(model, eval_dataloader): model.eval() correct_predictions = 0 total_predictions = 0 scores = 0 for step, batch in enumerate(eval_dataloader): batch = to_device(batch, device) with torch.no_grad(): outputs = model(**batch) chosen = outputs["chosen_mean_scores"] rejected = outputs["rejected_mean_scores"] correct_predictions += (chosen > rejected).sum() total_predictions += chosen.shape[0] scores += outputs["chosen_mean_scores"].mean().float() if step == 99: # For faster evaluation and debugging break acc = correct_predictions / total_predictions scores = scores / (step + 1) try: acc = get_all_reduce_mean(acc).item() scores = get_all_reduce_mean(scores).item() except: pass return scores, acc # Split weights in two groups, one with weight decay and the other not. optimizer_grouped_parameters = get_optimizer_grouped_parameters( rm_model, args.weight_decay) AdamOptimizer = DeepSpeedCPUAdam if args.offload else FusedAdam optimizer = AdamOptimizer(optimizer_grouped_parameters, lr=args.learning_rate, betas=(0.9, 0.95)) num_update_steps_per_epoch = math.ceil( len(train_dataloader) / args.gradient_accumulation_steps) lr_scheduler = get_scheduler( name=args.lr_scheduler_type, optimizer=optimizer, num_warmup_steps=args.num_warmup_steps, num_training_steps=args.num_train_epochs * num_update_steps_per_epoch, ) rm_model, optimizer, _, lr_scheduler = deepspeed.initialize( model=rm_model, optimizer=optimizer, args=args, config=ds_config, lr_scheduler=lr_scheduler, dist_init_required=True) if args.gradient_checkpointing: rm_model.gradient_checkpointing_enable() # Train! print_rank_0("***** Running training *****", args.global_rank) print_rank_0( f"***** Evaluating reward, Epoch {0}/{args.num_train_epochs} *****", args.global_rank) reward_score, acc = evaluation_reward(rm_model, eval_dataloader) print_rank_0( f"chosen_last_scores (higher is better) : {reward_score}, acc (higher is better) : {acc}", args.global_rank) _global_step = 1 for epoch in range(args.num_train_epochs): print_rank_0( f"Beginning of Epoch {epoch+1}/{args.num_train_epochs}, Total Micro Batches {len(train_dataloader)}", args.global_rank) rm_model.train() mean_loss = 0 for step, batch in enumerate(train_dataloader): batch = to_device(batch, device) outputs = rm_model(**batch, use_cache=False) loss = outputs["loss"] if args.global_rank <= 0: tensorboard.add_scalar(tag='train/loss', scalar_value=loss, global_step=_global_step) rm_model.backward(loss) rm_model.step() _global_step += 1 mean_loss += loss.item() print_rank_0( f"Epoch {epoch+1}/{args.num_train_epochs} with loss {mean_loss/(step+1)}", args.global_rank) # Evaluate reward_loss on the validation set. print_rank_0( f"***** Evaluating reward, Epoch {epoch+1}/{args.num_train_epochs} *****", args.global_rank) reward_score, acc = evaluation_reward(rm_model, eval_dataloader) if args.global_rank <= 0: tensorboard.add_scalar(tag='eval/reward_score', scalar_value=reward_score, global_step=_global_step) tensorboard.add_scalar(tag='eval/acc', scalar_value=acc, global_step=_global_step) print_rank_0( f"chosen_last_scores (higher is better) : {reward_score}, acc (higher is better) : {acc}", args.global_rank) rm_model.tput_timer.update_epoch_count() if args.output_dir is not None: print_rank_0('saving model ...', args.global_rank) rm_model = convert_lora_to_linear_layer(rm_model) if args.global_rank == 0: save_hf_format(rm_model, tokenizer, args) if args.zero_stage == 3: # for zero stage 3, each gpu only has a part of the model, so we need to save the model on each gpu by using DS-Engine save_zero_three_model(rm_model, args.global_rank, args.output_dir, zero_stage=args.zero_stage) if __name__ == "__main__": main()