pai/model/_model_recipe.py (597 lines of code) (raw):

# Copyright 2023 Alibaba, Inc. or its affiliates. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import dataclasses import enum import shutil from typing import Any, Dict, List, Optional, Tuple, Union from ..common.logging import get_logger from ..common.oss_utils import download, is_oss_uri from ..job._training_job import ( DEFAULT_OUTPUT_MODEL_CHANNEL_NAME, AlgorithmSpec, Channel, ComputeResource, DatasetConfig, ExperimentConfig, HyperParameterDefinition, InstanceSpec, ModelRecipeSpec, OssLocation, ResourceType, SpotSpec, TrainingJob, UriInput, UserVpcConfig, _TrainingJobSubmitter, ) from ..predictor import Predictor from ..session import get_default_session from ._model import InferenceSpec, Model, RegisteredModel, ResourceConfig logger = get_logger(__name__) @dataclasses.dataclass class RecipeInitKwargs(object): model_name: Optional[str] model_version: Optional[str] model_provider: Optional[str] method: Optional[str] # following fields are generated from model or overridden model_channel_name: Optional[str] model_uri: Optional[str] hyperparameters: Optional[Dict[str, Any]] hyperparameter_definitions: Optional[List[HyperParameterDefinition]] job_type: Optional[str] image_uri: Optional[str] source_dir: Optional[str] command: Union[str, List[str]] resource_id: Optional[str] instance_count: Optional[int] instance_type: Optional[str] instance_spec: Optional[InstanceSpec] max_run_time: Optional[int] labels: Optional[Dict[str, str]] requirements: Optional[List[str]] environments: Optional[Dict[str, str]] input_channels: Optional[List[Channel]] output_channels: Optional[List[Channel]] default_inputs: Optional[Union[UriInput, DatasetConfig]] customization: Optional[Dict[str, Any]] supported_instance_types: Optional[List[str]] class ModelRecipeType(enum.Enum): TRAINING = "training" EVALUATION = "evaluation" COMPRESSION = "compression" @classmethod def supported_types(cls): return [cls.TRAINING, cls.EVALUATION, cls.COMPRESSION] class ModelRecipe(_TrainingJobSubmitter): MODEL_CHANNEL_NAME = "model" def __init__( self, model_name: Optional[str] = None, model_version: Optional[str] = None, model_provider: Optional[str] = None, model_uri: Optional[str] = None, recipe_type: ModelRecipeType = ModelRecipeType.TRAINING, method: Optional[str] = None, source_dir: Optional[str] = None, model_channel_name: Optional[str] = "model", hyperparameters: Optional[Dict[str, Any]] = None, job_type: Optional[str] = None, image_uri: Optional[str] = None, command: Union[str, List[str]] = None, instance_count: Optional[int] = None, instance_type: Optional[str] = None, instance_spec: Optional[InstanceSpec] = None, resource_id: Optional[str] = None, resource_type: Optional[Union[str, ResourceType]] = None, spot_spec: Optional[SpotSpec] = None, user_vpc_config: Optional[UserVpcConfig] = None, labels: Optional[Dict[str, str]] = None, requirements: Optional[List[str]] = None, environments: Optional[Dict[str, str]] = None, experiment_config: Optional[ExperimentConfig] = None, input_channels: Optional[List[Channel]] = None, output_channels: Optional[List[Channel]] = None, max_run_time: Optional[int] = None, default_inputs: Optional[Dict[str, Any]] = None, base_job_name: Optional[str] = None, supported_instance_type: Optional[List[str]] = None, settings: Optional[Dict[str, Any]] = None, ): init_kwargs = self._init_kwargs( model_name=model_name, model_version=model_version, model_provider=model_provider, recipe_type=recipe_type, method=method, # get from model or override model_uri=model_uri, model_channel_name=model_channel_name, hyperparameters=hyperparameters, job_type=job_type, image_uri=image_uri, source_dir=source_dir, command=command, instance_count=instance_count, instance_spec=instance_spec, instance_type=instance_type, labels=labels, requirements=requirements, environments=environments, input_channels=input_channels, output_channels=output_channels, default_inputs=default_inputs, max_run_time=max_run_time, supported_instance_types=supported_instance_type, ) self.model_name = init_kwargs.model_name self.model_version = init_kwargs.model_version self.model_provider = init_kwargs.model_provider self.method = init_kwargs.method self.model_uri = init_kwargs.model_uri self.model_channel_name = init_kwargs.model_channel_name self.job_type = init_kwargs.job_type self.hyperparameters = init_kwargs.hyperparameters self.image_uri = init_kwargs.image_uri self.command = init_kwargs.command self.source_dir = init_kwargs.source_dir self.default_inputs = init_kwargs.default_inputs self.customization = init_kwargs.customization self.supported_instance_types = init_kwargs.supported_instance_types self.input_channels = init_kwargs.input_channels self.output_channels = init_kwargs.output_channels self.hyperparameter_definitions = init_kwargs.hyperparameter_definitions super().__init__( resource_type=resource_type, base_job_name=base_job_name, experiment_config=experiment_config, resource_id=resource_id, user_vpc_config=user_vpc_config, spot_spec=spot_spec, instance_type=init_kwargs.instance_type, instance_count=init_kwargs.instance_count, instance_spec=init_kwargs.instance_spec, max_run_time=init_kwargs.max_run_time, environments=init_kwargs.environments, requirements=init_kwargs.requirements, labels=init_kwargs.labels, settings=settings, ) @classmethod def _init_kwargs( cls, model_name: Optional[str] = None, model_version: Optional[str] = None, model_provider: Optional[str] = None, recipe_type: ModelRecipeType = ModelRecipeType.TRAINING, method: Optional[str] = None, model_channel_name: Optional[str] = "model", model_uri: Optional[str] = None, hyperparameters: Optional[Dict[str, Any]] = None, job_type: Optional[str] = None, image_uri: Optional[str] = None, source_dir: Optional[str] = None, command: Union[str, List[str]] = None, instance_count: Optional[int] = None, instance_type: Optional[str] = None, resource_id: Optional[str] = None, instance_spec: Optional[InstanceSpec] = None, max_run_time: Optional[int] = None, labels: Optional[Dict[str, str]] = None, requirements: Optional[List[str]] = None, environments: Optional[Dict[str, str]] = None, input_channels: List[Channel] = None, output_channels: List[Channel] = None, default_inputs: Optional[Union[UriInput, DatasetConfig]] = None, supported_instance_types: Optional[List[str]] = None, ) -> RecipeInitKwargs: model = ( RegisteredModel( model_name=model_name, model_version=model_version, model_provider=model_provider, ) if model_name else None ) model_recipe_spec = ( model.get_recipe_spec(recipe_type=recipe_type, method=method) if model else None ) model_uri = model_uri or (model and model.uri) customization = None if not model_recipe_spec: return RecipeInitKwargs( model_name=model_name, model_version=model_version, model_provider=model_provider, method=method, model_channel_name=model_channel_name, model_uri=model_uri, hyperparameters=hyperparameters, job_type=job_type, image_uri=image_uri, source_dir=source_dir, command=command, instance_count=instance_count, instance_type=instance_type, instance_spec=instance_spec, resource_id=resource_id, labels=labels, requirements=requirements, environments=environments, input_channels=input_channels, output_channels=output_channels, max_run_time=max_run_time, default_inputs=default_inputs, customization=customization, supported_instance_types=supported_instance_types, hyperparameter_definitions=None, ) if not model_uri: input_ = next( ( item for item in model_recipe_spec.inputs if item.name == model_channel_name ), None, ) if input_: if isinstance(input_, UriInput): model_uri = input_.input_uri else: logger.warning( "Input channel '%s' is not a URI input: %s", model_channel_name, type(input_), ) if not default_inputs and model_recipe_spec.inputs: default_inputs = {} for item in model_recipe_spec.inputs: if isinstance(item, UriInput): default_inputs[item.name] = item.input_uri else: default_inputs[item.name] = item algorithm_spec = cls._get_algorithm_spec(model_recipe_spec) supported_instance_types = ( supported_instance_types or model_recipe_spec.supported_instance_types ) hyperparameter_definitions = None if algorithm_spec: if ( not source_dir and algorithm_spec.code_dir and isinstance(algorithm_spec.code_dir.location_value, OssLocation) ): oss_location = algorithm_spec.code_dir.location_value if oss_location.endpoint: source_dir = f"oss://{oss_location.bucket}.{oss_location.endpoint}/{oss_location.key.lstrip('/')}" else: source_dir = ( f"oss://{oss_location.bucket}/{oss_location.key.lstrip('/')}" ) image_uri = image_uri or algorithm_spec.image command = command or algorithm_spec.command job_type = job_type or algorithm_spec.job_type input_channels = input_channels or algorithm_spec.input_channels output_channels = output_channels or algorithm_spec.output_channels customization = algorithm_spec.customization supported_instance_types = ( supported_instance_types or algorithm_spec.supported_channel_types ) hyperparameter_definitions = algorithm_spec.hyperparameter_definitions instance_type, instance_spec, instance_count = cls._get_compute_resource_config( instance_type=instance_type, instance_spec=instance_spec, instance_count=instance_count, resource_id=resource_id, compute_resource=model_recipe_spec.compute_resource, supported_instance_types=supported_instance_types, ) hyperparameters = hyperparameters or {} hyperparameters = { **{ hp.name: hp.default_value for hp in ( algorithm_spec and algorithm_spec.hyperparameter_definitions or {} ) if hp.default_value is not None and hp.default_value != "" }, **{hp.name: hp.value for hp in model_recipe_spec.hyperparameters}, **hyperparameters, } requirements = requirements or model_recipe_spec.requirements environments = environments or model_recipe_spec.environments return RecipeInitKwargs( model_name=model_name, model_version=model_version, model_provider=model_provider, method=method, model_uri=model_uri, model_channel_name=model_channel_name, hyperparameters=hyperparameters, job_type=job_type, image_uri=image_uri, source_dir=source_dir, command=command, instance_count=instance_count, instance_spec=instance_spec, instance_type=instance_type, max_run_time=max_run_time, labels=labels, requirements=requirements, environments=environments, input_channels=input_channels, output_channels=output_channels, resource_id=resource_id, default_inputs=default_inputs, customization=customization, supported_instance_types=supported_instance_types, hyperparameter_definitions=hyperparameter_definitions, ) @staticmethod def _get_compute_resource_config( instance_type: str, instance_count: int, instance_spec: InstanceSpec, resource_id: str, compute_resource: ComputeResource, supported_instance_types: List[str], ) -> Tuple[str, InstanceSpec, int]: if resource_id: if instance_type: logger.warning( "The instance type is ignored when resource_id is provided." ) instance_spec = instance_spec or ( compute_resource and compute_resource.instance_spec ) if not instance_spec: raise ValueError( "Running in dedicated resource group, please provide instance spec" " for the training job." ) instance_count = ( instance_count or (compute_resource and compute_resource.instance_count) or 1 ) else: if instance_spec: logger.warning( "The instance spec is ignored when resource_id is not provided." ) instance_type = instance_type or ( compute_resource and compute_resource.ecs_spec ) if not instance_type: if not supported_instance_types: raise ValueError( "No instance type is specified for the training job" ) else: instance_type = supported_instance_types[0] instance_count = ( instance_count or (compute_resource and compute_resource.ecs_count) or 1 ) return instance_type, instance_spec, instance_count @staticmethod def _get_algorithm_spec(model_recipe_spec: ModelRecipeSpec) -> AlgorithmSpec: session = get_default_session() if model_recipe_spec.algorithm_spec: return model_recipe_spec.algorithm_spec if not model_recipe_spec.algorithm_name: raise ValueError( "Both algorithm_name and algorithm_spec are not provided " "in the model training spec." ) algo = session.algorithm_api.get_by_name( algorithm_name=model_recipe_spec.algorithm_name, algorithm_provider=model_recipe_spec.algorithm_provider, ) raw_algo_version_spec = session.algorithm_api.get_version( algorithm_id=algo["AlgorithmId"], algorithm_version=model_recipe_spec.algorithm_version, ) return AlgorithmSpec.model_validate(raw_algo_version_spec["AlgorithmSpec"]) def _build_algorithm_spec( self, code_input, inputs: Dict[str, Any] ) -> AlgorithmSpec: algorithm_spec = AlgorithmSpec( command=( self.command if isinstance(self.command, list) else ["sh", "-c", self.command] ), image=self.image_uri, job_type=self.job_type, code_dir=code_input, output_channels=self.output_channels or self._default_training_output_channels(), input_channels=self.input_channels or [ Channel(name=channel_name, required=False) for channel_name in inputs.keys() ], customization=self.customization, ) return algorithm_spec def retrieve_scripts(self, local_path: str) -> str: """Retrieve the training scripts to the local file system. Args: local_path (str): The local path where the training scripts are saved. Returns: str: The local path where the training scripts are saved. """ if not self.source_dir: raise RuntimeError("Source code is not available for the training job.") if is_oss_uri(self.source_dir): return download(self.source_dir, local_path, un_tar=True) else: shutil.copytree(self.source_dir, local_path) return local_path def run( self, inputs: Optional[Dict[str, Union[str, DatasetConfig]]] = None, outputs: Optional[Dict[str, Union[str, DatasetConfig]]] = None, wait: bool = True, job_name: Optional[str] = None, show_logs: bool = True, ) -> TrainingJob: """Start a training job with the given inputs. Args: inputs (Dict[str, Union[str, DatasetConfig]], optional): A dictionary of inputs used in the training job. The keys are the channel name and the values are the URIs of the input data. If not specified, the default inputs will be used. wait (bool): Whether to wait for the job to complete before returning. Default to True. job_name (str, optional): The name of the training job. If not provided, a default job name will be generated. show_logs (bool): Whether to show the logs of the training job. Default to True. Returns: :class:`pai.training.TrainingJob`: A submitted training job. """ job_name = self.job_name(job_name) inputs = inputs or dict() code_input = self._build_code_input(job_name, source_dir=self.source_dir) algo_spec = self._build_algorithm_spec( code_input=code_input, inputs=inputs, ) if self.model_channel_name not in inputs: inputs[self.model_channel_name] = self.model_uri if len(inputs.keys()) == 1 and self.model_channel_name in inputs: default_inputs = self.default_inputs else: default_inputs = None inputs = self.build_inputs( inputs=inputs, input_channels=algo_spec.input_channels, default_inputs=default_inputs, ) outputs = self.build_outputs( job_name=job_name, output_channels=algo_spec.output_channels, outputs=outputs, ) return self._submit( job_name=job_name, algorithm_spec=algo_spec, instance_spec=self.instance_spec, instance_type=self.instance_type, instance_count=self.instance_count, resource_id=self.resource_id, hyperparameters=self.hyperparameters, environments=self.environments, requirements=self.requirements, max_run_time=self.max_run_time, inputs=inputs, outputs=outputs, user_vpc_config=self.user_vpc_config if self.user_vpc_config else None, # experiment_config=self.experiment_config if self.experiment_config else None, labels=self.labels, wait=wait, show_logs=show_logs, ) class ModelTrainingRecipe(ModelRecipe): """A recipe used to train a model.""" def __init__( self, model_name: Optional[str] = None, model_version: Optional[str] = None, model_provider: Optional[str] = None, model_uri: Optional[str] = None, method: Optional[str] = None, source_dir: Optional[str] = None, model_channel_name: Optional[str] = "model", hyperparameters: Optional[Dict[str, Any]] = None, job_type: Optional[str] = None, image_uri: Optional[str] = None, command: Union[str, List[str]] = None, instance_count: Optional[int] = None, instance_type: Optional[str] = None, spot_spec: Optional[SpotSpec] = None, instance_spec: Optional[InstanceSpec] = None, resource_id: Optional[str] = None, resource_type: Optional[Union[str, ResourceType]] = None, user_vpc_config: Optional[UserVpcConfig] = None, labels: Optional[Dict[str, str]] = None, requirements: Optional[List[str]] = None, environments: Optional[Dict[str, str]] = None, experiment_config: Optional[ExperimentConfig] = None, input_channels: Optional[List[Channel]] = None, output_channels: Optional[List[Channel]] = None, max_run_time: Optional[int] = None, default_training_inputs: Optional[Dict[str, Any]] = None, base_job_name: Optional[str] = None, **kwargs, ): """Initialize a ModelTrainingRecipe object. Args: model_name (str, optional): The name of the registered model. Default to None. model_version (str, optional): The version of the registered model. Default to None. model_provider (str, optional): The provider of the registered model. Optional values are "pai", "huggingface" or None. If None, list registered models in the workspace of the current session. Default to None. method (str, optional): The training method used to select the specific training recipe while the registered model contains multiple model training specs. Default to None. model_channel_name (str, optional): The name of the model channel. Default to "model". model_uri (str, optional): The URI of the input pretrained model. If the URI is not provided, the model from the registered model will be used. Default to None. hyperparameters (dict, optional): A dictionary of hyperparameters used in the training job. Default to None. job_type (str, optional): The type of the job, supported values are "PyTorch", "TfJob", "XGBoostJob" etc. image_uri (str, optional): The URI of the Docker image. Default to None. source_dir (str, optional): The source code using in the training job, which is a directory containing the training script or an OSS URI. Default to None. command (str or list, optional): The command to execute in the training job. Default to None. requirements (list, optional): A list of Python requirements used to install the dependencies in the training job. Default to None. instance_count (int, optional): The number of instances to use for training. Default to None. instance_type (str, optional): The instance type to use for training. Default to None. instance_spec (:class:`pai.model.InstanceSpec`, optional): The resource config for each instance of the training job. The dedicated resource group must be provided when the instance spec is set. Default to None. resource_id (str, optional): The ID of the resource group used to run the training job. Default to None. spot_spec (:class:`pai.model.SpotSpec`, optional): The spot instance config used to run the training job. If provided, spot instance will be used. resource_type (str, optional): The resource type used to run the training job. By default, general computing resource is used. If the resource_type is 'Lingjun', Lingjun computing resource is used. user_vpc_config (:class:`pai.model.UserVpcConfig`, optional): The VPC configuration used to enable the job instance to connect to the specified user VPC. Default to None. environments (dict, optional): A dictionary of environment variables used in the training job. Default to None. experiment_config (:class:`pai.model.ExperimentConfig`, optional): The experiment labels (dict, optional): A dictionary of labels used to tag the training job. Default to None. """ super().__init__( model_name=model_name, model_version=model_version, model_provider=model_provider, model_uri=model_uri, method=method, recipe_type=ModelRecipeType.TRAINING, source_dir=source_dir, model_channel_name=model_channel_name, hyperparameters=hyperparameters, job_type=job_type, image_uri=image_uri, command=command, instance_count=instance_count, instance_type=instance_type, instance_spec=instance_spec, resource_type=resource_type, resource_id=resource_id, spot_spec=spot_spec, user_vpc_config=user_vpc_config, labels=labels, requirements=requirements, environments=environments, experiment_config=experiment_config, input_channels=input_channels, output_channels=output_channels, max_run_time=max_run_time, default_inputs=default_training_inputs, base_job_name=base_job_name, **kwargs, ) def train( self, inputs: Optional[Dict[str, Union[str, DatasetConfig]]] = None, outputs: Optional[Dict[str, Union[str, DatasetConfig]]] = None, wait: bool = True, job_name: Optional[str] = None, show_logs: bool = True, ) -> TrainingJob: """Start a training job with the given inputs. Args: inputs (Dict[str, Union[str, DatasetConfig]], optional): A dictionary of inputs used in the training job. The keys are the channel name and the values are the URIs of the input data. If not specified, the default inputs will be used. outputs (Dict[str, Union[str, DatasetConfig]], optional): A dictionary of outputs used in the training job. The keys are the channel name and the values are the URIs or Dataset of the output data. wait (bool): Whether to wait for the job to complete before returning. Default to True. job_name (str, optional): The name of the training job. If not provided, a default job name will be generated. show_logs (bool): Whether to show the logs of the training job. Default to True. Note that the logs will be shown only when the `wait` is set to True. Returns: :class:`pai.training.TrainingJob`: A submitted training job. """ return self.run( inputs=inputs, outputs=outputs, wait=wait, job_name=job_name, show_logs=show_logs, ) def deploy( self, service_name: str, instance_type: Optional[str] = None, instance_count: int = 1, resource_config: Optional[Union[ResourceConfig, Dict[str, int]]] = None, resource_id: str = None, options: Optional[Dict[str, Any]] = None, wait=True, inference_spec: Optional[InferenceSpec] = None, **kwargs, ) -> Predictor: """Deploy the training job output model as a online prediction service. Args: service_name (str): The name of the online prediction service. instance_type (str, optional): The instance type used to run the service. instance_count (int, optional): The number of instances used to run the service. Default to 1. resource_config (Union[ResourceConfig, Dict[str, int]], optional): The resource config for the service. Default to None. resource_id (str, optional): The ID of the resource group used to run the service. Default to None. options (Dict[str, Any], optional): The options used to deploy the service. Default to None. wait (bool, optional): Whether to wait for the service endpoint to be ready. inference_spec (:class:`pai.model.InferenceSpec`, optional): The inference spec used to deploy the service. If not provided, the `inference_spec` of the model will be used. Default to None. kwargs: Additional keyword arguments used to deploy the service. Returns: :class:`pai.predictor.Predictor`: A predictor object refers to the created service. """ if not inference_spec and self.model_name: model = RegisteredModel( model_name=self.model_name, model_version=self.model_version, model_provider=self.model_provider, ) inference_spec = model.inference_spec if not inference_spec: raise RuntimeError("No inference_spec is available for model deployment.") m = Model( model_data=self.model_data(), inference_spec=inference_spec, ) p = m.deploy( service_name=service_name, instance_type=instance_type, instance_count=instance_count, resource_config=resource_config, resource_id=resource_id, options=options, wait=wait, **kwargs, ) return p def model_data(self): if not self._training_jobs: raise RuntimeError("No training job is available for deployment.") if not self.latest_job.is_succeeded(): logger.warning( "The latest training job is not succeeded, the deployment may not work." ) return self.latest_job.output_path( channel_name=DEFAULT_OUTPUT_MODEL_CHANNEL_NAME )