python/pipelines/feature_engineering_pipelines.py (315 lines of code) (raw):

# Copyright 2023 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from typing import Optional import kfp as kfp import kfp.dsl as dsl from pipelines.components.bigquery.component import bq_stored_procedure_exec as sp from pipelines.components.bigquery.component import ( bq_dynamic_query_exec_output, bq_dynamic_stored_procedure_exec_output_full_dataset_preparation) @dsl.pipeline() def auto_audience_segmentation_feature_engineering_pipeline( project_id: str, location: Optional[str], dataset: str, date_start: str, date_end: str, feature_table: str, mds_project_id: str, mds_dataset: str, stored_procedure_name: str, full_dataset_table: str, reg_expression: str, query_auto_audience_segmentation_inference_preparation: str, query_auto_audience_segmentation_training_preparation: str, perc_keep: int = 35, timeout: Optional[float] = 3600.0 ): """ This pipeline defines the steps for feature engineering for the auto audience segmentation model. Args: project_id: The Google Cloud project ID. location: The Google Cloud region where the pipeline will be run. dataset: The BigQuery dataset where the raw data is stored. date_start: The start date for the data to be processed. date_end: The end date for the data to be processed. feature_table: The BigQuery table where the feature data will be stored. mds_project_id: The Google Cloud project ID where the Marketing Data Store (MDS) is located. mds_dataset: The MDS dataset where the product data is stored. stored_procedure_name: The name of the BigQuery stored procedure that will be used to prepare the full dataset. full_dataset_table: The BigQuery table where the full dataset will be stored. #training_table: The BigQuery table where the training data will be stored. #inference_table: The BigQuery table where the inference data will be stored. reg_expression: The regular expression that will be used to identify the pages to be included in the analysis. query_auto_audience_segmentation_inference_preparation: The SQL query that will be used to prepare the inference data. query_auto_audience_segmentation_training_preparation: The SQL query that will be used to prepare the training data. perc_keep: The percentage of pages to be included in the analysis. timeout: The timeout for the pipeline in seconds. Returns: None """ # Feature data preparation feature_table_preparation = bq_dynamic_query_exec_output( location=location, project_id=project_id, dataset=dataset, create_table=feature_table, mds_project_id=mds_project_id, mds_dataset=mds_dataset, date_start=date_start, date_end=date_end, perc_keep=perc_keep, reg_expression=reg_expression ) full_dataset_table_preparation = bq_dynamic_stored_procedure_exec_output_full_dataset_preparation( location=location, project_id=project_id, dataset=dataset, mds_project_id=mds_project_id, mds_dataset=mds_dataset, dynamic_table_input=feature_table_preparation.outputs['destination_table'], stored_procedure_name=stored_procedure_name, full_dataset_table=full_dataset_table, reg_expression=reg_expression ).after(*[feature_table_preparation]) # Training data preparation auto_audience_segmentation_training_prep = sp( project=project_id, location=location, query=query_auto_audience_segmentation_training_preparation, timeout=timeout).after(*[full_dataset_table_preparation]).set_display_name('auto_audience_segmentation_training_preparation') # Inference data preparation auto_audience_segmentation_inf_prep = sp( project=project_id, location=location, query=query_auto_audience_segmentation_inference_preparation, timeout=timeout).after(*[auto_audience_segmentation_training_prep]).set_display_name('auto_audience_segmentation_inference_preparation') @dsl.pipeline() def aggregated_value_based_bidding_feature_engineering_pipeline( project_id: str, location: Optional[str], query_aggregated_value_based_bidding_training_preparation: str, query_aggregated_value_based_bidding_explanation_preparation: str, timeout: Optional[float] = 3600.0 ): """ This pipeline defines the steps for feature engineering for the aggregated value based bidding model. Args: project_id: The Google Cloud project ID. location: The Google Cloud region where the pipeline will be run. query_aggregated_value_based_bidding_training_preparation: The SQL query that will be used to prepare the training data. query_aggregated_value_based_bidding_explanation_preparation: The SQL query that will be used to prepare the explanation data. timeout: The timeout for the pipeline in seconds. Returns: None """ # Training data preparation training_table_preparation = sp( project=project_id, location=location, query=query_aggregated_value_based_bidding_training_preparation, timeout=timeout).set_display_name('aggregated_value_based_bidding_training_preparation') # Explanation data preparation explanation_table_preparation = sp( project=project_id, location=location, query=query_aggregated_value_based_bidding_explanation_preparation, timeout=timeout).set_display_name('aggregated_value_based_bidding_explanation_preparation') @dsl.pipeline() def audience_segmentation_feature_engineering_pipeline( project_id: str, location: Optional[str], query_user_lookback_metrics: str, query_user_segmentation_dimensions: str, query_audience_segmentation_inference_preparation: str, query_audience_segmentation_training_preparation: str, timeout: Optional[float] = 3600.0 ): """ This pipeline defines the steps for feature engineering for the audience segmentation model. Args: project_id: The Google Cloud project ID. location: The Google Cloud region where the pipeline will be run. query_user_lookback_metrics: The SQL query that will be used to calculate the user lookback metrics. query_user_segmentation_dimensions: The SQL query that will be used to calculate the user segmentation dimensions. query_audience_segmentation_inference_preparation: The SQL query that will be used to prepare the inference data. query_audience_segmentation_training_preparation: The SQL query that will be used to prepare the training data. timeout: The timeout for the pipeline in seconds. Returns: None """ # Features Preparation phase_1 = list() phase_1.append(sp( project=project_id, location=location, query=query_user_lookback_metrics, timeout=timeout).set_display_name('user_lookback_metrics') ) phase_1.append( sp( project=project_id, location=location, query=query_user_segmentation_dimensions, timeout=timeout).set_display_name('user_segmentation_dimensions') ) # Training data preparation audience_segmentation_train_prep = sp( project=project_id, location=location, query=query_audience_segmentation_training_preparation, timeout=timeout).set_display_name('audience_segmentation_training_preparation').after(*phase_1) # Inference data preparation audience_segmentation_inf_prep = sp( project=project_id, location=location, query=query_audience_segmentation_inference_preparation, timeout=timeout).set_display_name('audience_segmentation_inference_preparation').after(*phase_1) @dsl.pipeline() def lead_score_propensity_feature_engineering_pipeline( project_id: str, location: Optional[str], query_lead_score_propensity_label: str, query_user_dimensions: str, query_user_rolling_window_metrics: str, query_lead_score_propensity_inference_preparation: str, query_lead_score_propensity_training_preparation: str, timeout: Optional[float] = 3600.0 ): """ This pipeline defines the steps for feature engineering for the lead score propensity model. Args: project_id: The Google Cloud project ID. location: The Google Cloud region where the pipeline will be run. query_lead_score_propensity_label: The SQL query that will be used to calculate the purchase propensity label. query_user_dimensions: The SQL query that will be used to calculate the user dimensions. query_user_rolling_window_metrics: The SQL query that will be used to calculate the user rolling window metrics. query_lead_score_propensity_inference_preparation: The SQL query that will be used to prepare the inference data. query_lead_score_propensity_training_preparation: The SQL query that will be used to prepare the training data. timeout: The timeout for the pipeline in seconds. Returns: None """ # Features Preparation phase_1 = list() phase_1.append( sp( project=project_id, location=location, query=query_lead_score_propensity_label, timeout=timeout).set_display_name('lead_score_propensity_label') ) phase_1.append( sp( project=project_id, location=location, query=query_user_dimensions, timeout=timeout).set_display_name('user_dimensions') ) phase_1.append( sp( project=project_id, location=location, query=query_user_rolling_window_metrics, timeout=timeout).set_display_name('user_rolling_window_metrics') ) # Training data preparation purchase_propensity_train_prep = sp( project=project_id, location=location, query=query_lead_score_propensity_training_preparation, timeout=timeout).set_display_name('lead_score_propensity_training_preparation').after(*phase_1) # Inference data preparation purchase_propensity_inf_prep = sp( project=project_id, location=location, query=query_lead_score_propensity_inference_preparation, timeout=timeout).set_display_name('lead_score_propensity_inference_preparation').after(*phase_1) @dsl.pipeline() def purchase_propensity_feature_engineering_pipeline( project_id: str, location: Optional[str], query_purchase_propensity_label: str, query_user_dimensions: str, query_user_rolling_window_metrics: str, query_purchase_propensity_inference_preparation: str, query_purchase_propensity_training_preparation: str, timeout: Optional[float] = 3600.0 ): """ This pipeline defines the steps for feature engineering for the purchase propensity model. Args: project_id: The Google Cloud project ID. location: The Google Cloud region where the pipeline will be run. query_purchase_propensity_label: The SQL query that will be used to calculate the purchase propensity label. query_user_dimensions: The SQL query that will be used to calculate the user dimensions. query_user_rolling_window_metrics: The SQL query that will be used to calculate the user rolling window metrics. query_purchase_propensity_inference_preparation: The SQL query that will be used to prepare the inference data. query_purchase_propensity_training_preparation: The SQL query that will be used to prepare the training data. timeout: The timeout for the pipeline in seconds. Returns: None """ # Features Preparation phase_1 = list() phase_1.append( sp( project=project_id, location=location, query=query_purchase_propensity_label, timeout=timeout).set_display_name('purchase_propensity_label') ) phase_1.append( sp( project=project_id, location=location, query=query_user_dimensions, timeout=timeout).set_display_name('user_dimensions') ) phase_1.append( sp( project=project_id, location=location, query=query_user_rolling_window_metrics, timeout=timeout).set_display_name('user_rolling_window_metrics') ) # Training data preparation purchase_propensity_train_prep = sp( project=project_id, location=location, query=query_purchase_propensity_training_preparation, timeout=timeout).set_display_name('purchase_propensity_training_preparation').after(*phase_1) # Inference data preparation purchase_propensity_inf_prep = sp( project=project_id, location=location, query=query_purchase_propensity_inference_preparation, timeout=timeout).set_display_name('purchase_propensity_inference_preparation').after(*phase_1) @dsl.pipeline() def churn_propensity_feature_engineering_pipeline( project_id: str, location: Optional[str], query_churn_propensity_label: str, query_user_dimensions: str, query_user_rolling_window_metrics: str, query_churn_propensity_inference_preparation: str, query_churn_propensity_training_preparation: str, timeout: Optional[float] = 3600.0 ): """ This pipeline defines the steps for feature engineering for the churn propensity model. Args: project_id: The Google Cloud project ID. location: The Google Cloud region where the pipeline will be run. query_churn_propensity_label: The SQL query that will be used to calculate the churn propensity label. query_user_dimensions: The SQL query that will be used to calculate the user dimensions. query_user_rolling_window_metrics: The SQL query that will be used to calculate the user rolling window metrics. query_churn_propensity_inference_preparation: The SQL query that will be used to prepare the inference data. query_churn_propensity_training_preparation: The SQL query that will be used to prepare the training data. timeout: The timeout for the pipeline in seconds. Returns: None """ # Features Preparation phase_1 = list() phase_1.append( sp( project=project_id, location=location, query=query_churn_propensity_label, timeout=timeout).set_display_name('churn_propensity_label') ) phase_1.append( sp( project=project_id, location=location, query=query_user_dimensions, timeout=timeout).set_display_name('user_dimensions') ) phase_1.append( sp( project=project_id, location=location, query=query_user_rolling_window_metrics, timeout=timeout).set_display_name('user_rolling_window_metrics') ) # Training data preparation churn_propensity_train_prep = sp( project=project_id, location=location, query=query_churn_propensity_training_preparation, timeout=timeout).set_display_name('churn_propensity_training_preparation').after(*phase_1) # Inference data preparation churn_propensity_inf_prep = sp( project=project_id, location=location, query=query_churn_propensity_inference_preparation, timeout=timeout).set_display_name('churn_propensity_inference_preparation').after(*phase_1) @dsl.pipeline() def customer_lifetime_value_feature_engineering_pipeline( project_id: str, location: Optional[str], query_customer_lifetime_value_label: str, query_user_lifetime_dimensions: str, query_user_rolling_window_lifetime_metrics: str, query_customer_lifetime_value_inference_preparation: str, query_customer_lifetime_value_training_preparation: str, timeout: Optional[float] = 3600.0 ): """ This pipeline defines the steps for feature engineering for the customer lifetime value model. Args: project_id: The Google Cloud project ID. location: The Google Cloud region where the pipeline will be run. query_customer_lifetime_value_label: The SQL query that will be used to calculate the customer lifetime value label. query_user_lifetime_dimensions: The SQL query that will be used to calculate the user lifetime dimensions. query_user_rolling_window_lifetime_metrics: The SQL query that will be used to calculate the user rolling window lifetime metrics. query_customer_lifetime_value_inference_preparation: The SQL query that will be used to prepare the inference data. query_customer_lifetime_value_training_preparation: The SQL query that will be used to prepare the training data. timeout: The timeout for the pipeline in seconds. Returns: None """ # Features Preparation phase_1 = list() phase_1.append( sp( project=project_id, location=location, query=query_customer_lifetime_value_label, timeout=timeout).set_display_name('customer_lifetime_value_label') ) phase_1.append( sp( project=project_id, location=location, query=query_user_lifetime_dimensions, timeout=timeout).set_display_name('user_lifetime_dimensions') ) phase_1.append( sp( project=project_id, location=location, query=query_user_rolling_window_lifetime_metrics, timeout=timeout).set_display_name('user_rolling_window_lifetime_metrics') ) # Training data preparation customer_lifetime_value_train_prep = sp( project=project_id, location=location, query=query_customer_lifetime_value_training_preparation, timeout=timeout).set_display_name('customer_lifetime_value_training_preparation').after(*phase_1) # Inference data preparation customer_lifetime_value_inf_prep = sp( project=project_id, location=location, query=query_customer_lifetime_value_inference_preparation, timeout=timeout).set_display_name('customer_lifetime_value_inference_preparation').after(*phase_1) @dsl.pipeline() def reporting_preparation_pl( project_id: str, location: Optional[str], query_aggregate_last_day_predictions: str, timeout: Optional[float] = 3600.0 ): """ This pipeline defines the steps for preparing the reporting data. Args: project_id: The Google Cloud project ID. location: The Google Cloud region where the pipeline will be run. query_aggregate_last_day_predictions: The SQL query that will be used to aggregate the last day predictions. timeout: The timeout for the pipeline in seconds. Returns: None """ # Reporting Preparation aggregate_predictions = sp( project=project_id, location=location, query=query_aggregate_last_day_predictions, query_parameters=[] ).set_display_name('aggregate_predictions') @dsl.pipeline() def gemini_insights_pl( project_id: str, location: Optional[str], query_invoke_user_scoped_metrics: str, query_invoke_user_behaviour_revenue_insights: str, timeout: Optional[float] = 3600.0 ): """ This pipeline defines the steps for invoking the user behaviour revenue insights query. Args: project_id: The Google Cloud project ID. location: The Google Cloud region where the pipeline will be run. query_invoke_user_behaviour_revenue_insights: The SQL query that will be used to invoke the user behaviour revenue gemini insights. timeout: The timeout for the pipeline in seconds. Returns: None """ # User Scoped Metrics user_scoped_metrics = sp( project=project_id, location=location, query=query_invoke_user_scoped_metrics, query_parameters=[] ).set_display_name('user_scoped_metrics') # User behaviour revenue insights user_behaviour_revenue_insights = sp( project=project_id, location=location, query=query_invoke_user_behaviour_revenue_insights, query_parameters=[] ).after(*[user_scoped_metrics]).set_display_name('user_behaviour_revenue_insights')