in tfx/orchestration/kubeflow/v2/pipeline_builder.py [0:0]
def build(self) -> pipeline_pb2.PipelineSpec:
"""Build a pipeline PipelineSpec."""
_check_name(self._pipeline_info.pipeline_name)
_check_default_image(self._default_image)
deployment_config = pipeline_pb2.PipelineDeploymentConfig()
pipeline_info = pipeline_pb2.PipelineInfo(
name=self._pipeline_info.pipeline_name)
self._pipeline.finalize()
# Map from (upstream_node_id, output_key) to output_type (ValueArtifact)
dynamic_exec_properties: dict[tuple[str, str], str] = {}
for component in self._pipeline.components:
for name, value in component.exec_properties.items():
if isinstance(value, placeholder.Placeholder):
try:
# This unwraps channel.future()[0].value and disallows any other
# placeholder expressions.
channel = channel_utils.unwrap_simple_channel_placeholder(value)
except ValueError as e:
raise ValueError(f'Invalid placeholder for exec prop {name}') from e
node_id = channel.producer_component_id
dynamic_exec_properties[
# The cast() is just to tell pytype that it's not None.
(node_id, typing.cast(str, channel.output_key))
] = channel.type.TYPE_NAME
tfx_tasks = {}
component_defs = {}
# Map from (producer component id, output key) to (new producer component
# id, output key)
channel_redirect_map = {}
with parameter_utils.ParameterContext() as pc:
for component in self._pipeline.components:
if self._exit_handler and component.id == utils.TFX_DAG_NAME:
component.with_id(component.id + _generate_component_name_suffix())
logging.warning(
(
'tfx-dag is system reserved name for pipeline with exit'
' handler, added suffix to your component name: %s'
),
component.id,
)
# Here the topological order of components is required.
# If a channel redirection is needed, redirect mapping is expected to be
# available because the upstream node (which is the cause for
# redirecting) is processed before the downstream consumer nodes.
component_image = _get_component_image(
self._default_image, component.id
)
built_tasks = step_builder.StepBuilder(
node=component,
deployment_config=deployment_config,
component_defs=component_defs,
dynamic_exec_properties=dynamic_exec_properties,
dsl_context_reg=self._pipeline.dsl_context_registry,
image=component_image,
image_cmds=self._default_commands,
beam_pipeline_args=self._pipeline.beam_pipeline_args,
enable_cache=self._pipeline.enable_cache,
pipeline_info=self._pipeline_info,
channel_redirect_map=channel_redirect_map,
use_pipeline_spec_2_1=self._use_pipeline_spec_2_1,
).build()
tfx_tasks.update(built_tasks)
result = pipeline_pb2.PipelineSpec(pipeline_info=pipeline_info)
# if exit handler is defined, put all the TFX tasks under tfx_dag,
# exit handler is a separate component triggered by tfx_dag.
if self._exit_handler:
for name, task_spec in tfx_tasks.items():
result.components[utils.TFX_DAG_NAME].dag.tasks[name].CopyFrom(
task_spec)
exit_handler_image = _get_component_image(
self._default_image, self._exit_handler.id
)
with self._pipeline.dsl_context_registry.temporary_mutable():
self._pipeline.dsl_context_registry.put_node(self._exit_handler)
# construct root with exit handler
exit_handler_task = step_builder.StepBuilder(
node=self._exit_handler,
deployment_config=deployment_config,
component_defs=component_defs,
dsl_context_reg=self._pipeline.dsl_context_registry,
dynamic_exec_properties=dynamic_exec_properties,
image=exit_handler_image,
image_cmds=self._default_commands,
beam_pipeline_args=self._pipeline.beam_pipeline_args,
enable_cache=False,
pipeline_info=self._pipeline_info,
channel_redirect_map=channel_redirect_map,
is_exit_handler=True,
use_pipeline_spec_2_1=self._use_pipeline_spec_2_1,
).build()
result.root.dag.tasks[
utils.TFX_DAG_NAME].component_ref.name = utils.TFX_DAG_NAME
result.root.dag.tasks[
utils.TFX_DAG_NAME].task_info.name = utils.TFX_DAG_NAME
result.root.dag.tasks[self._exit_handler.id].CopyFrom(
exit_handler_task[self._exit_handler.id])
else:
for name, task_spec in tfx_tasks.items():
result.root.dag.tasks[name].CopyFrom(task_spec)
result.deployment_spec.update(json_format.MessageToDict(deployment_config))
for name, component_def in component_defs.items():
result.components[name].CopyFrom(component_def)
# Attach runtime parameter to root's input parameter
for param in pc.parameters:
result.root.input_definitions.parameters[param.name].CopyFrom(
self._parameter_type_spec_builder_func(param)
)
return result