chatlearn/schedule/resource_manager.py (51 lines of code) (raw):

# Copyright 2024 Alibaba Group Holding Limited. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ============================================================================== """resource manager""" import os import time import ray import ray.experimental.state.api from ray.util.placement_group import placement_group from chatlearn.utils.logger import logger class ResourceManager: """ Manage hardware resources for each task. """ def __init__(self, models): self.models = models self.name2models = {model.name: model for model in self.models} self.model_to_placegroup = {} self.placement_groups = [] resource = ray.nodes()[0]['Resources'] self.gpu_per_node = int(resource['GPU']) self.cpu_per_node = int(resource['CPU']) self.nnode = int(os.environ['WORLD_SIZE']) self.total_gpu = self.nnode * self.gpu_per_node def get_placement_group_state(self, pg): try: state = ray.experimental.state.api.get_placement_group(pg.id.hex())["state"] return state except Exception as e: logger.warning(f"fail to get placement_group state {e}") def create_placement_group(self, num_gpus, num_cpus=None, strategy="PACK"): """ create resource placement group given model device args """ if num_gpus > 0: if num_gpus <= self.gpu_per_node: cpu_count = int(self.cpu_per_node * num_gpus / self.gpu_per_node) bundles = [{"GPU": num_gpus, "CPU": cpu_count}] else: assert num_gpus % self.gpu_per_node == 0 num_nodes = num_gpus // self.gpu_per_node bundles = [{"GPU": self.gpu_per_node, "CPU": self.cpu_per_node} for _ in range(num_nodes)] else: assert num_cpus is not None and isinstance(num_cpus, list), "num_cpus should be type of list" bundles = [{"GPU": 0, "CPU": num_cpu} for num_cpu in num_cpus] pg = placement_group(bundles, strategy=strategy) warn_once = True while self.get_placement_group_state(pg) == "PENDING": if warn_once: logger.info(ray.experimental.state.api.list_nodes()) logger.info(f"waiting for placement group to be created for {num_gpus}GPUs {pg.bundle_specs}") warn_once = False time.sleep(1) self.placement_groups.append(pg) return pg def remove_placement_groups(self): for pg in self.placement_groups: ray.util.remove_placement_group(pg) while self.get_placement_group_state(pg) != "REMOVED": time.sleep(0.5) logger.info("Remove placement groups done")