def place_gpu_models()

in chatlearn/schedule/model_manager.py [0:0]


    def place_gpu_models(self, gpu_models, env_list=None):
        """ place DistModel to gpu
        GPU models: Lis[DistModel]
        """
        if not gpu_models:
            return
        max_gpu = max(m.total_gpu for m in gpu_models)

        # create placement groups
        placement_group = self.resouce_manager.create_placement_group(max_gpu)
        for i, _ in enumerate(placement_group.bundle_specs):
            self.placement_groups.append((placement_group, i))
        models_str = ','.join([model.name for model in gpu_models])
        logger.info(f"create placement_group {placement_group.bundle_specs} for model {models_str} done")
        for model in gpu_models:
            # TODO: for colocate gpu_per_process > 1, support later
            assert model.gpu_per_process == 1
        self.model_packs = self.find_model_packing_strategy(gpu_models, max_gpu)

        for model in gpu_models:
            pack = []
            for pack in self.model_packs:
                if model in pack:
                    break
            colocate_models = []
            for model2 in gpu_models:
                if model2 is not model and model2 not in pack:
                    colocate_models.append(model2)
            model.set_colocate_models(colocate_models)

        def _get_model_replica_from_pack(gpu_index, model_pack):
            # for gpu rank between N * model.num_gpu_per_replica to (N + 1) *  model.num_gpu_per_replica
            # this function will return the same replica
            gpu_offset = 0
            for model in model_pack:
                if gpu_index < gpu_offset + model.total_gpu:
                    # compute the model rank
                    model_rank = gpu_index - gpu_offset
                    replica_id = model_rank // model.num_gpu_per_replica
                    return model.replicas[replica_id]
                gpu_offset += model.total_gpu
        # 1. we list the models to place on each device
        # 2. for device i, the number of models is N, then the num_gpus for each ray actor is 1.0/N
        # replica here is DistActor
        gpu_to_replicas = []
        for i in range(max_gpu):
            colocate_models = []
            for model_pack in self.model_packs:
                replica = _get_model_replica_from_pack(i, model_pack)
                if replica is not None:
                    colocate_models.append(replica)
            gpu_to_replicas.append(colocate_models)

        # For each gpu rank, create actor for each replica
        for i, replicas in enumerate(gpu_to_replicas):
            group = i // self.resouce_manager.gpu_per_node
            for replica in replicas:
                num_gpus = 1.0 / len(replicas)
                if isinstance(replica.model, VLLMModuleV2) and replica.vllm_engine is None:
                    num_gpus = num_gpus / 2
                    replica.create_engine_actor(num_gpus, placement_group, group)
                    # we do not want to add engine actor to all_actors
                    replica.all_actors.pop()
                replica.create_actor(num_gpus, placement_group, group)
        models_to_revert = self._find_param_recv_models(gpu_models)
        for model in gpu_models:
            if model in models_to_revert: # pylint: disable=simplifiable-if-statement
                # Reverse the placement of tgt models, so that shared models not in the same GPU
                # NCCL limit: NCCL WARN Duplicate GPU detected : rank 1 and rank 0 both on CUDA device
                # TODO: One GPU task still not work
                reverse_gpu_placement = True
            else:
                reverse_gpu_placement = False
            if env_list is None:
                for replica in model.replicas:
                    replica.set_dist_env(reverse_gpu_placement)
            else:
                env_list.append((model, reverse_gpu_placement))