pai/model/_model.py (1,454 lines of code) (raw):
# Copyright 2023 Alibaba, Inc. or its affiliates.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
import distutils.dir_util
import json
import os.path
import posixpath
import shlex
import shutil
import tempfile
import textwrap
import time
import typing
import warnings
from abc import ABCMeta, abstractmethod
from typing import Any, Dict, Iterator, List, Optional, Tuple, Union
import requests
from addict import Dict as AttrDict
from oss2 import ObjectIterator
from ..common import ProviderAlibabaPAI, git_utils
from ..common.consts import INSTANCE_TYPE_LOCAL_GPU, ModelFormat, StoragePathCategory
from ..common.docker_utils import ContainerRun, run_container
from ..common.logging import get_logger
from ..common.oss_utils import OssUriObj, download, is_oss_uri, upload
from ..common.utils import (
generate_repr,
is_local_run_instance_type,
random_str,
to_plain_text,
)
from ..exception import DuplicatedMountException
from ..image import ImageInfo
from ..job._training_job import InstanceSpec, ModelRecipeSpec, UriInput, UserVpcConfig
from ..predictor import AsyncPredictor, LocalPredictor, Predictor, ServiceType
from ..serializers import SerializerBase
from ..session import Session, get_default_session
if typing.TYPE_CHECKING:
from ..estimator import AlgorithmEstimator
from ._model_recipe import ModelRecipe, ModelRecipeType, ModelTrainingRecipe
logger = get_logger(__name__)
# Reserved ports for internal use, do not use them for service
_RESERVED_PORTS = [8080, 9090]
# Default model upstream source
MODEL_TASK_CREATED_BY_QUICKSTART = "QuickStart"
class DefaultServiceConfig(object):
"""Default configuration used in creating prediction service."""
# Listen Port
listen_port = 8000
# Default model path in container
model_path = "/eas/workspace/model/"
# Default user code path in container
code_path = "/ml/usercode/"
class StorageConfigBase(metaclass=ABCMeta):
"""Base Storage Configuration."""
@abstractmethod
def to_dict(self):
pass
class RawStorageConfig(StorageConfigBase):
def __init__(self, config: Dict[str, Any]):
self.config = config
def to_dict(self):
return self.config
class OssStorageConfig(StorageConfigBase):
"""Configuration for OSS Storage."""
def __init__(
self, mount_path: str, oss_path: str, oss_endpoint: Optional[str] = None
) -> None:
"""
Args:
mount_path (str): The target path where the OSS storage will be mounted.
oss_path (str): The source OSS path, must start with `oss://`. e.g. `oss://bucket-name/path/to/data`.
oss_endpoint (Optional[str]): The endpoint address of the OSS bucket, if not provided,
the internal endpoint for the bucket will be used.
"""
self.mount_path = mount_path
self.oss_path = oss_path
self.oss_endpoint = oss_endpoint
def to_dict(self) -> Dict[str, Any]:
d = {
"mount_path": self.mount_path,
"oss": {"path": self.oss_path},
}
if self.oss_endpoint:
d["oss"]["endpoint"] = self.oss_endpoint
return d
class NfsStorageConfig(StorageConfigBase):
"""Configuration for NFS Storage."""
def __init__(
self,
mount_path: str,
nfs_server: str,
nfs_path: str = "/",
read_only: bool = False,
) -> None:
"""
Args:
mount_path (str): The target path where the NFS storage will be mounted.
nfs_server (str): The NFS server address. e.g. `xxx.cn-shanghai.nas.aliyuncs.com'
nfs_path (str): The source path in the NFS storage, default to '/'.
read_only (bool): Indicates if the NFS storage should be mounted as read-only, default to False.
"""
self.mount_path = mount_path
self.nfs_path = nfs_path
self.read_only = read_only
self.nfs_server = nfs_server
def to_dict(self) -> Dict[str, Any]:
return {
"mount_path": self.mount_path,
"nfs": {
"path": self.nfs_path,
"readOnly": self.read_only,
"server": self.nfs_server,
},
}
class NodeStorageConfig(StorageConfigBase):
"""Use to mount the local node disk storage to the container."""
def __init__(self, mount_path) -> None:
"""
Args:
mount_path (str): The target path where the node disk storage will be mounted.
"""
self.mount_path = mount_path
def to_dict(self) -> Dict[str, Any]:
return {
"empty_dir": {},
"mount_path": self.mount_path,
}
class SharedMemoryConfig(StorageConfigBase):
"""Use to configure the shared memory for the container."""
def __init__(self, size_limit: int) -> None:
"""
Args:
size_limit (int): Size limit of the shared memory, in GB.
"""
self.size_limit = size_limit
def to_dict(self) -> Dict[str, Any]:
return {
"empty_dir": {
"medium": "memory",
"size_limit": self.size_limit,
},
"mount_path": "/dev/shm",
}
class ResourceConfig(object):
"""A class that represents the resource used by a PAI prediction service
instance."""
def __init__(self, cpu: int, memory: int, gpu: int = None, gpu_memory: int = None):
"""ResourceConfig initializer.
The public resource group does not support requesting GPU resources with
`ResourceConfig`. Use the 'gpu' and 'gpu_memory' parameter only for services
deployed to dedicated resource groups that provide GPU machine instances.
Args:
cpu (int): The number of CPUs that each instance requires.
memory (int): The amount of memory that each instance requires,
must be an integer, Unit: MB.
gpu (int): The number of GPUs that each instance requires.
gpu_memory (int): The amount of GPU memory that each instance requires.
The value must be an integer, Unit: GB.
PAI allows memory resources of a GPU to be allocated to multiple instances.
If you want multiple instances to share the memory resources of a GPU,
set the gpu parameter to 0. If you set the ``gpu`` parameter to 1, each
instance occupies a GPU and the gpu_memory parameter does not take effect.
.. note::
**Important** PAI does not enable the strict isolation of GPU memory.
To prevent out of memory (OOM) errors, make sure that the GPU memory
used by each instance does not exceed the requested amount.
"""
self.cpu = cpu
self.memory = memory
self.gpu = gpu
self.gpu_memory = gpu_memory
def __repr__(self):
return (
f"ResourceConfig(cpu={self.cpu}, memory={self.memory}MB, gpu={self.gpu or 0},"
f" gpu_memory={self.gpu_memory or 0}GB)"
)
def __str__(self):
return self.__repr__()
def to_dict(self):
"""Transform the ResourceConfig instance to a dictionary.
Returns:
dict:
"""
res = {
"cpu": self.cpu,
"gpu": self.gpu,
"gpu_memory": self.gpu_memory,
"memory": self.memory,
}
return {k: v for k, v in res.items() if v is not None}
class InferenceSpec(object):
"""A class used to describe how to create a prediction service.
InferenceSpec is using to describe how the model is serving in PAI. To view the
full supported parameters, please see the following hyperlink:
`Parameters of model services <https://help.aliyun.com/document_detail/450525.htm>`_.
Example of how to config a InferneceSpec::
>>> # build an inference_spec that using XGBoost processor.
>>> infer_spec = InferenceSpec(processor="xgboost")
>>> infer_spec.metadata.rpc.keepalive = 1000
>>> infer_spec.warm_up_data_path = "oss://bucket-name/path/to/warmup-data"
>>> infer_spec.add_option("metadata.rpc.max_batch_size", 8)
>>> print(infer_spec.processor)
xgboost
>>> print(infer_spec.metadata.rpc.keepalive)
1000
>>> print(infer_spec.metadata.rpc.max_batch_size)
8
>>> print(infer_spec.to_dict())
{'processor': 'xgboost', 'metadata': {'rpc': {'keepalive': 1000, 'max_batch_size': 8}},
'warm_up_data_path': 'oss://bucket-name/path/to/warmup-data'}
"""
def __init__(self, *args, **kwargs):
"""InferenceSpec initializer.
Args:
**kwargs: Parameters of the inference spec.
"""
properties = kwargs.pop("__properties", [])
cfg_dict = copy.deepcopy(kwargs)
cfg_dict = {k: v for k, v in cfg_dict.items() if not k.startswith("_")}
if args:
if len(args) > 1:
raise TypeError()
cfg_dict.update(args[0])
super(InferenceSpec, self).__setattr__(
"_cfg_dict", self._transform_value(cfg_dict)
)
super(InferenceSpec, self).__setattr__("__properties", properties)
def __repr__(self):
return json.dumps(self.to_dict(), sort_keys=True, indent=4)
def _transform_value(self, value):
if isinstance(value, (List, Tuple)):
return [self._transform_value(item) for item in value]
elif isinstance(value, (Dict, AttrDict)):
return AttrDict(
{key: self._transform_value(value) for key, value in value.items()}
)
return value
def __missing__(self, name):
return self._cfg_dict.__missing__(name)
def __setitem__(self, name, value):
return self._cfg_dict.__setitem__(name, self._transform_value(value))
def __setattr__(self, name, value):
if name in getattr(self, "__properties"):
super(InferenceSpec, self).__setattr__(name, self._transform_value(value))
else:
self._cfg_dict.__setattr__(name, self._transform_value(value))
def __getattr__(self, item):
if item.startswith("_"):
return getattr(self, item)
return self._cfg_dict.__getitem__(item)
def __contains__(self, item):
return item in self._cfg_dict
def to_dict(self) -> Dict:
"""Return a dictionary that represent the InferenceSpec."""
return self._cfg_dict.to_dict()
def add_option(self, name: str, value):
"""Add an option to the inference_spec instance.
Args:
name (str): Name of the option to set, represented as the JSON path of the
parameter for the InferenceSpec. To view the full supported parameters,
please see the following hyperlink: `Parameters of model services
<https://help.aliyun.com/document_detail/450525.htm>`_.
value: Value for the option.
Examples:
>>> infer_spec = InferenceSpec(processor="tensorflow_gpu_1.12")
>>> infer_spec.add_option("metadata.rpc.keepalive", 10000)
>>> infer_spec.metadata.rpc.keepalive
10000
>>> infer_spec.to_dict()
{'processor': 'tensorflow_gpu_1.12', 'metadata': {'rpc': {'keepalive': 10000}}}
"""
src = self._transform_value(value)
for k in reversed(name.split(".")):
src = {k: src}
self._cfg_dict.update(AttrDict(src))
def merge_options(self, options: Dict[str, Any]):
"""Merge options from a dictionary."""
for key, value in options.items():
self.add_option(key, value)
@classmethod
def from_dict(cls, config: Dict[str, Any]) -> "InferenceSpec":
"""Initialize a InferenceSpec from a dictionary.
You can use this method to initialize a InferenceSpec instance
from a dictionary.
Returns:
:class:`pai.model.InferenceSpec`: A InferenceSpec instance.
"""
config = config or dict()
return cls(**config)
def is_container_serving(self):
return "containers" in self._cfg_dict
@classmethod
def _upload_source_dir(cls, source_dir, session):
"""Upload source files to OSS bucket."""
if not os.path.exists(source_dir):
raise ValueError(f"Input source code path does not exist: {source_dir}.")
if not os.path.isdir(source_dir):
raise ValueError(
f"Input source code path should be a directory: {source_dir}."
)
target_dir = session.get_storage_path_by_category(
category=StoragePathCategory.InferenceSrc
)
# upload local script data to the OSS bucket.
uploaded_source_code = upload(
source_dir,
target_dir,
session.oss_bucket,
)
logger.debug("Uploaded source code to OSS: %s", uploaded_source_code)
return uploaded_source_code
def mount(
self,
source: str,
mount_path: str,
session: Session = None,
properties: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
"""Mount a source storage to the running container.
.. note::
If source is a local path, it will be uploaded to the OSS bucket and
mounted. If source is a OSS path, it will be mounted directly.
Args:
source (str): The source storage to be attached, currently only support OSS
path in OSS URI format and local path.
mount_path (str): The mount path in the container.
session (Session, optional): A PAI session instance used for communicating
with PAI service.
Returns:
Dict[str, Any]: The storage config.
Raises:
DuplicateMountException: If the mount path is already used or source OSS
path is mounted to the container.
Examples::
# Mount a OSS storage path to the running container.
>>> inference_spec.mount("oss://<YourOssBucket>/path/to/directory/model.json",
... "/ml/model/")
# 'Mount' a local path to the running container.
>>> inference_spec.mount("/path/to/your/data/", "/ml/model/")
"""
session = session or get_default_session()
# TODO: supports more storages, such as NAS, PAI Dataset, PAI CodeSource, etc.
if not isinstance(source, str):
raise ValueError(
"Parameter should be a string which represents an OSS storage path"
" or a local file path."
)
if "storage" in self._cfg_dict:
storages = copy.deepcopy(self._cfg_dict.get("storage", []))
else:
storages = []
configs = []
uris = set()
for s in storages:
# overwrite the existing mount path
if s.get("mount_path") == mount_path:
continue
oss_uri = s.get("oss", {}).get("path")
if oss_uri:
uris.add(oss_uri)
configs.append(s)
if is_oss_uri(source):
oss_uri_obj = OssUriObj(source)
storage_config = {
"mount_path": mount_path,
"oss": {"path": oss_uri_obj.get_dir_uri()},
}
elif os.path.exists(source):
# if source is a local path, upload it to OSS bucket and use OSS URI
# as storage source.
oss_path = session.get_storage_path_by_category(
StoragePathCategory.ModelData
)
oss_uri = upload(
source_path=source, oss_path=oss_path, bucket=session.oss_bucket
)
oss_uri_obj = OssUriObj(oss_uri)
storage_config = {
"mount_path": mount_path,
"oss": {"path": oss_uri_obj.get_dir_uri()},
}
else:
raise ValueError(
"Source path is not a valid OSS URI or a existing local path."
)
if properties:
storage_config.update({"properties": properties})
# check if the source OSS Path is already mounted to the container.
if oss_uri_obj.get_dir_uri() in uris:
raise DuplicatedMountException(
f"Source OSS path '{oss_uri_obj.get_dir_uri()}' is already "
f"mounted to the container."
)
configs.append(storage_config)
self.storage = configs
return storage_config
def set_model_data(self, model_data: str, mount_path: Optional[str] = None):
"""
Set the model data for the InferenceSpec instance.
Args:
model_data (str): The model data to be set. It must be an OSS URI.
mount_path (str, optional): The mount path in the container.
Raises:
DuplicatedMountException: If the model data is already mounted to the container.
"""
def is_model_storage(storage: Dict[str, Any]):
return (
"properties" in storage
and storage["properties"].get("resource_type") == "model"
)
if not model_data:
return
if not self.is_container_serving():
# if model_data is an OSS URI with endpoint, truncate the endpoint.
oss_uri_obj = OssUriObj(model_data)
model_path_uri = "oss://{bucket_name}/{key}".format(
bucket_name=oss_uri_obj.bucket_name,
key=oss_uri_obj.object_key,
)
self.add_option("model_path", model_path_uri)
else:
indexes = [idx for idx, s in enumerate(self.storage) if is_model_storage(s)]
# replace the first model storage with the model_data.
if indexes:
if len(indexes) > 1:
logger.warning(
"Multiple model storage found in the InferenceSpec,"
" use the first one."
)
idx = indexes[0]
oss_uri_obj = OssUriObj(model_data)
storage_config = {
"path": oss_uri_obj.get_dir_uri(),
}
if oss_uri_obj.endpoint:
storage_config.update(
{
"endpoint": oss_uri_obj.endpoint,
}
)
self.storage[idx].oss = self._transform_value(storage_config)
else:
try:
self.mount(
model_data,
mount_path=mount_path or DefaultServiceConfig.model_path,
properties={"resource_type": "model", "resource_use": "base"},
)
except DuplicatedMountException as e:
# ignore duplicated mount
logger.warning("Model is already mounted the container: %s", e)
def container_serving_spec(
command: str,
image_uri: Union[str, ImageInfo],
source_dir: Optional[str] = None,
git_config: Optional[Dict[str, Any]] = None,
port: Optional[int] = None,
environment_variables: Optional[Dict[str, str]] = None,
requirements: Optional[List[str]] = None,
requirements_path: Optional[str] = None,
health_check: Optional[Dict[str, Any]] = None,
storage_configs: Optional[List[StorageConfigBase]] = None,
session: Optional[Session] = None,
) -> InferenceSpec:
"""A convenient function to create an InferenceSpec instance that serving the model
with given container and script.
Examples::
infer_spec: InferenceSpec = container_serving_spec(
command="python run.py",
source_dir="./model_server/",
image_uri="<ServingImageUri>",
)
m = Model(
model_data="oss://<YourOssBucket>/path/to/your/model",
inference_spec=infer_spec,
)
m.deploy(
instance_type="ecs.c6.xlarge"
)
Args:
command (str): The command used to launch the Model server.
source_dir (str): A relative path or an absolute path to the source code
directory used to load model and launch the HTTP server, it will be
uploaded to the OSS bucket and mounted to the container. If there is a
``requirements.txt`` file under the directory, it will be installed before
the prediction server started.
If 'git_config' is provided, 'source_dir' should be a relative location
to a directory in the Git repo. With the following GitHub repo directory
structure:
.. code::
|----- README.md
|----- src
|----- train.py
|----- test.py
if you need 'src' directory as the source code directory, you can assign
source_dir='./src/'.
git_config (Dict[str, str]): Git configuration used to clone the repo.
Including ``repo``, ``branch``, ``commit``, ``username``, ``password`` and
``token``. The ``repo`` is required. All other fields are optional. ``repo``
specifies the Git repository. If you don't provide ``branch``, the default
value 'master' is used. If you don't provide ``commit``, the latest commit
in the specified branch is used. ``username``, ``password`` and ``token``
are for authentication purpose. For example, the following config:
.. code:: python
git_config = {
'repo': 'https://github.com/modelscope/modelscope.git',
'branch': 'master',
'commit': '9bfc4a9d83c4beaf8378d0a186261ffc1cd9f960'
}
results in cloning the repo specified in 'repo', then checking out the
'master' branch, and checking out the specified commit.
image_uri (str): The Docker image used to run the prediction service.
port (int): Expose port of the server in container, the prediction request
will be forward to the port. The environment variable ``LISTENING_PORT``
in the container will be set to this value. Default to 8000.
environment_variables (Dict[str, str], optional): Dictionary of environment
variable key-value pairs to set on the running container.
requirements (List[str], optional): A list of Python package dependency, it
will be installed before the serving container run.
requirements_path (str, optional): A absolute path to the requirements.txt in
the container.
health_check (Dict[str, Any], optional): The health check configuration. If it
not set, A TCP readiness probe will be used to check the health of the
HTTP server.
storage_configs (List[StorageConfigBase], optional): A list of storage configs
used to mount the storage to the container. The storage can be OSS, NFS,
SharedMemory, or NodeStorage, etc.
session (Session, optional): A PAI session instance used for communicating
with PAI service.
Returns:
:class:`pai.model.InferenceSpec`: An InferenceSpec instance.
"""
session = session or get_default_session()
if git_config:
updated_args = git_utils.git_clone_repo(
git_config=git_config,
source_dir=source_dir,
)
source_dir = updated_args["source_dir"]
if not port:
port = DefaultServiceConfig.listen_port
elif int(port) in _RESERVED_PORTS:
raise ValueError(
"Reserved port {} is not allowed to use as serving port.".format(port),
)
if source_dir:
if not os.path.exists(source_dir):
raise ValueError("Source directory {} does not exist.".format(source_dir))
if not os.path.isdir(source_dir):
raise ValueError(
"Source directory {} is not a directory.".format(source_dir)
)
code_mount_path = DefaultServiceConfig.code_path
# build the command for serving container.
command = textwrap.dedent(
f"""\
# change working directory to code mount path.
cd {code_mount_path}
{command}
"""
)
if not requirements_path and os.path.exists(
os.path.join(source_dir, "requirements.txt")
):
requirements_path = posixpath.join(code_mount_path, "requirements.txt")
else:
code_mount_path = None
requirements_path = None
if isinstance(image_uri, ImageInfo):
image_uri = image_uri.image_uri
environment_variables = environment_variables or dict()
container_spec = {
"image": image_uri,
"port": port,
"script": command,
"env": (
[
{"name": key, "value": str(value)}
for key, value in environment_variables.items()
]
if environment_variables
else []
),
}
if health_check:
container_spec["health_check"] = health_check
if requirements:
container_spec["prepare"] = {"pythonRequirements": requirements}
if requirements_path:
logger.warning(
"If the parameter 'requirements' is set, the requirements_path "
"parameter will be ignored."
)
elif requirements_path:
container_spec["prepare"] = {
"pythonRequirementsPath": requirements_path,
}
inference_spec = InferenceSpec(containers=[container_spec])
if storage_configs:
storage = [s.to_dict() for s in storage_configs]
inference_spec.storage = storage
# mount the uploaded serving scripts to the serving container.
if source_dir:
inference_spec.mount(
source_dir,
code_mount_path,
session=session,
)
return inference_spec
class _BuiltinProcessor(object):
"""Helper class uses for getting the builtin processor"""
PMML = "pmml"
XGBoost = "xgboost"
SupportedFrameworkAcceleratorVersionConfig = {
"tensorflow": {
"cpu": [
"1.12",
"1.14",
"1.15",
"2.3",
],
"gpu": [
"1.12",
"1.14",
"1.15",
],
},
"pytorch": {
"cpu": [
"1.6",
],
"gpu": [
"1.6",
],
},
}
# Hard code default processor for specific model format.
ModelFormatDefaultProcessorMapping = {
ModelFormat.PMML: "pmml",
ModelFormat.SavedModel: "tensorflow_cpu_2.3",
ModelFormat.TorchScript: "pytorch_cpu_1.6",
ModelFormat.FrozenPb: "pytorch_cpu_1.6",
ModelFormat.CaffePrototxt: "caffe_cpu",
ModelFormat.ONNX: "onnx_cu100",
}
@classmethod
def get_default_by_model_format(cls, model_format: str) -> str:
"""Get the default processor for a specific model format."""
if model_format in cls.ModelFormatDefaultProcessorMapping:
return cls.ModelFormatDefaultProcessorMapping[model_format]
@classmethod
def from_framework_version(
cls, framework_name, framework_version, accelerator=None
):
accelerator = accelerator or "cpu"
versions = cls.SupportedFrameworkAcceleratorVersionConfig.get(
framework_name, dict()
).get(accelerator, [])
if framework_version in versions:
return "{}_{}_{}".format(framework_name, accelerator, framework_version)
else:
logger.warning(
"Could not find the processor for the framework_version({} {}), use the"
" latest processor".format(framework_name, framework_version)
)
return "{}_{}_{}".format(framework_name, accelerator, versions[-1])
class ModelBase(object):
"""A class represent ModelBase."""
def __init__(
self,
model_data: str,
inference_spec: Optional[InferenceSpec] = None,
session: Session = None,
):
self.model_data = model_data
self.inference_spec = inference_spec
self.session = session or get_default_session()
def download(self, target_dir: str):
"""Download the model data from OSS to local directory.
Args:
target_dir (str): The target directory to download the model data.
Returns:
str: Local directory path stores the model data.
"""
if not self.model_data:
raise ValueError("Could not find the model data for this model.")
if not is_oss_uri(self.model_data):
raise RuntimeError("Download method only support model data stored in OSS.")
self._download_model_data(target_dir)
return target_dir
def _download_model_data(self, target_dir):
if not self.model_data:
return
logger.info(f"Prepare model data to local directory: {target_dir}")
if self.model_data.startswith("oss://"):
oss_uri = OssUriObj(self.model_data)
oss_bucket = self.session.get_oss_bucket(oss_uri.bucket_name)
download(
oss_path=oss_uri.object_key,
local_path=target_dir,
bucket=oss_bucket,
un_tar=True,
)
else:
if not os.path.exists(self.model_data):
raise ValueError(f"Model data path does not exist: {self.model_data}")
os.makedirs(target_dir, exist_ok=True)
if os.path.isfile(self.model_data):
shutil.copy(
self.model_data,
os.path.join(target_dir, os.path.basename(self.model_data)),
)
else:
distutils.dir_util.copy_tree(self.model_data, target_dir)
def _upload_model_data(self):
"""Upload the model artifact to OSS bucket if self.model_data is a local
file path.
"""
if not self.model_data:
return
elif is_oss_uri(self.model_data):
return self.model_data
elif not os.path.exists(self.model_data):
raise RuntimeError(f"Model data path does not exist: {self.model_data}")
dest_oss_path = self.session.get_storage_path_by_category(
category=StoragePathCategory.ModelData
)
upload_model_data = upload(
source_path=self.model_data,
oss_path=dest_oss_path,
bucket=self.session.oss_bucket,
)
return upload_model_data
def list_model_files(self, uri_format: bool = False) -> Iterator[str]:
"""List model files under the model path.
Args:
uri_format (bool): If True, return the model file path in OSS URI format.
Returns:
Iterator[str]: Iterator of model files.
"""
if not self.model_data:
raise ValueError("Model data path is not specified.")
if not is_oss_uri(self.model_data):
raise ValueError("Method only support model data stored in OSS.")
oss_uri_obj = OssUriObj(self.model_data)
bucket = self.session.get_oss_bucket(
bucket_name=oss_uri_obj.bucket_name,
)
def _get_relative_path(obj_key: str):
# if the model_data is reference an object, return the object file
# name.
if oss_uri_obj.object_key == obj_key:
return os.path.basename(obj_key)
path = obj_key[len(oss_uri_obj.object_key) :]
return path.lstrip("/") if path.startswith("/") else path
obj_iter = ObjectIterator(bucket=bucket, prefix=oss_uri_obj.object_key)
for obj_info in obj_iter:
if uri_format:
yield f"oss://{bucket.bucket_name}/{obj_info.key}"
else:
yield _get_relative_path(obj_info.key)
def _get_inference_spec(self, method: str) -> InferenceSpec:
if type(self)._is_multiple_inference_spec(self.inference_spec):
supported_methods = list(self.inference_spec.to_dict().keys())
if method and method not in supported_methods:
raise ValueError(
"The model does not support the given method:"
f" {method}. Supported methods are: {supported_methods}."
)
elif method:
spec = InferenceSpec(self.inference_spec.to_dict().get(method))
else:
method = supported_methods[0]
logger.warning(
f"Model contains multiple specs and method is not specified. "
f"Default method is used: '{method}'. Supported inference methods are:"
f" {supported_methods}."
)
spec = InferenceSpec(self.inference_spec.to_dict().get(method))
else:
if method:
raise ValueError(
"The model contains only one inference spec, do not specify the method."
)
else:
spec = self.inference_spec
return spec
def deploy(
self,
service_name: str,
instance_count: Optional[int] = 1,
instance_type: Optional[str] = None,
resource_config: Optional[Union[Dict[str, int], ResourceConfig]] = None,
resource_id: Optional[str] = None,
options: Optional[Dict[str, Any]] = None,
service_type: Optional[str] = None,
wait: bool = True,
serializer: Optional["SerializerBase"] = None,
inference_method: str = None,
**kwargs,
):
"""Deploy a prediction service with the model."""
if is_local_run_instance_type(instance_type):
return self._deploy_local(
instance_type=instance_type,
serializer=serializer,
wait=wait,
)
else:
return self._deploy(
service_name=service_name,
instance_count=instance_count,
instance_type=instance_type,
resource_config=resource_config,
resource_id=resource_id,
service_type=service_type,
options=options,
wait=wait,
serializer=serializer,
inference_method=inference_method,
**kwargs,
)
def _generate_service_name(self):
s = os.path.basename(self.model_data.rstrip("/")) + random_str(8)
return to_plain_text(s)
@classmethod
def _is_multiple_inference_spec(cls, spec: Dict[str, Any]) -> bool:
return not (
"metadata" in spec
or "cloud" in spec
or "containers" in spec
or "storage" in spec
)
def _deploy(
self,
service_name: str = None,
instance_count: int = 1,
instance_type: str = None,
resource_config: Union[Dict[str, int], ResourceConfig] = None,
resource_id: str = None,
service_type: str = None,
options: Dict[str, Any] = None,
wait: bool = True,
serializer: "SerializerBase" = None,
labels: Optional[Dict[str, str]] = None,
inference_method: str = None,
**kwargs,
):
"""Create a prediction service."""
if not service_name:
service_name = self._generate_service_name()
logger.info(
"Service name is not specified, using a generated service"
f" name to create the service: service_name={service_name}"
)
config = self._build_service_config(
service_name=service_name,
instance_count=instance_count,
instance_type=instance_type,
service_type=service_type,
resource_config=resource_config,
resource_id=resource_id,
options=options,
inference_method=inference_method,
)
service_name = self.session.service_api.create(config=config, labels=labels)
self._wait_service_visible(service_name)
if service_type == ServiceType.Async:
predictor = AsyncPredictor(
service_name=service_name,
session=self.session,
serializer=serializer,
)
else:
predictor = Predictor(
service_name=service_name,
session=self.session,
serializer=serializer,
)
print(
"View the service detail by accessing the console URI: \n{}".format(
predictor.console_uri
)
)
if wait:
predictor.wait_for_ready()
return predictor
def _wait_service_visible(self, service_name, attempts=3, interval=2):
"""Wait for the service to be visible in DescribeService API.
hack:
https://aone.alibaba-inc.com/v2/project/1134421/bug#viewIdentifier=5dfb195e2e2b84f6b2f24718&openWorkitemIdentifier=50192431
"""
while attempts > 0:
obj = self.session.service_api.get(service_name)
if "ServiceUid" in obj:
return
attempts -= 1
time.sleep(interval)
logger.warning("DescribeService API failed to get the Service object.")
def _build_service_config(
self,
service_name: str = None,
instance_count: int = None,
instance_type: str = None,
resource_config: Union[ResourceConfig, Dict[str, Any]] = None,
resource_id: str = None,
service_type: str = None,
options: Dict[str, Any] = None,
inference_method: str = None,
) -> Dict[str, Any]:
"""Build a service config dictionary used to create a PAI EAS service."""
self.model_data = self._upload_model_data()
resource_config = (
ResourceConfig(**resource_config)
if resource_config and isinstance(resource_config, dict)
else None
)
if resource_config and instance_type:
raise ValueError(
f"Only one of 'instance_type' and 'resource_config' "
f"is required, but both have been provided: instance_type"
f"={instance_type}, resource_config="
f"{resource_config}."
)
inference_spec = InferenceSpec(
self._get_inference_spec(method=inference_method).to_dict()
if self.inference_spec
else dict()
)
inference_spec.set_model_data(model_data=self.model_data)
if service_type:
inference_spec.add_option("metadata.type", service_type)
if inference_spec.is_container_serving():
inference_spec.add_option("metadata.rpc.proxy_path", "/")
if service_name:
inference_spec.add_option("name", service_name)
if instance_count:
inference_spec.add_option("metadata.instance", instance_count)
if instance_type:
inference_spec.add_option("cloud.computing.instance_type", instance_type)
elif resource_config:
inference_spec.add_option("metadata.cpu", resource_config.cpu)
inference_spec.add_option("metadata.memory", resource_config.memory)
if resource_config.gpu:
inference_spec.add_option("metadata.gpu", resource_config.gpu)
if resource_config.gpu_memory:
inference_spec.add_option(
"metadata.gpu_memory", resource_config.gpu_memory
)
if resource_config.gpu:
logger.warning(
"Parameters 'gpu' is set, the 'gpu_memory' parameter "
"does not take effect."
)
if resource_id:
inference_spec.add_option("metadata.resource", resource_id)
if options:
inference_spec.merge_options(options=options)
return inference_spec.to_dict()
def _deploy_local(
self,
instance_type: str,
serializer: SerializerBase = None,
wait: bool = True,
) -> LocalPredictor:
"""Deploy the model in local using docker."""
if not self.inference_spec.is_container_serving():
raise RuntimeError(
"Currently, only model using the InferenceSpec that serving with"
" container support local run."
)
if len(self.inference_spec.containers) > 1:
raise RuntimeError(
"InferenceSpec that serving with multiple container "
"does not support local run."
)
# prepare model data to local
work_dir = tempfile.mkdtemp()
model_dir = os.path.join(work_dir, "model")
self._download_model_data(target_dir=model_dir)
volumes = {
model_dir: {
"bind": DefaultServiceConfig.model_path,
"mode": "rw",
}
}
# prepare used storage to local directory.
if "storage" in self.inference_spec:
# only OSS storage config support local run.
if any(s for s in self.inference_spec.storage if "oss" not in s):
raise ValueError(
f"Local run only support InferenceSpec using OSS storage config: "
f"{self.inference_spec.to_dict()}"
)
for idx, storage in enumerate(self.inference_spec.storage):
store_dir = os.path.join(work_dir, f"storage_{idx}")
os.makedirs(store_dir, exist_ok=True)
oss_uri = OssUriObj(storage.oss.path)
download(
oss_path=oss_uri.object_key,
local_path=store_dir,
bucket=self.session.get_oss_bucket(oss_uri.bucket_name),
)
volumes[store_dir] = {"bind": storage.mount_path, "mode": "rw"}
container_spec = self.inference_spec.containers[0].to_dict()
env_vars = {
item["name"]: item["value"] for item in container_spec.get("env", [])
}
# build local launch script
requirements_list = container_spec.get("prepare", dict()).get(
"pythonRequirements", []
)
requirements_path = container_spec.get("prepare", dict()).get(
"pythonRequirementsPath", None
)
# build command to install requirements
if requirements_list:
install_requirements = " ".join(
[
shlex.quote(s)
for s in ["python", "-m", "pip", "install"] + requirements_list
]
)
elif requirements_path:
install_requirements = " ".join(
[
shlex.quote(s)
for s in ["python", "-m", "pip", "install", "-r", requirements_path]
]
)
else:
install_requirements = ""
user_scripts = container_spec.get("script", "")
launch_script = textwrap.dedent(
f"""\
set -e
{install_requirements}
{user_scripts}
"""
)
gpu_count = -1 if instance_type == INSTANCE_TYPE_LOCAL_GPU else None
container_run = run_container(
image_uri=container_spec["image"],
port=container_spec.get("port"),
environment_variables=env_vars,
entry_point=[
"/bin/sh",
"-c",
launch_script,
],
volumes=volumes,
gpu_count=gpu_count,
)
predictor = LocalPredictor(
container_id=container_run.container.id,
port=container_run.port,
serializer=serializer,
)
if wait:
predictor.wait_for_ready()
return predictor
@classmethod
def _wait_local_server_ready(
cls,
container_run: ContainerRun,
interval: int = 5,
):
"""Wait for the local model server to be ready."""
while True:
try:
# Check whether the container is still running.
if not container_run.is_running():
raise RuntimeError(
"Container exited unexpectedly, status: {}".format(
container_run.status
)
)
# Make a HEAD request to the server.
requests.head(
f"http://127.0.0.1:{container_run.port}/",
)
break
except requests.ConnectionError:
# ConnectionError means server is not ready.
logger.debug("Waiting for the container to be ready...")
time.sleep(interval)
continue
def register(
self,
model_name: str,
version: str = None,
accessibility: Optional[str] = None,
version_labels: Optional[Dict[str, str]] = None,
version_description: Optional[str] = None,
format_type: Optional[str] = None,
framework_type: Optional[str] = None,
training_spec: Optional[Dict[str, Any]] = None,
evaluation_spec: Optional[Dict[str, Any]] = None,
approval_status: Optional[str] = None,
metrics: Optional[Dict[str, Any]] = None,
options: Optional[str] = None,
model_labels: Optional[Dict[str, str]] = None,
model_description: Optional[str] = None,
model_doc: Optional[str] = None,
origin: Optional[str] = None,
domain: Optional[str] = None,
task: Optional[str] = None,
) -> "RegisteredModel":
"""Register a model to the PAI model registry.
Use ``self.model_data`` to register a model to the PAI model registry.
Args:
model_name (str): The name of the model. If the model name already exists in
workspace, the model will be updated with a new model version,
parameters like ``model_labels``, ``model_description``, ``model_doc``,
``origin``, ``domain``, ``task``, ``accessibility`` will be ignored. If
the model name does not exist, a new model will be created.
version (str, optional): The version of the model. If not specified, a new
version will be created. If the version already exists, registration
will fail.
accessibility (str, optional): The accessibility of the model. The value
can be "PUBLIC" or "PRIVATE". Default to "PRIVATE".
version_labels (dict, optional): The labels of the model version.
version_description (str, optional): The description of the model version.
format_type (str, optional): The format type of the model version. The value
can be "OfflineModel", "SavedModel", "Keras H5", "Frozen Pb",
"Caffe Prototxt", "TorchScript", "XGBoost", "PMML", "AlinkModel",
"ONNX". Default to None.
framework_type (str, optional): The framework type of the model version. The
value can be "PyTorch", "TensorFlow", "Keras", "Caffe", "Alink",
"Xflow", "XGBoost". Default to None.
training_spec (dict, optional): The training spec of the model version.
Usually, it is got from the training job. Default to None.
evaluation_spec (dict, optional): The evaluation spec of the model version.
Usually, it is got from the processing job for evaluation. Default to None.
approval_status (str, optional): The approval status of the model version.
The value can be "APPROVED", "PENDING". Default to None.
metrics (dict, optional): The metrics of the model version.
options (str, optional): Any other options that you want to pass to the
model registry. Default to None.
model_labels (dict, optional): The labels of the model.
model_description (str, optional): The description of the model.
model_doc (str, optional): The documentation uri of the model.
origin (str, optional): The origin of the model. For example, "huggingface",
"modelscope" etc. Default to None.
domain (str, optional): The domain that the model is used for. For example,
"aigc", "audio", "nlp", "cv" etc. Default to None.
task (str, optional): The task that the model is used for. For example,
"large-language-model", "text-classification", "image-classification",
"sequence-labeling" etc. Default to None.
Returns:
:class:`pai.model.RegisteredModel`: The registered model object.
"""
if not self.model_data:
raise ValueError(
"Register model failed, ``model_data`` is required to register a model."
)
# Ensure model data is uploaded to OSS.
self.model_data = self._upload_model_data()
# By specifying model_name with double quotes, the list api will process the
# precise search. Otherwise, the list api will process the fuzzy search.
resp = self.session.model_api.list(
model_name=f'"{model_name}"',
)
if resp.total_count == 0:
model_id = self.session.model_api.create(
model_name=model_name,
labels=model_labels,
model_description=model_description,
model_doc=model_doc,
origin=origin,
domain=domain,
task=task,
accessibility=accessibility,
)
else:
model_id = resp.items[0]["ModelId"]
# TODO support to registry model with evaluation spec
version_name = self.session.model_api.create_version(
model_id=model_id,
uri=self.model_data,
version_name=version,
labels=version_labels,
version_description=version_description,
format_type=format_type,
framework_type=framework_type,
training_spec=training_spec,
evaluation_spec=evaluation_spec,
inference_spec=(
self.inference_spec.to_dict() if self.inference_spec else None
),
approval_status=approval_status,
metrics=metrics,
options=options,
)
return RegisteredModel(model_name=model_name, model_version=version_name)
class Model(ModelBase):
"""The Class representing a ready-to-deploy model.
A Model instance includes the model artifact path and information on how to create
prediction service in PAI (specified by the inference_spec). By calling the
`model.deploy` method, a prediction service is created in PAI and a
:class:`pai.predictor.Predictor` instance is returned that can be used to make
prediction to the service.
Example::
from pai.model import Model
from pai.model import InferenceSpec
m: Model = Model(
inference_spec=InferenceSpec(processor="xgboost"),
model_data="oss://bucket-name/path/to/model",
)
# register model to PAI ModelRegistry
registered_model = m.register(
model_name="example_xgb_model"
version="1.0.0",
)
# Deploy to model to create a prediction service.
p: Predictor = m.deploy(
service_name="xgb_model_service",
instance_count=2,
instance_type="ecs.c6.large",
options={
"metadata.rpc.batching": true,
"metadata.rpc.keepalive": 10000
}
)
# make a prediction by send the data to the prediction service.
result = p.predict([[2,3,4], [54.12, 2.9, 45.8]])
"""
def __init__(
self,
model_data: str = None,
inference_spec: InferenceSpec = None,
session: Session = None,
):
"""Model initializer.
Args:
model_data (str): An OSS URI or file path specifies the location of the
model. If model_data is a local file path, it will be uploaded to OSS
bucket before deployment or model registry.
inference_spec (:class:`pai.model.InferenceSpec`, optional): An
`InferenceSpec` object representing how to create the prediction service
using the model.
session (:class:`pai.session.Session`, optional): A pai session object
manages interactions with PAI REST API.
"""
super(Model, self).__init__(
model_data,
inference_spec,
session=session or get_default_session(),
)
def deploy(
self,
service_name: str,
instance_count: Optional[int] = 1,
instance_type: Optional[str] = None,
resource_config: Optional[Union[Dict[str, int], ResourceConfig]] = None,
resource_id: Optional[str] = None,
service_type: Optional[str] = None,
options: Optional[Dict[str, Any]] = None,
wait: bool = True,
serializer: Optional["SerializerBase"] = None,
**kwargs,
):
"""Deploy an online prediction service.
Args:
service_name (str, optional): Name for the online prediction service. The
name must be unique in a region.
instance_count (int): Number of instance request for the service deploy
(Default 1).
instance_type (str, optional): Type of the machine instance, for example,
'ecs.c6.large'. For all supported instance, view the appendix of the
link:
https://help.aliyun.com/document_detail/144261.htm?#section-mci-qh9-4j7
resource_config (Union[ResourceConfig, Dict[str, int]], optional):
Request resource for each instance of the service. Required if
instance_type is not set. Example config:
.. code::
resource_config = {
"cpu": 2, # The number of CPUs that each instance requires
"memory: 4000, # The amount of memory that each instance
# requires, must be an integer, Unit: MB.
# "gpu": 1, # The number of GPUs that each instance
# requires.
# "gpu_memory": 3 # The amount of GPU memory that each
# instance requires, must be an integer,
# Unit: GB.
}
resource_id (str, optional): The ID of the resource group. The service
can be deployed to ``public resource group`` and
``dedicated resource group``.
* If `resource_id` is not specified, the service is deployed
to public resource group.
* If the service deployed in a dedicated resource group, provide
the parameter as the ID of the resource group. Example:
"eas-r-6dbzve8ip0xnzte5rp".
service_type (str, optional): The type of the service.
options (Dict[str, Any], optional): Advanced deploy parameters used
to create the online prediction service.
wait (bool): Whether the call should wait until the online prediction
service is ready (Default True).
serializer (:class:`pai.predictor.serializers.BaseSerializer`, optional): A
serializer object used to serialize the prediction request and
deserialize the prediction response.
Returns:
A ``PredictorBase`` instance used for making prediction to the prediction
service.
"""
return super(Model, self).deploy(
service_name=service_name,
instance_count=instance_count,
instance_type=instance_type,
resource_config=resource_config,
resource_id=resource_id,
options=options,
wait=wait,
serializer=serializer,
service_type=service_type,
**kwargs,
)
class RegisteredModel(ModelBase):
"""A class that represents a registered model in PAI model registry.
A RegisteredModel instance has a unique tuple of (model_name, model_version,
model_provider), and can be used for downstream tasks such as creating an online
prediction service, or creating an AlgorithmEstimator to start a training job.
Examples::
from pai.model import RegisteredModel
# retrieve a registered model from PAI model registry by
# specifying the model_name, model_version and model_provider
m = RegisteredModel(
model_name="easynlp_pai_bert_small_zh",
model_version="0.1.0",
model_provider="pai",
)
# deploy the Registered Model to create an online prediction
# service if the model has inference_spec
m.deploy()
# create an AlgorithmEstimator to start a training job if the
# model has training_spec
est = m.get_estimator()
inputs = m.get_estimator_inputs()
est.fit(inputs)
"""
def __init__(
self,
model_name: str,
model_version: Optional[str] = None,
model_provider: Optional[str] = None,
session: Optional[Session] = None,
**kwargs,
):
"""Get a RegisteredModel instance from PAI model registry.
Args:
model_name (str): The name of the registered model.
model_version (str, optional): The version of the registered model. If not
provided, the latest version is retrieved from the model registry.
model_provider (str, optional): The provider of the registered model.
Currently, only "pai", "huggingface" or None are supported.
session (:class:`pai.session.Session`, optional): A PAI session object used
for interacting with PAI Service.
"""
self.session = session or get_default_session()
model_info = kwargs.pop("model_info", None)
if model_info:
self._model_version_info = kwargs.pop("model_version_info", {})
self._model_info = model_info
else:
self._model_info, self._model_version_info = self._get_model_version_obj(
model_name=model_name,
model_version=model_version,
model_provider=model_provider,
)
self.model_id = self._model_info.get("ModelId")
self.model_name = self._model_info.get("ModelName")
self.model_provider = self._model_info.get("Provider")
self.task = self._model_info.get("Task")
self.domain = self._model_info.get("Domain")
self.framework_type = self._model_version_info.get("FrameworkType")
self.source_type = self._model_version_info.get("SourceType")
self.source_id = self._model_version_info.get("SourceId")
self.format_type = self._model_version_info.get("FormatType")
self.uri = self._model_version_info.get("Uri")
self.model_version = self._model_version_info.get("VersionName")
self.training_spec = self._model_version_info.get("TrainingSpec")
self.evaluation_spec = self._model_version_info.get("EvaluationSpec")
self.compression_spec = self._model_version_info.get("CompressionSpec")
self.model_labels = {
lb["Key"]: lb["Value"] for lb in self._model_info.get("Labels", [])
}
self.version_labels = {
lb["Key"]: lb["Value"] for lb in self._model_version_info.get("Labels", [])
}
super(RegisteredModel, self).__init__(
model_data=self.uri,
inference_spec=InferenceSpec.from_dict(
self._model_version_info.get("InferenceSpec", dict())
),
)
def __eq__(self, other: "RegisteredModel") -> bool:
"""Compare two RegisteredModel instances."""
return (
isinstance(other, RegisteredModel)
and self.model_id == other.model_id
and self.model_version == other.model_version
)
def __repr__(self):
if not self.model_provider:
return generate_repr(
self,
"model_name",
"model_version",
)
else:
return generate_repr(self, "model_name", "model_version", "model_provider")
@classmethod
def list(
cls,
model_name: Optional[str] = None,
model_provider: Optional[str] = None,
task: Optional[str] = None,
session: Optional[Session] = None,
) -> Iterator["RegisteredModel"]:
"""List registered models in model registry.
Args:
model_name (str, optional): The name of the registered model. Default to
None.
model_provider (str, optional): The provider of the registered model.
Optional values are "pai", "huggingface" or None. If None, list
registered models in the workspace of the current session. Default to
None.
task (str, optional): The task of the registered model. Default to None.
session (Session, optional): A PAI session object used for interacting with
PAI Service.
Returns:
Iterator[RegisteredModel]: An iterator of RegisteredModel instances matching
the given criteria.
"""
session = session or get_default_session()
page_size = 50
page_number = 1
while True:
result = session.model_api.list(
model_name=model_name,
provider=model_provider,
task=task,
page_size=page_size,
page_number=page_number,
).items
if not result:
break
for item in result:
model_version_info = item.pop("LatestVersion", {})
model_info = item
yield cls(
model_name=item["ModelName"],
session=session,
model_info=model_info,
model_version_info=model_version_info,
)
page_number += 1
def list_versions(
self, model_version: Optional[str] = None
) -> Iterator["RegisteredModel"]:
"""List all versions of the registered model.
Args:
model_version (str, optional): The version of the registered model. Default
to None.
"""
page_size = 50
page_number = 1
while True:
items = self.session.model_api.list_versions(
model_id=self.model_id,
page_number=page_number,
page_size=page_size,
version_name=model_version,
).items
if not items:
break
for item in items:
yield RegisteredModel(
model_name=self.model_name,
session=self.session,
model_info=self._model_info,
model_version_info=item,
)
page_number += 1
def _generate_service_name(self) -> str:
"""Generate a service name for the online prediction service."""
base_name = self.model_name.lower().replace("-", "_")[:36]
if base_name.endswith("_"):
base_name = base_name[:-1]
gen_name = f"{base_name}_{random_str(8)}"
return to_plain_text(gen_name)
def _get_model_version_obj(
self,
model_name: str,
model_version: Optional[str] = None,
model_provider: Optional[str] = None,
) -> Tuple[Dict[str, Any], Dict[str, Any]]:
"""Get the model info from PAI model registry.
Args:
model_name (str): The name of the registered model.
model_version (str, optional): The version of the registered model. If not
provided, the latest version is retrieved from the model registry.
model_provider (str, optional): The provider of the registered model.
Currently, only "pai" or None are supported. Set it to "pai" to retrieve
a PAI official model.
Returns:
A tuple of model object and model version object.
"""
if not model_name:
raise ValueError(
"Parameter model_name cannot be None or empty. Please provide a valid"
" model_name."
)
# Use model_name to get the model_id
# By specifying model_name with double quotes, the list api will process the
# precise search. Otherwise, the list api will process the fuzzy search.
result = self.session.model_api.list(
model_name=f'"{model_name}"', provider=model_provider
)
if result.total_count == 0:
raise RuntimeError(
f"Could not find any Registered Model with the specific"
f" name='{model_name}' and provider='{model_provider}'. Please check"
f" the arguments."
)
model_obj = result.items[0]
model_id = model_obj["ModelId"]
if model_version:
model_version_obj = self.session.model_api.get_version(
model_id=model_id, version=model_version
)
return model_obj, model_version_obj
else:
# Get the latest model version of the specific model if model_version
# is not provided.
if "LatestVersion" not in model_obj:
raise RuntimeError(
f"Could not find any model version under the specific"
f" name='{model_name}' and provider='{model_provider}'. Please"
f" check the arguments."
)
return model_obj, model_obj["LatestVersion"]
def delete(self, delete_all_version: bool = False):
"""Delete the specific registered model from PAI model registry.
Args:
delete_all_version (bool): Whether to delete all versions of the registered
model.
"""
if delete_all_version:
self.session.model_api.delete(self.model_id)
else:
if not self.model_version:
logger.warning(
"No model version is specified for the registered model, "
"skipping deletion."
)
return
self.session.model_api.delete_version(self.model_id, self.model_version)
def deploy(
self,
service_name: Optional[str] = None,
instance_count: Optional[int] = None,
instance_type: Optional[str] = None,
resource_config: Optional[Union[Dict[str, int], ResourceConfig]] = None,
resource_id: Optional[str] = None,
options: Optional[Dict[str, Any]] = None,
service_type: Optional[str] = None,
wait: bool = True,
serializer: Optional["SerializerBase"] = None,
inference_method: str = None,
**kwargs,
):
"""Deploy an online prediction service with the registered model.
If the RegisteredModel already has a registered inference_spec, then the model
can be deployed directly. Give more specific arguments to override the registered
inference_spec. Otherwise, the model will be deployed through the same process
as the :meth:`deploy` method of :class:`pai.model.Model`.
Args:
service_name (str, optional): Name for the online prediction service. The
name must be unique in a region.
instance_count (int, optional): Number of instance requested for the service
deploy.
instance_type (str, optional): Type of the machine instance, for example,
'ecs.c6.large'. For all supported instance, view the appendix of the
link:
https://help.aliyun.com/document_detail/144261.htm?#section-mci-qh9-4j7
resource_config (Union[ResourceConfig, Dict[str, int]], optional):
Request resource for each instance of the service. Required if
instance_type is not set. Example config:
.. code::
resource_config = {
"cpu": 2, # The number of CPUs that each instance requires
"memory: 4000, # The amount of memory that each instance
# requires, must be an integer, Unit: MB.
# "gpu": 1, # The number of GPUs that each instance
# requires.
# "gpu_memory": 3 # The amount of GPU memory that each
# instance requires, must be an integer,
# Unit: GB.
}
resource_id (str, optional): The ID of the resource group. The service
can be deployed to ``public resource group`` and
``dedicated resource group``.
* If `resource_id` is not specified, the service is deployed
to public resource group.
* If the service deployed in a dedicated resource group, provide
the parameter as the ID of the resource group. Example:
"eas-r-6dbzve8ip0xnzte5rp".
service_type (str, optional): The type of the service.
options (Dict[str, Any], optional): Advanced deploy parameters used
to create the online prediction service.
wait (bool): Whether the call should wait until the online prediction
service is ready (Default True).
serializer (:class:`pai.predictor.serializers.BaseSerializer`, optional): A
serializer object used to serialize the prediction request and
deserialize the prediction response.
inference_method (str, optional): The inference method of the service.
**kwargs: Additional arguments for the service.
Returns:
A ``PredictorBase`` instance used for making prediction to the prediction
service.
"""
if not self._model_version_info:
raise ValueError("No model version is available for deployment.")
if not self.inference_spec:
raise RuntimeError("No inference_spec for the registered model.")
labels = kwargs.pop("labels", dict())
if self.model_provider == ProviderAlibabaPAI:
default_labels = {
"Task": self.task,
"RootModelName": self.model_name,
"RootModelVersion": self.model_version,
"RootModelID": self.model_id,
"Domain": self.domain,
"CreatedBy": MODEL_TASK_CREATED_BY_QUICKSTART,
"BaseModelUri": self.uri,
}
default_labels.update(labels)
labels = default_labels
if is_local_run_instance_type(instance_type):
return self._deploy_local(
instance_type=instance_type,
serializer=serializer,
wait=wait,
)
else:
return self._deploy(
service_name=service_name,
instance_count=instance_count,
instance_type=instance_type,
resource_config=resource_config,
resource_id=resource_id,
service_type=service_type,
options=options,
wait=wait,
serializer=serializer,
labels=labels,
inference_method=inference_method,
**kwargs,
)
def _build_service_config(
self,
service_name: str = None,
instance_count: int = None,
instance_type: str = None,
resource_config: Union[ResourceConfig, Dict[str, Any]] = None,
resource_id: str = None,
service_type: str = None,
options: Dict[str, Any] = None,
inference_method: str = None,
) -> Dict[str, Any]:
"""Build a service config dictionary with RegisteredModel's inference_spec.
When the RegisteredModel builds a service config, it will ignore the model_data
parameter and use the inference_spec of the RegisteredModel as default config.
User can override the inference_spec by providing more specific arguments.
"""
resource_config = (
ResourceConfig(**resource_config)
if resource_config and isinstance(resource_config, dict)
else None
)
if resource_config and instance_type:
raise ValueError(
f"Only one of 'instance_type' and 'resource_config' is required, but"
f" both have been provided: instance_type={instance_type},"
f" resource_config={resource_config}."
)
inference_spec = InferenceSpec(
self._get_inference_spec(method=inference_method).to_dict()
if self.inference_spec
else dict()
)
if service_type:
inference_spec.add_option("metadata.type", service_type)
if inference_spec.is_container_serving():
inference_spec.add_option("metadata.rpc.proxy_path", "/")
if service_name:
inference_spec.add_option("name", service_name)
if instance_count:
inference_spec.add_option("metadata.instance", instance_count)
if instance_type:
inference_spec.add_option("cloud.computing.instance_type", instance_type)
elif resource_config:
inference_spec.add_option("metadata.cpu", resource_config.cpu)
inference_spec.add_option("metadata.memory", resource_config.memory)
if resource_config.gpu:
inference_spec.add_option("metadata.gpu", resource_config.gpu)
if resource_config.gpu_memory:
inference_spec.add_option(
"metadata.gpu_memory", resource_config.gpu_memory
)
if resource_config.gpu:
logger.warning(
"Parameters 'gpu' is set, the 'gpu_memory' parameter does not"
" take effect."
)
if resource_id:
inference_spec.add_option("metadata.resource", resource_id)
if options:
inference_spec.merge_options(options=options)
return inference_spec.to_dict()
def get_recipe_spec(
self, recipe_type: "ModelRecipeType", method: Optional[str] = None
) -> ModelRecipeSpec:
from ._model_recipe import ModelRecipeType
if recipe_type == ModelRecipeType.TRAINING:
raw_spec = self.training_spec
elif recipe_type == ModelRecipeType.EVALUATION:
raw_spec = self.evaluation_spec
elif recipe_type == ModelRecipeType.COMPRESSION:
raw_spec = self.compression_spec
else:
raise ValueError(
f"Invalid recipe_type: {recipe_type}. Supported recipe types are:"
f" {ModelRecipeType.supported_types()}"
)
if type(self)._is_multiple_spec(raw_spec):
supported_methods = list(raw_spec.keys())
if method and method not in supported_methods:
raise ValueError(
"The model recipe does not support the given method:"
f" {method}. Supported methods are: {supported_methods}."
)
elif method:
spec = raw_spec.get(method)
else:
method = supported_methods[0]
logger.warning(
f"Model recipe contains multiple specs and method is not specified. "
f"Default method is used: '{method}'. Supported training methods are:"
f" {supported_methods}."
)
spec = raw_spec.get(method)
else:
if method:
raise ValueError(
"The model recipe contains only one spec, do not specify the method."
)
spec = raw_spec
return ModelRecipeSpec.model_validate(spec)
def get_training_spec(self, training_method: Optional[str]) -> ModelRecipeSpec:
from ._model_recipe import ModelRecipeType
return self.get_recipe_spec(ModelRecipeType.TRAINING, training_method)
def get_estimator(
self,
training_method: Optional[str] = None,
instance_type: Optional[str] = None,
instance_count: Optional[int] = None,
hyperparameters: Optional[Dict[str, Any]] = None,
base_job_name: Optional[str] = None,
output_path: Optional[str] = None,
max_run_time: Optional[int] = None,
**kwargs,
) -> "AlgorithmEstimator":
"""Generate an AlgorithmEstimator.
Generate an AlgorithmEstimator object from RegisteredModel's training_spec.
Args:
training_method (str, optional): Used to select the training algorithm
that supported by the model. If not specified, the default training
algorithm will be retrieved from the model version.
instance_type (str, optional): The machine instance type used to run the
training job. If not provider, the default instance type will be
retrieved from the algorithm definition. To view the supported machine
instance types, please refer to the document:
https://help.aliyun.com/document_detail/171758.htm#section-55y-4tq-84y.
instance_count (int, optional): The number of machines used to run the
training job. If not provider, the default instance count will be
retrieved from the algorithm definition.
hyperparameters (dict, optional): A dictionary that represents the
hyperparameters used in the training job. Default hyperparameters will
be retrieved from the algorithm definition.
base_job_name (str, optional): The base name used to generate the training
job name. If not provided, a default job name will be generated.
output_path (str, optional): An OSS URI to store the outputs of the training
jobs. If not provided, an OSS URI will be generated using the default
OSS bucket in the session. When the `estimator.fit` method is called,
a specific OSS URI under the output_path for each channel is generated
and mounted to the training container.
max_run_time (int, optional): The maximum time in seconds that the training
job can run. The training job will be terminated after the time is
reached (Default None).
Returns:
:class:`pai.estimator.AlgorithmEstimator`: An AlgorithmEstimator object.
"""
from ..estimator import AlgorithmEstimator
warnings.warn(
"`.get_estimator` is deprecated and will be removed in a future version, you can now use "
"`.training_recipe` instead.",
category=FutureWarning,
)
if not self.training_spec:
raise ValueError(
"The provided registered model does not contain training spec."
)
ts = self.get_training_spec(training_method=training_method)
hyperparameters = hyperparameters or {}
# TODO: validate the given hyperparameters via algorithm definition
for hp in ts.hyperparameters:
if hp.name not in hyperparameters:
hyperparameters.update(
{
hp.name: hp.value,
}
)
if not base_job_name:
base_job_name = f"{self.model_name}_training" if self.model_name else None
if not max_run_time:
max_run_time = (
ts.scheduler.max_running_time_in_seconds if ts.scheduler else None
)
resource_id = kwargs.pop("resource_id", None)
instance_spec = kwargs.pop("instance_spec", None)
compute_resource = ts.compute_resource
if resource_id:
if instance_type:
logger.warning(
"The instance type is ignored when resource_id is provided."
)
instance_spec = instance_type or compute_resource.instance_spec
if not instance_spec:
raise ValueError(
"Instance spec is required when resource_id is provided."
)
instance_spec = InstanceSpec.model_validate(instance_spec)
instance_count = (
instance_count
or compute_resource.instance_count
or compute_resource.ecs_count
or 1
)
else:
if instance_spec:
instance_spec = None
logger.warning(
"The instance spec is ignored when resource_id is not provided."
)
instance_type = instance_type or compute_resource.ecs_spec
instance_count = (
instance_count
or compute_resource.ecs_count
or compute_resource.instance_count
or 1
)
labels = kwargs.pop("labels", dict())
if self.model_provider == ProviderAlibabaPAI:
default_labels = {
"BaseModelUri": self.uri,
"CreatedBy": MODEL_TASK_CREATED_BY_QUICKSTART,
"Domain": self.domain,
"RootModelID": self.model_id,
"RootModelName": self.model_name,
"RootModelVersion": self.model_version,
"Task": self.task,
}
default_labels.update(labels)
labels = default_labels
return AlgorithmEstimator(
algorithm_name=ts.algorithm_name,
algorithm_version=ts.algorithm_version,
algorithm_provider=ts.algorithm_provider,
algorithm_spec=ts.algorithm_spec,
hyperparameters=hyperparameters,
base_job_name=base_job_name,
max_run_time=max_run_time,
instance_type=instance_type,
instance_count=instance_count,
instance_spec=instance_spec,
resource_id=resource_id,
output_path=output_path,
labels=labels,
**kwargs,
)
def get_estimator_inputs(self, training_method=None) -> Dict[str, Any]:
"""Get the AlgorithmEstimator's default input channels
Get the AlgorithmEstimator's default input channels from RegisteredModel's
training_spec.
Returns:
Dict[str, str]: A dict of input channels.
"""
warnings.warn(
"`.get_estimator_inputs` is deprecated and will be removed in a future version, you can now use "
"`.training_recipe().default_inputs` instead.",
category=FutureWarning,
)
default_inputs = (
self.get_training_spec(training_method=training_method).inputs or []
)
ret = {}
for item in default_inputs:
if isinstance(item, UriInput):
ret[item.name] = item.input_uri
else:
ret[item.name] = item
return ret
def get_eval_processor(
self,
base_job_name: Optional[str] = None,
output_path: Optional[str] = None,
parameters: Optional[Dict[str, Any]] = None,
max_run_time: Optional[int] = None,
instance_type: Optional[str] = None,
instance_count: Optional[int] = None,
user_vpc_config: Optional[UserVpcConfig] = None,
):
"""Generate a Processor for model evaluation.
Generate a Processor object from RegisteredModel's evaluation_spec.
Args:
parameters (dict, optional): A dictionary that represents the
parameters used in the job. Default parameters will
be retrieved from the evaluation spec.
base_job_name (str, optional): The base name used to generate the
job name. If not provided, a default job name will be generated.
output_path (str, optional): An OSS URI to store the outputs of the
jobs. If not provided, an OSS URI will be generated using the default
OSS bucket in the session. When the `estimator.fit` method is called,
a specific OSS URI under the output_path for each channel is generated
and mounted to the container.
max_run_time (int, optional): The maximum time in seconds that the
job can run. The job will be terminated after the time is
reached (Default None).
instance_type (str, optional): The machine instance type used to run the
job. If not provider, the default instance type will be
retrieved from the evaluation spec. To view the supported machine
instance types, please refer to the document:
https://help.aliyun.com/document_detail/171758.htm#section-55y-4tq-84y.
instance_count (int, optional): The number of machines used to run the
job. If not provider, the default instance count will be
retrieved from the evaluation spec.
user_vpc_config (:class:`pai.estimator.UserVpcConfig`, optional): The VPC
configuration used to enable the job instance to connect to the
specified user VPC. If provided, an Elastic Network Interface (ENI) will
be created and attached to the job instance, allowing the
instance to access the resources within the specified VPC. Default to
None.
Returns:
:class:`pai.processor.Processor`: An Processor object.
"""
from ..processor import Processor
warnings.warn(
"`.get_eval_processor` is deprecated and will be removed in a future version, you can now use "
"`.model_recipe` instead.",
category=FutureWarning,
)
eval_spec = self._get_evaluation_spec()
if not eval_spec:
raise ValueError(
"The provided registered model does not contain evaluation spec."
)
eval_spec = ModelRecipeSpec.model_validate(eval_spec)
if not eval_spec.algorithm_spec:
raise ValueError(
"Invalid evaluation spec, the evaluation spec does not contain any"
" configuration for the evaluation job."
)
# workload = eval_spec.get("AlgorithmSpec")
if not base_job_name:
base_job_name = f"{self.model_name}_eval" if self.model_name else None
parameters = parameters or dict()
for item in eval_spec.hyperparameters:
if item.name not in parameters:
parameters[item.name] = item.value
if not max_run_time:
max_run_time = eval_spec.scheduler.max_running_time_in_seconds
compute_resource = eval_spec.compute_resource
if compute_resource and (not instance_type or not instance_count):
# If instance_type or instance_count is not provided, use the default
instance_type = instance_type or compute_resource.ecs_spec
instance_count = instance_count or compute_resource.ecs_count
source_dir = None
code_dir = eval_spec.algorithm_spec.code_dir
if code_dir and code_dir.location_type == "oss":
oss_uri_obj = OssUriObj.from_bucket_key_endpoint(
bucket_name=code_dir.location_value.bucket,
object_key=code_dir.location_value.key,
endpoint=code_dir.location_value.endpoint,
)
source_dir = oss_uri_obj.uri
processor = Processor(
image_uri=eval_spec.algorithm_spec.image,
command=eval_spec.algorithm_spec.command,
source_dir=source_dir,
parameters=parameters,
max_run_time=max_run_time,
base_job_name=base_job_name,
output_path=output_path,
instance_type=instance_type,
instance_count=instance_count,
user_vpc_config=user_vpc_config,
session=self.session,
)
processor.set_input_channels(eval_spec.algorithm_spec.input_channels)
processor.set_output_channels(eval_spec.algorithm_spec.output_channels)
return processor
def get_evaluation_inputs(self) -> Dict[str, Any]:
"""Get the Processor's default input channels
Get the Processor's default input channels from RegisteredModel's
evaluation_spec.
Returns:
dict[str, str]: A dict of input channels.
"""
warnings.warn(
"`.get_eval_inputs` is deprecated and will be removed in a future version, you can now use "
"`.model_recipe().default_inputs` instead.",
category=FutureWarning,
)
if not self.evaluation_spec:
raise ValueError(
"The provided registered model does not contain evaluation spec."
)
eval_spec = ModelRecipeSpec.model_validate(self.evaluation_spec)
inputs = eval_spec.inputs or []
res = {}
for item in inputs:
res[item.name] = item.input_uri if isinstance(item, UriInput) else item
return res
@classmethod
def _is_multiple_spec(cls, spec: Dict[str, Any]) -> bool:
return not ("AlgorithmSpec" in spec or "AlgorithmName" in spec)
def _get_evaluation_spec(self):
"""Get the evaluation_spec of the registered model."""
return self.evaluation_spec
def training_recipe(self, method: Optional[str] = None) -> "ModelTrainingRecipe":
"""Get the training recipe of the registered model.
Args:
method (str, optional): The training method used to select the
specific training recipe.
Returns:
:class:`pai.model.ModelTrainingRecipe`: A ModelTrainingRecipe object.
"""
from ._model_recipe import ModelTrainingRecipe
return ModelTrainingRecipe(
model_name=self.model_name,
model_version=self.model_version,
model_provider=self.model_provider,
method=method,
)
def model_recipe(
self, recipe_type: "ModelRecipeType", method: Optional[str] = None
) -> "ModelRecipe":
"""Initialize a ModelRecipe object from the recipe spec of the registered model.
Args:
recipe_type (ModelRecipeType): The recipe type used to select the specific model recipe.
supported recipe types are: "training", "evaluation", "compression".
method (str, optional): The method used to select the specific model recipe.
Returns:
:class:`pai.model.ModelRecipe`: A ModelRecipe object.
"""
from ._model_recipe import ModelRecipe
return ModelRecipe(
model_name=self.model_name,
model_version=self.model_version,
model_provider=self.model_provider,
recipe_type=recipe_type,
method=method,
)