in rlhf/deepspeed-chat/rm_main.py [0:0]
def main():
args = parse_args()
if args.local_rank == -1:
device = torch.device("cuda")
else:
torch.cuda.set_device(args.local_rank)
device = torch.device("cuda", args.local_rank)
# Initializes the distributed backend which will take care of sychronizing nodes/GPUs
# torch.distributed.init_process_group(backend='nccl')
deepspeed.init_distributed()
args.global_rank = torch.distributed.get_rank()
tensorboard = SummaryWriter(log_dir=os.path.join(args.output_dir, 'log'))
assert not args.offload, "zero-offload is not currently supported but coming soon!"
ds_config = get_train_ds_config(offload=args.offload,
stage=args.zero_stage)
ds_config[
'train_micro_batch_size_per_gpu'] = args.per_device_train_batch_size
ds_config[
'train_batch_size'] = args.per_device_train_batch_size * torch.distributed.get_world_size(
) * args.gradient_accumulation_steps
# If passed along, set the training seed now.
set_random_seed(args.seed)
torch.distributed.barrier()
tokenizer = AutoTokenizer.from_pretrained(args.model_name_or_path,
fast_tokenizer=True)
# tokenizer.pad_token = tokenizer.eos_token
tokenizer.padding_side = 'right'
tokenizer.truncation_side = 'right'
rm_model = create_critic_model(args.model_name_or_path,
tokenizer,
ds_config,
args.num_padding_at_beginning,
disable_dropout=args.disable_dropout)
if args.lora_dim > 0:
rm_model = convert_linear_layer_to_lora(rm_model,
args.lora_module_name,
args.lora_dim)
if args.only_optimize_lora:
rm_model = only_optimize_lora_parameters(rm_model)
train_phase = 2
train_dataset, eval_dataset = create_prompt_dataset(
args.local_rank, args.data_path, args.data_split,
args.data_output_path, train_phase, args.seed, tokenizer,
args.max_seq_len, end_of_conversation_token=tokenizer.eos_token)
# DataLoaders creation:
data_collator = DataCollatorReward()
if args.local_rank == -1:
train_sampler = RandomSampler(train_dataset)
eval_sampler = SequentialSampler(eval_dataset)
else:
train_sampler = DistributedSampler(train_dataset)
eval_sampler = DistributedSampler(eval_dataset)
train_dataloader = DataLoader(train_dataset,
collate_fn=data_collator,
sampler=train_sampler,
batch_size=args.per_device_train_batch_size)
eval_sampler = SequentialSampler(eval_dataset)
eval_dataloader = DataLoader(eval_dataset,
collate_fn=data_collator,
sampler=eval_sampler,
batch_size=args.per_device_eval_batch_size)
def evaluation_reward(model, eval_dataloader):
model.eval()
correct_predictions = 0
total_predictions = 0
scores = 0
for step, batch in enumerate(eval_dataloader):
batch = to_device(batch, device)
with torch.no_grad():
outputs = model(**batch)
chosen = outputs["chosen_mean_scores"]
rejected = outputs["rejected_mean_scores"]
correct_predictions += (chosen > rejected).sum()
total_predictions += chosen.shape[0]
scores += outputs["chosen_mean_scores"].mean().float()
if step == 99: # For faster evaluation and debugging
break
acc = correct_predictions / total_predictions
scores = scores / (step + 1)
try:
acc = get_all_reduce_mean(acc).item()
scores = get_all_reduce_mean(scores).item()
except:
pass
return scores, acc
# Split weights in two groups, one with weight decay and the other not.
optimizer_grouped_parameters = get_optimizer_grouped_parameters(
rm_model, args.weight_decay)
AdamOptimizer = DeepSpeedCPUAdam if args.offload else FusedAdam
optimizer = AdamOptimizer(optimizer_grouped_parameters,
lr=args.learning_rate,
betas=(0.9, 0.95))
num_update_steps_per_epoch = math.ceil(
len(train_dataloader) / args.gradient_accumulation_steps)
lr_scheduler = get_scheduler(
name=args.lr_scheduler_type,
optimizer=optimizer,
num_warmup_steps=args.num_warmup_steps,
num_training_steps=args.num_train_epochs * num_update_steps_per_epoch,
)
rm_model, optimizer, _, lr_scheduler = deepspeed.initialize(
model=rm_model,
optimizer=optimizer,
args=args,
config=ds_config,
lr_scheduler=lr_scheduler,
dist_init_required=True)
if args.gradient_checkpointing:
rm_model.gradient_checkpointing_enable()
# Train!
print_rank_0("***** Running training *****", args.global_rank)
print_rank_0(
f"***** Evaluating reward, Epoch {0}/{args.num_train_epochs} *****",
args.global_rank)
reward_score, acc = evaluation_reward(rm_model, eval_dataloader)
print_rank_0(
f"chosen_last_scores (higher is better) : {reward_score}, acc (higher is better) : {acc}",
args.global_rank)
_global_step = 1
for epoch in range(args.num_train_epochs):
print_rank_0(
f"Beginning of Epoch {epoch+1}/{args.num_train_epochs}, Total Micro Batches {len(train_dataloader)}",
args.global_rank)
rm_model.train()
mean_loss = 0
for step, batch in enumerate(train_dataloader):
batch = to_device(batch, device)
outputs = rm_model(**batch, use_cache=False)
loss = outputs["loss"]
if args.global_rank <= 0:
tensorboard.add_scalar(tag='train/loss',
scalar_value=loss,
global_step=_global_step)
rm_model.backward(loss)
rm_model.step()
_global_step += 1
mean_loss += loss.item()
print_rank_0(
f"Epoch {epoch+1}/{args.num_train_epochs} with loss {mean_loss/(step+1)}",
args.global_rank)
# Evaluate reward_loss on the validation set.
print_rank_0(
f"***** Evaluating reward, Epoch {epoch+1}/{args.num_train_epochs} *****",
args.global_rank)
reward_score, acc = evaluation_reward(rm_model, eval_dataloader)
if args.global_rank <= 0:
tensorboard.add_scalar(tag='eval/reward_score',
scalar_value=reward_score,
global_step=_global_step)
tensorboard.add_scalar(tag='eval/acc',
scalar_value=acc,
global_step=_global_step)
print_rank_0(
f"chosen_last_scores (higher is better) : {reward_score}, acc (higher is better) : {acc}",
args.global_rank)
rm_model.tput_timer.update_epoch_count()
if args.output_dir is not None:
print_rank_0('saving model ...', args.global_rank)
rm_model = convert_lora_to_linear_layer(rm_model)
if args.global_rank == 0:
save_hf_format(rm_model, tokenizer, args)
if args.zero_stage == 3:
# for zero stage 3, each gpu only has a part of the model, so we need to save the model on each gpu by using DS-Engine
save_zero_three_model(rm_model,
args.global_rank,
args.output_dir,
zero_stage=args.zero_stage)