in gpudirect-tcpxo/topology-scheduler/schedule-daemon.py [0:0]
def run_scheduling_loop():
"""Runs scheduling."""
parser = argparse.ArgumentParser(
prog='schedule-workload.py')
parser.add_argument(
'-g', '--gate',
default='gke.io/topology-aware-auto-') # prefix of the schedule gate
parser.add_argument(
'-i', '--interval',
default=1.0) # intervals (in seconds) between scheduling
parser.add_argument(
'--ignored-namespace',
nargs='*',
default=[]) # namespace to search for pods
args = parser.parse_args()
try:
kubernetes.config.load_incluster_config()
except kubernetes.config.ConfigException:
kubernetes.config.load_kube_config()
v1 = kubernetes.client.CoreV1Api()
def list_pods():
# filtering of namespace is not cached as namespaces could be
# created and deleted
namespaces = v1.list_namespace().items
filtered_namespace_names = []
for n in namespaces:
if n.metadata.name not in args.ignored_namespace:
filtered_namespace_names.append(n.metadata.name)
pods = []
for n in filtered_namespace_names:
pods += v1.list_namespaced_pod(n).items
return pods
try:
t0 = time.time()
while True:
interval = time.time() - t0
if interval < args.interval:
time.sleep(args.interval - interval)
t0 = time.time()
pods = list_pods()
gates = find_pod_gates(pods, args.gate)
print(f"Found {len(pods)} pods and {len(gates)} gates")
if len(gates) == 0:
# No pods to be scheduled
continue
# sleep for one seconds, assuming that all pods within one group would be
# all visible by then
time.sleep(5.0)
for g in gates:
print(f"scheduling pods with gate {g}")
# query the pods again after the sleep, just in case not all gated pods
# are returned from previous query
pods = list_pods()
schedule_pod_with_gate(v1, pods, g)
except ApiException as e:
print(f'Exception when listing Kubernetes nodes or pods: {e}')