pai/pipeline/step.py (254 lines of code) (raw):
# Copyright 2023 Alibaba, Inc. or its affiliates.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import absolute_import
import itertools
import six
from pai.pipeline.types import PipelineVariable
from pai.pipeline.types.parameter import ConditionExpr, LoopItems, PipelineParameter
from pai.pipeline.types.spec import load_input_output_spec
class _ComponentRefer(object):
def __init__(self):
pass
def to_dict(self):
pass
class RegisteredComponentRefer(_ComponentRefer):
def __init__(self, identifier, provider, version):
super(RegisteredComponentRefer, self).__init__()
self.identifier = identifier
self.provider = provider
self.version = version
def to_dict(self):
return {
"identifier": self.identifier,
"provider": self.provider,
"version": self.version,
}
class UnRegisteredComponentRefer(_ComponentRefer):
def __init__(self, guid):
super(UnRegisteredComponentRefer, self).__init__()
self.guid = guid
def to_dict(self):
return {"guid": self.guid}
class PipelineStep(object):
"""Represents an execution step in PAI pipeline.
Pipeline steps can be configured together to construct a Pipeline, which is present as workflow
in PAI ML pipeline service.
"""
def __init__(
self,
inputs=None,
name=None,
depends=None,
component=None,
):
"""Construct a step which represent component execution in pipeline.
Args:
inputs (dict): Inputs for the step in dict: key is the component input name, value
could be the output artifact/parameter from other step, input of the pipeline,
or actual value for the step.
name (str): Name of the step in pipeline, must be unique in the pipeline.
depends (list): A list of PipelineStep which step depends.
component (OperatorBase): The component used by the constructed step.
"""
from pai.pipeline.component import RegisteredComponent
self._depends = depends or set()
if any([type(x) for x in self._depends if not isinstance(x, PipelineStep)]):
raise ValueError("Invalid variable in depends, expected PipelineStep")
self._assigned = set()
if isinstance(component, RegisteredComponent):
self.component_ref = RegisteredComponentRefer(
identifier=component.identifier,
version=component.version,
provider=component.provider,
)
else:
self.component_ref = UnRegisteredComponentRefer(guid=component.guid)
self._name = name
self._component = component
(
inputs_spec,
outputs_spec,
) = load_input_output_spec(self, component.io_spec_to_dict())
self.parent = None
self.inputs = inputs_spec
self.outputs = outputs_spec
self._assign_inputs(inputs)
self._repeated_artifact_config = {}
@property
def is_component_registered(self):
if isinstance(self.component_ref, RegisteredComponentRefer):
return True
return False
@property
def component(self):
return self._component
def gen_name_prefix(self):
if self.is_component_registered:
return self.component_ref.identifier
return self.component.name
@classmethod
def from_registered_component(
cls,
identifier,
provider=None,
version="v1",
inputs=None,
name=None,
depends=None,
):
"""Build the PipelineStep from the given registered component reference: identifier, version, provider.
Args:
identifier: Identifier of the registered component.
provider: Provider of the registered component.
version: Version of the registered component.
inputs: Inputs for the building step.
name: Name for the building step.
depends: Depended steps of the building step.
Returns:
PipelineStep: The built step instantiates from the given registered component and inputs.
"""
from pai.pipeline.component import RegisteredComponent
from pai.session import get_default_session
provider = provider or get_default_session().provider
c = RegisteredComponent.get_by_identifier(
identifier=identifier, provider=provider, version=version
)
if not c:
raise ValueError(
"Specific register component not found: identifier={0}, provider={1}, version={2}".format(
identifier, provider, version
)
)
return cls(
inputs=inputs or [],
name=name,
depends=depends,
component=c,
)
@property
def repeated_io_config(self):
return self._repeated_artifact_config
def set_artifact_count(self, artifact_name, count):
artifacts = {
item.name: item
for item in itertools.chain(self.outputs.artifacts, self.inputs.artifacts)
}
artifact = artifacts.get(artifact_name)
if not artifact:
raise ValueError("artifact is not exists: %s" % artifact_name)
if not artifact.repeated:
raise ValueError("artifact is not repeated: %s", artifact_name)
artifact.count = count
return self
# TODO: Confirm pipeline step name restriction
@classmethod
def _validate_name(cls, name):
if name is None:
return
if not isinstance(name, six.string_types):
raise ValueError("PipelineStep name should be string type")
if not name:
raise ValueError("PipelineStep name should not be empty str")
if len(name) > 30:
raise ValueError("Given invalid pipeline step name.")
def _assign_inputs(self, inputs):
"""Assign inputs to the step.
Inputs could be inputs definition of pipeline, output of other steps or actual value.
Args:
inputs: Inputs for the step.
"""
if not inputs:
return
assign_items = self.inputs.assign(inputs)
self._assigned = self._assigned.union(set(item.name for item in assign_items))
if isinstance(inputs, dict):
inputs = inputs.values()
values = []
for ipt in inputs:
if isinstance(ipt, (list, tuple)):
values.extend(ipt)
else:
values.append(ipt)
def _depend_step(input):
from pai.pipeline.types.artifact import PipelineArtifactElement
if isinstance(input, PipelineVariable) and input.parent:
return input.parent
elif isinstance(input, PipelineArtifactElement) and input.artifact.parent:
return input.artifact.parent
input_steps = set(filter(None, [_depend_step(val) for val in values]))
self._depends = input_steps.union(self._depends)
@property
def depends(self):
return list(self._depends)
@property
def name(self):
return self._name
@name.setter
def name(self, value):
self._name = value
@classmethod
def get_component(cls, identifier, provider, version):
from pai.pipeline.component import RegisteredComponent
component = RegisteredComponent.get_by_identifier(
identifier=identifier, provider=provider, version=version
)
return component
def after(self, *steps):
if self.parent or any(step for step in steps if step.parent):
raise ValueError(
"Not allow operation, pipeline step has been included in a pipeline"
)
for step in steps:
if step not in self._depends:
self._depends.add(step)
@property
def ref_name(self):
return "pipelines.{}".format(self.name)
def _convert_spec_to_json(self):
assigned_inputs = [ipt for ipt in self.inputs if ipt.name in self._assigned]
repeated_artifact_config = [
{
"name": opt.name,
"value": [None] * opt.count,
}
for opt in self.outputs.artifacts
if opt.repeated and opt.count
]
spec = {
"arguments": {
"parameters": [
ipt.to_argument()
for ipt in assigned_inputs
if ipt.variable_category == "parameters"
],
"artifacts": [
ipt.to_argument()
for ipt in assigned_inputs
if ipt.variable_category == "artifacts"
]
+ repeated_artifact_config,
}
}
if self._depends:
spec["dependencies"] = [step.name for step in self.depends]
return spec
def to_dict(self):
metadata = {"name": self.name}
metadata.update(self.component_ref.to_dict())
d = {
"metadata": metadata,
"spec": self._convert_spec_to_json(),
}
return d
class ConditionStep(PipelineStep):
"""Represent a conditional execution step in pipeline."""
def __init__(self, condition, inputs=None, name=None, depends=None, component=None):
"""Construct a ConditionStep to support conditional execution in pipeline.
A ConditionStep only execute if condition evaluate to true.
Args:
condition (Union[str, ConditionExpr]): Condition expression used to determine
if the step should be executed, could be ConditionExpr or str.
inputs (dict): Inputs for the step in dict: key is the component input name, value
could be the output artifact/parameter from other step, input of the pipeline,
or actual value for the step.
name (str): Name of the step in pipeline, must be unique in the pipeline.
depends (list): A list of PipelineStep which step depends.
component (ComponentBase): The component used by the constructed step.
"""
if not isinstance(condition, (ConditionExpr, str)):
raise ValueError("Not supported condition type: %s" % type(condition))
elif isinstance(condition, ConditionExpr):
condition_depends = condition.get_depends_steps()
depends = list(filter(None, (depends or []) + condition_depends))
super().__init__(inputs=inputs, name=name, depends=depends, component=component)
self.condition = condition
def to_dict(self):
d = super(ConditionStep, self).to_dict()
d["spec"]["when"] = (
self.condition.to_expr()
if isinstance(self.condition, ConditionExpr)
else str(self.condition)
)
return d
class LoopStep(PipelineStep):
"""Represent a parallel execution step in pipeline."""
DEFAULT_PARALLELISM_COUNT = 5
def __init__(
self,
items,
parallelism=None,
inputs=None,
name=None,
depends=None,
component=None,
):
"""Construct a LoopStep to support for-loop execution in pipeline.
A LoopStep invoke the component in for-loop execution style.
Args:
items (Union[str, ConditionExpr]): Condition expression used to determine
if the step should be executed, could be ConditionExpr or str.
parallelism (int): Max execution parallelism of the step.
inputs (dict): Inputs for the step in dict: key is the component input name, value
could be the output artifact/parameter from other step, input of the pipeline,
or actual value for the step.
name (str): Name of the step in pipeline, must be unique in the pipeline.
depends (list): A list of PipelineStep which step depends.
component (OperatorBase): The component used by the constructed step.
"""
if (
isinstance(items, PipelineParameter)
and items.parent
and isinstance(items.parent, PipelineStep)
):
depends = depends or []
depends.append(items.parent)
super().__init__(inputs=inputs, name=name, depends=depends, component=component)
self.items = LoopItems(items)
self.parallelism = int(parallelism or type(self).DEFAULT_PARALLELISM_COUNT)
def to_dict(self):
d = super(LoopStep, self).to_dict()
d["spec"].update(self.items.to_dict())
if self.parallelism:
d["spec"]["parallelism"] = self.parallelism
return d