gpudirect-tcpxo/topology-scheduler/schedule-daemon.py (349 lines of code) (raw):
#!/usr/bin/env python
# Copyright 2024 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
from itertools import groupby
import time
import kubernetes
import kubernetes.client
from kubernetes.client.rest import ApiException
from kubernetes.utils.quantity import parse_quantity
def split_pods_based_on_jobs(pods):
"""Splits pending pods into groups based on jobs."""
return [
list(job_group)
for _, job_group in groupby(pods, lambda pod: pod.get('job_name'))
]
def sort_jobs_by_time(job):
"""Return the key to be used for sorting jobs which is by creation time."""
# All the pods in the job should have the same creation time.
return job[0].get('creation_time')
def pod_sorting_key(pod):
"""Returns key to be used for sorting pods.
Given that numbers is often suffixed for multi-node deployments,
here we use a (prefix, number) tuple for the sorting key.
This means "xxx-pod2" should appear before "xxx-pod10"
"""
if pod['index'] is not None:
return int(pod['index'])
# if the suffix is a number, extract it
idx = 0
suffix = ""
name = pod['name']
while name[-1 - len(suffix)].isdigit():
suffix = name[-1 - len(suffix)] + suffix
if suffix != "":
idx = int(suffix)
return (name[:len(name) - len(suffix)], idx)
def node_topology_distance(node1, node2):
node1_key = node_topology_key(node1)
node2_key = node_topology_key(node2)
result = 1000000
for i in range(len(node1_key)):
if node1_key[i] != node2_key[i]:
return result
result /= 100
return 0
def node_topology_key(node):
"""Builds a key to be used to sort nodes."""
node_labels = node['node_labels']
if (
'cloud.google.com/gke-placement-group' in node_labels
and 'topology.gke.io/cluster' in node_labels
and 'topology.gke.io/rack' in node_labels
and 'topology.gke.io/host' in node_labels
):
return (
node_labels['cloud.google.com/gke-placement-group'],
node_labels['topology.gke.io/cluster'],
node_labels['topology.gke.io/rack'],
node_labels['topology.gke.io/host'],
)
return ()
def get_pod_used_resources(pod):
"""Get the resources used by this pod"""
used_cpu = 0
used_memory = 0
used_gpu = 0
if pod.status is None or pod.status.container_statuses is None:
return used_cpu, used_memory, used_gpu
for container, container_status in zip(pod.spec.containers, pod.status.container_statuses):
if container_status.state.terminated is not None:
# terminated pods don't use resources
continue
requests = container.resources.requests or {}
used_cpu += parse_quantity(requests.get('cpu', 0))
used_memory += parse_quantity(requests.get('memory', 0))
used_gpu += int(requests.get('nvidia.com/gpu', 0))
return used_cpu, used_memory, used_gpu
def get_pods_taint_toleration(pods):
"""Get the taint tolerations of the pods.
For simplicity, we assume that the pods are homogeneous and
all have the same tolerations.
"""
ts = None
for pod in pods:
tolerations = pod['spec'].tolerations
if ts is None:
ts = tolerations
else:
assert(ts == tolerations)
return ts if ts is not None else []
def find_schedulable_nodes(nodes, pods, tolerated_taints):
"""Finds nodes that can be scheduled."""
nodes_info = {}
if tolerated_taints is not None:
tolerated_taint_dict = {t.key: t for t in tolerated_taints}
else:
tolerated_taint_dict = {}
for node in nodes:
node_name = node.metadata.name
node_labels = node.metadata.labels
if 'cloud.google.com/gke-placement-group' not in node_labels:
print(
f'Skipping node {node_name} because it does not have topology'
' metadata'
)
continue
skip_node = False
# check node taints
if node.spec.taints is not None:
for t in node.spec.taints:
if t.key not in tolerated_taint_dict:
print(f'Skipping node {node_name} because it is tainted with key {t.key}')
skip_node = True
break
else:
tol = tolerated_taint_dict[t.key]
if tol.operator == "Equal" and tol.value != t.value:
print(f'Skipping node {node_name} because it is tainted with key {t.key} with value {t.value}')
skip_node = True
break
# check node status
if any(condition.type == "Ready" and condition.status != "True" for condition in node.status.conditions):
print(f'Skipping node {node_name} because it is NotReady')
skip_node = True
break
if skip_node:
continue
allocatable = node.status.allocatable
used_cpu = 0
used_memory = 0
used_gpu = 0
for pod in pods:
if pod.spec.node_name == node_name:
cpu, mem, gpu = get_pod_used_resources(pod)
used_cpu += cpu
used_memory += mem
used_gpu += gpu
free_cpu = parse_quantity(allocatable['cpu']) - used_cpu
free_memory = parse_quantity(allocatable['memory']) - used_memory
free_gpu = int(allocatable.get('nvidia.com/gpu', 0)) - used_gpu
node_info = {
'name': node_name,
'cpu': free_cpu,
'memory': free_memory,
'gpu': free_gpu,
'node_labels': node_labels,
}
nodes_info[node_name] = node_info
print(
f'Node: {node_name}, CPU: {free_cpu}, Memory: {free_memory}, GPU:'
f' {free_gpu}, Topology: {node_topology_key(node_info)}'
)
return nodes_info
def find_pod_gates(pods, prefix):
"""Finds pods with scheduling gates that starts with the prefix"""
s = set()
for pod in pods:
if pod.spec.scheduling_gates:
for g in pod.spec.scheduling_gates:
if g.name.startswith(prefix):
s.add(g.name)
return s
def find_schedulable_pods(pods, gate_name):
"""Finds pods that can be scheduled."""
pods_to_schedule = {}
for pod in pods:
if pod.spec.scheduling_gates:
gates = pod.spec.scheduling_gates
for gate in gates:
if gate.name == gate_name:
pod_name = pod.metadata.name
pod_namespace = pod.metadata.namespace
pod_index = None
job_name = None
if pod.metadata.labels is not None:
if (
'batch.kubernetes.io/job-completion-index'
in pod.metadata.labels
):
pod_index = pod.metadata.labels[
'batch.kubernetes.io/job-completion-index'
]
else:
print('Unable to find index in metadata. Can not queue jobs')
if 'job-name' in pod.metadata.labels:
job_name = pod.metadata.labels['job-name']
else:
print('Unable to find job_name in metadata. Can not queue jobs')
else:
print('No labels on pod to extract job metadata from.')
creation_time = None
if pod.metadata.creation_timestamp is not None:
creation_time = pod.metadata.creation_timestamp
else:
print(
'Unable to find creation_time in metadata. Can not queue jobs'
)
used_cpu = 0
used_memory = 0
used_gpu = 0
for container in pod.spec.containers:
requests = container.resources.requests or {}
used_cpu += parse_quantity(requests.get('cpu', 0))
used_memory += parse_quantity(requests.get('memory', 0))
used_gpu += int(requests.get('nvidia.com/gpu', 0))
pods_to_schedule[pod_name] = {
'name': pod_name,
'namespace': pod_namespace,
'index': pod_index,
'cpu': used_cpu,
'memory': used_memory,
'gpu': used_gpu,
'node_selector': pod.spec.node_selector,
'spec': pod.spec,
'metadata': pod.metadata,
'job_name': job_name,
'creation_time': creation_time
}
print(
f'Found schedulable pod: {pod_namespace}/{pod_name}, CPU:'
f' {used_cpu}, Memory: {used_memory}, GPU: {used_gpu}'
f' Index: {pod_index}'
)
return pods_to_schedule
def can_schedule(node, pod):
"""Checks if a given pod can be scheduled on a given node."""
node_selector = pod['node_selector']
node_labels = node['node_labels']
if node_selector:
for key, value in node_selector.items():
if key not in node_labels or node_labels[key] != value:
return False
return (
node['cpu'] >= pod['cpu']
and node['memory'] >= pod['memory']
and node['gpu'] >= pod['gpu']
)
def schedule_pod_on_node(v1, pod_name, pod_namespace, node, gate_name):
"""Schedules a pod on a given node."""
try:
pod = v1.read_namespaced_pod(pod_name, pod_namespace)
if any(gate.name == gate_name for gate in pod.spec.scheduling_gates):
new_gates = [
gate for gate in pod.spec.scheduling_gates if gate.name != gate_name
]
pod.spec.affinity = {
'nodeAffinity': {
'requiredDuringSchedulingIgnoredDuringExecution': {
'nodeSelectorTerms': [{
'matchExpressions': [{
'key': 'kubernetes.io/hostname',
'operator': 'In',
'values': [node['name']],
}]
}]
}
}
}
pod.spec.scheduling_gates = new_gates
v1.replace_namespaced_pod(pod_name, pod_namespace, pod)
print(
'Pod %s/%s scheduled on %s with topology %s', pod_namespace, pod_name, node['name'], node_topology_key(node)
)
except ApiException as e:
print(f'Exception when removing scheduling gate: {e}')
def calculate_pods_assignment(sorted_nodes, sorted_pods):
"""Calculates the best assignment for pods."""
assignment = [-i for i in reversed(range(1, len(sorted_pods) + 1))]
best_assignment = []
minimum_distance = 1000000000
while True:
all_ok = True
i = len(assignment) - 1
while i >= 0 and all_ok:
assignment[i] += 1
if assignment[i] == len(sorted_nodes):
break
if assignment[i] >= 0 and can_schedule(
sorted_nodes[assignment[i]], sorted_pods[i]
):
i -= 1
elif i < len(assignment) - 1 and assignment[i] == assignment[i + 1] - 1:
all_ok = False
if assignment[-1] == len(sorted_nodes):
break
if all_ok:
new_distance = 0
for i in range(1, len(sorted_pods)):
new_distance += node_topology_distance(
sorted_nodes[assignment[i]], sorted_nodes[assignment[i - 1]]
)
if new_distance < minimum_distance:
best_assignment = assignment.copy()
minimum_distance = new_distance
return best_assignment
def schedule_pod_with_gate(v1, pods, gate):
pods_to_schedule = find_schedulable_pods(pods, gate)
nodes = v1.list_node().items
print(f'Pods to schedule: {len(pods_to_schedule)}')
jobs = split_pods_based_on_jobs(pods_to_schedule.values())
sorted_jobs = sorted(jobs, key=sort_jobs_by_time)
for job in sorted_jobs:
job_name = job[0].get('job_name')
creation_time = job[0].get('creation_time')
print(f'Attempting to schedule job: {job_name} created: {creation_time}')
tolerated_taints = get_pods_taint_toleration(job)
nodes_to_schedule = find_schedulable_nodes(nodes, pods, tolerated_taints)
sorted_pods = sorted(job, key=pod_sorting_key)
sorted_nodes = sorted(nodes_to_schedule.values(), key=node_topology_key)
print(f'Nodes to schedule: {len(nodes_to_schedule)}')
best_assignment = calculate_pods_assignment(sorted_nodes, sorted_pods)
if not best_assignment:
print(
f'No scheduling for job: {job_name} with gate {gate} has been found.'
' Skipping job.'
)
continue
else:
print(f'Assignment found, scheduling {job_name} with {len(jobs)} pods.')
for i in range(0, len(sorted_pods)):
pod = sorted_pods[i]
node = sorted_nodes[best_assignment[i]]
schedule_pod_on_node(
v1, pod['name'], pod['namespace'], node, gate
)
def run_scheduling_loop():
"""Runs scheduling."""
parser = argparse.ArgumentParser(
prog='schedule-workload.py')
parser.add_argument(
'-g', '--gate',
default='gke.io/topology-aware-auto-') # prefix of the schedule gate
parser.add_argument(
'-i', '--interval',
default=1.0) # intervals (in seconds) between scheduling
parser.add_argument(
'--ignored-namespace',
nargs='*',
default=[]) # namespace to search for pods
args = parser.parse_args()
try:
kubernetes.config.load_incluster_config()
except kubernetes.config.ConfigException:
kubernetes.config.load_kube_config()
v1 = kubernetes.client.CoreV1Api()
def list_pods():
# filtering of namespace is not cached as namespaces could be
# created and deleted
namespaces = v1.list_namespace().items
filtered_namespace_names = []
for n in namespaces:
if n.metadata.name not in args.ignored_namespace:
filtered_namespace_names.append(n.metadata.name)
pods = []
for n in filtered_namespace_names:
pods += v1.list_namespaced_pod(n).items
return pods
try:
t0 = time.time()
while True:
interval = time.time() - t0
if interval < args.interval:
time.sleep(args.interval - interval)
t0 = time.time()
pods = list_pods()
gates = find_pod_gates(pods, args.gate)
print(f"Found {len(pods)} pods and {len(gates)} gates")
if len(gates) == 0:
# No pods to be scheduled
continue
# sleep for one seconds, assuming that all pods within one group would be
# all visible by then
time.sleep(5.0)
for g in gates:
print(f"scheduling pods with gate {g}")
# query the pods again after the sleep, just in case not all gated pods
# are returned from previous query
pods = list_pods()
schedule_pod_with_gate(v1, pods, g)
except ApiException as e:
print(f'Exception when listing Kubernetes nodes or pods: {e}')
if __name__ == '__main__':
run_scheduling_loop()