in gke-topology-scheduler/schedule-daemon.py [0:0]
def run_scheduling_loop() -> None:
"""Runs scheduling.
This function runs a infinite loop that periodically schedules pods with
topology-aware scheduling gates.
"""
parser = argparse.ArgumentParser(prog='schedule-workload.py')
parser.add_argument(
'-g', '--gate', default='gke.io/topology-aware-auto-'
) # prefix of the schedule gate
parser.add_argument(
'-i', '--interval', default=1.0
) # intervals (in seconds) between scheduling
parser.add_argument(
'--ignored-namespace', nargs='*', default=[]
) # namespace to search for pods
args = parser.parse_args()
try:
kubernetes.config.load_incluster_config()
except kubernetes.config.ConfigException:
kubernetes.config.load_kube_config()
v1 = kubernetes.client.CoreV1Api()
# wait needed during container restart to allow previously scheduled pods
# to be visible on nodes and occupy resources for their correct estimates
logging.info('[Cool off] 90sec')
time.sleep(90.0)
try:
last_run_ts = time.time()
while True:
time_since_prev_run = time.time() - last_run_ts
if time_since_prev_run < args.interval:
logging.info('[Cool off] %ssec', args.interval - time_since_prev_run)
time.sleep(args.interval - time_since_prev_run)
last_run_ts = time.time()
# Get pods to schedule
pods: list[kubernetes.client.models.V1Pod] = list_pods(v1, 'Pending')
gates = find_pod_gates(pods, args.gate)
logging.info('Found %s pending pods and %s gates', len(pods), len(gates))
if not gates:
# No pods to be scheduled
continue
# sleep for 5 seconds, assuming that all pods within one group would be
# all visible by then
logging.info('[Cool off] 5sec')
time.sleep(5.0)
for g in gates:
logging.info('Scheduling pods with gate %s', g)
schedule_pod_with_gate(v1, g)
logging.info('[Cool off] 60sec')
time.sleep(60.0) # cool off
except kubernetes.client.rest.ApiException as e:
logging.exception('Exception when listing Kubernetes nodes or pods: %s', e)