in python/pipelines/pipeline_ops.py [0:0]
def compile_pipeline(
pipeline_func: Callable,
template_path: str,
pipeline_name: str,
pipeline_parameters: Optional[Dict[str, Any]] = None,
pipeline_parameters_substitutions: Optional[Dict[str, Any]] = None,
enable_caching: bool = True,
type_check: bool = True) -> str:
"""
Compiles a Vertex AI Pipeline.
This function takes a pipeline function, a template path, a pipeline name, and optional pipeline parameters and substitutions, and compiles them into a Vertex AI Pipeline YAML file.
Args:
pipeline_func: The pipeline function to compile.
template_path: The path to the pipeline template file.
pipeline_name: The name of the pipeline.
pipeline_parameters: The parameters to pass to the pipeline.
pipeline_parameters_substitutions: A dictionary of substitutions to apply to the pipeline parameters.
enable_caching: Whether to enable caching for the pipeline.
type_check: Whether to perform type checking on the pipeline parameters.
Returns:
The path to the compiled pipeline YAML file.
Raises:
Exception: If an error occurs while compiling the pipeline.
"""
if pipeline_parameters_substitutions != None:
pipeline_parameters = substitute_pipeline_params(
pipeline_parameters, pipeline_parameters_substitutions)
logging.info("Pipeline parameters: {}".format(pipeline_parameters))
#pipeline_parameters.pop('columns_to_skip', None)
# The function uses the compiler.Compiler() class to compile the pipeline defined by the pipeline_func function.
# The compiled pipeline is saved to the template_path file.
compiler.Compiler().compile(
pipeline_func=pipeline_func,
package_path=template_path,
pipeline_name=pipeline_name,
pipeline_parameters=pipeline_parameters,
type_check=type_check,
)
# The function opens the compiled pipeline template file and loads the configuration using the yaml.safe_load() function.
with open(template_path, 'r') as file:
configuration = yaml.safe_load(file)
# The function sets the enable_caching value of the configuration to the enable_caching parameter.
_set_enable_caching_value(pipeline_spec=configuration,
enable_caching=enable_caching)
# Saves the updated pipeline configuration back to the template_path file.
with open(template_path, 'w') as yaml_file:
yaml.dump(configuration, yaml_file)
return template_path