tfx/orchestration/portable/outputs_utils.py (259 lines of code) (raw):
# Copyright 2020 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Portable library for output artifacts resolution including caching decision.
"""
import collections
import copy
import datetime
import os
from typing import Any, Dict, List, Mapping, Optional, Sequence, Union
import uuid
from absl import logging
from tfx import types
from tfx import version
from tfx.dsl.io import fileio
from tfx.orchestration import data_types_utils
from tfx.orchestration import node_proto_view
from tfx.orchestration.portable import data_types
from tfx.proto.orchestration import execution_result_pb2
from tfx.proto.orchestration import pipeline_pb2
from tfx.types import artifact as tfx_artifact
from tfx.types import artifact_utils
from tfx.types.value_artifact import ValueArtifact
from tfx.utils import proto_utils
from ml_metadata.proto import metadata_store_pb2
_SYSTEM = '.system'
_EXECUTOR_EXECUTION = 'executor_execution'
_DRIVER_EXECUTION = 'driver_execution'
_STATEFUL_WORKING_DIR = 'stateful_working_dir'
_DRIVER_OUTPUT_FILE = 'driver_output.pb'
_EXECUTOR_OUTPUT_FILE = 'executor_output.pb'
_VALUE_ARTIFACT_FILE_NAME = 'value'
# The fixed special value to indicate that the binary will set the output URI
# value during its execution.
# LINT.IfChange
RESOLVED_AT_RUNTIME = '{resolved_at_runtime}'
# LINT.ThenChange(<Internal source code>)
_ORCHESTRATOR_GENERATED_BCL_DIR = 'orchestrator_generated_bcl'
_STATEFUL_WORKING_DIR_INDEX = '__stateful_working_dir_index__'
def make_output_dirs(
output_dict: Mapping[str, Sequence[types.Artifact]]) -> None:
"""Make dirs for output artifacts' URI."""
for _, artifact_list in output_dict.items():
for artifact in artifact_list:
# Omit lifecycle management for external artifacts.
if artifact.is_external:
continue
if isinstance(artifact, ValueArtifact):
# If this is a ValueArtifact, create the file if it does not exist.
if not fileio.exists(artifact.uri):
artifact_dir = os.path.dirname(artifact.uri)
fileio.makedirs(artifact_dir)
with fileio.open(artifact.uri, 'w') as f:
# Because fileio.open won't create an empty file, we write an
# empty string to it to force the creation.
f.write('')
else:
# Otherwise create a dir.
fileio.makedirs(artifact.uri)
def remove_output_dirs(
output_dict: Mapping[str, Sequence[types.Artifact]]) -> None:
"""Remove dirs of output artifacts' URI."""
for _, artifact_list in output_dict.items():
for artifact in artifact_list:
# Omit lifecycle management for external artifacts.
if artifact.is_external:
continue
if fileio.isdir(artifact.uri):
fileio.rmtree(artifact.uri)
else:
fileio.remove(artifact.uri)
def remove_stateful_working_dir(stateful_working_dir: str) -> None:
"""Remove stateful_working_dir."""
try:
fileio.rmtree(stateful_working_dir)
logging.info('Deleted stateful_working_dir %s', stateful_working_dir)
except fileio.NotFoundError:
logging.warning(
'stateful_working_dir %s is not found, not going to delete it.',
stateful_working_dir)
def _attach_artifact_properties(spec: pipeline_pb2.OutputSpec.ArtifactSpec,
artifact: types.Artifact):
"""Attaches properties of an artifact using ArtifactSpec."""
for key, value in spec.additional_properties.items():
if not value.HasField('field_value'):
raise RuntimeError('Property value is not a field_value for %s' % key)
if value.field_value.HasField('proto_value'):
# Proto properties need to be unpacked from the google.protobuf.Any
# message to its concrete message before setting the artifact property
property_value = proto_utils.unpack_proto_any(
value.field_value.proto_value)
else:
property_value = data_types_utils.get_metadata_value(value.field_value)
setattr(artifact, key, property_value)
for key, value in spec.additional_custom_properties.items():
if not value.HasField('field_value'):
raise RuntimeError('Property value is not a field_value for %s' % key)
value_type = value.field_value.WhichOneof('value')
if value_type == 'int_value':
artifact.set_int_custom_property(key, value.field_value.int_value)
elif value_type == 'string_value':
artifact.set_string_custom_property(key, value.field_value.string_value)
elif value_type == 'double_value':
artifact.set_float_custom_property(key, value.field_value.double_value)
elif value_type == 'proto_value':
proto_value = proto_utils.unpack_proto_any(value.field_value.proto_value)
artifact.set_proto_custom_property(key, proto_value)
else:
raise RuntimeError(f'Unexpected value_type: {value_type}')
def get_node_dir(
pipeline_runtime_spec: pipeline_pb2.PipelineRuntimeSpec, node_id: str
) -> str:
"""Gets node dir for the given pipeline node."""
return os.path.join(
pipeline_runtime_spec.pipeline_root.field_value.string_value, node_id
)
class OutputsResolver:
"""This class has methods to handle launcher output related logic."""
def __init__(self,
pipeline_node: Union[pipeline_pb2.PipelineNode,
node_proto_view.NodeProtoView],
pipeline_info: pipeline_pb2.PipelineInfo,
pipeline_runtime_spec: pipeline_pb2.PipelineRuntimeSpec,
execution_mode: 'pipeline_pb2.Pipeline.ExecutionMode' = (
pipeline_pb2.Pipeline.SYNC)):
self._pipeline_node = pipeline_node
self._pipeline_info = pipeline_info
self._pipeline_root = (
pipeline_runtime_spec.pipeline_root.field_value.string_value)
self._pipeline_run_id = (
pipeline_runtime_spec.pipeline_run_id.field_value.string_value)
self._execution_mode = execution_mode
self._node_dir = get_node_dir(
pipeline_runtime_spec, pipeline_node.node_info.id
)
def generate_output_artifacts(
self, execution_id: int) -> Dict[str, List[types.Artifact]]:
"""Generates output artifacts given execution_id."""
return generate_output_artifacts(
execution_id=execution_id,
outputs=self._pipeline_node.outputs.outputs,
node_dir=self._node_dir)
def get_executor_output_uri(self, execution_id: int) -> str:
"""Generates executor output uri given execution_id."""
return get_executor_output_uri(self._node_dir, execution_id)
def get_driver_output_uri(self) -> str:
driver_output_dir = os.path.join(
self._node_dir, _SYSTEM, _DRIVER_EXECUTION,
str(int(datetime.datetime.now().timestamp() * 1000000)))
fileio.makedirs(driver_output_dir)
return os.path.join(driver_output_dir, _DRIVER_OUTPUT_FILE)
def get_stateful_working_directory(
self,
execution: metadata_store_pb2.Execution,
) -> str:
"""Generates stateful working directory.
Args:
execution: execution containing stateful_working_dir_index.
Returns:
Path to stateful working directory.
"""
return get_stateful_working_directory(
self._node_dir,
execution=execution,
)
def make_tmp_dir(self, execution_id: int) -> str:
"""Generates a temporary directory."""
return make_tmp_dir(self._node_dir, execution_id)
def _generate_output_artifact(
output_spec: pipeline_pb2.OutputSpec) -> types.Artifact:
"""Generates each output artifact given output_spec."""
artifact = artifact_utils.deserialize_artifact(output_spec.artifact_spec.type)
_attach_artifact_properties(output_spec.artifact_spec, artifact)
if output_spec.artifact_spec.is_async:
# Mark the artifact state as REFERENCE to distinguish it from PUBLISHED
# (LIVE in MLMD) intermediate artifacts emitted during a component's
# execution. At the end of the component's execution, its state will remain
# REFERENCE instead of changing to PUBLISHED.
artifact.state = tfx_artifact.ArtifactState.REFERENCE
return artifact
def generate_output_artifacts(
execution_id: int,
outputs: Mapping[str, pipeline_pb2.OutputSpec],
node_dir: str) -> Dict[str, List[types.Artifact]]:
"""Generates output artifacts.
Args:
execution_id: The id of the execution.
outputs: Mapping from artifact key to its OutputSpec value in pipeline IR.
node_dir: The root directory of the node.
Returns:
Mapping from artifact key to the list of TFX artifacts.
"""
output_artifacts = collections.defaultdict(list)
for key, output_spec in outputs.items():
artifact = _generate_output_artifact(output_spec)
if output_spec.artifact_spec.external_artifact_uris:
for external_uri in output_spec.artifact_spec.external_artifact_uris:
external_artifact = copy.deepcopy(artifact)
external_artifact.uri = external_uri
external_artifact.is_external = True
logging.debug('Creating external output artifact uri %s',
external_artifact.uri)
output_artifacts[key].append(external_artifact)
else:
artifact.uri = os.path.join(node_dir, key, str(execution_id))
if isinstance(artifact, ValueArtifact):
artifact.uri = os.path.join(artifact.uri, _VALUE_ARTIFACT_FILE_NAME)
logging.debug('Creating output artifact uri %s', artifact.uri)
output_artifacts[key].append(artifact)
return output_artifacts
def get_executor_output_dir(execution_info: data_types.ExecutionInfo) -> str:
"""Generates executor output directory for a given execution info."""
return os.path.dirname(execution_info.execution_output_uri)
def get_executor_output_uri(node_dir, execution_id: int) -> str:
"""Generates executor output uri for a given execution_id."""
execution_dir = os.path.join(node_dir, _SYSTEM, _EXECUTOR_EXECUTION,
str(execution_id))
fileio.makedirs(execution_dir)
return os.path.join(execution_dir, _EXECUTOR_OUTPUT_FILE)
def get_stateful_working_dir_index(
execution: Optional[metadata_store_pb2.Execution] = None,
) -> str:
"""Gets stateful working directory index.
Returned the UUID stored in the execution. If the execution is not provided or
UUID is not found in the execution, a new UUID will be returned.
Args:
execution: execution that stores the stateful_working_dir_index.
Returns:
an index for stateful working dir.
"""
index = None
if (
execution is not None
and _STATEFUL_WORKING_DIR_INDEX in execution.custom_properties
):
index = data_types_utils.get_metadata_value(
execution.custom_properties[_STATEFUL_WORKING_DIR_INDEX])
return str(index) if index is not None else str(uuid.uuid4())
def get_stateful_working_directory(
node_dir: str,
execution: metadata_store_pb2.Execution,
) -> str:
"""Generates stateful working directory.
The generated stateful working directory will have the following pattern:
{node_id}/.system/stateful_working_dir/{stateful_working_dir_index}. The
stateful_working_dir_index is an UUID stored as a custom property in the
execution. If no UUID was found in the execution, a new UUID will be
generated and used as the directory suffix.
Args:
node_dir: root directory of the node.
execution: execution containing stateful_working_dir_index.
Returns:
Path to stateful working directory.
"""
# NOTE: If this directory structure is changed, please update
# the remove_stateful_working_dir function in this file accordingly.
# Create stateful working dir for the execution.
stateful_working_dir_index = get_stateful_working_dir_index(execution)
stateful_working_dir = os.path.join(
node_dir, _SYSTEM, _STATEFUL_WORKING_DIR, stateful_working_dir_index
)
if not fileio.exists(stateful_working_dir):
try:
fileio.makedirs(stateful_working_dir)
except Exception: # pylint: disable=broad-except
logging.exception(
'Failed to make stateful working dir: %s',
stateful_working_dir,
)
raise
return stateful_working_dir
def make_tmp_dir(node_dir: str, execution_id: int) -> str:
"""Generates a temporary directory."""
result = os.path.join(node_dir, _SYSTEM, _EXECUTOR_EXECUTION,
str(execution_id), '.temp', '')
fileio.makedirs(result)
return result
def tag_output_artifacts_with_version(
output_artifacts: Optional[Mapping[str, Sequence[types.Artifact]]] = None):
"""Tag output artifacts with the current TFX version."""
if not output_artifacts:
return
for unused_key, artifact_list in output_artifacts.items():
for artifact in artifact_list:
if not artifact.has_custom_property(
artifact_utils.ARTIFACT_TFX_VERSION_CUSTOM_PROPERTY_KEY):
artifact.set_string_custom_property(
artifact_utils.ARTIFACT_TFX_VERSION_CUSTOM_PROPERTY_KEY,
version.__version__)
def populate_output_artifact(
executor_output: execution_result_pb2.ExecutorOutput,
output_dict: Mapping[str, Sequence[types.Artifact]]):
"""Populate output_dict to executor_output."""
for key, artifact_list in output_dict.items():
artifacts = execution_result_pb2.ExecutorOutput.ArtifactList()
for artifact in artifact_list:
artifacts.artifacts.append(artifact.mlmd_artifact)
executor_output.output_artifacts[key].CopyFrom(artifacts)
def populate_exec_properties(
executor_output: execution_result_pb2.ExecutorOutput,
exec_properties: Mapping[str, Any]):
"""Populate exec_properties to executor_output."""
for key, value in exec_properties.items():
v = metadata_store_pb2.Value()
if isinstance(value, str):
v.string_value = value
elif isinstance(value, int):
v.int_value = value
elif isinstance(value, float):
v.double_value = value
else:
logging.info(
'Value type %s of key %s in exec_properties is not '
'supported, going to drop it', type(value), key)
continue
executor_output.execution_properties[key].CopyFrom(v)
def get_orchestrator_generated_bcl_dir(
pipeline_runtime_spec: pipeline_pb2.PipelineRuntimeSpec, node_id: str
) -> str:
"""Generates a root directory to hold orchestrator generated BCLs for the given node.
Args:
pipeline_runtime_spec: pipeline runtime specifications.
node_id: unique id of the node within the pipeline.
Returns:
Path to orchestrator generated bcl root dir, which has the format
`<node_dir>/.system/orchestrator_generated_bcl`
"""
node_dir = get_node_dir(pipeline_runtime_spec, node_id)
orchestrator_generated_bcl_dir = os.path.join(
node_dir, _SYSTEM, _ORCHESTRATOR_GENERATED_BCL_DIR
)
if not fileio.exists(orchestrator_generated_bcl_dir):
try:
fileio.makedirs(orchestrator_generated_bcl_dir)
except Exception: # pylint: disable=broad-except
logging.exception(
'Failed to make orchestrator generated bcl dir: %s',
orchestrator_generated_bcl_dir,
)
raise
return orchestrator_generated_bcl_dir