def run_pipeline_from_func()

in python/pipelines/pipeline_ops.py [0:0]


def run_pipeline_from_func(
        pipeline_func: Callable,
        pipeline_root: str,
        project_id: str,
        location: str,
        service_account: str,
        pipeline_parameters: Optional[Dict[str, Any]],
        pipeline_parameters_substitutions: Optional[Dict[str, Any]] = None,
        enable_caching: bool = False,
        experiment_name: str = None,
        job_id: str = None,
        labels: Optional[Dict[str, str]] = None,
        credentials: Optional[credentials.Credentials] = None,
        encryption_spec_key_name: Optional[str] = None,
        wait: bool = False) -> str:
    """
    Runs a Vertex AI Pipeline from a function.

    This function takes a pipeline function, a pipeline root directory, a project ID, a location, a service account, pipeline parameters, and optional parameters for pipeline parameter substitutions, caching, experiment name, job ID, labels, credentials, encryption key name, and waiting for completion. It creates a PipelineJob object from the pipeline function, submits the pipeline to Vertex AI, and optionally waits for the pipeline to complete.

    Args:
        pipeline_func: The pipeline function to run.
        pipeline_root: The root directory of the pipeline.
        project_id: The ID of the project that contains the pipeline.
        location: The location of the pipeline.
        service_account: The service account to use for the pipeline.
        pipeline_parameters: The parameters to pass to the pipeline.
        pipeline_parameters_substitutions: A dictionary of substitutions to apply to the pipeline parameters.
        enable_caching: Whether to enable caching for the pipeline.
        experiment_name: The name of the experiment to create for the pipeline.
        job_id: The ID of the pipeline job.
        labels: The labels to apply to the pipeline.
        credentials: The credentials to use for the pipeline.
        encryption_spec_key_name: The encryption key to use for the pipeline.
        wait: Whether to wait for the pipeline to complete.

    Returns:
        A PipelineJob object.

    Raises:
        RuntimeError: If the pipeline execution fails.
    """

    if pipeline_parameters_substitutions != None:
        pipeline_parameters = substitute_pipeline_params(
            pipeline_parameters, pipeline_parameters_substitutions)

    pl = PipelineJob.from_pipeline_func(
        pipeline_func=pipeline_func,
        parameter_values=pipeline_parameters,
        enable_caching=enable_caching,
        job_id=job_id,
        output_artifacts_gcs_dir=pipeline_root,
        project=project_id,
        location=location,
        credentials=credentials,
        encryption_spec_key_name=encryption_spec_key_name,
        labels=labels
    )
    pl.submit(service_account=service_account, experiment_name=experiment_name)

    if (wait):
        pl.wait()
        if (pl.has_failed):
            raise RuntimeError("Pipeline execution failed")
    return pl