def build()

in tfx/orchestration/kubeflow/v2/pipeline_builder.py [0:0]


  def build(self) -> pipeline_pb2.PipelineSpec:
    """Build a pipeline PipelineSpec."""

    _check_name(self._pipeline_info.pipeline_name)
    _check_default_image(self._default_image)

    deployment_config = pipeline_pb2.PipelineDeploymentConfig()
    pipeline_info = pipeline_pb2.PipelineInfo(
        name=self._pipeline_info.pipeline_name)

    self._pipeline.finalize()

    # Map from (upstream_node_id, output_key) to output_type (ValueArtifact)
    dynamic_exec_properties: dict[tuple[str, str], str] = {}
    for component in self._pipeline.components:
      for name, value in component.exec_properties.items():
        if isinstance(value, placeholder.Placeholder):
          try:
            # This unwraps channel.future()[0].value and disallows any other
            # placeholder expressions.
            channel = channel_utils.unwrap_simple_channel_placeholder(value)
          except ValueError as e:
            raise ValueError(f'Invalid placeholder for exec prop {name}') from e
          node_id = channel.producer_component_id
          dynamic_exec_properties[
              # The cast() is just to tell pytype that it's not None.
              (node_id, typing.cast(str, channel.output_key))
          ] = channel.type.TYPE_NAME
    tfx_tasks = {}
    component_defs = {}
    # Map from (producer component id, output key) to (new producer component
    # id, output key)
    channel_redirect_map = {}
    with parameter_utils.ParameterContext() as pc:
      for component in self._pipeline.components:
        if self._exit_handler and component.id == utils.TFX_DAG_NAME:
          component.with_id(component.id + _generate_component_name_suffix())
          logging.warning(
              (
                  'tfx-dag is system reserved name for pipeline with exit'
                  ' handler, added suffix to your component name: %s'
              ),
              component.id,
          )
        # Here the topological order of components is required.
        # If a channel redirection is needed, redirect mapping is expected to be
        # available because the upstream node (which is the cause for
        # redirecting) is processed before the downstream consumer nodes.
        component_image = _get_component_image(
            self._default_image, component.id
        )
        built_tasks = step_builder.StepBuilder(
            node=component,
            deployment_config=deployment_config,
            component_defs=component_defs,
            dynamic_exec_properties=dynamic_exec_properties,
            dsl_context_reg=self._pipeline.dsl_context_registry,
            image=component_image,
            image_cmds=self._default_commands,
            beam_pipeline_args=self._pipeline.beam_pipeline_args,
            enable_cache=self._pipeline.enable_cache,
            pipeline_info=self._pipeline_info,
            channel_redirect_map=channel_redirect_map,
            use_pipeline_spec_2_1=self._use_pipeline_spec_2_1,
        ).build()
        tfx_tasks.update(built_tasks)

    result = pipeline_pb2.PipelineSpec(pipeline_info=pipeline_info)

    # if exit handler is defined, put all the TFX tasks under tfx_dag,
    # exit handler is a separate component triggered by tfx_dag.
    if self._exit_handler:
      for name, task_spec in tfx_tasks.items():
        result.components[utils.TFX_DAG_NAME].dag.tasks[name].CopyFrom(
            task_spec)
      exit_handler_image = _get_component_image(
          self._default_image, self._exit_handler.id
      )
      with self._pipeline.dsl_context_registry.temporary_mutable():
        self._pipeline.dsl_context_registry.put_node(self._exit_handler)
      # construct root with exit handler
      exit_handler_task = step_builder.StepBuilder(
          node=self._exit_handler,
          deployment_config=deployment_config,
          component_defs=component_defs,
          dsl_context_reg=self._pipeline.dsl_context_registry,
          dynamic_exec_properties=dynamic_exec_properties,
          image=exit_handler_image,
          image_cmds=self._default_commands,
          beam_pipeline_args=self._pipeline.beam_pipeline_args,
          enable_cache=False,
          pipeline_info=self._pipeline_info,
          channel_redirect_map=channel_redirect_map,
          is_exit_handler=True,
          use_pipeline_spec_2_1=self._use_pipeline_spec_2_1,
      ).build()
      result.root.dag.tasks[
          utils.TFX_DAG_NAME].component_ref.name = utils.TFX_DAG_NAME
      result.root.dag.tasks[
          utils.TFX_DAG_NAME].task_info.name = utils.TFX_DAG_NAME
      result.root.dag.tasks[self._exit_handler.id].CopyFrom(
          exit_handler_task[self._exit_handler.id])
    else:
      for name, task_spec in tfx_tasks.items():
        result.root.dag.tasks[name].CopyFrom(task_spec)

    result.deployment_spec.update(json_format.MessageToDict(deployment_config))
    for name, component_def in component_defs.items():
      result.components[name].CopyFrom(component_def)

    # Attach runtime parameter to root's input parameter
    for param in pc.parameters:
      result.root.input_definitions.parameters[param.name].CopyFrom(
          self._parameter_type_spec_builder_func(param)
      )

    return result