community-content/pytorch_efficient_training/resnet_ddp.py (179 lines of code) (raw):

# Copyright 2022 Google LLC # # Licensed under the Apache License, Version 2.0 (the \"License\"); # you may not use this file except in compliance with the License.\n", # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an \"AS IS\" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Train resnet on multiple GPUs with DDP.""" import argparse import os import time from PIL import Image import torch from torch import nn import torch.distributed as dist import torch.multiprocessing as mp import torchmetrics import torchvision from torchvision.models import resnet50 class ImageFolder(torchvision.datasets.ImageFolder): """Class for loading imagenet.""" def __init__(self, image_list_file, transform=None, target_transform=None): self.samples = self._make_dataset(image_list_file) self.loader = self._loader self.imgs = self.samples self.targets = [s[1] for s in self.samples] self.transform = transform self.target_transform = target_transform def _make_dataset(self, image_list_file): items = [] with open(image_list_file, 'r') as f: for line in f: item = line.strip().split(' ') items.append((item[0], int(item[1]))) return items def _loader(self, image_path): with open(image_path, 'rb') as f: img = Image.open(f) img = img.convert('RGB') return img def train(model, device, dataloader, optimizer): model.train() for image, target in dataloader: image = image.to(device, non_blocking=True) target = target.to(device, non_blocking=True) pred = model(image) # pred.shape (N, C), target.shape (N) loss = nn.functional.cross_entropy(pred, target) optimizer.zero_grad() loss.backward() optimizer.step() return loss def evaluate(model, device, dataloader, metric): model.eval() with torch.no_grad(): for image, target in dataloader: image = image.to(device, non_blocking=True) target = target.to(device, non_blocking=True) pred = model(image) metric.update(pred, target) accuracy = metric.compute() metric.reset() return accuracy def worker(gpu, args): """Run training and evaluation.""" # Init process group. print(f'Initiating process {gpu}') dist.init_process_group( backend='nccl', init_method='env://', world_size=args.gpus, rank=gpu) # Create model. model = resnet50(weights=None) torch.cuda.set_device(gpu) model.to(args.device) model = torch.nn.SyncBatchNorm.convert_sync_batchnorm(model) model = nn.parallel.DistributedDataParallel(model, device_ids=[gpu]) # Create train dataloader. train_dataset = ImageFolder( image_list_file=args.train_data_path, transform=torchvision.transforms.Compose([ torchvision.transforms.RandomResizedCrop(224), torchvision.transforms.RandomHorizontalFlip(), torchvision.transforms.ToTensor(), torchvision.transforms.Normalize( mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]), ])) train_sampler = torch.utils.data.distributed.DistributedSampler( train_dataset, num_replicas=args.gpus, rank=gpu) train_dataloader = torch.utils.data.DataLoader( dataset=train_dataset, batch_size=args.train_batch_size, shuffle=False, num_workers=args.dataloader_num_workers, pin_memory=True, sampler=train_sampler) if gpu == 0: print(f'Train dataloader | samples: {len(train_dataloader.dataset)}, ' f'num workers: {train_dataloader.num_workers}, ' f'global batch size: {args.train_batch_size * args.gpus}, ' f'batches/epoch: {len(train_dataloader)}') # Create eval dataloader. eval_dataset = ImageFolder( image_list_file=args.eval_data_path, transform=torchvision.transforms.Compose([ torchvision.transforms.Resize(256), torchvision.transforms.CenterCrop(224), torchvision.transforms.ToTensor(), torchvision.transforms.Normalize( mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]), ])) eval_sampler = torch.utils.data.distributed.DistributedSampler( eval_dataset, num_replicas=args.gpus, rank=gpu) eval_dataloader = torch.utils.data.DataLoader( dataset=eval_dataset, batch_size=args.eval_batch_size, shuffle=False, num_workers=args.dataloader_num_workers, pin_memory=True, drop_last=True, sampler=eval_sampler) if gpu == 0: print(f'Eval dataloader | samples: {len(eval_dataloader.dataset)}, ' f'num workers: {eval_dataloader.num_workers}, ' f'batch size: {args.eval_batch_size}, ' f'batches/epoch: {len(eval_dataloader)}') # Optimizer. optimizer = torch.optim.SGD(model.parameters(), 0.1) # Main loop. metric = torchmetrics.classification.Accuracy(top_k=1).to(args.device) for epoch in range(1, args.epochs + 1): if gpu == 0: print(f'Running epoch {epoch}') train_sampler.set_epoch(epoch) start = time.time() train(model, args.device, train_dataloader, optimizer) end = time.time() if gpu == 0: print(f'Training finished in {(end - start):>0.3f} seconds') start = time.time() evaluate(model, args.device, eval_dataloader, metric) end = time.time() if gpu == 0: print(f'Evaluation finished in {(end - start):>0.3f} seconds') if gpu == 0: print('Done') def create_args(): """Create main args.""" parser = argparse.ArgumentParser( formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument( '--gpus', default=4, type=int, help='number of gpus to use') parser.add_argument( '--epochs', default=1, type=int, help='number of total epochs to run') parser.add_argument( '--dataloader_num_workers', default=2, type=int, help='number of workders for dataloader') parser.add_argument( '--train_data_path', default='', type=str, help='path to training data') parser.add_argument( '--train_batch_size', default=32, type=int, help='batch size for training per gpu') parser.add_argument( '--eval_data_path', default='', type=str, help='path to evaluation data') parser.add_argument( '--eval_batch_size', default=32, type=int, help='batch size for evaluation per gpu') args = parser.parse_args() return args def main(): args = create_args() os.environ['MASTER_ADDR'] = 'localhost' os.environ['MASTER_PORT'] = '8888' args.device = 'cuda' if torch.cuda.is_available() else 'cpu' print(f'Launch job on {args.gpus} GPUs with DDP') mp.spawn(worker, nprocs=args.gpus, args=(args,)) if __name__ == '__main__': main()