python/pipelines/scheduler.py (101 lines of code) (raw):

# Copyright 2023 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import logging import os from argparse import ArgumentParser, ArgumentTypeError import yaml from pipelines.pipeline_ops import pause_schedule, schedule_pipeline, delete_schedules # Ensures that the provided file path is a valid YAML file. def check_extention(file_path: str, type: str = '.yaml'): if os.path.exists(file_path): if not file_path.lower().endswith(type): raise ArgumentTypeError(f"File provited must be {type}: {file_path}") else: raise FileNotFoundError(f"{file_path} does not exist") return file_path # config path : pipeline module and function name pipelines_list = { 'vertex_ai.pipelines.feature-creation-auto-audience-segmentation.execution': "pipelines.feature_engineering_pipelines.auto_audience_segmentation_feature_engineering_pipeline", 'vertex_ai.pipelines.feature-creation-aggregated-value-based-bidding.execution': "pipelines.feature_engineering_pipelines.aggregated_value_based_bidding_feature_engineering_pipeline", 'vertex_ai.pipelines.feature-creation-audience-segmentation.execution': "pipelines.feature_engineering_pipelines.audience_segmentation_feature_engineering_pipeline", 'vertex_ai.pipelines.feature-creation-purchase-propensity.execution': "pipelines.feature_engineering_pipelines.purchase_propensity_feature_engineering_pipeline", 'vertex_ai.pipelines.feature-creation-churn-propensity.execution': "pipelines.feature_engineering_pipelines.churn_propensity_feature_engineering_pipeline", 'vertex_ai.pipelines.feature-creation-customer-ltv.execution': "pipelines.feature_engineering_pipelines.customer_lifetime_value_feature_engineering_pipeline", 'vertex_ai.pipelines.feature-creation-lead-score-propensity.execution': "pipelines.feature_engineering_pipelines.lead_score_propensity_feature_engineering_pipeline", 'vertex_ai.pipelines.purchase_propensity.training': None, # tabular workflows pipelines is precompiled 'vertex_ai.pipelines.purchase_propensity.prediction': "pipelines.tabular_pipelines.prediction_binary_classification_pl", 'vertex_ai.pipelines.lead_score_propensity.training': None, # tabular workflows pipelines is precompiled 'vertex_ai.pipelines.lead_score_propensity.prediction': "pipelines.tabular_pipelines.prediction_binary_classification_pl", 'vertex_ai.pipelines.churn_propensity.training': None, # tabular workflows pipelines is precompiled 'vertex_ai.pipelines.churn_propensity.prediction': "pipelines.tabular_pipelines.prediction_binary_classification_pl", 'vertex_ai.pipelines.segmentation.training': "pipelines.segmentation_pipelines.training_pl", 'vertex_ai.pipelines.segmentation.prediction': "pipelines.segmentation_pipelines.prediction_pl", 'vertex_ai.pipelines.auto_segmentation.training': "pipelines.auto_segmentation_pipelines.training_pl", 'vertex_ai.pipelines.auto_segmentation.prediction': "pipelines.auto_segmentation_pipelines.prediction_pl", 'vertex_ai.pipelines.propensity_clv.training': None, # tabular workflows pipelines is precompiled 'vertex_ai.pipelines.clv.training': None, # tabular workflows pipelines is precompiled 'vertex_ai.pipelines.clv.prediction': "pipelines.tabular_pipelines.prediction_binary_classification_regression_pl", 'vertex_ai.pipelines.value_based_bidding.training': None, # tabular workflows pipelines is precompiled 'vertex_ai.pipelines.value_based_bidding.explanation': "pipelines.tabular_pipelines.explanation_tabular_workflow_regression_pl", 'vertex_ai.pipelines.reporting_preparation.execution': "pipelines.feature_engineering_pipelines.reporting_preparation_pl", 'vertex_ai.pipelines.gemini_insights.execution': "pipelines.feature_engineering_pipelines.gemini_insights_pl", } # key should match pipeline names as in the config.yaml files for automatic compilation if __name__ == "__main__": """ This Python code defines a script for scheduling and deleting Vertex AI pipelines. It uses the pipelines_list dictionary to map pipeline names to their corresponding module and function names. this script provides a convenient way to schedule and delete Vertex AI pipelines schedules from the command line. The script takes the following arguments: -c: Path to the configuration YAML file. -p: Pipeline key name as it is in the config.yaml file. -i: The compiled pipeline input filename. -d: (Optional) Flag to delete the scheduled pipeline. """ logging.basicConfig(level=logging.INFO) parser = ArgumentParser() parser.add_argument("-c", "--config-file", dest="config", required=True, help="path to config YAML file (config.yaml)") parser.add_argument("-p", '--pipeline-config-name', dest="pipeline", required=True, choices=list(pipelines_list.keys()), help='Pipeline key name as it is in config.yaml') parser.add_argument("-i", '--input-file', dest="input", required=True, help='the compiled pipeline input filename') parser.add_argument("-d", '--delete', dest="delete", required=False, action='store_true', help='if flag is set- delete scheduled pipeline') args = parser.parse_args() # Reads the configuration YAML file and extracts the relevant parameters for the pipeline # and the artifact registry. It then checks if the pipeline name is valid and retrieves # the corresponding module and function name from the pipelines_list dictionary. repo_params = {} with open(args.config, encoding='utf-8') as fh: params = yaml.full_load(fh) repo_params = params['artifact_registry']['pipelines_repo'] generic_pipeline_vars = params['vertex_ai']['pipelines'] my_pipeline_vars=params with open(args.config, encoding='utf-8') as fh: for i in args.pipeline.split('.'): my_pipeline_vars = my_pipeline_vars[i] if my_pipeline_vars['name'] is None: raise Exception("No pipeline display_name provided for deleting schedules.") template_artifact_uri = f"https://{repo_params['region']}-kfp.pkg.dev/{repo_params['project_id']}/{repo_params['name']}/{my_pipeline_vars['name']}/latest" if args.delete: # If the -d flag is set, the script calls the delete_schedules function to delete the # scheduled pipeline. logging.info(f"Deleting scheduler for {args.pipeline}") delete_schedules(project_id=generic_pipeline_vars['project_id'], region=generic_pipeline_vars['region'], pipeline_name=my_pipeline_vars['name']) else: logging.info(f"Creating scheduler for {args.pipeline}") # Creates a new schedule for the pipeline and returns the schedule object. # If the schedule is successfully created, the script checks if the pipeline is supposed # to be paused and calls the pause_schedule function to pause it. schedule = schedule_pipeline( project_id=generic_pipeline_vars['project_id'], region=generic_pipeline_vars['region'], template_path = args.input, pipeline_parameters=my_pipeline_vars['pipeline_parameters'], pipeline_parameters_substitutions= my_pipeline_vars['pipeline_parameters_substitutions'], pipeline_sa=generic_pipeline_vars['service_account'], pipeline_name=my_pipeline_vars['name'], pipeline_root=generic_pipeline_vars['root_path'], cron=my_pipeline_vars['schedule']['cron'], max_concurrent_run_count=my_pipeline_vars['schedule']['max_concurrent_run_count'], start_time=my_pipeline_vars['schedule']['start_time'], end_time=my_pipeline_vars['schedule']['end_time'], subnetwork=my_pipeline_vars['schedule']['subnetwork'], use_private_service_access=my_pipeline_vars['schedule']['use_private_service_access'], ) if my_pipeline_vars['schedule']['state'] == 'PAUSED': logging.info(f"Pausing scheduler for {args.pipeline}") pause_schedule( project_id=generic_pipeline_vars['project_id'], region=generic_pipeline_vars['region'], pipeline_name=my_pipeline_vars['name'])