pai/common/docker_utils.py (131 lines of code) (raw):

# Copyright 2023 Alibaba, Inc. or its affiliates. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import io import subprocess import time from random import randint from typing import Any, Dict, List, Optional, Union from .logging import get_logger logger = get_logger(__name__) def _run_command(command: List[str], input: Optional[str] = None): with subprocess.Popen( command, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=False, bufsize=1, ) as p: if input: p.stdin.write(input.encode()) p.stdin.close() out = io.TextIOWrapper(p.stdout, newline="", errors="replace") for line in out: logger.info(line) return p.returncode class ContainerRun(object): """A class represent a container run in local.""" CONTAINER_STATUS_RUNNING = "running" CONTAINER_STATUS_EXITED = "exited" CONTAINER_STATUS_PAUSED = "paused" def __init__(self, container, port: Optional[int] = None): """Initialize a container run. Args: container: A docker container object. port (int): The host port that container is exposed to. """ self.container = container self.port = port @property def status(self): self.container.reload() return self.container.status def is_running(self): """Return True if container is running, otherwise False.""" return self.status == self.CONTAINER_STATUS_RUNNING def is_terminated(self): """Return True if container is terminated, otherwise False.""" return self.status in [ self.CONTAINER_STATUS_EXITED, self.CONTAINER_STATUS_PAUSED, ] def is_succeeded(self): """Return True if container is succeeded, otherwise False.""" return ( self.status == "exited" and self.container.attrs["State"]["ExitCode"] == 0 ) def wait_for_ready(self, interval=5): """Wait until container enter running state or terminated state.""" while True: status = self.status if status == self.CONTAINER_STATUS_RUNNING: break elif status in [self.CONTAINER_STATUS_EXITED, self.CONTAINER_STATUS_PAUSED]: raise RuntimeError( "Container is terminated : id={} status={}".format( self.container.id, self.container.status ) ) time.sleep(interval) def stop(self): if self.is_running(): self.container.stop() def start(self): if not self.is_running(): self.container.start() def delete(self): if self.is_running(): self.container.stop() self.container.remove() def watch(self, show_logs: bool = True): """Watch container log and wait for container to exit.""" if not show_logs: self.container.wait() else: log_iter = self.container.logs( stream=True, follow=True, ) for log in log_iter: print(log.decode()) self.container.reload() exit_code = self.container.attrs["State"]["ExitCode"] if exit_code != 0: raise RuntimeError( "Container run exited failed: exit_code={}".format(exit_code) ) def run_container( image_uri: str, container_name: Optional[str] = None, port: Optional[int] = None, environment_variables: Optional[Dict[str, str]] = None, command: Optional[Union[List[str], str]] = None, entry_point: Optional[Union[List[str], str]] = None, volumes: Optional[Dict[str, Any]] = None, working_dir: Optional[str] = None, gpu_count: Optional[int] = None, gpu_device_ids: Optional[List[str]] = None, gpu_capabilities: Optional[List[List[str]]] = None, ) -> ContainerRun: """Run a container in local. Args: image_uri (str): A docker image uri. container_name (str, optional): Name of the container. port (int, optional): The port to expose. environment_variables (Dict[str, str], optional): Environment variables to set in the container. command (Union[List[str], str], optional): Command to run the container. entry_point (Union[List[str], str], optional): Entry point to run the container. volumes (Dict[str, Any], optional): Volumes to mount in the container. working_dir (str, optional): Working directory in the container. gpu_count (int, optional): Number of GPU devices to request. Set to -1 to request all available devices. To use GPU, set either ``gpu_count`` or ``gpu_device_ids``. gpu_device_ids (List[str], optional): List of strings for GPU device IDs, corresponding to `NVIDIA_VISIBLE_DEVICES` in the NVIDIA Runtime. To use GPU, set either ``gpu_count`` or ``gpu_device_ids``. gpu_capabilities (List[List[str]], optional): This parameter corresponds to `NVIDIA_DRIVER_CAPABILITIES` in the NVIDIA Runtime. The default value is ``[["compute", "utility"]]`` if ``gpu_device_ids`` or ``gpu_count`` is set. Available capabilities for the NVIDIA driver can be found in https://docs.nvidia.com/datacenter/cloud-native/container-toolkit/user-guide.html#driver-capabilities. Returns: ContainerRun: A ContainerRun object. """ try: import docker except ImportError: raise ImportError("Please install docker first: pip install docker") client = docker.from_env() # use a random host port. host_port = randint(49152, 65535) if gpu_count or gpu_device_ids or gpu_capabilities: if not gpu_capabilities: gpu_capabilities = [["compute", "utility"]] device_requests = [ docker.types.DeviceRequest( count=gpu_count, device_ids=gpu_device_ids, capabilities=gpu_capabilities, ) ] else: device_requests = [] container = client.containers.run( name=container_name, entrypoint=entry_point, image=image_uri, command=command, environment=environment_variables, ports={port: host_port} if port else None, volumes=volumes, working_dir=working_dir, detach=True, device_requests=device_requests, ) container_run = ContainerRun( container=container, port=host_port, ) return container_run