in sagemaker/22_accelerate_sagemaker_examples/src/seq2seq/run_seq2seq_no_trainer.py [0:0]
def main():
args = parse_args()
# Initialize the accelerator. We will let the accelerator handle device placement for us in this example.
# If we're using tracking, we also need to initialize it here and it will by default pick up all supported trackers
# in the environment
accelerator = (
Accelerator(log_with=args.report_to, logging_dir=args.output_dir) if args.with_tracking else Accelerator()
)
if args.source_prefix is None and args.model_name_or_path in [
"t5-small",
"t5-base",
"t5-large",
"t5-3b",
"t5-11b",
]:
logger.warning(
"You're running a t5 model but didn't provide a source prefix, which is the expected, e.g. with "
"`--source_prefix 'summarize: ' `"
)
# Make one log on every process with the configuration for debugging.
logging.basicConfig(
format="%(asctime)s - %(levelname)s - %(name)s - %(message)s",
datefmt="%m/%d/%Y %H:%M:%S",
level=logging.INFO,
)
logger.info(accelerator.state, main_process_only=False)
if accelerator.is_local_main_process:
datasets.utils.logging.set_verbosity_warning()
transformers.utils.logging.set_verbosity_info()
else:
datasets.utils.logging.set_verbosity_error()
transformers.utils.logging.set_verbosity_error()
# If passed along, set the training seed now.
if args.seed is not None:
set_seed(args.seed)
# Handle the repository creation
if accelerator.is_main_process:
if args.push_to_hub:
if args.hub_model_id is None:
repo_name = get_full_repo_name(Path(args.output_dir).name, token=args.hub_token)
else:
repo_name = args.hub_model_id
repo = Repository(args.output_dir, clone_from=repo_name)
with open(os.path.join(args.output_dir, ".gitignore"), "w+") as gitignore:
if "step_*" not in gitignore:
gitignore.write("step_*\n")
if "epoch_*" not in gitignore:
gitignore.write("epoch_*\n")
elif args.output_dir is not None:
os.makedirs(args.output_dir, exist_ok=True)
accelerator.wait_for_everyone()
# Get the datasets: you can either provide your own CSV/JSON/TXT training and evaluation files (see below)
# or just provide the name of one of the public datasets available on the hub at https://huggingface.co/datasets/
# (the dataset will be downloaded automatically from the datasets Hub).
#
# For CSV/JSON files, this script will use the column called 'text' or the first column if no column called
# 'text' is found. You can easily tweak this behavior (see below).
#
# In distributed training, the load_dataset function guarantee that only one local process can concurrently
# download the dataset.
if args.dataset_name is not None:
# Downloading and loading a dataset from the hub.
raw_datasets = load_dataset(args.dataset_name, args.dataset_config_name)
if args.n_train > 0:
raw_datasets["train"] = datasets.Dataset.from_dict(raw_datasets["train"][: args.n_train])
if args.n_val > 0:
raw_datasets["validation"] = datasets.Dataset.from_dict(raw_datasets["validation"][: args.n_val])
else:
data_files = {}
if args.train_file is not None:
data_files["train"] = args.train_file
if args.validation_file is not None:
data_files["validation"] = args.validation_file
extension = args.train_file.split(".")[-1]
raw_datasets = load_dataset(extension, data_files=data_files)
# See more about loading any type of standard or custom dataset (from files, python dict, pandas DataFrame, etc) at
# https://huggingface.co/docs/datasets/loading_datasets.html.
# Load pretrained model and tokenizer
#
# In distributed training, the .from_pretrained methods guarantee that only one local process can concurrently
# download model & vocab.
if args.config_name:
config = AutoConfig.from_pretrained(args.config_name)
elif args.model_name_or_path:
config = AutoConfig.from_pretrained(args.model_name_or_path)
else:
config = CONFIG_MAPPING[args.model_type]()
logger.warning("You are instantiating a new config instance from scratch.")
if args.tokenizer_name:
tokenizer = AutoTokenizer.from_pretrained(args.tokenizer_name, use_fast=not args.use_slow_tokenizer)
elif args.model_name_or_path:
tokenizer = AutoTokenizer.from_pretrained(args.model_name_or_path, use_fast=not args.use_slow_tokenizer)
else:
raise ValueError(
"You are instantiating a new tokenizer from scratch. This is not supported by this script."
"You can do it from another script, save it, and load it from here, using --tokenizer_name."
)
if args.model_name_or_path:
model = AutoModelForSeq2SeqLM.from_pretrained(
args.model_name_or_path,
from_tf=bool(".ckpt" in args.model_name_or_path),
config=config,
)
else:
logger.info("Training new model from scratch")
model = AutoModelForSeq2SeqLM.from_config(config)
model.resize_token_embeddings(len(tokenizer))
if model.config.decoder_start_token_id is None:
raise ValueError("Make sure that `config.decoder_start_token_id` is correctly defined")
prefix = args.source_prefix if args.source_prefix is not None else ""
# Preprocessing the datasets.
# First we tokenize all the texts.
column_names = raw_datasets["train"].column_names
# Get the column names for input/target.
dataset_columns = column_names
if args.text_column is None:
text_column = dataset_columns[0] if dataset_columns is not None else column_names[0]
else:
text_column = args.text_column
if text_column not in column_names:
raise ValueError(
f"--text_column' value '{args.text_column}' needs to be one of: {', '.join(column_names)}"
)
if args.summary_column is None:
summary_column = dataset_columns[1] if dataset_columns is not None else column_names[1]
else:
summary_column = args.summary_column
if summary_column not in column_names:
raise ValueError(
f"--summary_column' value '{args.summary_column}' needs to be one of: {', '.join(column_names)}"
)
# Temporarily set max_target_length for training.
max_target_length = args.max_target_length
padding = "max_length" if args.pad_to_max_length else False
def preprocess_function(examples):
inputs = examples[text_column]
targets = examples[summary_column]
inputs = [prefix + inp for inp in inputs]
model_inputs = tokenizer(inputs, max_length=args.max_source_length, padding=padding, truncation=True)
# Setup the tokenizer for targets
if "t5" in args.model_name_or_path:
with tokenizer.as_target_tokenizer():
labels = tokenizer(targets, max_length=max_target_length, padding=padding, truncation=True)
else:
labels = tokenizer(targets, max_length=max_target_length, padding=padding, truncation=True)
# If we are padding here, replace all tokenizer.pad_token_id in the labels by -100 when we want to ignore
# padding in the loss.
if padding == "max_length" and args.ignore_pad_token_for_loss:
labels["input_ids"] = [
[(l if l != tokenizer.pad_token_id else -100) for l in label] for label in labels["input_ids"]
]
model_inputs["labels"] = labels["input_ids"]
return model_inputs
with accelerator.main_process_first():
processed_datasets = raw_datasets.map(
preprocess_function,
batched=True,
num_proc=args.preprocessing_num_workers,
remove_columns=column_names,
load_from_cache_file=not args.overwrite_cache,
desc="Running tokenizer on dataset",
)
train_dataset = processed_datasets["train"]
eval_dataset = processed_datasets["validation"]
# Log a few random samples from the training set:
for index in random.sample(range(len(train_dataset)), 1):
logger.info(f"Sample {index} of the training set: {train_dataset[index]}.")
label_pad_token_id = -100 if args.ignore_pad_token_for_loss else tokenizer.pad_token_id
data_collator = DataCollatorForSeq2Seq(
tokenizer,
model=model,
label_pad_token_id=label_pad_token_id,
pad_to_multiple_of=8 if accelerator.use_fp16 else None,
)
train_dataloader = DataLoader(
train_dataset, shuffle=True, collate_fn=data_collator, batch_size=args.per_device_train_batch_size
)
eval_dataloader = DataLoader(eval_dataset, collate_fn=data_collator, batch_size=args.per_device_eval_batch_size)
# Optimizer
# Split weights in two groups, one with weight decay and the other not.
no_decay = ["bias", "LayerNorm.weight"]
optimizer_grouped_parameters = [
{
"params": [p for n, p in model.named_parameters() if not any(nd in n for nd in no_decay)],
"weight_decay": args.weight_decay,
},
{
"params": [p for n, p in model.named_parameters() if any(nd in n for nd in no_decay)],
"weight_decay": 0.0,
},
]
# New Code #
# Creates Dummy Optimizer if `optimizer` was spcified in the config file else creates Adam Optimizer
optimizer_cls = (
torch.optim.Adam
if accelerator.state.deepspeed_plugin is None
or "optimizer" not in accelerator.state.deepspeed_plugin.deepspeed_config
else DummyOptim
)
optimizer = optimizer_cls(optimizer_grouped_parameters, lr=args.learning_rate)
# New Code
# Get gradient accumulation steps from deepspeed config if available
if accelerator.state.deepspeed_plugin is not None:
args.gradient_accumulation_steps = accelerator.state.deepspeed_plugin.deepspeed_config[
"gradient_accumulation_steps"
]
# Scheduler and math around the number of training steps.
num_update_steps_per_epoch = math.ceil(len(train_dataloader) / args.gradient_accumulation_steps)
if args.max_train_steps is None:
args.max_train_steps = args.num_train_epochs * num_update_steps_per_epoch
else:
args.num_train_epochs = math.ceil(args.max_train_steps / num_update_steps_per_epoch)
# New Code #
# Creates Dummy Scheduler if `scheduler` was spcified in the config file else creates `args.lr_scheduler_type` Scheduler
if (
accelerator.state.deepspeed_plugin is None
or "scheduler" not in accelerator.state.deepspeed_plugin.deepspeed_config
):
lr_scheduler = get_scheduler(
name=args.lr_scheduler_type,
optimizer=optimizer,
num_warmup_steps=args.num_warmup_steps,
num_training_steps=args.max_train_steps,
)
else:
lr_scheduler = DummyScheduler(
optimizer, total_num_steps=args.max_train_steps, warmup_num_steps=args.num_warmup_steps
)
# Prepare everything with our `accelerator`.
model, optimizer, train_dataloader, eval_dataloader, lr_scheduler = accelerator.prepare(
model, optimizer, train_dataloader, eval_dataloader, lr_scheduler
)
# We need to recalculate our total training steps as the size of the training dataloader may have changed.
num_update_steps_per_epoch = math.ceil(len(train_dataloader) / args.gradient_accumulation_steps)
args.max_train_steps = args.num_train_epochs * num_update_steps_per_epoch
# Figure out how many steps we should save the Accelerator states
if hasattr(args.checkpointing_steps, "isdigit"):
checkpointing_steps = args.checkpointing_steps
if args.checkpointing_steps.isdigit():
checkpointing_steps = int(args.checkpointing_steps)
else:
checkpointing_steps = None
# We need to initialize the trackers we use, and also store our configuration.
# We initialize the trackers only on main process because `accelerator.log`
# only logs on main process and we don't want empty logs/runs on other processes.
if args.with_tracking:
if accelerator.is_main_process:
experiment_config = vars(args)
# TensorBoard cannot log Enums, need the raw value
experiment_config["lr_scheduler_type"] = experiment_config["lr_scheduler_type"].value
accelerator.init_trackers(args.report_name, experiment_config)
# Metric
metric = load_metric("sacrebleu")
# Train!
total_batch_size = args.per_device_train_batch_size * accelerator.num_processes * args.gradient_accumulation_steps
logger.info("***** Running training *****")
logger.info(f" Num examples = {len(train_dataset)}")
logger.info(f" Num Epochs = {args.num_train_epochs}")
logger.info(f" Instantaneous batch size per device = {args.per_device_train_batch_size}")
logger.info(f" Total train batch size (w. parallel, distributed & accumulation) = {total_batch_size}")
logger.info(f" Gradient Accumulation steps = {args.gradient_accumulation_steps}")
logger.info(f" Total optimization steps = {args.max_train_steps}")
# Only show the progress bar once on each machine.
progress_bar = tqdm(range(args.max_train_steps), disable=not accelerator.is_local_main_process)
completed_steps = 0
starting_epoch = 0
best_metric = None
best_metric_checkpoint = None
# Potentially load in the weights and states from a previous save
if args.resume_from_checkpoint:
# New Code #
# Loads the DeepSpeed checkpoint from the specified path
_, last_global_step = load_training_checkpoint(
model,
args.resume_from_checkpoint,
**{"load_optimizer_states": True, "load_lr_scheduler_states": True},
)
accelerator.print(f"Resumed from checkpoint: {args.resume_from_checkpoint}")
resume_step = last_global_step
starting_epoch = resume_step // len(train_dataloader)
resume_step -= starting_epoch * len(train_dataloader)
for epoch in range(starting_epoch, args.num_train_epochs):
start_time = time()
model.train()
if args.with_tracking:
total_loss = 0
for step, batch in enumerate(train_dataloader):
# We need to skip steps until we reach the resumed step
if args.resume_from_checkpoint and epoch == starting_epoch:
if resume_step is not None and step < resume_step:
completed_steps += 1
continue
decoder_input_ids = batch["labels"].new_zeros(batch["labels"].shape)
decoder_input_ids[..., 1:] = batch["labels"][..., :-1].clone()
decoder_input_ids[..., 0] = 0
decoder_input_ids.masked_fill_(decoder_input_ids == -100, 0)
batch["decoder_input_ids"] = decoder_input_ids
# if accelerator.is_main_process:
# print(batch)
outputs = model(**batch)
# if accelerator.is_main_process:
# print(outputs.logits)
loss = outputs.loss
# We keep track of the loss at each epoch
if args.with_tracking:
total_loss += loss.detach().float()
loss = loss / args.gradient_accumulation_steps
accelerator.backward(loss)
if step % args.gradient_accumulation_steps == 0 or step == len(train_dataloader) - 1:
optimizer.step()
lr_scheduler.step()
optimizer.zero_grad()
progress_bar.update(1)
completed_steps += 1
if isinstance(args.logging_steps, int):
if completed_steps % args.logging_steps == 0:
steps_this_epoch = completed_steps % len(train_dataloader)
train_loss = total_loss.item() / steps_this_epoch
train_perplexity = math.exp(train_loss)
accelerator.log(
{
"train_loss": train_loss,
"train_perplexity": train_perplexity,
"epoch": epoch,
"step": completed_steps,
"steps_this_epoch": steps_this_epoch,
},
step=completed_steps,
)
logger.info(
f"Epoch: {epoch}, Step: {completed_steps}, Loss: {train_loss}, Perplexity: {train_perplexity}"
)
if isinstance(checkpointing_steps, int):
if completed_steps % checkpointing_steps == 0:
# New Code #
# Save the checkpoint to the specified path
if accelerator.state.deepspeed_plugin is not None:
checkpoint_model(args.output_dir, epoch, model, epoch, completed_steps)
else:
accelerator.wait_for_everyone()
if accelerator.is_main_process:
ckpt_path = os.path.join(args.output_dir, str(epoch))
os.makedirs(ckpt_path, exist_ok=True)
accelerator.save(accelerator.get_state_dict(model), os.path.join(ckpt_path, "model.pt"))
# return
if completed_steps >= args.max_train_steps:
break
end_time = time()
logger.info(f"Epoch {epoch} training took {end_time-start_time} seconds")
# New Code #
# Save the checkpoint to the specified path
if accelerator.state.deepspeed_plugin is not None:
checkpoint_model(args.output_dir, epoch, model, epoch, completed_steps)
else:
accelerator.wait_for_everyone()
if accelerator.is_main_process:
ckpt_path = os.path.join(args.output_dir, str(epoch))
os.makedirs(ckpt_path, exist_ok=True)
accelerator.save(accelerator.get_state_dict(model), os.path.join(ckpt_path, "model.pt"))
start_time = time()
bleu_score = evaluate(args, model, metric, tokenizer, eval_dataloader, accelerator, config.max_length)
end_time = time()
logger.info(f"Epoch {epoch} evaluation took {end_time-start_time} seconds")
result = {}
if args.with_tracking:
result["bleu_score"] = bleu_score
result["train_loss"] = total_loss.item() / len(train_dataloader)
result["train_perplexity"] = math.exp(result["train_loss"])
result["epoch"] = epoch
result["step"] = completed_steps
accelerator.log(result, step=completed_steps)
# New Code #
# Tracks the best checkpoint and best metric
if (best_metric is None or best_metric < bleu_score) and args.load_best_model:
best_metric = bleu_score
best_metric_checkpoint = os.path.join(args.output_dir, str(epoch))
accelerator.print(f"New best metric: {best_metric} at epoch {epoch}")
accelerator.print(f"best_metric_checkpoint: {best_metric_checkpoint}")
# New Code #
# Loads the best checkpoint after the training is finished
if args.load_best_model:
if accelerator.state.deepspeed_plugin is not None:
_, last_global_step = load_training_checkpoint(
model,
"/".join(best_metric_checkpoint.split("/")[:-1]),
tag=best_metric_checkpoint.split("/")[-1],
**{"load_optimizer_states": True, "load_lr_scheduler_states": True},
)
else:
map_location = {"cuda:0": "cuda:{}".format(accelerator.local_process_index)}
model.load_state_dict(
torch.load(os.path.join(best_metric_checkpoint, "model.pt"), map_location=map_location)
)
# New Code #
# Evaluates using the best checkpoint
bleu_score = evaluate(args, model, metric, tokenizer, eval_dataloader, accelerator, config.max_length)
logger.info(f"Best model metrics: bleu_score: {bleu_score}")
if bleu_score != best_metric:
raise AssertionError(
f"Best metric {best_metric} does not match the metric {bleu_score} of the loaded best model."
)
if args.output_dir is not None:
accelerator.wait_for_everyone()
unwrapped_model = accelerator.unwrap_model(model)
# New Code #
# Saves the whole/unpartitioned fp16 model when in ZeRO Stage-3 to the output directory if
# `stage3_gather_16bit_weights_on_model_save` is True in DeepSpeed Config file or
# `zero3_save_16bit_model` is True in DeepSpeed Plugin.
# For Zero Stages 1 and 2, models are saved as usual in the output directory.
# The model name saved is `pytorch_model.bin`
unwrapped_model.save_pretrained(
args.output_dir,
is_main_process=accelerator.is_main_process,
save_function=accelerator.save,
state_dict=accelerator.get_state_dict(model),
)
if accelerator.is_main_process:
tokenizer.save_pretrained(args.output_dir)
if args.push_to_hub:
repo.push_to_hub(commit_message="End of training", auto_lfs_prune=True)
with open(os.path.join(args.output_dir, "all_results.json"), "w") as f:
json.dump({"eval_bleu": bleu_score}, f)