pai/processor.py (154 lines of code) (raw):
# Copyright 2024 Alibaba, Inc. or its affiliates.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from datetime import datetime
from typing import Any, Dict, List, Optional, Union
from .common.consts import JobType, StoragePathCategory
from .common.logging import get_logger
from .common.utils import experimental, random_str, to_plain_text
from .job import (
AlgorithmSpec,
Channel,
CodeDir,
ExperimentConfig,
SpotSpec,
TrainingJob,
UriOutput,
UserVpcConfig,
_TrainingJobSubmitter,
)
from .job._training_job import ResourceType
from .session import Session, get_default_session
logger = get_logger(__name__)
@experimental
class Processor(_TrainingJobSubmitter):
def __init__(
self,
image_uri: str,
command: Union[str, List[str]],
source_dir: Optional[str] = None,
job_type: str = JobType.PyTorchJob,
parameters: Optional[Dict[str, Any]] = None,
environments: Optional[Dict[str, str]] = None,
requirements: Optional[List[str]] = None,
max_run_time: Optional[int] = None,
base_job_name: Optional[str] = None,
output_path: Optional[str] = None,
instance_type: Optional[str] = None,
spot_spec: Optional[SpotSpec] = None,
resource_type: Optional[Union[str, ResourceType]] = None,
instance_count: Optional[int] = None,
user_vpc_config: Optional[UserVpcConfig] = None,
experiment_config: Optional[ExperimentConfig] = None,
settings: Optional[Dict[str, Any]] = None,
labels: Optional[Dict[str, str]] = None,
session: Optional[Session] = None,
):
"""Processor constructor.
Args:
image_uri (str): The image used in the job. It can be an image
provided by PAI or a user customized image. To view the images provided
by PAI, please refer to the document:
https://help.aliyun.com/document_detail/202834.htm.
command (Union[str, List[str]): The command used to run the job.
source_dir (str, optional): The local source code directory used in the
job. The directory will be packaged and uploaded to an OSS
bucket, then downloaded to the `/ml/usercode` directory in the
job container. If there is a `requirements.txt` file in the source code
directory, the corresponding dependencies will be installed before the
script runs.
If 'git_config' is provided, 'source_dir' should be a relative location
to a directory in the Git repo. With the following GitHub repo directory
structure:
.. code::
|----- README.md
|----- src
|----- train.py
|----- test.py
if you need 'src' directory as the source code directory, you can assign
source_dir='./src/'.
job_type (str): The type of job, which can be TFJob, PyTorchJob, XGBoostJob,
etc. Default value is PyTorchJob.
parameters (dict, optional): A dictionary that represents the
parameters used in the job. The parameters will be
stored in the `/ml/input/config/hyperparameters.json` as a JSON
dictionary in the container.
environments: A dictionary that maps environment variable names to their values.
This optional field allows you to provide a set of environment variables that will be
applied to the context where the code is executed.
requirements (list, optional): An optional list of strings that specifies the Python
package dependencies with their versions. Each string in the list should be in the format
'package' or 'package==version'. This is similar to the contents of a requirements.txt file used
in Python projects. If requirements.txt is provided in user code directory, requirements
will override the conflict dependencies directly.
max_run_time (int, optional): The maximum time in seconds that the
job can run. The job will be terminated after the time is
reached (Default None).
base_job_name (str, optional): The base name used to generate the
job name.
output_path (str, optional): An OSS URI to store the outputs of the
jobs. If not provided, an OSS URI will be generated using the default
OSS bucket in the session. When the `estimator.fit` method is called,
a specific OSS URI under the output_path for each channel is generated
and mounted to the container.
A completed container directory structure example::
/ml
|-- usercode // User source code directory.
| |-- requirements.txt
| `-- train.py
|-- input // Job input
| `-- config
| |-- hyperparameters.json // Hyperparameters in JSON
| | // dictionary format for the
| | // Job
| |
| `-- data // Job input channels
| | // `/ml/input/data/` is a input
| | // channel, and the directory
| | // name is the channel name.
| | // Each directory under the
| |-- test-data
| | `-- test.csv
| `-- train-data
| `-- train.csv
`-- output // Job output channels.
| // Each directory under the
| // `/ml/output/` is an output
| // channel, and the directory
| // name is the channel name.
`-- model
`-- checkpoints
instance_type (str): The machine instance type used to run the job.
To view the supported machine instance types, please refer to the
document:
https://help.aliyun.com/document_detail/171758.htm#section-55y-4tq-84y.
If the instance_type is "local", the job is executed locally
using docker.
instance_count (int): The number of machines used to run the job.
resource_type (str, optional): The resource type used to run the training job.
By default, general computing resource is used. If the resource_type is
'Lingjun', Lingjun computing resource is used.
user_vpc_config (:class:`pai.estimator.UserVpcConfig`, optional): The VPC
configuration used to enable the job instance to connect to the
specified user VPC. If provided, an Elastic Network Interface (ENI) will
be created and attached to the job instance, allowing the
instance to access the resources within the specified VPC. Default to
None.
experiment_config(:class:`pai.estimator.ExperimentConfig`, optional): The
experiment configuration used to construct the relationship between the
job and the experiment. If provided, the training job will belong to the
specified experiment, in which case the job will use artifact_uri of
settings (dict, optional): A dictionary that represents the additional settings
for job, such as AIMaster configurations.
experiment as default output path. Default to None.
labels (Dict[str, str], optional): A dictionary that maps label names to
their values. This optional field allows you to provide a set of labels
that will be applied to the training job.
session (Session, optional): A PAI session instance used for communicating
with PAI service.
"""
self.image_uri = image_uri
self.command = command
self.source_dir = source_dir
self.job_type = job_type or JobType.PyTorchJob
self.parameters = parameters or dict()
self.session = session or get_default_session()
self._input_channels = None
self._output_channels = None
super().__init__(
resource_type=resource_type,
spot_spec=spot_spec,
base_job_name=base_job_name,
output_path=output_path,
experiment_config=experiment_config,
instance_type=instance_type,
instance_count=instance_count or 1,
user_vpc_config=user_vpc_config,
max_run_time=max_run_time,
environments=environments,
requirements=requirements,
labels=labels,
settings=settings,
)
def run(
self,
inputs: Dict[str, Any] = None,
outputs: Dict[str, Any] = None,
wait: bool = True,
show_logs: bool = True,
) -> TrainingJob:
"""Submit a job with the given input and output channels.
Args:
inputs (Dict[str, Any]): A dictionary representing the input data for the
job. Each key/value pair in the dictionary is an input channel,
the key is the channel name, and the value is the input data. The input
data can be an OSS URI or a NAS URI object and will be mounted to the
`/ml/input/data/{channel_name}` directory in the job container.
outputs (Dict[str, Any]): A dictionary representing the output data for the
job. Each key/value pair in the dictionary is an output channel,
the key is the channel name, and the value is the output path. The output
path can be an OSS URI or a NAS URI object and will be mounted to the
`/ml/outputs/data/{channel_name}` directory in the job container.
wait (bool): Specifies whether to block until the training job is completed,
either succeeded, failed, or stopped. (Default True).
show_logs (bool): Whether to show the logs of the job. Default to True.
Note that the logs will be shown only when the `wait` is set to True.
Returns:
:class:`pai.job.TrainingJob`: A submitted training job.
Raises:
UnExpectedStatusException: If the job fails.
"""
inputs = inputs or dict()
outputs = outputs or dict()
job_name = self._gen_job_display_name()
code_dest = Session.get_storage_path_by_category(
StoragePathCategory.ProcessingSrc, to_plain_text(job_name)
)
code_dir = self._build_code_input(job_name, self.source_dir, code_dest)
algo_spec = self._build_algorithm_spec(
code_input=code_dir,
)
inputs = self.build_inputs(inputs, input_channels=algo_spec.input_channels)
outputs = self.build_outputs(
job_name=job_name,
output_channels=algo_spec.output_channels,
outputs=outputs,
)
return self._submit(
instance_count=self.instance_count,
instance_type=self.instance_type,
job_name=job_name,
hyperparameters=self.parameters,
environments=self.environments,
requirements=self.requirements,
max_run_time=self.max_run_time,
inputs=inputs,
outputs=outputs,
algorithm_spec=algo_spec,
user_vpc_config=(
self.user_vpc_config.model_dump() if self.user_vpc_config else None
),
experiment_config=(
self.experiment_config.model_dump() if self.experiment_config else None
),
labels=self.labels,
wait=wait,
show_logs=show_logs,
)
def _gen_job_display_name(self, job_name=None):
"""Generate job display name."""
if job_name:
return job_name
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
return "{}_{}".format(self.base_job_name or "processing_job", ts)
def _build_algorithm_spec(self, code_input: CodeDir) -> AlgorithmSpec:
"""Build a temporary AlgorithmSpec used for submitting the Job."""
algorithm_spec = AlgorithmSpec(
command=(
self.command
if isinstance(self.command, list)
else [
"sh",
"-c",
self.command,
]
),
image=self.image_uri,
job_type=self.job_type,
code_dir=code_input,
input_channels=self._input_channels or [],
output_channels=self._output_channels or [],
)
return algorithm_spec
def _training_job_base_output(self, job_name: str) -> str:
"""Generate the base output path for the job."""
bucket_name = self.session.oss_bucket.bucket_name
# replace non-alphanumeric character in job name.
job_name = to_plain_text(job_name)
if self.output_path:
return os.path.join(self.output_path, f"{job_name}_{random_str(6)}")
job_output_path = Session.get_storage_path_by_category(
StoragePathCategory.ProcessingJob, f"{job_name}_{random_str(6)}"
)
return f"oss://{bucket_name}/{job_output_path}"
def get_outputs_data(self) -> Dict[str, str]:
"""Show all outputs data paths.
Returns:
dict[str, str]: A dictionary of all outputs data paths.
"""
if not self.latest_job:
raise RuntimeError("Current no Job for the processor.")
return {
ch.name: ch.output_uri if isinstance(ch, UriOutput) else ch.dataset_id
for ch in self.latest_job.outputs
}
def set_input_channels(self, channels: List[Channel]):
self._input_channels = channels
def set_output_channels(self, channels: List[Channel]):
self._output_channels = channels