pai/pipeline/types/artifact.py (615 lines of code) (raw):

# Copyright 2023 Alibaba, Inc. or its affiliates. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from __future__ import absolute_import import json import re from abc import ABCMeta, abstractmethod import six from odps.df import DataFrame as ODPSDataFrame from odps.models import Partition as ODPSPartition from odps.models import Table as ODPSTable from odps.models import Volume as ODPSVolume from odps.models.ml.offlinemodel import OfflineModel as ODPSOfflineModel from six import with_metaclass from pai.common.oss_utils import OssUriObj from pai.common.utils import is_iterable from pai.pipeline.types.variable import PipelineVariable class ArtifactMetadataUtils(object): """Util class using for create artifact metadata.""" @staticmethod def maxc_table(): """Return metadata represent a MaxComputeTable.""" return LocationArtifactMetadata( data_type=DataType.DataSet, location_type=LocationType.MaxComputeTable, ) @staticmethod def oss_dataset(): return LocationArtifactMetadata( data_type=DataType.DataSet, location_type=LocationType.OSS ) @staticmethod def maxc_offlinemodel(): return LocationArtifactMetadata( data_type=DataType.Model, location_type=LocationType.MaxComputeOfflineModel ) @staticmethod def maxc_volume(): return LocationArtifactMetadata( data_type=DataType.DataSet, location_type=LocationType.MaxComputeVolume ) @staticmethod def raw(): return LocationArtifactMetadata(data_type=DataType.Any) class DataType(object): DataSet = "DataSet" Model = "Model" Any = "Any" ModelEvaluation = "ModelEvaluation" class LocationType(object): MaxComputeTable = "MaxComputeTable" MaxComputeVolume = "MaxComputeVolume" MaxComputeOfflineModel = "MaxComputeOfflineModel" OSS = "OSS" class ModelType(object): OfflineModel = "OfflineModel" PMML = "PMML" class PipelineArtifact(PipelineVariable): """Input/output artifact definition of the Pipeline and operator. Examples: from pai.operator.types import MetadataBuilder, PipelineArtifact from pai.operator import ContainerOperator op = ContainerOperator( image_uri="python:3", inputs=[ PipelineArtifact(name="foo", metadata=MetadataBuilder.to_dict()), ] ) pass """ variable_category = "artifacts" def __init__( self, name, metadata=None, path=None, desc=None, io_type="inputs", value=None, from_=None, required=False, repeated=False, parent=None, ): super(PipelineArtifact, self).__init__( name=name, desc=desc, io_type=io_type, value=value, from_=from_, required=required, parent=parent, ) self.metadata = metadata self.path = path self.repeated = repeated self._count = None @property def count(self): return self._count @count.setter def count(self, value): if not self.repeated: raise ValueError("no repeated artifact do not has count attribute.") if not isinstance(value, six.integer_types) or value <= 0: raise ValueError("invalid count for repeated artifact:%s" % value) self._count = value def reset_count(self): self._count = None def validate_from(self, arg): if isinstance(arg, PipelineArtifact): source = arg if source.repeated and not source.parent: raise ValueError("repeated artifact could not assign entirely.") elif isinstance(arg, PipelineArtifactElement): source = arg.artifact else: raise ValueError( "arg is expected to be type of 'PipelineParameter' " "but was actually of type '%s'" % type(arg) ) if ( source.metadata is not None and self.metadata is not None and source.metadata != self.metadata ): return False return True def assign(self, arg): def _validate(item): if isinstance( item, (PipelineArtifact, PipelineArtifactElement) ) and not self.validate_from(item): raise ValueError( "invalid assignment. %s left: %s, right: %s" % (self.fullname, self, arg) ) elif not isinstance( item, (PipelineArtifact, PipelineArtifactElement) ) and not self.validate_value(item): raise ValueError("Value(%s) is invalid value for %s" % (item, self)) if self.repeated: if is_iterable(arg): value = [item for item in arg] else: value = [arg] for item in value: _validate(item) self.value = value else: _validate(arg) if isinstance(arg, (PipelineArtifact, PipelineArtifactElement)): self.from_ = arg else: # We need to transform artifact input value input to standard artifact value # which PAIFlow service recognized. self.value = self._translate_value(arg)["value"] def __getitem__(self, key): if not self.repeated: raise KeyError("no repeated artifact is not indexable.") if isinstance(key, six.integer_types): return PipelineArtifactElement(self, key) elif isinstance(key, slice): if key.stop is None: raise ValueError( "slice of repeated artifact should have stop property." ) start, stop, step = key.indices(key.stop) indexes = range(stop)[start:stop:step] return [ PipelineArtifactElement(artifact=self, index=idx) for idx in indexes ] raise KeyError("please provide integer to index the artifact element.") # TODO: Artifact value validation def validate_value(self, val): return True def normalized_name(self, index): return "%s_%s" % (self.name, index) def depend_steps(self): def _depend_step(value): if isinstance(value, PipelineArtifact) and value.parent: return value.parent elif isinstance(value, PipelineArtifactElement) and value.artifact.parent: return value.artifact.parent if self.from_: return filter(None, [_depend_step(self.from_)]) elif self.repeated and self.value: return filter(None, [_depend_step(item) for item in self.value]) def translate_argument(self, arg): argument = {"name": self.name} if self.repeated: arg_list = arg if isinstance(arg, list) else [arg] results = [ {"name": self.normalized_name(idx), "value": self._translate_value(v)} for idx, v in enumerate(arg_list) ] argument["value"] = results else: art_value = self._translate_value(arg) argument["value"] = art_value["value"] argument["metadata"] = art_value["metadata"] return argument def _translate_value(self, val): if isinstance(self.metadata, ArtifactMetadataBase): md_val = self.metadata.to_dict() else: md_val = self.metadata try: af_value = LocationArtifactValue.from_resource(val) value = json.dumps(af_value.to_dict(), sort_keys=True) except ValueError: value = val if ( isinstance(self.metadata, LocationArtifactMetadata) and self.metadata.is_raw() ): metadata = LocationArtifactValue.metadata_from_value(val) if metadata: md_val = metadata.to_dict() return { "value": value, "metadata": md_val, } def to_argument(self): argument = {"name": self.name} if self.repeated: values = [] for idx, item in enumerate(self.value): if isinstance(item, (PipelineArtifact, PipelineArtifactElement)): values.append( { "name": self.normalized_name(idx), "from": item.enclosed_fullname, } ) else: values.append( { "name": self.normalized_name(idx), "value": None, } ) argument["value"] = values elif self.from_: argument["from"] = self.from_.enclosed_fullname else: argument["value"] = self.value return argument def to_dict(self): d = super(PipelineArtifact, self).to_dict() d["metadata"] = self.metadata.to_dict() if self.value is not None: if isinstance(self.value, LocationArtifactValue): d["value"] = self.value.to_dict() else: d["value"] = self.value if self.path is not None: d["path"] = self.path d["required"] = self.required if self.repeated: d["repeated"] = self.repeated return d class PipelineArtifactElement(object): def __init__(self, artifact, index): self.artifact = artifact self.index = index self._from = None self._value = None @property def name(self): return "%s_%s" % (self.artifact.name, self.index) @property def fullname(self): return ".".join( [ self.artifact.parent.ref_name, self.artifact.io_type, self.artifact.variable_category, self.name, ] ) @property def enclosed_fullname(self): return "{{%s}}" % self.fullname @property def parent(self): return self.artifact.parent class ArtifactMetadataBase(with_metaclass(ABCMeta)): @abstractmethod def to_dict(self): pass class LocationArtifactMetadata(ArtifactMetadataBase): def __init__(self, data_type, location_type=None, type_attributes=None): self.data_type = data_type self.location_type = location_type self.type_attributes = type_attributes or dict() def __str__(self): return "%s:data_type=%s:location_type=%s" % ( type(self).__name__, self.data_type, self.location_type, ) def is_raw(self): return self.data_type == DataType.Any and self.location_type is None def to_dict(self): d = { "type": { self.data_type: { "locationType": self.location_type, } } } d["type"][self.data_type].update(self.type_attributes) return d @property def value(self): return self.to_dict() def __eq__(self, other): if self.data_type == DataType.Any: return True elif isinstance(other, LocationArtifactMetadata): return other.data_type == DataType.Any or ( self.data_type == other.data_type and self.location_type == other.location_type and self.type_attributes == other.type_attributes ) elif isinstance(other, dict): other = self.from_dict(other) return self.__eq__(other) return False def __ne__(self, other): return not self.__eq__(other) @classmethod def from_dict(cls, d): af_typ = d["type"] data_type = list(af_typ.keys())[0] type_attributes = af_typ[data_type].copy() location_type = type_attributes.pop("locationType", None) return cls( data_type=data_type, location_type=location_type, type_attributes=type_attributes, ) class LocationArtifactValue(object): def __init__(self): pass @classmethod def from_resource(cls, resource, metadata=None): """Get artifact value from input resource and artifact metadata. Args: resource: Input resource which can be ODPS url, OSS url, PyODPS table object, etc. metadata: Metadata for the input artifact. Returns: LocationArtifactValue: A artifact value parsed from input resource. """ from pai.pipeline.artifact import ArchivedArtifact if isinstance(resource, six.string_types): if resource.startswith("odps://"): val, _ = MaxComputeResourceArtifact.from_resource_url(resource) return val elif resource.startswith("oss://"): val, _ = OSSArtifact.from_resource_url(resource) return val else: try: resource_dict = json.loads(resource) except json.JSONDecodeError: raise ValueError("Not support artifact url schema:%s", resource) return cls.from_raw_value(resource_dict, metadata) elif isinstance( resource, (ODPSTable, ODPSPartition, ODPSDataFrame, ODPSVolume) ): return MaxComputeResourceArtifact.from_odps_resource(resource) elif isinstance(resource, ArchivedArtifact): return resource.value elif isinstance(resource, LocationArtifactValue): return resource raise ValueError("Not supported artifact resource:%s", type(resource)) @classmethod def metadata_from_value(cls, resource): """Try to get metadata from raw input for the artifact. Args: resource: Input for the artifact. Returns: (ArtifactMetadataBase): Metadata for the input artifact value. """ from pai.pipeline.artifact import ArchivedArtifact if isinstance(resource, six.string_types): if resource.startswith("odps://"): _, metadata = MaxComputeResourceArtifact.from_resource_url(resource) elif resource.startswith("oss://"): _, metadata = OSSArtifact.from_resource_url(resource) else: return None elif isinstance(resource, (ODPSTable, ODPSPartition, ODPSDataFrame)): metadata = ArtifactMetadataUtils.maxc_table() elif isinstance(resource, ArchivedArtifact): try: metadata = LocationArtifactMetadata.from_dict(resource.metadata) except (KeyError, ValueError): return None else: return None return metadata @classmethod def from_raw_value(cls, value, metadata): if metadata is None: raise ValueError( 400, "ArtifactMetadata should provide while parse artifact value from dict data.", ) if metadata.location_type == LocationType.OSS: return OSSArtifact.from_dict(value) elif metadata.location_type == LocationType.MaxComputeTable: return MaxComputeTableArtifact.from_dict(value) elif metadata.location_type == LocationType.MaxComputeOfflineModel: return MaxComputeOfflineModelArtifact.from_dict(value) elif metadata.location_type == LocationType.MaxComputeVolume: return MaxComputeVolumeArtifact.from_dict(value) else: raise ValueError( "Not support artifact location_type: %s", metadata.location_type ) @classmethod def get_param_ref(cls, param): from pai.pipeline.types import PipelineParameter if not param: return elif isinstance(param, PipelineParameter): ref = param.enclosed_fullname elif isinstance(param, six.string_types): ref = "{{inputs.parameters.%s}}" % param else: raise ValueError( "expect PipelineParameter or string, but given %s", type(param) ) return ref def to_dict(self): pass class MaxComputeResourceArtifact(LocationArtifactValue): MaxComputeResourceUrlPattern = re.compile( r"odps://(?P<project>[^/]+)/(?P<resource_type>(?:tables)|(?:volumes)|(?:offlinemodels))/" r"(?P<resource_name>[^/]+)/?(?P<sub_resource>[^\?]+)?(?P<arguments>[.*])?" ) def __init__(self, project, endpoint=None): super(MaxComputeResourceArtifact, self).__init__() self.project = project self.endpoint = endpoint def to_dict(self): d = { "location": { "project": self.project, } } if self.endpoint: d["location"]["endpoint"] = self.endpoint return d @classmethod def from_resource_url(cls, resource_url): """Parse MaxCompute(ODPS) resource in url schema and returns artifact value and metadata. Args: resource_url: An ODPS(MaxCompute) table, tablePartition, offline-model or volume in url schema. Returns: tuple: A tuple of MaxCompute artifact value and artifact metadata. """ matches = cls.MaxComputeResourceUrlPattern.match(resource_url) if not matches: raise ValueError("Not support MaxCompute resource url format.") resource_type = matches.group("resource_type") project = matches.group("project") if resource_type == "tables": table = matches.group("resource_name") partition = matches.group("sub_resource") return ( MaxComputeTableArtifact( project=project, table=table, partition=partition.strip("/") if partition else None, ), ArtifactMetadataUtils.maxc_table(), ) elif resource_type == "volumes": volume = matches.group("resource_name") sub_resource = matches.group("sub_resource").strip("/") idx = sub_resource.find("/") partition = sub_resource[:idx] file_name = sub_resource[idx + 1 :] return ( MaxComputeVolumeArtifact( project=project, volume=volume, partition=partition, file_name=file_name, ), ArtifactMetadataUtils.maxc_volume(), ) elif resource_type == "offlinemodels": name = matches.group("resource_name") return ( MaxComputeOfflineModelArtifact( project=project, offline_model=name, ), ArtifactMetadataUtils.maxc_offlinemodel(), ) else: raise ValueError("Not support MaxCompute resource type :%s" % resource_type) @classmethod def from_odps_resource(cls, resource): if not isinstance( resource, (ODPSTable, ODPSPartition, ODPSDataFrame, ODPSVolume) ): raise ValueError("Not support resource type:%s" % type(resource)) if isinstance(resource, ODPSDataFrame): resource = resource.data if isinstance(resource, ODPSTable): project = resource.project.name table = resource.name return MaxComputeTableArtifact( project=project, table=table, ) elif isinstance(resource, ODPSPartition): table = resource.table.name project = resource.table.project.name partition = ",".join( ["%s=%s" % (k, v) for k, v in resource.partition_spec.kv.items()] ) return MaxComputeTableArtifact( project=project, partition=partition, table=table ) elif isinstance(resource, ODPSOfflineModel): project = resource.project.name offlinemodel = resource.name return MaxComputeOfflineModelArtifact( project=project, offline_model=offlinemodel ) # TODO: ODPSVolume Support elif isinstance(resource, ODPSVolume): pass class MaxComputeTableArtifact(MaxComputeResourceArtifact): def __init__(self, table, project, endpoint=None, partition=None): super(MaxComputeTableArtifact, self).__init__( project=project, endpoint=endpoint ) self.table = table self.partition = partition def to_dict(self): d = super(MaxComputeTableArtifact, self).to_dict() d["location"]["table"] = self.table if self.partition: d["location"]["partition"] = self.partition return d @classmethod def from_dict(cls, d): table = d["location"]["table"] project = d["location"].get("project") endpoint = d["location"].get("endpoint") partition = d["location"].get("partition") return cls(table=table, project=project, endpoint=endpoint, partition=partition) @classmethod def value_from_param(cls, table_name, partition=None): table_ref = cls.get_param_ref(table_name) partition_ref = cls.get_param_ref(partition) if not table_ref: raise ValueError("MaxComputeTableArtifact require table not be None") d = { "location": { "table": table_ref, } } if partition_ref is not None: d["location"]["partition"] = partition_ref return json.dumps(d) class MaxComputeOfflineModelArtifact(MaxComputeResourceArtifact): def __init__(self, offline_model, project, endpoint=None): super(MaxComputeOfflineModelArtifact, self).__init__( project=project, endpoint=endpoint ) self.offline_model = offline_model def to_dict(self): d = super(MaxComputeOfflineModelArtifact, self).to_dict() d["location"]["name"] = self.offline_model d["name"] = self.offline_model return d @classmethod def from_dict(cls, d): project = d["location"]["project"] name = d["location"]["name"] endpoint = d["location"].get("endpoint") return cls(offline_model=name, project=project, endpoint=endpoint) class MaxComputeVolumeArtifact(MaxComputeResourceArtifact): def __init__(self, volume, project, file_name, endpoint=None, partition=None): super(MaxComputeVolumeArtifact, self).__init__( project=project, endpoint=endpoint ) self.volume = volume self.partition = partition self.file_name = file_name def to_dict(self): d = super(MaxComputeVolumeArtifact, self).to_dict() d["location"]["volume"] = self.volume d["location"]["volumePartition"] = self.partition d["location"]["file"] = self.file_name return d @classmethod def from_dict(cls, d): endpoint = d["location"].get("endpoint", None) project = d["location"]["project"] volume = d["location"]["volume"] partition = d["location"].get("volumePartition", None) file = d["location"].get("file", None) return cls( volume=volume, project=project, file_name=file, endpoint=endpoint, partition=partition, ) class OSSArtifact(LocationArtifactValue): """OSSArtifact instance represent a OSS artifact value.""" def __init__(self, bucket, key, endpoint, role_arn=None): """ Args: bucket: OSS bucket name. key: OSS object key. endpoint: Endpoint for the OSS bucket. role_arn: RoleArn used for OSS access. """ super(OSSArtifact, self).__init__() self.bucket = bucket self.endpoint = endpoint self.role_arn = role_arn self.key = key def to_dict(self): d = { "location": { "endpoint": self.endpoint, "bucket": self.bucket, "key": self.key, } } if self.role_arn: d["location"]["rolearn"] = self.role_arn return d @classmethod def from_resource_url(cls, resource): """Initialize a OSSArtifact instance from URL in OSS schema. Args: resource: URL in OSS schema. Returns: OSSArtifact instance. """ bucket_name, object_key, endpoint, role_arn = OssUriObj.parse(resource) return ( cls( bucket=bucket_name, key=object_key, endpoint=endpoint, role_arn=role_arn ), ArtifactMetadataUtils.oss_dataset(), ) @classmethod def from_dict(cls, d): if not d["location"].get("bucket"): return bucket = d["location"]["bucket"] key = d["location"]["key"] endpoint = d["location"].get("endpoint") rolearn = d["location"].get("rolearn") return OSSArtifact(bucket=bucket, key=key, endpoint=endpoint, role_arn=rolearn)