in cluster-trace-gpu-v2020/simulator/scheduler.py [0:0]
def try_allocate_job_to_cluster(self, job_a, cluster):
"""
job_a: job to allocate
cluster: target cluster
return:
-1: the cluster is full, stop job picking
0: the current job cannot be placed, try next
1: the current job has been successfully deployed, need record.
"""
ig, ic = cluster.idl_gpus, cluster.idl_cpus
if ig <= 0 and ic <= 0:
return -1
elif job_a['num_inst'] * job_a['num_gpu'] > ig or job_a['num_inst'] * job_a['num_cpu'] > ic:
return 0
else: # with in gpu and cpu limits
assigned_node_map = {}
assigned_inst_num = 0
sorted_node_list = self.sorted_node_list(cluster.node_list)
for nid, node in enumerate(sorted_node_list):
# <Node-job label matching>
if self.gpu_type_matching == 1: # GPU type perfect match
if job_a['gpu_type'] != 'CPU' and job_a['gpu_type'] != node.gpu_type:
continue # cannot on this node
elif self.gpu_type_matching == 2: # Only V100 cannot compromise
if job_a['gpu_type'] == 'V100' and job_a['gpu_type'] != node.gpu_type:
continue # cannot on this node
# </Node-job label matching>
if job_a['num_inst'] == 1:
if job_a['num_gpu'] <= node.idl_gpus and job_a['num_cpu'] <= node.idl_cpus:
succ_alloc = node.alloc_job(job_a)
assert succ_alloc
job_a['node'] = node.id
print_fn("%sON : N[%d] %s" % (cluster.log_prefix, job_a['node'], job_a))
self.display_node_status(cur_node_id=job_a['node'])
return 1
else: # gang-scheduling: all or nothing
node_idle_gpus, node_idle_cpus = node.idl_gpus, node.idl_cpus
node_inst_num_gpu, node_inst_num_cpu = job_a['num_inst'], job_a['num_inst'] # init.
if job_a['num_gpu'] != 0:
node_inst_num_gpu = node_idle_gpus // job_a['num_gpu']
if job_a['num_cpu'] != 0:
node_inst_num_cpu = node_idle_cpus // job_a['num_cpu']
node_inst_num = min(node_inst_num_gpu, node_inst_num_cpu)
if assigned_inst_num + node_inst_num >= job_a['num_inst']:
node_inst_num = job_a['num_inst'] - assigned_inst_num
assigned_node_map[nid] = node_inst_num
assigned_inst_num += node_inst_num
break
elif node_inst_num > 0:
assigned_node_map[nid] = node_inst_num
assigned_inst_num += node_inst_num
if assigned_inst_num < job_a['num_inst']:
print_fn("Cannot allocate all instances (%d/%d) of %s." % (assigned_inst_num, job_a['num_inst'], _repr_job_concise(job_a)))
self.cannot_counter += 1
if self.cannot_counter % 100000 == 0:
print_fn("[%s] %d rejects. len(job_done_list) = %d. Current job: %s." % (cluster.log_prefix, self.cannot_counter, len(self.cluster.job_history.job_done_list), _repr_job_concise(job_a)))
return 0 # No successful allocation, for num_inst=1 and >1 cases
else:
# Successfully Scheduled. Assigning instances to nodes according to the map
inst_id = 0
for nid, num_inst in assigned_node_map.items():
node = sorted_node_list[nid]
job_tmp = {'node': -1}
for _ in range(num_inst):
job_tmp = job_a.copy()
job_tmp['inst_id'] = inst_id
succ_alloc = node.alloc_job(job_tmp)
assert succ_alloc
job_tmp['node'] = node.id
print_fn("%sON : N[%d] %s Inst[%d]" % (cluster.log_prefix, job_tmp['node'], job_tmp, inst_id))
inst_id += 1
self.display_node_status(cur_node_id=job_tmp['node'])
assert inst_id == job_a['num_inst']
return 1