python/pipelines/components/bigquery/component.py (751 lines of code) (raw):
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Optional, List
from kfp.dsl import component, Output, Artifact, Model, Input, Metrics, Dataset
import os
import yaml
config_file_path = os.path.join(os.path.dirname(
__file__), '../../../../config/config.yaml')
base_image = None
if os.path.exists(config_file_path):
with open(config_file_path, encoding='utf-8') as fh:
configs = yaml.full_load(fh)
vertex_components_params = configs['vertex_ai']['components']
repo_params = configs['artifact_registry']['pipelines_docker_repo']
# target_image = f"{repo_params['region']}-docker.pkg.dev/{repo_params['project_id']}/{repo_params['name']}/{vertex_components_params['image_name']}:{vertex_components_params['tag']}"
base_image = f"{repo_params['region']}-docker.pkg.dev/{repo_params['project_id']}/{repo_params['name']}/{vertex_components_params['base_image_name']}:{vertex_components_params['base_image_tag']}"
# This component makes it possible to invoke a BigQuery Stored Procedure
@component(base_image=base_image)
def bq_stored_procedure_exec(
project: str,
location: str,
query: str,
query_parameters: Optional[list] = [],
timeout: Optional[float] = 1800
) -> None:
"""Executes a BigQuery stored procedure.
Args:
project: The project containing the stored procedure.
location: The location of the stored procedure.
query: The query to execute.
query_parameters: The query parameters to pass to the stored procedure.
timeout: The timeout for the query, in seconds.
"""
from google.cloud import bigquery
import logging
from google.api_core.gapic_v1.client_info import ClientInfo
USER_AGENT_FEATURES = 'cloud-solutions/marketing-analytics-jumpstart-features-v1'
USER_AGENT_PROPENSITY_TRAINING = 'cloud-solutions/marketing-analytics-jumpstart-propensity-training-v1'
USER_AGENT_PROPENSITY_PREDICTION= 'cloud-solutions/marketing-analytics-jumpstart-propensity-prediction-v1'
USER_AGENT_REGRESSION_TRAINING = 'cloud-solutions/marketing-analytics-jumpstart-regression-training-v1'
USER_AGENT_REGRESSION_PREDICTION = 'cloud-solutions/marketing-analytics-jumpstart-regression-prediction-v1'
USER_AGENT_SEGMENTATION_TRAINING = 'cloud-solutions/marketing-analytics-jumpstart-segmentation-training-v1'
USER_AGENT_SEGMENTATION_PREDICTION = 'cloud-solutions/marketing-analytics-jumpstart-segmentation-prediction-v1'
USER_AGENT_VBB_TRAINING = 'cloud-solutions/marketing-analytics-jumpstart-vbb-training-v1'
USER_AGENT_VBB_EXPLANATION = 'cloud-solutions/marketing-analytics-jumpstart-vbb-explanation-v1'
client = bigquery.Client(
project=project,
location=location,
client_info=ClientInfo(user_agent=USER_AGENT_FEATURES)
)
params = []
for i in query_parameters:
i['value'] = None if i['value'] == "None" else i['value']
params.append(bigquery.ScalarQueryParameter(i['name'], i['type'], i['value']))
job_config = bigquery.QueryJobConfig(
query_parameters=params
)
query_job = client.query(
query=query,
location=location,
job_config=job_config)
query_job.result(timeout=timeout)
# This component creates and train a BQML KMEANS model
@component(base_image=base_image)
def bq_clustering_exec(
model: Output[Artifact],
project_id: str,
location: str,
model_dataset_id: str,
model_name_bq_prefix: str,
vertex_model_name: str,
training_data_bq_table: str,
exclude_features: list,
model_parameters: Optional[Input[Dataset]] = None,
km_num_clusters: int = 4,
km_init_method: str = "KMEANS++",
km_distance_type: str = "EUCLIDEAN",
km_standardize_features: str = "TRUE",
km_max_interations: int = 20,
km_early_stop: str = "TRUE",
km_min_rel_progress: float = 0.01,
km_warm_start: str = "FALSE",
use_split_column: Optional[str] = "FALSE",
use_hparams_tuning: Optional[str] = "FALSE"
) -> None:
"""Creates and trains a BigQuery ML KMEANS model.
Args:
model: Output artifact for the trained model.
project_id: The project containing the model.
location: The location of the model.
model_dataset_id: The dataset ID of the model.
model_name_bq_prefix: The prefix of the model name.
vertex_model_name: The name of the model in Vertex AI.
training_data_bq_table: The BigQuery table containing the training data.
exclude_features: A list of features to exclude from the model.
km_num_clusters: The number of clusters to create.
km_init_method: The initialization method to use.
km_distance_type: The distance type to use.
km_standardize_features: Whether to standardize the features.
km_max_interations: The maximum number of iterations to run.
km_early_stop: Whether to use early stopping.
km_min_rel_progress: The minimum relative progress to stop early.
km_warm_start: Whether to use warm start.
"""
from google.cloud import bigquery
import logging
from datetime import datetime
from google.api_core.gapic_v1.client_info import ClientInfo
USER_AGENT_FEATURES = 'cloud-solutions/marketing-analytics-jumpstart-features-v1'
USER_AGENT_PROPENSITY_TRAINING = 'cloud-solutions/marketing-analytics-jumpstart-propensity-training-v1'
USER_AGENT_PROPENSITY_PREDICTION= 'cloud-solutions/marketing-analytics-jumpstart-propensity-prediction-v1'
USER_AGENT_REGRESSION_TRAINING = 'cloud-solutions/marketing-analytics-jumpstart-regression-training-v1'
USER_AGENT_REGRESSION_PREDICTION = 'cloud-solutions/marketing-analytics-jumpstart-regression-prediction-v1'
USER_AGENT_SEGMENTATION_TRAINING = 'cloud-solutions/marketing-analytics-jumpstart-segmentation-training-v1'
USER_AGENT_SEGMENTATION_PREDICTION = 'cloud-solutions/marketing-analytics-jumpstart-segmentation-prediction-v1'
USER_AGENT_VBB_TRAINING = 'cloud-solutions/marketing-analytics-jumpstart-vbb-training-v1'
USER_AGENT_VBB_EXPLANATION = 'cloud-solutions/marketing-analytics-jumpstart-vbb-explanation-v1'
model_bq_name = f"{model_name_bq_prefix}_{str(int(datetime.now().timestamp()))}"
exclude_sql=""
if len(exclude_features)>0:
for i in exclude_features:
i = f'`{i}`'
exclude_sql = f" EXCEPT ({', '.join(exclude_features)}) "
# Filter from data split column
if use_split_column == "TRUE":
filter_clause = f"""data_split = 'TRAIN'"""
else:
filter_clause = f"""TRUE"""
# Use model parameters if provided
if model_parameters is not None:
km_num_clusters = model_parameters.metadata["NUM_CLUSTERS"]
km_max_interations = model_parameters.metadata["MAX_ITERATIONS"]
km_min_rel_progress = model_parameters.metadata["MIN_REL_PROGRESS"]
km_init_method = model_parameters.metadata["KMEANS_INIT_METHOD"]
km_standardize_features = model_parameters.metadata["STANDARDIZE_FEATURES"]
# Create model
if use_hparams_tuning == "TRUE":
query = query = f"""CREATE OR REPLACE MODEL
`{project_id}.{model_dataset_id}.{model_bq_name}` OPTIONS (model_type='KMEANS',
NUM_CLUSTERS=HPARAM_RANGE(2, {km_num_clusters}),
MAX_ITERATIONS={km_max_interations},
MIN_REL_PROGRESS={km_min_rel_progress},
KMEANS_INIT_METHOD='{km_init_method}',
DISTANCE_TYPE='{km_distance_type}',
EARLY_STOP={km_early_stop},
STANDARDIZE_FEATURES={km_standardize_features},
WARM_START={km_warm_start},
NUM_TRIALS = 100,
HPARAM_TUNING_ALGORITHM = 'RANDOM_SEARCH',
HPARAM_TUNING_OBJECTIVES = 'DAVIES_BOULDIN_INDEX',
MODEL_REGISTRY='VERTEX_AI',
VERTEX_AI_MODEL_ID='{vertex_model_name}' ) AS (
SELECT DISTINCT * {exclude_sql} FROM `{training_data_bq_table}` WHERE {filter_clause}
)"""
else:
query = f"""CREATE OR REPLACE MODEL
`{project_id}.{model_dataset_id}.{model_bq_name}` OPTIONS (model_type='KMEANS',
NUM_CLUSTERS={km_num_clusters},
MAX_ITERATIONS={km_max_interations},
MIN_REL_PROGRESS={km_min_rel_progress},
KMEANS_INIT_METHOD='{km_init_method}',
DISTANCE_TYPE='{km_distance_type}',
EARLY_STOP={km_early_stop},
STANDARDIZE_FEATURES={km_standardize_features},
WARM_START={km_warm_start},
MODEL_REGISTRY='VERTEX_AI',
VERTEX_AI_MODEL_ID='{vertex_model_name}' ) AS (
SELECT DISTINCT * {exclude_sql} FROM `{training_data_bq_table}` WHERE {filter_clause}
)"""
client = bigquery.Client(
project=project_id,
location=location,
client_info=ClientInfo(user_agent=USER_AGENT_SEGMENTATION_TRAINING)
)
logging.info(f"BQML Model Training Query: {query}")
query_job = client.query(
query=query,
location=location
)
r = query_job.result()
project, dataset = project_id, model_dataset_id
model.metadata = {"projectId": project, "datasetId": dataset,
"modelId": model_bq_name, 'vertex_model_name': vertex_model_name}
#TODO: Implement TRAINING info summary on the metrics
# SELECT * FROM ML.TRAINING_INFO(MODEL `<project-id>.<datasets>.audience_segmentation_model`)
# This component submits a BQML Model Evaluate and logs into Metrics
@component(base_image=base_image)
def bq_evaluate(
model: Input[Artifact],
project: str,
location: str,
metrics: Output[Metrics]
):
"""Submits a BigQuery ML Model Evaluate and logs the results into Metrics.
Args:
model: Input artifact for the trained model.
project: The project containing the model.
location: The location of the model.
metrics: Output artifact for the evaluation metrics.
"""
from google.cloud import bigquery
import json, google.auth, logging
from google.api_core.gapic_v1.client_info import ClientInfo
USER_AGENT_FEATURES = 'cloud-solutions/marketing-analytics-jumpstart-features-v1'
USER_AGENT_PROPENSITY_TRAINING = 'cloud-solutions/marketing-analytics-jumpstart-propensity-training-v1'
USER_AGENT_PROPENSITY_PREDICTION= 'cloud-solutions/marketing-analytics-jumpstart-propensity-prediction-v1'
USER_AGENT_REGRESSION_TRAINING = 'cloud-solutions/marketing-analytics-jumpstart-regression-training-v1'
USER_AGENT_REGRESSION_PREDICTION = 'cloud-solutions/marketing-analytics-jumpstart-regression-prediction-v1'
USER_AGENT_SEGMENTATION_TRAINING = 'cloud-solutions/marketing-analytics-jumpstart-segmentation-training-v1'
USER_AGENT_SEGMENTATION_PREDICTION = 'cloud-solutions/marketing-analytics-jumpstart-segmentation-prediction-v1'
USER_AGENT_VBB_TRAINING = 'cloud-solutions/marketing-analytics-jumpstart-vbb-training-v1'
USER_AGENT_VBB_EXPLANATION = 'cloud-solutions/marketing-analytics-jumpstart-vbb-explanation-v1'
query = f"""SELECT * FROM ML.EVALUATE(MODEL `{model.metadata["projectId"]}.{model.metadata["datasetId"]}.{model.metadata["modelId"]}`)"""
client = bigquery.Client(
project=project,
location=location
)
query_job = client.query(
query=query,
location=location
)
r = query_job.result()
r = list(r)
for i in r:
for k,v in i.items():
metrics.log_metric(k, v)
## NOT USED
@component(base_image=base_image)
def bq_evaluation_table(
eval: Input[Artifact],
metrics: Output[Metrics]
) -> None:
for row in eval.metadata["rows"]:
for idx, f in enumerate(row["f"]):
metrics.log_metric(eval.metadata["schema"]["fields"][idx]["name"], f["v"])
@component(base_image=base_image)
def bq_select_best_kmeans_model(
project_id: str,
location: str,
dataset_id: str,
model_prefix: str,
metric_name: str,
metric_threshold: float,
number_of_models_considered: int,
metrics_logger: Output[Metrics],
elected_model: Output[Artifact]) -> None:
"""Selects the best KMeans model from a set of models based on a given metric.
Args:
project_id: The project ID of the models.
location: The location of the models.
dataset_id: The dataset ID of the models.
model_prefix: The prefix of the model IDs.
metric_name: The name of the metric to use for comparison.
metric_threshold: The minimum value of the metric that is acceptable.
number_of_models_considered: The number of models to consider.
metrics_logger: The output artifact to log the metrics of the selected model.
elected_model: The output artifact to store the metadata of the selected model.
"""
from google.cloud import bigquery
import logging
from enum import Enum
from google.api_core.gapic_v1.client_info import ClientInfo
USER_AGENT_FEATURES = 'cloud-solutions/marketing-analytics-jumpstart-features-v1'
USER_AGENT_PROPENSITY_TRAINING = 'cloud-solutions/marketing-analytics-jumpstart-propensity-training-v1'
USER_AGENT_PROPENSITY_PREDICTION= 'cloud-solutions/marketing-analytics-jumpstart-propensity-prediction-v1'
USER_AGENT_REGRESSION_TRAINING = 'cloud-solutions/marketing-analytics-jumpstart-regression-training-v1'
USER_AGENT_REGRESSION_PREDICTION = 'cloud-solutions/marketing-analytics-jumpstart-regression-prediction-v1'
USER_AGENT_SEGMENTATION_TRAINING = 'cloud-solutions/marketing-analytics-jumpstart-segmentation-training-v1'
USER_AGENT_SEGMENTATION_PREDICTION = 'cloud-solutions/marketing-analytics-jumpstart-segmentation-prediction-v1'
USER_AGENT_VBB_TRAINING = 'cloud-solutions/marketing-analytics-jumpstart-vbb-training-v1'
USER_AGENT_VBB_EXPLANATION = 'cloud-solutions/marketing-analytics-jumpstart-vbb-explanation-v1'
class MetricsEnum(Enum):
DAVIES_BOULDIN_INDEX = 'davies_bouldin_index'
MEAN_SQUARED_DISCTANCE = 'mean_squared_distance'
def is_new_metric_better(self, new_value: float, old_value: float):
return new_value < old_value
@classmethod
def list(cls):
return list(map(lambda c: c.value, cls))
# Construct a BigQuery client object.
client = bigquery.Client(
project=project_id,
location=location,
client_info=ClientInfo(user_agent=USER_AGENT_SEGMENTATION_PREDICTION)
)
# TODO(developer): Set dataset_id to the ID of the dataset that contains
# the models you are listing.
# dataset_id = 'your-project.your_dataset'
logging.info(f"Getting models from: {project_id}.{dataset_id}")
models = client.list_models(f"{dataset_id}") # Make an API request.
models_to_compare = []
counter = 0
for model in models:
if model.model_id.startswith(model_prefix):
# logging.info(f"{model.model_id} - {model.created}")
if (counter < number_of_models_considered):
models_to_compare.append(model)
counter += 1
else:
canditate = model
for idx, m in enumerate(models_to_compare):
# checks if current canditate is newer than one already in list
if canditate.created.timestamp() > m.created.timestamp():
tmp = m
models_to_compare[idx] = canditate
canditate = tmp
# logging.info(f"{counter} {models_to_compare}")
if len(models_to_compare) == 0:
raise Exception(f"No models in vertex model registry match '{model_prefix}'")
best_model = dict()
best_eval_metrics = dict()
for i in models_to_compare:
logging.info(i.path)
model_bq_name = f"{i.project}.{i.dataset_id}.{i.model_id}"
query = f"""
SELECT * FROM ML.EVALUATE(MODEL `{model_bq_name}`)
"""
query_job = client.query(
query=query,
location=location
)
r = list(query_job.result())[0]
logging.info(f"keys {r.keys()}")
logging.info(f"{metric_name} {r.get(metric_name)}")
if (metric_name not in best_model) or MetricsEnum(metric_name).is_new_metric_better(r.get(metric_name), best_model[metric_name]):
for k in r.keys():
best_eval_metrics[k] = r.get(k)
best_model[metric_name] = r.get(metric_name)
best_model["resource_name"] = i.path
best_model["uri"] = model_bq_name
logging.info(
f"New Model/Version elected | name: {model_bq_name} | metric name: {metric_name} | metric value: {best_model[metric_name]} ")
if MetricsEnum(metric_name).is_new_metric_better(metric_threshold, best_model[metric_name]):
raise ValueError(
f"Model evaluation metric {metric_name} of value {best_model[metric_name]} does not meet minumum criteria of threshold{metric_threshold}")
for k, v in best_eval_metrics.items():
if k in MetricsEnum.list():
metrics_logger.log_metric(k, v)
# elected_model.uri = f"bq://{best_model['uri']}"
elected_model.metadata = best_model
pId, dId, mId = best_model['uri'].split('.')
elected_model.metadata = {
"projectId": pId,
"datasetId": dId,
"modelId": mId,
"resourceName": best_model["resource_name"]}
@component(base_image=base_image)
def bq_clustering_predictions(
model: Input[Model],
project_id: str,
location: str,
bigquery_source: str,
bigquery_destination_prefix: str,
destination_table: Output[Dataset]
) -> None:
"""Generates predictions for a BigQuery ML KMeans model.
Args:
model: Input artifact for the trained model.
project_id: The project ID of the model.
location: The location of the model.
bigquery_source: The BigQuery table containing the data to predict.
bigquery_destination_prefix: The prefix of the destination table name.
destination_table: Output artifact for the predictions.
"""
from datetime import datetime
from google.cloud import bigquery
import logging
from google.api_core.gapic_v1.client_info import ClientInfo
USER_AGENT_FEATURES = 'cloud-solutions/marketing-analytics-jumpstart-features-v1'
USER_AGENT_PROPENSITY_TRAINING = 'cloud-solutions/marketing-analytics-jumpstart-propensity-training-v1'
USER_AGENT_PROPENSITY_PREDICTION= 'cloud-solutions/marketing-analytics-jumpstart-propensity-prediction-v1'
USER_AGENT_REGRESSION_TRAINING = 'cloud-solutions/marketing-analytics-jumpstart-regression-training-v1'
USER_AGENT_REGRESSION_PREDICTION = 'cloud-solutions/marketing-analytics-jumpstart-regression-prediction-v1'
USER_AGENT_SEGMENTATION_TRAINING = 'cloud-solutions/marketing-analytics-jumpstart-segmentation-training-v1'
USER_AGENT_SEGMENTATION_PREDICTION = 'cloud-solutions/marketing-analytics-jumpstart-segmentation-prediction-v1'
USER_AGENT_VBB_TRAINING = 'cloud-solutions/marketing-analytics-jumpstart-vbb-training-v1'
USER_AGENT_VBB_EXPLANATION = 'cloud-solutions/marketing-analytics-jumpstart-vbb-explanation-v1'
timestamp = str(int(datetime.now().timestamp()))
destination_table.metadata["table_id"] = f"{bigquery_destination_prefix}_{timestamp}"
model_uri = f"{model.metadata['projectId']}.{model.metadata['datasetId']}.{model.metadata['modelId']}"
client = bigquery.Client(
project=project_id,
location=location,
client_info=ClientInfo(user_agent=USER_AGENT_SEGMENTATION_PREDICTION)
)
query = f"""
SELECT * FROM ML.PREDICT(MODEL `{model_uri}`,
TABLE `{bigquery_source}`)
"""
query_job = client.query(
query=query,
location=location,
job_config=bigquery.QueryJobConfig(
destination=destination_table.metadata["table_id"])
)
r = query_job.result()
if (query_job.done()):
logging.info(f"Job Completed: {query_job.state}")
destination_table.metadata["predictions_column_prefix"] = "CENTROID_ID"
@component(base_image=base_image)
def bq_flatten_tabular_binary_prediction_table(
destination_table: Output[Dataset],
project_id: str,
location: str,
source_table: str,
predictions_table: Input[Dataset],
bq_unique_key: str,
threashold: float = 0.5,
positive_label: str = 'true'
):
"""Flattens a BigQuery table containing binary prediction results from a tabular model.
Args:
destination_table: Output artifact for the flattened table.
project_id: The project ID of the predictions table.
location: The location of the predictions table.
source_table: The BigQuery table containing the data to predict.
predictions_table: Input artifact for the predictions table.
bq_unique_key: The unique key column in the source table.
threashold: The threshold for determining the predicted class.
positive_label: The label to assign to predictions above the threshold.
"""
from google.cloud import bigquery
import logging
from google.api_core.gapic_v1.client_info import ClientInfo
USER_AGENT_FEATURES = 'cloud-solutions/marketing-analytics-jumpstart-features-v1'
USER_AGENT_PROPENSITY_TRAINING = 'cloud-solutions/marketing-analytics-jumpstart-propensity-training-v1'
USER_AGENT_PROPENSITY_PREDICTION= 'cloud-solutions/marketing-analytics-jumpstart-propensity-prediction-v1'
USER_AGENT_REGRESSION_TRAINING = 'cloud-solutions/marketing-analytics-jumpstart-regression-training-v1'
USER_AGENT_REGRESSION_PREDICTION = 'cloud-solutions/marketing-analytics-jumpstart-regression-prediction-v1'
USER_AGENT_SEGMENTATION_TRAINING = 'cloud-solutions/marketing-analytics-jumpstart-segmentation-training-v1'
USER_AGENT_SEGMENTATION_PREDICTION = 'cloud-solutions/marketing-analytics-jumpstart-segmentation-prediction-v1'
USER_AGENT_VBB_TRAINING = 'cloud-solutions/marketing-analytics-jumpstart-vbb-training-v1'
USER_AGENT_VBB_EXPLANATION = 'cloud-solutions/marketing-analytics-jumpstart-vbb-explanation-v1'
# Construct a BigQuery client object.
client = bigquery.Client(
project=project_id,
location=location,
client_info=ClientInfo(user_agent=USER_AGENT_PROPENSITY_PREDICTION)
)
# Inspect the metadata set on destination_table and predictions_table
logging.info(destination_table.metadata)
logging.info(predictions_table.metadata)
# Make an API request.
bq_table = client.get_table(predictions_table.metadata['table_id'])
destination_table.metadata["table_id"] = f"{predictions_table.metadata['table_id']}_view"
destination_table.metadata["predictions_column"] = 'prediction'
# View table properties
logging.info(
"Got table '{}.{}.{} located at {}'.".format(
bq_table.project, bq_table.dataset_id, bq_table.table_id, bq_table.location)
)
predictions_column = None
for i in bq_table.schema:
if (i.name.startswith(predictions_table.metadata['predictions_column_prefix'])):
predictions_column = i.name
if predictions_column is None:
raise Exception(
f"no prediction field found in given table {predictions_table.metadata['table_id']}")
query = f"""
CREATE OR REPLACE TEMP TABLE prediction_indexes AS (
SELECT
(SELECT offset from UNNEST({predictions_column}.classes) c with offset where c = "0") AS index_z,
(SELECT offset from UNNEST({predictions_column}.classes) c with offset where c = "1") AS index_one,
{predictions_column} as {predictions_column},
* EXCEPT({predictions_column})
FROM `{predictions_table.metadata['table_id']}`
);
CREATE OR REPLACE TEMP TABLE prediction_greatest_scores AS (
SELECT
{predictions_column}.scores[SAFE_OFFSET(index_z)] AS score_zero,
{predictions_column}.scores[SAFE_OFFSET(index_one)] AS score_one,
GREATEST({predictions_column}.scores[SAFE_OFFSET(index_z)], {predictions_column}.scores[SAFE_OFFSET(index_one)]) AS greatest_score,
LEAST({predictions_column}.scores[SAFE_OFFSET(index_z)], {predictions_column}.scores[SAFE_OFFSET(index_one)]) AS least_score,
*
FROM prediction_indexes
);
CREATE OR REPLACE TABLE `{destination_table.metadata["table_id"]}` AS (
SELECT
CASE
WHEN a.score_zero > {threashold} THEN 'false'
WHEN a.score_one > {threashold} THEN 'true'
ELSE 'false'
END AS {destination_table.metadata["predictions_column"]},
CASE
WHEN a.score_zero > {threashold} THEN a.least_score
WHEN a.score_one > {threashold} THEN a.greatest_score
ELSE a.least_score
END as prediction_prob,
b.*
FROM prediction_greatest_scores as a
INNER JOIN `{source_table}` as b on a.{bq_unique_key}=b.{bq_unique_key}
);
"""
job_config = bigquery.QueryJobConfig()
job_config.write_disposition = 'WRITE_TRUNCATE'
# Reconstruct a BigQuery client object.
client = bigquery.Client(
project=project_id,
location=bq_table.location,
client_info=ClientInfo(user_agent=USER_AGENT_PROPENSITY_PREDICTION)
)
query_job = client.query(
query=query,
location=bq_table.location
)
results = query_job.result()
logging.info(query)
for row in results:
logging.info("row info: {}".format(row))
@component(base_image=base_image)
def bq_flatten_tabular_regression_table(
project_id: str,
location: str,
source_table: str,
predictions_table: Input[Dataset],
bq_unique_key: str,
destination_table: Output[Dataset]
):
"""Flattens a BigQuery table containing regression prediction results from a tabular model.
Args:
project_id: The project ID of the predictions table.
location: The location of the predictions table.
source_table: The BigQuery table containing the data to predict.
predictions_table: Input artifact for the predictions table.
bq_unique_key: The unique key column in the source table.
destination_table: Output artifact for the flattened table.
"""
from google.cloud import bigquery
import logging
from google.api_core.gapic_v1.client_info import ClientInfo
USER_AGENT_FEATURES = 'cloud-solutions/marketing-analytics-jumpstart-features-v1'
USER_AGENT_PROPENSITY_TRAINING = 'cloud-solutions/marketing-analytics-jumpstart-propensity-training-v1'
USER_AGENT_PROPENSITY_PREDICTION= 'cloud-solutions/marketing-analytics-jumpstart-propensity-prediction-v1'
USER_AGENT_REGRESSION_TRAINING = 'cloud-solutions/marketing-analytics-jumpstart-regression-training-v1'
USER_AGENT_REGRESSION_PREDICTION = 'cloud-solutions/marketing-analytics-jumpstart-regression-prediction-v1'
USER_AGENT_SEGMENTATION_TRAINING = 'cloud-solutions/marketing-analytics-jumpstart-segmentation-training-v1'
USER_AGENT_SEGMENTATION_PREDICTION = 'cloud-solutions/marketing-analytics-jumpstart-segmentation-prediction-v1'
USER_AGENT_VBB_TRAINING = 'cloud-solutions/marketing-analytics-jumpstart-vbb-training-v1'
USER_AGENT_VBB_EXPLANATION = 'cloud-solutions/marketing-analytics-jumpstart-vbb-explanation-v1'
# Construct a BigQuery client object.
client = bigquery.Client(
project=project_id,
location=location,
client_info=ClientInfo(user_agent=USER_AGENT_REGRESSION_PREDICTION)
)
# Inspect the metadata set on destination_table and predictions_table
logging.info(destination_table.metadata)
logging.info(predictions_table.metadata)
# Make an API request.
bq_table = client.get_table(predictions_table.metadata['table_id'])
destination_table.metadata["table_id"] = f"{predictions_table.metadata['table_id']}_view"
destination_table.metadata["predictions_column"] = 'prediction'
# View table properties
logging.info(
"Got table '{}.{}.{} located at {}'.".format(
bq_table.project, bq_table.dataset_id, bq_table.table_id, bq_table.location)
)
predictions_column = None
for i in bq_table.schema:
if (i.name.startswith(predictions_table.metadata['predictions_column_prefix'])):
predictions_column = i.name
if predictions_column is None:
raise Exception(
f"no prediction field found in given table {predictions_table.metadata['table_id']}")
query = f"""
CREATE OR REPLACE TABLE `{destination_table.metadata["table_id"]}` AS (SELECT
GREATEST(0.0,{predictions_column}.value) AS {destination_table.metadata["predictions_column"]}, b.*
FROM `{predictions_table.metadata['table_id']}` as a
INNER JOIN `{source_table}` as b on a.{bq_unique_key}=b.{bq_unique_key}
)
"""
job_config = bigquery.QueryJobConfig()
job_config.write_disposition = 'WRITE_TRUNCATE'
# Reconstruct a BigQuery client object.
client = bigquery.Client(
project=project_id,
location=bq_table.location,
client_info=ClientInfo(user_agent=USER_AGENT_REGRESSION_PREDICTION)
)
query_job = client.query(
query=query,
location=bq_table.location,
)
results = query_job.result()
logging.info(query)
for row in results:
logging.info("row info: {}".format(row))
@component(base_image=base_image)
def bq_flatten_kmeans_prediction_table(
project_id: str,
location: str,
source_table: Input[Dataset],
destination_table: Output[Dataset]
):
"""Flattens a BigQuery table containing KMeans prediction results.
Args:
project_id: The project ID of the predictions table.
location: The location of the predictions table.
source_table: Input artifact for the predictions table.
destination_table: Output artifact for the flattened table.
"""
from google.cloud import bigquery
import logging
from google.api_core.gapic_v1.client_info import ClientInfo
USER_AGENT_FEATURES = 'cloud-solutions/marketing-analytics-jumpstart-features-v1'
USER_AGENT_PROPENSITY_TRAINING = 'cloud-solutions/marketing-analytics-jumpstart-propensity-training-v1'
USER_AGENT_PROPENSITY_PREDICTION= 'cloud-solutions/marketing-analytics-jumpstart-propensity-prediction-v1'
USER_AGENT_REGRESSION_TRAINING = 'cloud-solutions/marketing-analytics-jumpstart-regression-training-v1'
USER_AGENT_REGRESSION_PREDICTION = 'cloud-solutions/marketing-analytics-jumpstart-regression-prediction-v1'
USER_AGENT_SEGMENTATION_TRAINING = 'cloud-solutions/marketing-analytics-jumpstart-segmentation-training-v1'
USER_AGENT_SEGMENTATION_PREDICTION = 'cloud-solutions/marketing-analytics-jumpstart-segmentation-prediction-v1'
USER_AGENT_VBB_TRAINING = 'cloud-solutions/marketing-analytics-jumpstart-vbb-training-v1'
USER_AGENT_VBB_EXPLANATION = 'cloud-solutions/marketing-analytics-jumpstart-vbb-explanation-v1'
# Construct a BigQuery client object.
client = bigquery.Client(
project=project_id,
location=location,
client_info=ClientInfo(user_agent=USER_AGENT_SEGMENTATION_PREDICTION)
)
# Make an API request.
bq_table = client.get_table(source_table.metadata['table_id'])
destination_table.metadata["table_id"] = f"{source_table.metadata['table_id']}_view"
destination_table.metadata["predictions_column"] = 'prediction'
# View table properties
logging.info(
"Got table '{}.{}.{}'.".format(
bq_table.project, bq_table.dataset_id, bq_table.table_id)
)
predictions_column = None
for i in bq_table.schema:
if (i.name.startswith(source_table.metadata['predictions_column_prefix'])):
predictions_column = i.name
if predictions_column is None:
raise Exception(
f"no prediction field found in given table {source_table.metadata['table_id']}")
query = f"""
CREATE OR REPLACE VIEW `{destination_table.metadata["table_id"]}` AS (SELECT
{predictions_column} AS {destination_table.metadata["predictions_column"]}
, * EXCEPT({predictions_column})
FROM `{source_table.metadata['table_id']}`)
"""
job_config = bigquery.QueryJobConfig()
job_config.write_disposition = 'WRITE_TRUNCATE'
"""
# Make an API request to create the view.
view = bigquery.Table(f"{table.metadata['table_id']}_view")
view.view_query = query
view = client.create_table(table = view)
logging.info(f"Created {view.table_type}: {str(view.reference)}")
"""
query_job = client.query(
query=query,
location=location
)
results = query_job.result()
for row in results:
logging.info("row info: {}".format(row))
@component(base_image=base_image)
def bq_dynamic_query_exec_output(
location: str,
project_id: str,
dataset: str,
create_table: str,
mds_project_id: str,
mds_dataset: str,
date_start: str,
date_end: str,
reg_expression: str,
destination_table: Output[Dataset],
perc_keep: int = 35,
) -> None:
"""Executes a dynamic BigQuery query and stores the results in a BigQuery table.
Args:
location: The location of the BigQuery dataset.
project_id: The project ID of the BigQuery dataset.
dataset: The dataset ID of the BigQuery dataset.
create_table: The name of the BigQuery table to create.
mds_project_id: The project ID of the Marketing Data Store dataset.
mds_dataset: The dataset ID of the Marketing Data Store dataset.
date_start: The start date of the query.
date_end: The end date of the query.
reg_expression: The regular expression to use to extract features from the page_path column.
destination_table: Output artifact for the BigQuery table.
perc_keep: The percentage of features to keep in the output table.
"""
from google.cloud import bigquery
import logging
import numpy as np
import pandas as pd
import jinja2
import re
from google.api_core.gapic_v1.client_info import ClientInfo
USER_AGENT_FEATURES = 'cloud-solutions/marketing-analytics-jumpstart-features-v1'
USER_AGENT_PROPENSITY_TRAINING = 'cloud-solutions/marketing-analytics-jumpstart-propensity-training-v1'
USER_AGENT_PROPENSITY_PREDICTION= 'cloud-solutions/marketing-analytics-jumpstart-propensity-prediction-v1'
USER_AGENT_REGRESSION_TRAINING = 'cloud-solutions/marketing-analytics-jumpstart-regression-training-v1'
USER_AGENT_REGRESSION_PREDICTION = 'cloud-solutions/marketing-analytics-jumpstart-regression-prediction-v1'
USER_AGENT_SEGMENTATION_TRAINING = 'cloud-solutions/marketing-analytics-jumpstart-segmentation-training-v1'
USER_AGENT_SEGMENTATION_PREDICTION = 'cloud-solutions/marketing-analytics-jumpstart-segmentation-prediction-v1'
USER_AGENT_VBB_TRAINING = 'cloud-solutions/marketing-analytics-jumpstart-vbb-training-v1'
USER_AGENT_VBB_EXPLANATION = 'cloud-solutions/marketing-analytics-jumpstart-vbb-explanation-v1'
# Construct a BigQuery client object.
client = bigquery.Client(
project=project_id,
location=location,
client_info=ClientInfo(user_agent=USER_AGENT_SEGMENTATION_TRAINING)
)
# Construct query template
template = jinja2.Template("""
CREATE OR REPLACE TABLE `{{project_id}}.{{dataset}}.{{create_table}}` AS (
SELECT DISTINCT
feature,
ROUND(100 * SUM(users) OVER (ORDER BY users DESC) / SUM(users) OVER (), 2) as cumulative_traffic_percent,
FROM (
SELECT
REGEXP_EXTRACT(page_path, '{{re_page_path}}') as feature,
COUNT(DISTINCT user_pseudo_id) as users
FROM (
SELECT
user_pseudo_id,
user_id,
LOWER(page_location) as page_path
FROM `{{mds_project_id}}.{{mds_dataset}}.event`
WHERE
event_name = 'page_view'
--AND DATE(event_timestamp) BETWEEN '{{date_start}}' AND '{{date_end}}'
)
GROUP BY 1
)
WHERE
feature IS NOT NULL
QUALIFY
cumulative_traffic_percent <= {{perc_keep}}
ORDER BY 2 ASC
)
""")
# Apply parameters to template
sql = template.render(
project_id=project_id,
dataset=dataset,
create_table=create_table,
mds_project_id=mds_project_id,
mds_dataset=mds_dataset,
re_page_path=reg_expression,
date_start=date_start,
date_end=date_end,
perc_keep=perc_keep
)
logging.info(f"{sql}")
# Run the BQ query
query_job = client.query(
query=sql,
location=location
)
results = query_job.result()
for row in results:
logging.info("row info: {}".format(row))
# Extract rows values
sql = f"""SELECT feature FROM `{project_id}.{dataset}.{create_table}`"""
query_df = client.query(query=sql).to_dataframe()
# Prepare component output
destination_table.metadata["table_id"] = f"{project_id}.{dataset}.{create_table}"
destination_table.metadata["features"] = list(query_df.feature.tolist())
@component(base_image=base_image)
def bq_dynamic_stored_procedure_exec_output_full_dataset_preparation(
project_id: str,
location: str,
dataset: str,
mds_project_id: str,
mds_dataset: str,
dynamic_table_input: Input[Dataset],
full_dataset_table_output: Output[Dataset],
reg_expression: str,
stored_procedure_name: str,
full_dataset_table: str,
timeout: Optional[float] = 1800
) -> None:
"""Executes a dynamic BigQuery stored procedure to create a full dataset preparation table.
Args:
project_id: The project ID of the BigQuery dataset.
location: The location of the BigQuery dataset.
dataset: The dataset ID of the BigQuery dataset.
mds_project_id: The project ID of the Marketing Data Store dataset.
mds_dataset: The dataset ID of the Marketing Data Store dataset.
dynamic_table_input: Input artifact for the dynamic table.
full_dataset_table_output: Output artifact for the full dataset preparation table.
reg_expression: The regular expression to use to extract features from the page_path column.
stored_procedure_name: The name of the stored procedure to execute.
full_dataset_table: The name of the full dataset preparation table to create.
timeout: The timeout for the query, in seconds.
"""
from google.cloud import bigquery
import logging
from google.api_core.gapic_v1.client_info import ClientInfo
USER_AGENT_FEATURES = 'cloud-solutions/marketing-analytics-jumpstart-features-v1'
USER_AGENT_PROPENSITY_TRAINING = 'cloud-solutions/marketing-analytics-jumpstart-propensity-training-v1'
USER_AGENT_PROPENSITY_PREDICTION= 'cloud-solutions/marketing-analytics-jumpstart-propensity-prediction-v1'
USER_AGENT_REGRESSION_TRAINING = 'cloud-solutions/marketing-analytics-jumpstart-regression-training-v1'
USER_AGENT_REGRESSION_PREDICTION = 'cloud-solutions/marketing-analytics-jumpstart-regression-prediction-v1'
USER_AGENT_SEGMENTATION_TRAINING = 'cloud-solutions/marketing-analytics-jumpstart-segmentation-training-v1'
USER_AGENT_SEGMENTATION_PREDICTION = 'cloud-solutions/marketing-analytics-jumpstart-segmentation-prediction-v1'
USER_AGENT_VBB_TRAINING = 'cloud-solutions/marketing-analytics-jumpstart-vbb-training-v1'
USER_AGENT_VBB_EXPLANATION = 'cloud-solutions/marketing-analytics-jumpstart-vbb-explanation-v1'
# Construct a BigQuery client object.
client = bigquery.Client(
project=project_id,
location=location,
client_info=ClientInfo(user_agent=USER_AGENT_SEGMENTATION_TRAINING)
)
def _create_auto_audience_segmentation_full_dataset_preparation_procedure(
project_id,
location,
dataset,
mds_project_id,
mds_dataset,
dynamic_table_input,
reg_expression,
stored_procedure_name,
full_dataset_table
) -> None:
import logging
import numpy as np
import pandas as pd
import jinja2
import re
def _clean_column_values(f):
if f == '/' or f == '' or f is None: return 'homepage'
if f.startswith('/'): f = f[1:]
if f.endswith('/'): f = f[:-1]
if f[0].isdigit(): f = '_' + f
return re.sub('[^0-9a-zA-Z]+', '_', f)
template = jinja2.Template("""
CREATE OR REPLACE PROCEDURE `{{project_id}}.{{dataset}}.{{stored_procedure_name}}`(
DATE_START DATE,
DATE_END DATE,
LOOKBACK_DAYS INT64
)
BEGIN
DECLARE RE_PAGE_PATH STRING DEFAULT "{{reg_expression|e}}";
CREATE OR REPLACE TABLE `{{project_id}}.{{dataset}}.{{full_dataset_table}}`
AS
WITH
visitor_pool AS (
SELECT
user_pseudo_id,
user_id,
MAX(event_timestamp) as feature_timestamp,
DATE(MAX(event_timestamp)) - LOOKBACK_DAYS as date_lookback
FROM `{{mds_project_id}}.{{mds_dataset}}.event`
WHERE DATE(event_timestamp) BETWEEN DATE_START AND DATE_END
GROUP BY 1, 2
)
SELECT
user_pseudo_id,
user_id,
feature_timestamp,
{% for f in features %}COUNTIF( REGEXP_EXTRACT(page_path, RE_PAGE_PATH) = '{{ f }}' ) as {{ clean_column_values(f) }},
{% endfor %}
FROM (
SELECT
vp.feature_timestamp,
ga.user_pseudo_id,
ga.user_id,
page_location as page_path
FROM `{{mds_project_id}}.{{mds_dataset}}.event` as ga
INNER JOIN visitor_pool as vp
ON vp.user_pseudo_id = ga.user_pseudo_id
AND DATE(ga.event_timestamp) >= vp.date_lookback
WHERE
event_name = 'page_view'
AND DATE(ga.event_timestamp) BETWEEN DATE_START AND DATE_END
)
GROUP BY 1, 2, 3;
END
""")
template.globals.update({'clean_column_values': _clean_column_values})
sql = template.render(
project_id=project_id,
dataset=dataset,
stored_procedure_name = stored_procedure_name,
full_dataset_table=full_dataset_table,
mds_project_id=mds_project_id,
mds_dataset=mds_dataset,
reg_expression=reg_expression,
features= dynamic_table_input.metadata['features'] if isinstance(dynamic_table_input.metadata['features'], List) else list(dynamic_table_input.metadata['features'])
)
logging.info(f"{sql}")
return sql
sql = _create_auto_audience_segmentation_full_dataset_preparation_procedure(project_id,
location,
dataset,
mds_project_id,
mds_dataset,
dynamic_table_input,
reg_expression,
stored_procedure_name,
full_dataset_table)
# Run the BQ query
query_job = client.query(
query=sql,
location=location
)
results = query_job.result()
for row in results:
logging.info("row info: {}".format(row))
# Prepare component output
full_dataset_table_output.metadata["table_id"] = f"{project_id}.{dataset}.{full_dataset_table}"
full_dataset_table_output.metadata["stored_procedure_name"] = f"{project_id}.{dataset}.{stored_procedure_name}"
##TODO: improve code
@component(base_image=base_image)
def bq_union_predictions_tables(
project_id: str,
location: str,
predictions_table_propensity: Input[Dataset],
predictions_table_regression: Input[Dataset],
table_propensity_bq_unique_key: str,
table_regression_bq_unique_key: str,
destination_table: Output[Dataset],
threashold: float
):
"""Unions the predictions from two BigQuery tables into a single table.
Args:
project_id: The project ID of the BigQuery dataset.
location: The location of the BigQuery dataset.
predictions_table_propensity: Input artifact for the propensity predictions table.
predictions_table_regression: Input artifact for the regression predictions table.
table_propensity_bq_unique_key: The unique key column in the propensity predictions table.
table_regression_bq_unique_key: The unique key column in the regression predictions table.
destination_table: Output artifact for the unioned predictions table.
threashold: The threshold for determining the predicted class for the propensity predictions.
"""
from google.cloud import bigquery
import logging
from google.api_core.gapic_v1.client_info import ClientInfo
USER_AGENT_FEATURES = 'cloud-solutions/marketing-analytics-jumpstart-features-v1'
USER_AGENT_PROPENSITY_TRAINING = 'cloud-solutions/marketing-analytics-jumpstart-propensity-training-v1'
USER_AGENT_PROPENSITY_PREDICTION= 'cloud-solutions/marketing-analytics-jumpstart-propensity-prediction-v1'
USER_AGENT_REGRESSION_TRAINING = 'cloud-solutions/marketing-analytics-jumpstart-regression-training-v1'
USER_AGENT_REGRESSION_PREDICTION = 'cloud-solutions/marketing-analytics-jumpstart-regression-prediction-v1'
USER_AGENT_SEGMENTATION_TRAINING = 'cloud-solutions/marketing-analytics-jumpstart-segmentation-training-v1'
USER_AGENT_SEGMENTATION_PREDICTION = 'cloud-solutions/marketing-analytics-jumpstart-segmentation-prediction-v1'
USER_AGENT_VBB_TRAINING = 'cloud-solutions/marketing-analytics-jumpstart-vbb-training-v1'
USER_AGENT_VBB_EXPLANATION = 'cloud-solutions/marketing-analytics-jumpstart-vbb-explanation-v1'
# Construct a BigQuery client object.
client = bigquery.Client(
project=project_id,
location=location,
client_info=ClientInfo(user_agent=USER_AGENT_REGRESSION_PREDICTION)
)
# Inspect the metadata set on destination_table and predictions_table
logging.info(destination_table.metadata)
logging.info(predictions_table_propensity.metadata)
logging.info(predictions_table_regression.metadata)
# Get BigQuery Table Object
bq_table_propensity = client.get_table(predictions_table_propensity.metadata['table_id'])
# View table properties
logging.info(
"Got table '{}.{}.{} located at {}'.".format(
bq_table_propensity.project, bq_table_propensity.dataset_id, bq_table_propensity.table_id, bq_table_propensity.location)
)
# Get BigQuery Table Object
bq_table_regression = client.get_table(predictions_table_regression.metadata['table_id'])
# View table properties
logging.info(
"Got table '{}.{}.{} located at {}'.".format(
bq_table_regression.project, bq_table_regression.dataset_id, bq_table_regression.table_id, bq_table_regression.location)
)
# Get table prediction column
predictions_column_propensity = None
for i in bq_table_propensity.schema:
if (i.name.startswith(predictions_table_propensity.metadata['predictions_column_prefix'])):
predictions_column_propensity = i.name
if predictions_column_propensity is None:
raise Exception(
f"no prediction field found in given table {predictions_table_propensity.metadata['table_id']}")
predictions_column_regression = None
for i in bq_table_regression.schema:
if (i.name.startswith(predictions_table_regression.metadata['predictions_column'])):
predictions_column_regression = i.name
if predictions_column_regression is None:
raise Exception(
f"no prediction field found in given table {predictions_table_regression.metadata['table_id']}")
destination_table.metadata["table_id"] = f"{predictions_table_regression.metadata['table_id']}_final"
destination_table.metadata["predictions_column"] = 'prediction'
query = f"""
CREATE OR REPLACE TEMP TABLE prediction_indexes AS (
SELECT
(SELECT offset from UNNEST({predictions_column_propensity}.classes) c with offset where c = "0") AS index_zero,
(SELECT offset from UNNEST({predictions_column_propensity}.classes) c with offset where c = "1") AS index_one,
{predictions_column_propensity},
* EXCEPT({predictions_column_propensity})
FROM `{predictions_table_propensity.metadata['table_id']}`
);
CREATE OR REPLACE TEMP TABLE prediction_greatest_scores AS (
SELECT
{predictions_column_propensity}.scores[SAFE_OFFSET(index_zero)] AS score_zero,
{predictions_column_propensity}.scores[SAFE_OFFSET(index_one)] AS score_one,
GREATEST({predictions_column_propensity}.scores[SAFE_OFFSET(index_zero)], {predictions_column_propensity}.scores[SAFE_OFFSET(index_one)]) AS greatest_score,
* EXCEPT({predictions_column_propensity})
FROM prediction_indexes
);
CREATE OR REPLACE TEMP TABLE flattened_prediction AS (
SELECT
CASE
WHEN a.score_zero > {threashold} THEN 'false'
WHEN a.score_one > {threashold} THEN 'true'
ELSE 'false'
END AS {predictions_column_regression},
a.score_one AS prediction_prob,
a.*
FROM prediction_greatest_scores AS a
);
CREATE OR REPLACE TEMP TABLE non_purchasers_prediction AS (
SELECT
B.{table_regression_bq_unique_key},
0.0 AS clv_prediction,
B.* EXCEPT({table_regression_bq_unique_key}, {predictions_column_regression})
FROM
flattened_prediction A
INNER JOIN
`{predictions_table_regression.metadata['table_id']}` B
ON
A.prediction_prob <= {threashold}
AND A.{table_propensity_bq_unique_key} = B.{table_regression_bq_unique_key}
);
CREATE OR REPLACE TEMP TABLE purchasers_prediction AS (
SELECT
B.{table_regression_bq_unique_key},
COALESCE(B.{predictions_column_regression}, 0.0) AS clv_prediction,
B.* EXCEPT({table_regression_bq_unique_key}, {predictions_column_regression})
FROM
flattened_prediction A
INNER JOIN
`{predictions_table_regression.metadata['table_id']}` B
ON
A.prediction_prob > {threashold}
AND A.{table_propensity_bq_unique_key} = B.{table_regression_bq_unique_key}
);
CREATE OR REPLACE TABLE `{destination_table.metadata["table_id"]}` AS
SELECT
A.clv_prediction AS {destination_table.metadata["predictions_column"]},
A.* EXCEPT(clv_prediction)
FROM
non_purchasers_prediction A
UNION ALL
SELECT
B.clv_prediction AS {destination_table.metadata["predictions_column"]},
B.* EXCEPT(clv_prediction)
FROM
purchasers_prediction B
;
"""
logging.info(query)
job_config = bigquery.QueryJobConfig()
job_config.write_disposition = 'WRITE_TRUNCATE'
# Reconstruct a BigQuery client object.
client = bigquery.Client(
project=project_id,
location=bq_table_regression.location,
client_info=ClientInfo(user_agent=USER_AGENT_REGRESSION_PREDICTION)
)
query_job = client.query(
query=query,
location=bq_table_regression.location,
)
results = query_job.result()
for row in results:
logging.info("row info: {}".format(row))
# This component writes Tabular Workflows feature importance values to a BigQuery table
@component(base_image=base_image)
def write_tabular_model_explanation_to_bigquery(
project: str,
location: str,
data_location: str,
destination_table: str,
model_explanation: Input[Dataset],
):
"""Writess tabular model explanation values to a BigQuery table.
Args:
project: project ID or project number of the Cloud project you want to use.
location: location of the BigQuery tables and datasets
data_location: location of the BigQuery tables and datasets
destination_table: table to be written to
model_explanation: Input artifact to be provided for extracting the model explanation values.
"""
import logging
from google.cloud import bigquery
from google.cloud.exceptions import NotFound
from google.api_core.retry import Retry
from google.api_core import exceptions
import time
from google.api_core.gapic_v1.client_info import ClientInfo
USER_AGENT_FEATURES = 'cloud-solutions/marketing-analytics-jumpstart-features-v1'
USER_AGENT_PROPENSITY_TRAINING = 'cloud-solutions/marketing-analytics-jumpstart-propensity-training-v1'
USER_AGENT_PROPENSITY_PREDICTION= 'cloud-solutions/marketing-analytics-jumpstart-propensity-prediction-v1'
USER_AGENT_REGRESSION_TRAINING = 'cloud-solutions/marketing-analytics-jumpstart-regression-training-v1'
USER_AGENT_REGRESSION_PREDICTION = 'cloud-solutions/marketing-analytics-jumpstart-regression-prediction-v1'
USER_AGENT_SEGMENTATION_TRAINING = 'cloud-solutions/marketing-analytics-jumpstart-segmentation-training-v1'
USER_AGENT_SEGMENTATION_PREDICTION = 'cloud-solutions/marketing-analytics-jumpstart-segmentation-prediction-v1'
USER_AGENT_VBB_TRAINING = 'cloud-solutions/marketing-analytics-jumpstart-vbb-training-v1'
USER_AGENT_VBB_EXPLANATION = 'cloud-solutions/marketing-analytics-jumpstart-vbb-explanation-v1'
client = bigquery.Client(
project=project,
location=data_location,
client_info=ClientInfo(user_agent=USER_AGENT_VBB_EXPLANATION)
)
feature_names = model_explanation.metadata['feature_names']
values = model_explanation.metadata['values']
model_id = model_explanation.metadata['model_id']
model_name = model_explanation.metadata['model_name']
model_version = model_explanation.metadata['model_version']
# Check if table exists continue otherwise create table
query = """CREATE OR REPLACE TABLE `"""+ destination_table +"""` (
processed_timestamp TIMESTAMP,
model_id STRING,
model_name STRING,
model_version STRING,
feature_names STRING,
values FLOAT64
)
OPTIONS (
description = "A table with feature names and numerical values"
);
"""
# Execute the query as a job
try:
query_job = client.query(query)
# Wait for the query job to complete
query_job.result() # Waits for job to finish
# Get query results and convert to pandas DataFrame
df = query_job.to_dataframe()
logging.info(df)
except NotFound as e:
logging.error(f"Error during vbb weights CREATE query job execution: {e}")
# Build the INSERT query
insert_query = "INSERT INTO `{}` (processed_timestamp, model_id, model_name, model_version, feature_names, values) VALUES ".format(destination_table)
for i in range(len(feature_names)):
insert_query += "(CURRENT_TIMESTAMP(), '{}', '{}', '{}', '{}', {}), ".format(model_id, model_name, model_version, feature_names[i], values[i])
insert_query = insert_query[:-2]
logging.info(insert_query)
# Execute the insert a job with retries because the table isnt ready to be used sometimes after creation
# Retry configuration
max_retries = 5
retry_delay = 15 # Seconds to wait between retries
def retry_if_exception_type(exception_types):
def decorator(func):
def new_func(*args, **kwargs):
try:
return func(*args, **kwargs)
except exception_types as error:
raise exceptions.RetryError(error, cause=error)
return new_func
return decorator
retry_predicate = Retry(
predicate=retry_if_exception_type(
exceptions.NotFound
),
)
def execute_query_with_retries(query):
"""Executes the query with retries."""
query_job = client.query(query, retry=retry_predicate)
while not query_job.done(): # Check if the query job is complete
print("Query running...")
time.sleep(retry_delay) # Wait before checking status again
query_job.reload() # Reload the job state
if query_job.errors:
raise RuntimeError(f"Query errors: {query_job.errors}")
return query_job.result() # Return the results
# Execute the query
try:
result = execute_query_with_retries(insert_query)
except RuntimeError as e:
logging.error(f"Query failed after retries: {e}")