in chatlearn/schedule/model_manager.py [0:0]
def place_gpu_models(self, gpu_models, env_list=None):
""" place DistModel to gpu
GPU models: Lis[DistModel]
"""
if not gpu_models:
return
max_gpu = max(m.total_gpu for m in gpu_models)
# create placement groups
placement_group = self.resouce_manager.create_placement_group(max_gpu)
for i, _ in enumerate(placement_group.bundle_specs):
self.placement_groups.append((placement_group, i))
models_str = ','.join([model.name for model in gpu_models])
logger.info(f"create placement_group {placement_group.bundle_specs} for model {models_str} done")
for model in gpu_models:
# TODO: for colocate gpu_per_process > 1, support later
assert model.gpu_per_process == 1
self.model_packs = self.find_model_packing_strategy(gpu_models, max_gpu)
for model in gpu_models:
pack = []
for pack in self.model_packs:
if model in pack:
break
colocate_models = []
for model2 in gpu_models:
if model2 is not model and model2 not in pack:
colocate_models.append(model2)
model.set_colocate_models(colocate_models)
def _get_model_replica_from_pack(gpu_index, model_pack):
# for gpu rank between N * model.num_gpu_per_replica to (N + 1) * model.num_gpu_per_replica
# this function will return the same replica
gpu_offset = 0
for model in model_pack:
if gpu_index < gpu_offset + model.total_gpu:
# compute the model rank
model_rank = gpu_index - gpu_offset
replica_id = model_rank // model.num_gpu_per_replica
return model.replicas[replica_id]
gpu_offset += model.total_gpu
# 1. we list the models to place on each device
# 2. for device i, the number of models is N, then the num_gpus for each ray actor is 1.0/N
# replica here is DistActor
gpu_to_replicas = []
for i in range(max_gpu):
colocate_models = []
for model_pack in self.model_packs:
replica = _get_model_replica_from_pack(i, model_pack)
if replica is not None:
colocate_models.append(replica)
gpu_to_replicas.append(colocate_models)
# For each gpu rank, create actor for each replica
for i, replicas in enumerate(gpu_to_replicas):
group = i // self.resouce_manager.gpu_per_node
for replica in replicas:
num_gpus = 1.0 / len(replicas)
if isinstance(replica.model, VLLMModuleV2) and replica.vllm_engine is None:
num_gpus = num_gpus / 2
replica.create_engine_actor(num_gpus, placement_group, group)
# we do not want to add engine actor to all_actors
replica.all_actors.pop()
replica.create_actor(num_gpus, placement_group, group)
models_to_revert = self._find_param_recv_models(gpu_models)
for model in gpu_models:
if model in models_to_revert: # pylint: disable=simplifiable-if-statement
# Reverse the placement of tgt models, so that shared models not in the same GPU
# NCCL limit: NCCL WARN Duplicate GPU detected : rank 1 and rank 0 both on CUDA device
# TODO: One GPU task still not work
reverse_gpu_placement = True
else:
reverse_gpu_placement = False
if env_list is None:
for replica in model.replicas:
replica.set_dist_env(reverse_gpu_placement)
else:
env_list.append((model, reverse_gpu_placement))