benchmarks/fp8/torchao/fsdp.py (121 lines of code) (raw):
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
This script tests to ensure that `accelerate` performs at the same level as raw `torchao`.
This particular script verifies this for FSDP training.
"""
from functools import partial
import evaluate
import torch
from fp8_utils import get_training_utilities
from torch.distributed.fsdp import FullyShardedDataParallel as FSDP
from torch.distributed.fsdp import MixedPrecision
from torch.distributed.fsdp.wrap import transformer_auto_wrap_policy
from torchao.float8 import convert_to_float8_training
from transformers.models.bert import BertLayer
from accelerate import Accelerator
from accelerate import FullyShardedDataParallelPlugin as FSDPPlugin
from accelerate.state import AcceleratorState
from accelerate.utils import AORecipeKwargs, set_seed
MODEL_NAME = "bert-base-cased"
METRIC = evaluate.load("glue", "mrpc")
FSDP_WRAP_POLICY = partial(transformer_auto_wrap_policy, transformer_layer_cls={BertLayer})
def filter_linear_layers(module, fqn, first_layer_name=None, last_layer_name=None):
if isinstance(module, torch.nn.Linear):
if module.in_features % 16 != 0 or module.out_features % 16 != 0:
return False
# For stability reasons, we skip the first and last linear layers
# Otherwise can lead to the model not training or converging properly
if fqn in (first_layer_name, last_layer_name):
return False
return True
def evaluate_model(model, dataloader, metric, accelerator=None):
"Turns model to .eval(), runs dataloader, calculates metric, then turns eval back on"
model.eval()
for step, batch in enumerate(dataloader):
with torch.no_grad():
outputs = model(**batch)
predictions = outputs.logits.argmax(dim=-1)
references = batch["labels"]
if accelerator is not None and accelerator.num_processes > 1:
predictions, references = accelerator.gather_for_metrics((predictions, references))
metric.add_batch(predictions=predictions, references=references)
return metric.compute()
def train_baseline():
set_seed(42)
model, optimizer, train_dataloader, eval_dataloader, lr_scheduler = get_training_utilities(MODEL_NAME)
first_linear = None
last_linear = None
for name, module in model.named_modules():
if isinstance(module, torch.nn.Linear):
if first_linear is None:
first_linear = name
last_linear = name
func = partial(filter_linear_layers, first_layer_name=first_linear, last_layer_name=last_linear)
accelerator = Accelerator()
device = accelerator.device
model.to(device)
convert_to_float8_training(model, module_filter_fn=func)
# Convert the model to FSDP
model = FSDP(
model,
use_orig_params=True,
mixed_precision=MixedPrecision(param_dtype=torch.bfloat16, reduce_dtype=torch.float32),
auto_wrap_policy=FSDP_WRAP_POLICY,
)
base_model_results = evaluate_model(model, eval_dataloader, METRIC, accelerator=accelerator)
model.train()
for batch in train_dataloader:
with torch.autocast(device_type="cuda", dtype=torch.bfloat16):
batch = batch.to(device)
outputs = model(**batch)
loss = outputs.loss
loss.backward()
optimizer.step()
optimizer.zero_grad()
lr_scheduler.step()
trained_model_results = evaluate_model(model, eval_dataloader, METRIC, accelerator=accelerator)
assert trained_model_results["accuracy"] > base_model_results["accuracy"], (
f"Accuracy should be higher for the trained model: {trained_model_results['accuracy']} > {base_model_results['accuracy']}"
)
assert trained_model_results["f1"] > base_model_results["f1"], (
f"F1 score should be higher for the trained model: {trained_model_results['f1']} > {base_model_results['f1']}"
)
return base_model_results, trained_model_results
def train_integration():
AcceleratorState()._reset_state(True)
fsdp_plugin = FSDPPlugin(
auto_wrap_policy=FSDP_WRAP_POLICY,
use_orig_params=True,
mixed_precision_policy=MixedPrecision(param_dtype=torch.bfloat16, reduce_dtype=torch.float32),
)
accelerator = Accelerator(mixed_precision="fp8", fsdp_plugin=fsdp_plugin, kwargs_handlers=[AORecipeKwargs()])
set_seed(42)
model, optimizer, train_dataloader, eval_dataloader, lr_scheduler = get_training_utilities(
MODEL_NAME, accelerator=accelerator
)
model, optimizer = accelerator.prepare(model, optimizer)
base_model_results = evaluate_model(model, eval_dataloader, METRIC, accelerator=accelerator)
model.train()
for batch in train_dataloader:
outputs = model(**batch)
loss = outputs.loss
accelerator.backward(loss)
optimizer.step()
optimizer.zero_grad()
lr_scheduler.step()
trained_model_results = evaluate_model(model, eval_dataloader, METRIC, accelerator=accelerator)
assert trained_model_results["accuracy"] > base_model_results["accuracy"], (
f"Accuracy should be higher for the trained model: {trained_model_results['accuracy']} > {base_model_results['accuracy']}"
)
assert trained_model_results["f1"] > base_model_results["f1"], (
f"F1 score should be higher for the trained model: {trained_model_results['f1']} > {base_model_results['f1']}"
)
return base_model_results, trained_model_results
if __name__ == "__main__":
baseline_not_trained, baseline_trained = train_baseline()
accelerator_not_trained, accelerator_trained = train_integration()
assert baseline_not_trained["accuracy"] == accelerator_not_trained["accuracy"], (
f"Accuracy should be the same for the baseline and accelerator: {baseline_not_trained['accuracy']} == {accelerator_not_trained['accuracy']}"
)
assert baseline_not_trained["f1"] == accelerator_not_trained["f1"], (
f"F1 score should be the same for the baseline and accelerator: {baseline_not_trained['f1']} == {accelerator_not_trained['f1']}"
)
assert baseline_trained["accuracy"] == accelerator_trained["accuracy"], (
f"Accuracy should be the same for the baseline and accelerator: {baseline_trained['accuracy']} == {accelerator_trained['accuracy']}"
)
assert baseline_trained["f1"] == accelerator_trained["f1"], (
f"F1 score should be the same for the baseline and accelerator: {baseline_trained['f1']} == {accelerator_trained['f1']}"
)
torch.distributed.destroy_process_group()