best-practices/gke-batch-refarch/compact_placement/workloads/team-a/team-a-compact-job-1.yaml (205 lines of code) (raw):
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
apiVersion: v1
kind: Service
metadata:
name: team-a-compact-svc-1
namespace: team-a
spec:
clusterIP: None
selector:
job-name: team-a-compact-job-1
---
apiVersion: v1
kind: ConfigMap
metadata:
name: team-a-compact-config-1
namespace: team-a
data:
main.py: |
#!/usr/bin/python
#
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os, sys, logging
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torch.distributed as dist
from torchvision import datasets
from torchvision import transforms
# Set up logging
root = logging.getLogger()
root.setLevel(logging.DEBUG)
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s [%(levelname)s] %(name)s:%(lineno)d: %(message)s')
handler.setFormatter(formatter)
root.handlers.clear()
root.addHandler(handler)
NUM_EPOCHS = 8 # number of training passes over the training dataset
BATCH_SIZE = 128 # dataset batch size; each batch gets evenly distributed across hosts and local devices per host
class MLP(nn.Module):
def __init__(self, num_classes):
super(MLP, self).__init__()
self.conv1 = nn.Conv2d(1, 32, 3, 1)
self.conv2 = nn.Conv2d(32, 64, 3, 1)
self.dropout1 = nn.Dropout(0.25)
self.dropout2 = nn.Dropout(0.5)
self.fc1 = nn.Linear(9216, 128)
self.fc2 = nn.Linear(128, num_classes)
def forward(self, x):
x = self.conv1(x)
x = F.relu(x)
x = self.conv2(x)
x = F.relu(x)
x = F.max_pool2d(x, 2)
x = self.dropout1(x)
x = torch.flatten(x, 1)
x = self.fc1(x)
x = F.relu(x)
x = self.dropout2(x)
x = self.fc2(x)
output = F.log_softmax(x, dim=1)
return output
# Load dataset and create dataloader
def load_dataset(rank, world_size):
dataset = datasets.MNIST(f'./data_{rank}', train=True, download=True,
transform=transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))
]))
num_classes = len(set(dataset.targets.numpy()))
mini_batch_size = BATCH_SIZE // world_size
sampler = torch.utils.data.distributed.DistributedSampler(dataset, num_replicas=world_size, rank=rank)
dataloader = torch.utils.data.DataLoader(dataset, batch_size=mini_batch_size, sampler=sampler)
return dataloader, num_classes
# Average gradients across all ranks
def average_gradients(model):
world_size = float(dist.get_world_size())
for param in model.parameters():
dist.all_reduce(param.grad.data, op=dist.ReduceOp.SUM)
param.grad.data /= world_size
# Average loss across all ranks
def average_loss(train_loss, device):
train_loss_tensor = torch.tensor(train_loss).to(device)
dist.all_reduce(train_loss_tensor, op=dist.ReduceOp.SUM)
return train_loss_tensor.item() / dist.get_world_size()
# Main training loop
def run():
local_rank = int(os.environ["LOCAL_RANK"])
rank = dist.get_rank()
size = dist.get_world_size()
logging.info(f"Rank {rank}: starting, world size={size}")
torch.manual_seed(1234)
train_set, num_classes = load_dataset(rank, size)
logging.info(f"Rank {rank}: num_classes: {num_classes}")
# Set up device and model
if torch.cuda.is_available():
device = torch.device(f"cuda:{local_rank}")
torch.cuda.set_device(local_rank)
model = MLP(num_classes).to(device)
model = nn.parallel.DistributedDataParallel(model, device_ids=[local_rank], output_device=local_rank)
else:
device = torch.device(f"cpu")
model = MLP(num_classes).to(device)
model = nn.parallel.DistributedDataParallel(model)
# Initialize the optimizer
optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.5)
# Train the model
for epoch in range(NUM_EPOCHS):
logging.info(f"Rank {rank}, epoch={epoch+1}/{NUM_EPOCHS}: starting")
epoch_loss = 0.0
for train_batch_num, (data, target) in enumerate(train_set):
data, target = data.to(device), target.to(device)
mini_batch_size = len(data)
optimizer.zero_grad()
output = model(data)
loss = F.nll_loss(output, target)
loss_item = loss.item()
epoch_loss += loss_item
loss.backward()
average_gradients(model)
optimizer.step()
logging.info(f"Rank {rank}: epoch: {epoch+1}/{NUM_EPOCHS}, batch: {train_batch_num+1}/{len(train_set)}, mini-batch size: {mini_batch_size}")
avg_epoch_loss = average_loss(epoch_loss / len(train_set), device)
logging.info(f"Rank {rank}: epoch {epoch+1}/{NUM_EPOCHS}, average epoch loss={avg_epoch_loss:.4f}")
logging.info(f"Rank {rank}: training completed.")
def _get_backend():
if torch.cuda.is_available() and torch.cuda.nccl.is_available([torch.rand(2, 2, device='cuda')]):
return "nccl"
return "gloo"
if __name__ == "__main__":
for key in ["LOCAL_RANK", "RANK", "GROUP_RANK", "ROLE_RANK", "LOCAL_WORLD_SIZE", "WORLD_SIZE", "ROLE_WORLD_SIZE", "MASTER_ADDR", "MASTER_PORT"]:
value = os.environ[key]
logging.info(f"env: {key}={value}")
dist.init_process_group(backend=_get_backend())
run()
---
apiVersion: batch/v1
kind: Job
metadata:
namespace: team-a # Job under team-a namespace
name: team-a-compact-job-1
labels:
kueue.x-k8s.io/queue-name: lq-team-a-compact # Point to the LocalQueue for team-a
spec:
backoffLimit: 15
podFailurePolicy:
rules:
- action: Ignore
onPodConditions:
- type: DisruptionTarget
suspend: true # Set to true to allow Kueue to control the Job when it starts
parallelism: 2
completions: 2
completionMode: Indexed
template:
spec:
priorityClassName: compact-priority-preempting
subdomain: team-a-compact-svc-1
serviceAccountName: team-a-ksa
volumes:
- name: script-volume
configMap:
name: team-a-compact-config-1
tolerations:
- key: "nvidia.com/gpu"
operator: "Exists"
effect: NoSchedule
- key: placement-group
operator: "Equal"
value: "team-a-1-compact"
effect: "NoSchedule"
- key: resource-type
operator: "Equal"
value: "compact"
effect: "NoSchedule"
- key: cloud.google.com/gke-placement-group
operator: "Equal"
value: "team-a-1-compact"
effect: "NoSchedule"
- key: cloud.google.com/gke-spot
operator: "Equal"
value: "true"
effect: "NoSchedule"
containers:
- name: main
image: $REGION-docker.pkg.dev/$PROJECT_ID/tutorial-installer/pytorch-gpu.1-12:v1
command: ["bash"]
args:
- -c
- |
torchrun --node_rank $JOB_COMPLETION_INDEX \
--nnodes 2 \
--nproc_per_node 1 \
--master_addr team-a-compact-job-1-0.team-a-compact-svc-1 \
--master_port 8080 /script-path/main.py
ports:
- containerPort: 8080
imagePullPolicy: IfNotPresent
resources:
requests:
cpu: "10"
memory: "60Gi"
limits:
cpu: "10"
memory: "60Gi"
nvidia.com/gpu: "1"
volumeMounts:
- mountPath: /script-path
name: script-volume
restartPolicy: Never
nodeSelector:
cloud.google.com/gke-spot: "true"
cloud.google.com/gke-accelerator: nvidia-tesla-a100
cloud.google.com/gke-placement-group: team-a-1-compact
placement-group: team-a-1-compact
resource-type: compact