python/pipelines/tabular_pipelines.py (247 lines of code) (raw):
# Copyregression 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Optional
import kfp as kfp
import kfp.dsl as dsl
from pipelines.components.vertex.component import elect_best_tabular_model, \
batch_prediction, \
get_tabular_model_explanation
from pipelines.components.bigquery.component import bq_flatten_tabular_binary_prediction_table, \
bq_flatten_tabular_regression_table, \
bq_union_predictions_tables, \
write_tabular_model_explanation_to_bigquery
from pipelines.components.pubsub.component import send_pubsub_activation_msg
# Function containing a KFP definition for a Prediction pipeline that uses a Tabular Workflow Model.
# This is for Binary Classification model.
@dsl.pipeline()
def prediction_binary_classification_pl(
project_id: str,
location: Optional[str],
model_display_name: str,
model_metric_name: str,
model_metric_threshold: float,
number_of_models_considered: int,
pubsub_activation_topic: str,
pubsub_activation_type: str,
bigquery_source: str,
bigquery_destination_prefix: str,
bq_unique_key: str,
job_name_prefix: str,
machine_type: str = "n1-standard-4",
max_replica_count: int = 10,
batch_size: int = 64,
accelerator_count: int = 0,
accelerator_type: str = None,
generate_explanation: bool = False,
threashold: float = 0.5,
positive_label: str = 'true',
):
"""
This function defines a KFP pipeline for binary classification prediction pipeline using an AutoML Tabular Workflow Model.
Args:
project_id: The Google Cloud project ID.
location: The Google Cloud region where the pipeline will be deployed.
model_display_name: The name of the Tabular Workflow Model to be used for prediction.
model_metric_name: The name of the metric used to select the best model.
model_metric_threshold: The threshold value for the metric used to select the best model.
number_of_models_considered: The number of models to consider when selecting the best model.
pubsub_activation_topic: The name of the Pub/Sub topic to send activation messages to.
pubsub_activation_type: The type of activation message to send.
bigquery_source: The BigQuery table containing the data to be predicted.
bigquery_destination_prefix: The prefix for the BigQuery table where the predictions will be stored.
bq_unique_key: The name of the column in the BigQuery table that uniquely identifies each row.
job_name_prefix: The prefix for the Vertex AI Batch Prediction job name.
machine_type: The machine type to use for the Vertex AI Batch Prediction job.
max_replica_count: The maximum number of replicas to use for the Vertex AI Batch Prediction job.
batch_size: The batch size to use for the Vertex AI Batch Prediction job.
accelerator_count: The number of accelerators to use for the Vertex AI Batch Prediction job.
accelerator_type: The type of accelerators to use for the Vertex AI Batch Prediction job.
generate_explanation: Whether to generate explanations for the predictions.
threashold: The threshold value used to convert the predicted probabilities into binary labels.
positive_label: The label to assign to predictions with a probability greater than or equal to the threshold.
"""
# Elect best model based on a metric and a threshold
purchase_propensity_label = elect_best_tabular_model(
project=project_id,
location=location,
display_name=model_display_name,
metric_name=model_metric_name,
metric_threshold=model_metric_threshold,
number_of_models_considered=number_of_models_considered,
).set_display_name('elect_best_model')
# Submits a Vertex AI Batch prediction job
predictions = batch_prediction(
bigquery_source=bigquery_source,
bigquery_destination_prefix=bigquery_destination_prefix,
job_name_prefix=job_name_prefix,
model=purchase_propensity_label.outputs['elected_model'],
machine_type=machine_type,
max_replica_count=max_replica_count,
batch_size=batch_size,
accelerator_count=accelerator_count,
accelerator_type=accelerator_type,
generate_explanation=generate_explanation
)
# Flattens prediction table in BigQuery
flatten_predictions = bq_flatten_tabular_binary_prediction_table(
project_id=project_id,
location=location,
source_table=bigquery_source,
predictions_table=predictions.outputs['destination_table'],
bq_unique_key=bq_unique_key,
threashold=threashold,
positive_label=positive_label
)
# Sends pubsub message for activation
send_pubsub_activation_msg(
project=project_id,
topic_name=pubsub_activation_topic,
activation_type=pubsub_activation_type,
predictions_table=flatten_predictions.outputs['destination_table'],
)
# Function containing a KFP definition for a Prediction pipeline that uses a Tabular Workflow Model.
# This is for Regression model.
@dsl.pipeline()
def prediction_regression_pl(
project_id: str,
location: Optional[str],
model_display_name: str,
model_metric_name: str,
model_metric_threshold: float,
number_of_models_considered: int,
pubsub_activation_topic: str,
pubsub_activation_type: str,
bigquery_source: str,
bigquery_destination_prefix: str,
bq_unique_key: str,
job_name_prefix: str,
machine_type: str = "n1-standard-4",
max_replica_count: int = 10,
batch_size: int = 64,
accelerator_count: int = 0,
accelerator_type: str = None,
generate_explanation: bool = False
):
"""
This function defines a KFP pipeline for regression prediction pipeline using an AutoML Tabular Workflow Model.
Args:
project_id: The Google Cloud project ID.
location: The Google Cloud region where the pipeline will be deployed.
model_display_name: The name of the Tabular Workflow Model to be used for prediction.
model_metric_name: The name of the metric used to select the best model.
model_metric_threshold: The threshold value for the metric used to select the best model.
number_of_models_considered: The number of models to consider when selecting the best model.
pubsub_activation_topic: The name of the Pub/Sub topic to send activation messages to.
pubsub_activation_type: The type of activation message to send.
bigquery_source: The BigQuery table containing the data to be predicted.
bigquery_destination_prefix: The prefix for the BigQuery table where the predictions will be stored.
bq_unique_key: The name of the column in the BigQuery table that uniquely identifies each row.
job_name_prefix: The prefix for the Vertex AI Batch Prediction job name.
machine_type: The machine type to use for the Vertex AI Batch Prediction job.
max_replica_count: The maximum number of replicas to use for the Vertex AI Batch Prediction job.
batch_size: The batch size to use for the Vertex AI Batch Prediction job.
accelerator_count: The number of accelerators to use for the Vertex AI Batch Prediction job.
accelerator_type: The type of accelerators to use for the Vertex AI Batch Prediction job.
generate_explanation: Whether to generate explanations for the predictions.
"""
# Elect best model based on a metric and a threshold
customer_lifetime_value_model = elect_best_tabular_model(
project=project_id,
location=location,
display_name=model_display_name,
metric_name=model_metric_name,
metric_threshold=model_metric_threshold,
number_of_models_considered=number_of_models_considered,
).set_display_name('elect_best_clv_model')
# Submits a Vertex AI Batch prediction job
predictions = batch_prediction(
bigquery_source=bigquery_source,
bigquery_destination_prefix=bigquery_destination_prefix,
job_name_prefix=job_name_prefix,
model=customer_lifetime_value_model.outputs['elected_model'],
machine_type=machine_type,
max_replica_count=max_replica_count,
batch_size=batch_size,
accelerator_count=accelerator_count,
accelerator_type=accelerator_type,
generate_explanation=generate_explanation
)
# Flattens prediction table in BigQuery
flatten_predictions = bq_flatten_tabular_regression_table(
project_id=project_id,
location=location,
source_table=bigquery_source,
predictions_table=predictions.outputs['destination_table'],
bq_unique_key=bq_unique_key
)
# Sends pubsub message for activation
send_pubsub_activation_msg(
project=project_id,
topic_name=pubsub_activation_topic,
activation_type=pubsub_activation_type,
predictions_table=flatten_predictions.outputs['destination_table'],
)
@dsl.pipeline()
def prediction_binary_classification_regression_pl(
project_id: str,
location: Optional[str],
purchase_bigquery_source: str,
purchase_bigquery_destination_prefix: str,
purchase_bq_unique_key: str,
purchase_job_name_prefix: str,
clv_bigquery_source: str,
clv_bigquery_destination_prefix: str,
clv_bq_unique_key: str,
clv_job_name_prefix: str,
purchase_model_display_name: str,
purchase_model_metric_name: str,
purchase_model_metric_threshold: float,
number_of_purchase_models_considered: int,
clv_model_display_name: str,
clv_model_metric_name: str,
clv_model_metric_threshold: float,
number_of_clv_models_considered: int,
pubsub_activation_topic: str,
pubsub_activation_type: str,
machine_type: str = "n1-standard-4",
max_replica_count: int = 10,
batch_size: int = 64,
accelerator_count: int = 0,
accelerator_type: str = None,
generate_explanation: bool = False,
threashold: float = 0.5,
positive_label: str = 'true',
):
"""
This function defines a KFP pipeline for a combined binary classification and regression prediction pipeline using AutoML Tabular Workflow Models.
Args:
project_id: The Google Cloud project ID.
location: The Google Cloud region where the pipeline will be deployed.
purchase_bigquery_source: The BigQuery table containing the data to be predicted for purchase propensity.
purchase_bigquery_destination_prefix: The prefix for the BigQuery table where the purchase propensity predictions will be stored.
purchase_bq_unique_key: The name of the column in the BigQuery table that uniquely identifies each row for purchase propensity.
purchase_job_name_prefix: The prefix for the Vertex AI Batch Prediction job name for purchase propensity.
clv_bigquery_source: The BigQuery table containing the data to be predicted for customer lifetime value.
clv_bigquery_destination_prefix: The prefix for the BigQuery table where the customer lifetime value predictions will be stored.
clv_bq_unique_key: The name of the column in the BigQuery table that uniquely identifies each row for customer lifetime value.
clv_job_name_prefix: The prefix for the Vertex AI Batch Prediction job name for customer lifetime value.
purchase_model_display_name: The name of the Tabular Workflow Model to be used for purchase propensity prediction.
purchase_model_metric_name: The name of the metric used to select the best model for purchase propensity.
purchase_model_metric_threshold: The threshold value for the metric used to select the best model for purchase propensity.
number_of_purchase_models_considered: The number of models to consider when selecting the best model for purchase propensity.
clv_model_display_name: The name of the Tabular Workflow Model to be used for customer lifetime value prediction.
clv_model_metric_name: The name of the metric used to select the best model for customer lifetime value.
clv_model_metric_threshold: The threshold value for the metric used to select the best model for customer lifetime value.
number_of_clv_models_considered: The number of models to consider when selecting the best model for customer lifetime value.
pubsub_activation_topic: The name of the Pub/Sub topic to send activation messages to.
pubsub_activation_type: The type of activation message to send.
machine_type: The machine type to use for the Vertex AI Batch Prediction job.
max_replica_count: The maximum number of replicas to use for the Vertex AI Batch Prediction job.
batch_size: The batch size to use for the Vertex AI Batch Prediction job.
accelerator_count: The number of accelerators to use for the Vertex AI Batch Prediction job.
accelerator_type: The type of accelerators to use for the Vertex AI Batch Prediction job.
generate_explanation: Whether to generate explanations for the predictions.
threashold: The threshold value used to convert the predicted probabilities into binary labels for purchase propensity.
positive_label: The label to assign to predictions with a probability greater than or equal to the threshold for purchase propensity.
"""
# Elects the best purchase propensity model based on a metric and a threshold
purchase_propensity_best_model = elect_best_tabular_model(
project=project_id,
location=location,
display_name=purchase_model_display_name,
metric_name=purchase_model_metric_name,
metric_threshold=purchase_model_metric_threshold,
number_of_models_considered=number_of_purchase_models_considered,
).set_display_name('elect_best_purchase_propensity_model')
# Submits a Vertex AI Batch Prediction job for purchase propensity
propensity_predictions = batch_prediction(
bigquery_source=purchase_bigquery_source,
bigquery_destination_prefix=purchase_bigquery_destination_prefix,
job_name_prefix=purchase_job_name_prefix,
model=purchase_propensity_best_model.outputs['elected_model'],
machine_type=machine_type,
max_replica_count=max_replica_count,
batch_size=batch_size,
accelerator_count=accelerator_count,
accelerator_type=accelerator_type,
generate_explanation=generate_explanation
).set_display_name('propensity_predictions')
# Elects the best customer lifetime value regression model based on a metric and a threshold
customer_lifetime_value_model = elect_best_tabular_model(
project=project_id,
location=location,
display_name=clv_model_display_name,
metric_name=clv_model_metric_name,
metric_threshold=clv_model_metric_threshold,
number_of_models_considered=number_of_clv_models_considered,
).set_display_name('elect_best_clv_model')
# Submits a Vertex AI Batch Prediction job for customer lifetime value
clv_predictions = batch_prediction(
bigquery_source=clv_bigquery_source,
bigquery_destination_prefix=clv_bigquery_destination_prefix,
job_name_prefix=clv_job_name_prefix,
model=customer_lifetime_value_model.outputs['elected_model'],
machine_type=machine_type,
max_replica_count=max_replica_count,
batch_size=batch_size,
accelerator_count=accelerator_count,
accelerator_type=accelerator_type,
generate_explanation=generate_explanation
).set_display_name('clv_predictions')
# Flattens the prediction table for the customer lifetime value model
clv_flatten_predictions = bq_flatten_tabular_regression_table(
project_id=project_id,
location=location,
source_table=clv_bigquery_source,
predictions_table=clv_predictions.outputs['destination_table'],
bq_unique_key=clv_bq_unique_key
).set_display_name('clv_flatten_predictions')
# Union the two predicitons tables: the flatenned clv predictions and the purchase propensity predictions
union_predictions = bq_union_predictions_tables(
project_id=project_id,
location=location,
predictions_table_propensity=propensity_predictions.outputs['destination_table'],
predictions_table_regression=clv_flatten_predictions.outputs['destination_table'],
table_propensity_bq_unique_key=purchase_bq_unique_key,
table_regression_bq_unique_key=clv_bq_unique_key,
threashold=threashold
).set_display_name('union_predictions')
# Sends pubsub message for activation
send_pubsub_activation_msg(
project=project_id,
topic_name=pubsub_activation_topic,
activation_type=pubsub_activation_type,
predictions_table=union_predictions.outputs['destination_table'],
)
# Function containing a KFP definition for a Explanation pipeline that uses a Tabular Workflow Model.
# This is a Explanation Pipeline Definition that will output the Feature Attribution
@dsl.pipeline()
def explanation_tabular_workflow_regression_pl(
project: str,
location: str,
data_location: str,
model_display_name: str,
model_metric_name: str,
model_metric_threshold: float,
number_of_models_considered: int,
bigquery_destination_prefix: str,
):
"""
This function defines a KFP pipeline for a Explanation pipeline that uses a Tabular Workflow Model.
This is a Explanation Pipeline Definition that will output the Feature Attribution
Args:
project: The Google Cloud project ID.
location: The Google Cloud region where the pipeline will be deployed.
data_location: The location of the data to be used for explanation.
model_display_name: The name of the Tabular Workflow Model to be used for explanation.
model_metric_name: The name of the metric used to select the best model.
model_metric_threshold: The threshold value for the metric used to select the best model.
number_of_models_considered: The number of models to consider when selecting the best model.
bigquery_destination_prefix: The prefix for the BigQuery table where the explanation will be stored.
"""
# Elect best model based on a metric and a threshold
value_based_bidding_model = elect_best_tabular_model(
project=project,
location=location,
display_name=model_display_name,
metric_name=model_metric_name,
metric_threshold=model_metric_threshold,
number_of_models_considered=number_of_models_considered,
).set_display_name('elect_best_vbb_model')
# Get the model explanation
value_based_bidding_model_explanation = get_tabular_model_explanation(
project=project,
location=location,
model=value_based_bidding_model.outputs['elected_model'],
).set_display_name('get_vbb_model_explanation')
# Write the model explanation to BigQuery
value_based_bidding_flatten_explanation = write_tabular_model_explanation_to_bigquery(
project=project,
location=location,
data_location = data_location,
model_explanation=value_based_bidding_model_explanation.outputs['model_explanation'],
destination_table=bigquery_destination_prefix,
).set_display_name('write_vbb_model_explanation')