in python/pipelines/pipeline_ops.py [0:0]
def run_pipeline_from_func(
pipeline_func: Callable,
pipeline_root: str,
project_id: str,
location: str,
service_account: str,
pipeline_parameters: Optional[Dict[str, Any]],
pipeline_parameters_substitutions: Optional[Dict[str, Any]] = None,
enable_caching: bool = False,
experiment_name: str = None,
job_id: str = None,
labels: Optional[Dict[str, str]] = None,
credentials: Optional[credentials.Credentials] = None,
encryption_spec_key_name: Optional[str] = None,
wait: bool = False) -> str:
"""
Runs a Vertex AI Pipeline from a function.
This function takes a pipeline function, a pipeline root directory, a project ID, a location, a service account, pipeline parameters, and optional parameters for pipeline parameter substitutions, caching, experiment name, job ID, labels, credentials, encryption key name, and waiting for completion. It creates a PipelineJob object from the pipeline function, submits the pipeline to Vertex AI, and optionally waits for the pipeline to complete.
Args:
pipeline_func: The pipeline function to run.
pipeline_root: The root directory of the pipeline.
project_id: The ID of the project that contains the pipeline.
location: The location of the pipeline.
service_account: The service account to use for the pipeline.
pipeline_parameters: The parameters to pass to the pipeline.
pipeline_parameters_substitutions: A dictionary of substitutions to apply to the pipeline parameters.
enable_caching: Whether to enable caching for the pipeline.
experiment_name: The name of the experiment to create for the pipeline.
job_id: The ID of the pipeline job.
labels: The labels to apply to the pipeline.
credentials: The credentials to use for the pipeline.
encryption_spec_key_name: The encryption key to use for the pipeline.
wait: Whether to wait for the pipeline to complete.
Returns:
A PipelineJob object.
Raises:
RuntimeError: If the pipeline execution fails.
"""
if pipeline_parameters_substitutions != None:
pipeline_parameters = substitute_pipeline_params(
pipeline_parameters, pipeline_parameters_substitutions)
pl = PipelineJob.from_pipeline_func(
pipeline_func=pipeline_func,
parameter_values=pipeline_parameters,
enable_caching=enable_caching,
job_id=job_id,
output_artifacts_gcs_dir=pipeline_root,
project=project_id,
location=location,
credentials=credentials,
encryption_spec_key_name=encryption_spec_key_name,
labels=labels
)
pl.submit(service_account=service_account, experiment_name=experiment_name)
if (wait):
pl.wait()
if (pl.has_failed):
raise RuntimeError("Pipeline execution failed")
return pl