def try_allocate_job_to_cluster()

in cluster-trace-gpu-v2020/simulator/scheduler.py [0:0]


    def try_allocate_job_to_cluster(self, job_a, cluster):
        """
        job_a: job to allocate
        cluster: target cluster

        return:
            -1: the cluster is full, stop job picking
             0: the current job cannot be placed, try next
             1: the current job has been successfully deployed, need record.
        """
        ig, ic = cluster.idl_gpus, cluster.idl_cpus
        if ig <= 0 and ic <= 0:
            return -1
        elif job_a['num_inst'] * job_a['num_gpu'] > ig or job_a['num_inst'] * job_a['num_cpu'] > ic:
            return 0
        else:  # with in gpu and cpu limits
            assigned_node_map = {}
            assigned_inst_num = 0
            sorted_node_list = self.sorted_node_list(cluster.node_list)
            for nid, node in enumerate(sorted_node_list):
                # <Node-job label matching>
                if self.gpu_type_matching == 1:  # GPU type perfect match
                    if job_a['gpu_type'] != 'CPU' and job_a['gpu_type'] != node.gpu_type:
                        continue  # cannot on this node
                elif self.gpu_type_matching == 2:  # Only V100 cannot compromise
                    if job_a['gpu_type'] == 'V100' and job_a['gpu_type'] != node.gpu_type:
                        continue  # cannot on this node
                # </Node-job label matching>

                if job_a['num_inst'] == 1:
                    if job_a['num_gpu'] <= node.idl_gpus and job_a['num_cpu'] <= node.idl_cpus:
                        succ_alloc = node.alloc_job(job_a)
                        assert succ_alloc
                        job_a['node'] = node.id
                        print_fn("%sON  : N[%d] %s" % (cluster.log_prefix, job_a['node'], job_a))
                        self.display_node_status(cur_node_id=job_a['node'])
                        return 1
                else:  # gang-scheduling: all or nothing
                    node_idle_gpus, node_idle_cpus = node.idl_gpus, node.idl_cpus
                    node_inst_num_gpu, node_inst_num_cpu = job_a['num_inst'], job_a['num_inst']  # init.
                    if job_a['num_gpu'] != 0:
                        node_inst_num_gpu = node_idle_gpus // job_a['num_gpu']
                    if job_a['num_cpu'] != 0:
                        node_inst_num_cpu = node_idle_cpus // job_a['num_cpu']
                    node_inst_num = min(node_inst_num_gpu, node_inst_num_cpu)

                    if assigned_inst_num + node_inst_num >= job_a['num_inst']:
                        node_inst_num = job_a['num_inst'] - assigned_inst_num
                        assigned_node_map[nid] = node_inst_num
                        assigned_inst_num += node_inst_num
                        break
                    elif node_inst_num > 0:
                        assigned_node_map[nid] = node_inst_num
                        assigned_inst_num += node_inst_num

            if assigned_inst_num < job_a['num_inst']:
                print_fn("Cannot allocate all instances (%d/%d) of %s." % (assigned_inst_num, job_a['num_inst'], _repr_job_concise(job_a)))
                self.cannot_counter += 1
                if self.cannot_counter % 100000 == 0:
                    print_fn("[%s] %d rejects. len(job_done_list) = %d. Current job: %s." % (cluster.log_prefix, self.cannot_counter, len(self.cluster.job_history.job_done_list), _repr_job_concise(job_a)))
                return 0  # No successful allocation, for num_inst=1 and >1 cases
            else:
                # Successfully Scheduled. Assigning instances to nodes according to the map
                inst_id = 0
                for nid, num_inst in assigned_node_map.items():
                    node = sorted_node_list[nid]
                    job_tmp = {'node': -1}
                    for _ in range(num_inst):
                        job_tmp = job_a.copy()
                        job_tmp['inst_id'] = inst_id
                        succ_alloc = node.alloc_job(job_tmp)
                        assert succ_alloc
                        job_tmp['node'] = node.id
                        print_fn("%sON  : N[%d] %s Inst[%d]" % (cluster.log_prefix, job_tmp['node'], job_tmp, inst_id))
                        inst_id += 1
                    self.display_node_status(cur_node_id=job_tmp['node'])
                assert inst_id == job_a['num_inst']
                return 1