cluster-trace-gpu-v2020/simulator/scheduler.py (187 lines of code) (raw):

import utils from utils import print_fn, ALLOC_POLICY_DICT, PREEMPT_POLICY_DICT, _repr_job_concise class Scheduler: def __init__(self, alloc_policy=0, preempt_policy=0, sort_node_policy=0, cluster=None, gpu_type_matching=0, verbose=0): self.cluster = cluster self.alloc_policy = alloc_policy self.preempt_policy = preempt_policy self.sort_node_policy = sort_node_policy self.node_rotate_counter = 0 self.verbose = verbose self.gpu_type_matching = gpu_type_matching # To skip unnecessary self.alloc_job_sort() self.last_time_snapshot = [0, 0, 0, 0] # [idle_gpu, idle_cpu, len(job_list), len(job_to_allocate_cache)] self.cannot_counter = 0 def alloc_job(self, cluster=None): cluster = cluster if cluster is not None else self.cluster job_list = cluster.job_list # Take cluster.job_list # Trying skipping allocation as early as possible if len(job_list) <= 0: return 0 ig, ic = cluster.idl_gpus, cluster.idl_cpus this_time_snapshot = [ig, ic, len(job_list), 0] # 0: no job allocated. if self.last_time_snapshot == this_time_snapshot: # exactly the same if self.verbose: print_fn("[{}] Last time snapshot == this time snapshot: {}. Bypass.".format(self.cluster.cur_time, this_time_snapshot)) return 0 job_min_gpu, job_min_cpu = min(job_list, key=lambda j: j['num_inst'] * j['num_gpu']), min(job_list, key=lambda j: j['num_inst'] * j['num_cpu']) if (ig <= 0 or job_min_gpu['num_inst'] * job_min_gpu['num_gpu'] > ig) and (ic <= 0 or job_min_cpu['num_inst'] * job_min_cpu['num_cpu'] > ic): self.last_time_snapshot = this_time_snapshot return 0 if self.verbose: print_fn("job_min_gpu, job_min_cpu = {:.1f}, {:.1f}".format(job_min_gpu['num_gpu'], job_min_cpu['num_cpu'])) job_to_allocate_cache = [] # Greedy algorithm or Greedy + load balancing if self.alloc_policy in ALLOC_POLICY_DICT.keys(): # Heavy action self.alloc_job_sort(job_list, cluster.job_runn_list) for job_a in job_list: succ_alloc = self.try_allocate_job_to_cluster(job_a, cluster) if succ_alloc == 1: job_to_allocate_cache.append(job_a) elif succ_alloc == -1: break # else, e.g., succ_alloc == 0: pass/continue else: raise KeyError("Uncaptured Allocation Policy Input: %d" % self.alloc_policy) this_time_snapshot[-1] = len(job_to_allocate_cache) # num of jobs allocated self.last_time_snapshot = this_time_snapshot for job_a in job_to_allocate_cache: cluster.job_list.remove(job_a) def alloc_job_sort(self, job_list, job_runn_list=None): if self.alloc_policy == 0: # short_duration_first job_list.sort(key=lambda e: (e['duration'], e['job_id'])) elif self.alloc_policy == 8: # FIFO, remains the original order job_list.sort(key=lambda e: (e['submit_time'], e['job_id'])) elif self.alloc_policy in [1, 2, 4]: # SJF with duration estimation est_feature = {1: 'user_dur', 2: 'group_dur', 4: 'group_gpu_dur'}[self.alloc_policy] job_list.sort(key=lambda e: (e[est_feature], e['job_id'])) else: raise Exception("Unexpected alloc policy: %d" % self.alloc_policy) if self.verbose: for i, j in enumerate(job_list): print_fn("%2d %s" % (i, j)) if i > 20: break def try_allocate_job_to_cluster(self, job_a, cluster): """ job_a: job to allocate cluster: target cluster return: -1: the cluster is full, stop job picking 0: the current job cannot be placed, try next 1: the current job has been successfully deployed, need record. """ ig, ic = cluster.idl_gpus, cluster.idl_cpus if ig <= 0 and ic <= 0: return -1 elif job_a['num_inst'] * job_a['num_gpu'] > ig or job_a['num_inst'] * job_a['num_cpu'] > ic: return 0 else: # with in gpu and cpu limits assigned_node_map = {} assigned_inst_num = 0 sorted_node_list = self.sorted_node_list(cluster.node_list) for nid, node in enumerate(sorted_node_list): # <Node-job label matching> if self.gpu_type_matching == 1: # GPU type perfect match if job_a['gpu_type'] != 'CPU' and job_a['gpu_type'] != node.gpu_type: continue # cannot on this node elif self.gpu_type_matching == 2: # Only V100 cannot compromise if job_a['gpu_type'] == 'V100' and job_a['gpu_type'] != node.gpu_type: continue # cannot on this node # </Node-job label matching> if job_a['num_inst'] == 1: if job_a['num_gpu'] <= node.idl_gpus and job_a['num_cpu'] <= node.idl_cpus: succ_alloc = node.alloc_job(job_a) assert succ_alloc job_a['node'] = node.id print_fn("%sON : N[%d] %s" % (cluster.log_prefix, job_a['node'], job_a)) self.display_node_status(cur_node_id=job_a['node']) return 1 else: # gang-scheduling: all or nothing node_idle_gpus, node_idle_cpus = node.idl_gpus, node.idl_cpus node_inst_num_gpu, node_inst_num_cpu = job_a['num_inst'], job_a['num_inst'] # init. if job_a['num_gpu'] != 0: node_inst_num_gpu = node_idle_gpus // job_a['num_gpu'] if job_a['num_cpu'] != 0: node_inst_num_cpu = node_idle_cpus // job_a['num_cpu'] node_inst_num = min(node_inst_num_gpu, node_inst_num_cpu) if assigned_inst_num + node_inst_num >= job_a['num_inst']: node_inst_num = job_a['num_inst'] - assigned_inst_num assigned_node_map[nid] = node_inst_num assigned_inst_num += node_inst_num break elif node_inst_num > 0: assigned_node_map[nid] = node_inst_num assigned_inst_num += node_inst_num if assigned_inst_num < job_a['num_inst']: print_fn("Cannot allocate all instances (%d/%d) of %s." % (assigned_inst_num, job_a['num_inst'], _repr_job_concise(job_a))) self.cannot_counter += 1 if self.cannot_counter % 100000 == 0: print_fn("[%s] %d rejects. len(job_done_list) = %d. Current job: %s." % (cluster.log_prefix, self.cannot_counter, len(self.cluster.job_history.job_done_list), _repr_job_concise(job_a))) return 0 # No successful allocation, for num_inst=1 and >1 cases else: # Successfully Scheduled. Assigning instances to nodes according to the map inst_id = 0 for nid, num_inst in assigned_node_map.items(): node = sorted_node_list[nid] job_tmp = {'node': -1} for _ in range(num_inst): job_tmp = job_a.copy() job_tmp['inst_id'] = inst_id succ_alloc = node.alloc_job(job_tmp) assert succ_alloc job_tmp['node'] = node.id print_fn("%sON : N[%d] %s Inst[%d]" % (cluster.log_prefix, job_tmp['node'], job_tmp, inst_id)) inst_id += 1 self.display_node_status(cur_node_id=job_tmp['node']) assert inst_id == job_a['num_inst'] return 1 def sorted_node_list(self, node_list): policy = self.sort_node_policy if policy == 0: node_list.sort(key=lambda n: n.id) # by id elif policy == 1: node_list.sort(key=lambda n: n.idl_gpus) # smallest idle gpus first elif policy == 2: node_list.sort(key=lambda n: -n.idl_gpus) # largest idle gpus first elif policy == 3: node_list.sort(key=lambda n: n.util_rate) # lowest avg. util. first else: node_list.sort(key=lambda n: n.id) return node_list def preempt_job(self, cluster=None): cluster = cluster if cluster is not None else self.cluster if all([n.idl_gpus for n in cluster.node_list]) >= 0 and \ all([n.idl_cpus for n in cluster.node_list]) >= 0: return 0 # No resource contention, bypass preemption preempted_job_list = [] if self.preempt_policy in PREEMPT_POLICY_DICT.keys(): # Pre node preemption: self.preempt_job_node(node) for node in cluster.node_list: # As long as the resources are sufficient, no proactive preempt for now. if node.idl_gpus < 0 or node.idl_cpus < 0 or len(preempted_job_list) > 0: print_fn("%sPreempt jobs on %s" % (cluster.log_prefix, node)) preempted_job_list = self.preempt_job_node(node, preempted_job_list) for job in preempted_job_list: print_fn("%sOFF : %s" % (cluster.log_prefix, job)) else: raise NotImplementedError("Preempting job policies not implemented") for job in preempted_job_list: cluster.job_list.append(job) # Update Job job['wasted'] += job['progress'] job['on_time'] = 0 job['progress'] = 0 job['node'] = None def preempt_job_node(self, node, preempted_job_list): # Svc is updated, but the job is not node.update_idl_gpus() node.update_idl_cpus() if self.preempt_policy in PREEMPT_POLICY_DICT.keys(): # Sort node.job_runn_list in place self.preempt_job_sort_node(node=node, preempt_policy=self.preempt_policy) for job_i in preempted_job_list: for job_j in node.job_runn_list: if job_i['job_id'] == job_j['job_id']: # these instances belong to the same job succ = node.release_job(job_i) assert succ is True preempted_job_list.append(job_i) while node.idl_gpus < 0 or node.idl_cpus < 0: job_to_preempt = node.job_runn_list[0] succ = node.release_job(job_to_preempt) assert succ is True preempted_job_list.append(job_to_preempt) else: raise KeyError("Uncaptured Preemption Policy Input: %d" % self.preempt_policy) return preempted_job_list def preempt_job_sort_node(self, node, preempt_policy): if preempt_policy == 1: # small_size_first node.job_runn_list.sort(key=lambda e: (e['size'], e['job_id'])) elif preempt_policy == 2: # large_gang_first node.job_runn_list.sort(key=lambda e: (-e['num_gpu'], e['job_id'])) else: # preempt_policy==0 or others: short_duration_first node.job_runn_list.sort(key=lambda e: (e['duration'], e['job_id'])) def display_node_status(self, cur_node_id): if cur_node_id >= 0: cur_node = self.cluster.node_dict[cur_node_id] print_fn(cur_node)