python/pipelines/compiler.py (79 lines of code) (raw):

# Copyright 2023 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import importlib, yaml, logging from pipelines.pipeline_ops import compile_pipeline, compile_automl_tabular_pipeline from argparse import ArgumentParser ''' example: python -m pipelines.compiler -c ../config/conf.yaml -p train_pipeline -o my_comp_pl.yaml ''' # config path : pipeline module and function name # This dictionary maps pipeline names to their corresponding module and function names. # This allows the script to dynamically import the correct pipeline function based on the provided pipeline name. pipelines_list = { 'vertex_ai.pipelines.feature-creation-auto-audience-segmentation.execution': "pipelines.feature_engineering_pipelines.auto_audience_segmentation_feature_engineering_pipeline", 'vertex_ai.pipelines.feature-creation-aggregated-value-based-bidding.execution': "pipelines.feature_engineering_pipelines.aggregated_value_based_bidding_feature_engineering_pipeline", 'vertex_ai.pipelines.feature-creation-audience-segmentation.execution': "pipelines.feature_engineering_pipelines.audience_segmentation_feature_engineering_pipeline", 'vertex_ai.pipelines.feature-creation-purchase-propensity.execution': "pipelines.feature_engineering_pipelines.purchase_propensity_feature_engineering_pipeline", 'vertex_ai.pipelines.feature-creation-churn-propensity.execution': "pipelines.feature_engineering_pipelines.churn_propensity_feature_engineering_pipeline", 'vertex_ai.pipelines.feature-creation-customer-ltv.execution': "pipelines.feature_engineering_pipelines.customer_lifetime_value_feature_engineering_pipeline", 'vertex_ai.pipelines.feature-creation-lead-score-propensity.execution': "pipelines.feature_engineering_pipelines.lead_score_propensity_feature_engineering_pipeline", 'vertex_ai.pipelines.auto_segmentation.training': "pipelines.auto_segmentation_pipelines.training_pl", 'vertex_ai.pipelines.auto_segmentation.prediction': "pipelines.auto_segmentation_pipelines.prediction_pl", 'vertex_ai.pipelines.segmentation.training': "pipelines.segmentation_pipelines.training_pl", 'vertex_ai.pipelines.segmentation.prediction': "pipelines.segmentation_pipelines.prediction_pl", 'vertex_ai.pipelines.purchase_propensity.training': None, # tabular workflows pipelines is precompiled 'vertex_ai.pipelines.purchase_propensity.prediction': "pipelines.tabular_pipelines.prediction_binary_classification_pl", 'vertex_ai.pipelines.churn_propensity.training': None, # tabular workflows pipelines is precompiled 'vertex_ai.pipelines.churn_propensity.prediction': "pipelines.tabular_pipelines.prediction_binary_classification_pl", 'vertex_ai.pipelines.lead_score_propensity.training': None, # tabular workflows pipelines is precompiled 'vertex_ai.pipelines.lead_score_propensity.prediction': "pipelines.tabular_pipelines.prediction_binary_classification_pl", 'vertex_ai.pipelines.propensity_clv.training': None, # tabular workflows pipelines is precompiled 'vertex_ai.pipelines.clv.training': None, # tabular workflows pipelines is precompiled 'vertex_ai.pipelines.clv.prediction': "pipelines.tabular_pipelines.prediction_binary_classification_regression_pl", 'vertex_ai.pipelines.value_based_bidding.training': None, # tabular workflows pipelines is precompiled 'vertex_ai.pipelines.value_based_bidding.explanation': "pipelines.tabular_pipelines.explanation_tabular_workflow_regression_pl", 'vertex_ai.pipelines.reporting_preparation.execution': "pipelines.feature_engineering_pipelines.reporting_preparation_pl", 'vertex_ai.pipelines.gemini_insights.execution': "pipelines.feature_engineering_pipelines.gemini_insights_pl", } # key should match pipeline names as in the `config.yaml.tftpl` files for automatic compilation if __name__ == "__main__": """ This Python code defines a script for compiling Vertex AI pipelines. This script provides a convenient way to compile Vertex AI pipelines from a configuration file. It allows users to specify the pipeline name, parameters, and output filename, and it automatically handles the compilation process. It takes three arguments: -c: Path to the configuration YAML file (config.yaml) -p: Pipeline key name as it is in config.yaml -o: The compiled pipeline output filename """ logging.basicConfig(level=logging.INFO) parser = ArgumentParser() parser.add_argument("-c", "--config-file", dest="config", required=True, help="path to config YAML file (config.yaml)") parser.add_argument("-p", '--pipeline-config-name', dest="pipeline", required=True, choices=list(pipelines_list.keys()), help='Pipeline key name as it is in config.yaml') parser.add_argument("-o", '--output-file', dest="output", required=True, help='the compiled pipeline output filename') # Parses the provided command-line arguments. It retrieves the path to the configuration file, the pipeline name, and the output filename. args = parser.parse_args() pipeline_params={} # Opens the configuration file and uses the yaml module to parse it. # It extracts the pipeline parameters based on the provided pipeline name. with open(args.config, encoding='utf-8') as fh: pipeline_params = yaml.full_load(fh) for i in args.pipeline.split('.'): print(i) pipeline_params = pipeline_params[i] logging.info(pipeline_params) # The script checks the pipeline type: # If the pipeline type is tabular-workflows, it uses the compile_automl_tabular_pipeline function to compile the pipeline. # Otherwise, it uses the compile_pipeline function to compile the pipeline. # Both functions take the following arguments: # template_path: Path to the compiled pipeline template file. # pipeline_name: Name of the pipeline. # pipeline_parameters: Parameters to pass to the pipeline. # pipeline_parameters_substitutions: Substitutions to apply to the pipeline parameters. # enable_caching: Whether to enable caching for the pipeline. # type_check: Whether to perform type checking on the pipeline parameters. # The compile_automl_tabular_pipeline function also takes the following arguments: # parameters_path: Path to the pipeline parameters file. # exclude_features: List of features to exclude from the pipeline. if pipeline_params['type'] == 'tabular-workflows': compile_automl_tabular_pipeline( template_path = args.output, parameters_path="params.yaml", pipeline_name=pipeline_params['name'], pipeline_parameters=pipeline_params['pipeline_parameters'], pipeline_parameters_substitutions= pipeline_params['pipeline_parameters_substitutions'], exclude_features = pipeline_params['exclude_features'], enable_caching=False, ) else: module_name = '.'.join(pipelines_list[args.pipeline].split('.')[:-1]) function_name = pipelines_list[args.pipeline].split('.')[-1] compile_pipeline( pipeline_func = getattr(importlib.import_module(module_name),function_name), template_path = args.output, pipeline_name = pipeline_params['name'], pipeline_parameters = pipeline_params['pipeline_parameters'], pipeline_parameters_substitutions = pipeline_params['pipeline_parameters_substitutions'], enable_caching=False, type_check=False, )