torchbenchmark/models/dcgan/__init__.py (187 lines of code) (raw):
# Ported from pytorch example:
# https://github.com/pytorch/examples/blob/master/dcgan/main.py
from __future__ import print_function
import argparse
import os
import random
import torch
import torch.nn as nn
import torch.nn.parallel
import torch.backends.cudnn as cudnn
import torch.optim as optim
import torch.utils.data
import torchvision.datasets as dset
import torchvision.transforms as transforms
import torchvision.utils as vutils
import numpy as np
from pathlib import Path
from ...util.model import BenchmarkModel
from torchbenchmark.tasks import COMPUTER_VISION
class DCGAN:
def __init__(self, bench):
# Spatial size of training images. All images will be resized to this
# size using a transformer.
self.image_size = 64
# Number of channels in the training images. For color images this is 3
self.nc = 3
# Size of z latent vector (i.e. size of generator input)
self.nz = 100
# Size of feature maps in generator
self.ngf = 64
# Size of feature maps in discriminator
self.ndf = 64
# Number of training epochs
self.num_epochs = 5
# Learning rate for optimizers
self.lr = 0.0002
# Beta1 hyperparam for Adam optimizers
self.beta1 = 0.5
# Number of GPUs available. Use 0 for CPU mode.
self.ngpu = 1
self.device = bench.device
# custom weights initialization called on netG and netD
def weights_init(m):
classname = m.__class__.__name__
if classname.find('Conv') != -1:
nn.init.normal_(m.weight.data, 0.0, 0.02)
elif classname.find('BatchNorm') != -1:
nn.init.normal_(m.weight.data, 1.0, 0.02)
nn.init.constant_(m.bias.data, 0)
class Generator(nn.Module):
def __init__(self, dcgan):
super(Generator, self).__init__()
self.ngpu = dcgan.ngpu
self.main = nn.Sequential(
# input is Z, going into a convolution
nn.ConvTranspose2d( dcgan.nz, dcgan.ngf * 8, 4, 1, 0, bias=False),
nn.BatchNorm2d(dcgan.ngf * 8),
nn.ReLU(True),
# state size. (dcgan.ngf*8) x 4 x 4
nn.ConvTranspose2d(dcgan.ngf * 8, dcgan.ngf * 4, 4, 2, 1, bias=False),
nn.BatchNorm2d(dcgan.ngf * 4),
nn.ReLU(True),
# state size. (dcgan.ngf*4) x 8 x 8
nn.ConvTranspose2d( dcgan.ngf * 4, dcgan.ngf * 2, 4, 2, 1, bias=False),
nn.BatchNorm2d(dcgan.ngf * 2),
nn.ReLU(True),
# state size. (dcgan.ngf*2) x 16 x 16
nn.ConvTranspose2d( dcgan.ngf * 2, dcgan.ngf, 4, 2, 1, bias=False),
nn.BatchNorm2d(dcgan.ngf),
nn.ReLU(True),
# state size. (dcgan.ngf) x 32 x 32
nn.ConvTranspose2d( dcgan.ngf, dcgan.nc, 4, 2, 1, bias=False),
nn.Tanh()
# state size. (dcgan.nc) x 64 x 64
)
self.jt = None
self.jitshape = None
self.debug_print = False
def forward(self, input):
if self.debug_print:
print(input.shape)
return self.main(input)
class Discriminator(nn.Module):
def __init__(self, ncgan):
ngpu = ncgan.ngpu
nc = ncgan.nc
ndf = ncgan.ndf
super(Discriminator, self).__init__()
self.ngpu = ngpu
self.main = nn.Sequential(
# input is (nc) x 64 x 64
nn.Conv2d(nc, ndf, 4, 2, 1, bias=False),
nn.LeakyReLU(0.2, inplace=True),
# state size. (ndf) x 32 x 32
nn.Conv2d(ndf, ndf * 2, 4, 2, 1, bias=False),
nn.BatchNorm2d(ndf * 2),
nn.LeakyReLU(0.2, inplace=True),
# state size. (ndf*2) x 16 x 16
nn.Conv2d(ndf * 2, ndf * 4, 4, 2, 1, bias=False),
nn.BatchNorm2d(ndf * 4),
nn.LeakyReLU(0.2, inplace=True),
# state size. (ndf*4) x 8 x 8
nn.Conv2d(ndf * 4, ndf * 8, 4, 2, 1, bias=False),
nn.BatchNorm2d(ndf * 8),
nn.LeakyReLU(0.2, inplace=True),
# state size. (ndf*8) x 4 x 4
nn.Conv2d(ndf * 8, 1, 4, 1, 0, bias=False),
nn.Sigmoid()
)
self.jt = None
self.jitshape = None
def forward(self, input):
return self.main(input)
class Model(BenchmarkModel):
task = COMPUTER_VISION.GENERATION
DEFAULT_TRAIN_BSIZE = 32
DEFAULT_EVAL_BSIZE = 256
def __init__(self, test, device, jit=False, batch_size=None, extra_args=[]):
super().__init__(test=test, device=device, jit=jit, batch_size=batch_size, extra_args=extra_args)
self.debug_print = False
self.root = str(Path(__file__).parent)
self.dcgan = DCGAN(self)
dcgan = self.dcgan
device = dcgan.device
ngpu = dcgan.ngpu
nz = dcgan.nz
lr = dcgan.lr
beta1 = dcgan.beta1
num_epochs = dcgan.num_epochs
# Create the generator
self.netG = Generator(dcgan).to(device)
# Handle multi-gpu if desired
if (dcgan.device == 'cuda') and (ngpu > 1):
self.netG = nn.DataParallel(self.netG, list(range(ngpu)))
# Apply the weights_init function to randomly initialize all weights
# to mean=0, stdev=0.2.
self.netG.apply(weights_init)
if self.debug_print:
# Print the model
print(self.netG)
# Create the Discriminator
netD = Discriminator(dcgan).to(device)
# Handle multi-gpu if desired
if (dcgan.device == 'cuda') and (ngpu > 1):
netD = nn.DataParallel(self.netD, list(range(ngpu)))
# Apply the weights_init function to randomly initialize all weights
# to mean=0, stdev=0.2.
netD.apply(weights_init)
if self.debug_print:
# Print the model
print(netD)
# Initialize BCELoss function
self.criterion = nn.BCELoss()
# Create batch of latent vectors that we will use to visualize
# the progression of the generator
self.fixed_noise = torch.randn(64, nz, 1, 1, device=device)
# Establish convention for real and fake labels during training
self.real_label = 1.
self.fake_label = 0.
# Random values as surrogate for batch of photos
self.exmaple_inputs = torch.randn(self.batch_size, 3, 64, 64, device=self.device)
self.model = netD
if test == "train":
# Setup Adam optimizers for both G and D
self.optimizerD = optim.Adam(netD.parameters(), lr=lr, betas=(beta1, 0.999))
self.optimizerG = optim.Adam(self.netG.parameters(), lr=lr, betas=(beta1, 0.999))
elif test == "eval":
# inference would just run descriminator so thats what we'll do too.
self.inference_just_descriminator = True
if False == self.inference_just_descriminator:
self.eval_noise = torch.randn(self.batch_size, nz, 1, 1, device=self.device)
def jit_callback(self):
assert self.jit, "Calling JIT callback without specifying the JIT option."
self.model = torch.jit.trace(self.model,(self.exmaple_inputs,))
if self.test == "eval" and False == self.inference_just_descriminator:
self.netG = torch.jit.trace(self.netG,(self.eval_noise,))
def get_module(self):
return self.model, (self.exmaple_inputs,)
def eval(self, niter=1):
for _ in range(niter):
if False == self.inference_just_descriminator:
# Generate fake image batch with G
self.eval_fake = self.netG(self.eval_noise)
# Since we just updated D, perform another forward pass of all-fake batch through D
output = self.model(self.exmaple_inputs).view(-1)
return (output, )
def train(self, niter=1):
# Training Loop
# Lists to keep track of progress
img_list = []
iters = 0
dcgan = self.dcgan
device = dcgan.device
num_epochs = dcgan.num_epochs
num_train_batch = niter
lr = dcgan.lr
nz = dcgan.nz
beta1 = dcgan.beta1
netD = self.model
netG = self.netG
criterion = self.criterion
optimizerD = self.optimizerD
optimizerG = self.optimizerG
real_label = self.real_label
fake_label = self.fake_label
benchmark_pic = self.exmaple_inputs
# For each epoch
for epoch in range(num_epochs):
for i in range(num_train_batch):
############################
# (1) Update D network: maximize log(D(x)) + log(1 - D(G(z)))
###########################
## Train with all-real batch
netD.zero_grad()
# Format batch
real_cpu = benchmark_pic
b_size = real_cpu.size(0)
label = torch.full((b_size,), real_label, dtype=torch.float, device=device)
# Forward pass real batch through D
output = netD(real_cpu).view(-1)
# Calculate loss on all-real batch
errD_real = criterion(output, label)
# Calculate gradients for D in backward pass
errD_real.backward()
D_x = output.mean().item()
## Train with all-fake batch
# Generate batch of latent vectors
noise = torch.randn(b_size, nz, 1, 1, device=device)
# Generate fake image batch with G
fake = netG(noise)
label.fill_(fake_label)
# Classify all fake batch with D
output = netD(fake.detach()).view(-1)
# Calculate D's loss on the all-fake batch
errD_fake = criterion(output, label)
# Calculate the gradients for this batch, accumulated (summed) with previous gradients
errD_fake.backward()
D_G_z1 = output.mean().item()
# Compute error of D as sum over the fake and the real batches
errD = errD_real + errD_fake
# Update D
optimizerD.step()
############################
# (2) Update G network: maximize log(D(G(z)))
###########################
netG.zero_grad()
label.fill_(real_label) # fake labels are real for generator cost
# Since we just updated D, perform another forward pass of all-fake batch through D
output = netD(fake).view(-1)
# Calculate G's loss based on this output
errG = criterion(output, label)
# Calculate gradients for G
errG.backward()
D_G_z2 = output.mean().item()
# Update G
optimizerG.step()