in distributed/rpc/ddp_rpc/main.py [0:0]
def _run_trainer(remote_emb_module, rank):
r"""
Each trainer runs a forward pass which involves an embedding lookup on the
parameter server and running nn.Linear locally. During the backward pass,
DDP is responsible for aggregating the gradients for the dense part
(nn.Linear) and distributed autograd ensures gradients updates are
propagated to the parameter server.
"""
# Setup the model.
model = HybridModel(remote_emb_module, rank)
# Retrieve all model parameters as rrefs for DistributedOptimizer.
# Retrieve parameters for embedding table.
model_parameter_rrefs = model.remote_emb_module.remote_parameters()
# model.fc.parameters() only includes local parameters.
# NOTE: Cannot call model.parameters() here,
# because this will call remote_emb_module.parameters(),
# which supports remote_parameters() but not parameters().
for param in model.fc.parameters():
model_parameter_rrefs.append(RRef(param))
# Setup distributed optimizer
opt = DistributedOptimizer(
optim.SGD,
model_parameter_rrefs,
lr=0.05,
)
criterion = torch.nn.CrossEntropyLoss()
def get_next_batch(rank):
for _ in range(10):
num_indices = random.randint(20, 50)
indices = torch.LongTensor(num_indices).random_(0, NUM_EMBEDDINGS)
# Generate offsets.
offsets = []
start = 0
batch_size = 0
while start < num_indices:
offsets.append(start)
start += random.randint(1, 10)
batch_size += 1
offsets_tensor = torch.LongTensor(offsets)
target = torch.LongTensor(batch_size).random_(8).cuda(rank)
yield indices, offsets_tensor, target
# Train for 100 epochs
for epoch in range(100):
# create distributed autograd context
for indices, offsets, target in get_next_batch(rank):
with dist_autograd.context() as context_id:
output = model(indices, offsets)
loss = criterion(output, target)
# Run distributed backward pass
dist_autograd.backward(context_id, [loss])
# Tun distributed optimizer
opt.step(context_id)
# Not necessary to zero grads as each iteration creates a different
# distributed autograd context which hosts different grads
print("Training done for epoch {}".format(epoch))