retail/recommendation-system/bqml-mlops/part_2/pipeline.py (134 lines of code) (raw):
from typing import NamedTuple
import json
import os
import fire
def run_bigquery_ddl(project_id: str, query_string: str, location: str) -> NamedTuple(
'DDLOutput', [('created_table', str), ('query', str)]):
"""
Runs BigQuery query and returns a table/model name
"""
print(query_string)
from google.cloud import bigquery
from google.api_core.future import polling
from google.cloud import bigquery
from google.cloud.bigquery import retry as bq_retry
bqclient = bigquery.Client(project=project_id, location=location)
job = bqclient.query(query_string, retry=bq_retry.DEFAULT_RETRY)
job._retry = polling.DEFAULT_RETRY
while job.running():
from time import sleep
sleep(0.1)
print('Running ...')
tblname = job.ddl_target_table
tblname = '{}.{}'.format(tblname.dataset_id, tblname.table_id)
print('{} created in {}'.format(tblname, job.ended - job.started))
from collections import namedtuple
result_tuple = namedtuple('DDLOutput', ['created_table', 'query'])
return result_tuple(tblname, query_string)
def train_matrix_factorization_model(ddlop, project_id: str, dataset: str):
query = """
CREATE OR REPLACE MODEL `{project_id}.{dataset}.my_implicit_mf_model_quantiles_demo_binary_prod`
OPTIONS
(model_type='matrix_factorization',
feedback_type='implicit',
user_col='user_id',
item_col='hotel_cluster',
rating_col='rating',
l2_reg=30,
num_factors=15) AS
SELECT
user_id,
hotel_cluster,
if(sum(is_booking) > 0, 1, sum(is_booking)) AS rating
FROM `{project_id}.{dataset}.hotel_train`
group by 1,2
""".format(project_id = project_id, dataset = dataset)
return ddlop(project_id, query, 'US')
def evaluate_matrix_factorization_model(project_id:str, mf_model:str, location:str='US')-> NamedTuple('MFMetrics', [('msqe', float)]):
query = """
SELECT * FROM ML.EVALUATE(MODEL `{project_id}.{mf_model}`)
""".format(project_id = project_id, mf_model = mf_model)
print(query)
from google.cloud import bigquery
import json
bqclient = bigquery.Client(project=project_id, location=location)
job = bqclient.query(query)
metrics_df = job.result().to_dataframe()
from collections import namedtuple
result_tuple = namedtuple('MFMetrics', ['msqe'])
return result_tuple(metrics_df.loc[0].to_dict()['mean_squared_error'])
def create_user_features(ddlop, project_id:str, dataset:str, mf_model:str):
#Feature engineering for useres
query = """
CREATE OR REPLACE TABLE `{project_id}.{dataset}.user_features_prod` AS
WITH u as
(
select
user_id,
count(*) as total_visits,
count(distinct user_location_city) as distinct_cities,
sum(distinct site_name) as distinct_sites,
sum(is_mobile) as total_mobile,
sum(is_booking) as total_bookings,
FROM `{project_id}.{dataset}.hotel_train`
GROUP BY 1
)
SELECT
u.*,
(SELECT ARRAY_AGG(weight) FROM UNNEST(factor_weights)) AS user_factors
FROM
u JOIN ML.WEIGHTS( MODEL `{mf_model}`) w
ON processed_input = 'user_id' AND feature = CAST(u.user_id AS STRING)
""".format(project_id = project_id, dataset = dataset, mf_model=mf_model)
return ddlop(project_id, query, 'US')
def create_hotel_features(ddlop, project_id:str, dataset:str, mf_model:str):
#Feature eingineering for hotels
query = """
CREATE OR REPLACE TABLE `{project_id}.{dataset}.hotel_features_prod` AS
WITH h as
(
select
hotel_cluster,
count(*) as total_cluster_searches,
count(distinct hotel_country) as distinct_hotel_countries,
sum(distinct hotel_market) as distinct_hotel_markets,
sum(is_mobile) as total_mobile_searches,
sum(is_booking) as total_cluster_bookings,
FROM `{project_id}.{dataset}.hotel_train`
group by 1
)
SELECT
h.*,
(SELECT ARRAY_AGG(weight) FROM UNNEST(factor_weights)) AS hotel_factors
FROM
h JOIN ML.WEIGHTS( MODEL `{mf_model}`) w
ON processed_input = 'hotel_cluster' AND feature = CAST(h.hotel_cluster AS STRING)
""".format(project_id = project_id, dataset = dataset, mf_model=mf_model)
return ddlop(project_id, query, 'US')
def combine_features(ddlop, project_id:str, dataset:str, mf_model:str, hotel_features:str, user_features:str):
#Combine user and hotel embedding features with the rating associated with each combination
query = """
CREATE OR REPLACE TABLE `{project_id}.{dataset}.total_features_prod` AS
with ratings as(
SELECT
user_id,
hotel_cluster,
if(sum(is_booking) > 0, 1, sum(is_booking)) AS rating
FROM `{project_id}.{dataset}.hotel_train`
group by 1,2
)
select
h.* EXCEPT(hotel_cluster),
u.* EXCEPT(user_id),
IFNULL(rating,0) as rating
from `{hotel_features}` h, `{user_features}` u
LEFT OUTER JOIN ratings r
ON r.user_id = u.user_id AND r.hotel_cluster = h.hotel_cluster
""".format(project_id = project_id, dataset = dataset, mf_model=mf_model, hotel_features=hotel_features, user_features=user_features)
return ddlop(project_id, query, 'US')
def train_xgboost_model(ddlop, project_id:str, dataset:str, total_features:str):
#Combine user and hotel embedding features with the rating associated with each combination
query = """
CREATE OR REPLACE MODEL `{project_id}.{dataset}.recommender_hybrid_xgboost_prod`
OPTIONS(model_type='boosted_tree_classifier', input_label_cols=['rating'], AUTO_CLASS_WEIGHTS=True)
AS
SELECT
* EXCEPT(user_factors, hotel_factors),
{dataset}.arr_to_input_15_users(user_factors).*,
{dataset}.arr_to_input_15_hotels(hotel_factors).*
FROM
`{total_features}`
""".format(project_id = project_id, dataset = dataset, total_features=total_features)
return ddlop(project_id, query, 'US')
def evaluate_class(project_id:str, dataset:str, class_model:str, total_features:str, location:str='US')-> NamedTuple('ClassMetrics', [('roc_auc', float)]):
query = """
SELECT
*
FROM ML.EVALUATE(MODEL `{class_model}`, (
SELECT
* EXCEPT(user_factors, hotel_factors),
{dataset}.arr_to_input_15_users(user_factors).*,
{dataset}.arr_to_input_15_hotels(hotel_factors).*
FROM
`{total_features}`
))
""".format(dataset = dataset, class_model = class_model, total_features = total_features)
print(query)
from google.cloud import bigquery
bqclient = bigquery.Client(project=project_id, location=location)
job = bqclient.query(query)
metrics_df = job.result().to_dataframe()
from collections import namedtuple
result_tuple = namedtuple('ClassMetrics', ['roc_auc'])
return result_tuple(metrics_df.loc[0].to_dict()['roc_auc'])
def export_bqml_model(project_id:str, model:str, destination:str) -> NamedTuple('ModelExport', [('destination', str)]):
import subprocess
#command='bq extract -destination_format=ML_XGBOOST_BOOSTER -m {}:{} {}'.format(project_id, model, destination)
model_name = '{}:{}'.format(project_id, model)
print (model_name)
subprocess.run(['bq', 'extract', '-destination_format=ML_XGBOOST_BOOSTER', '-m', model_name, destination], check=True)
from collections import namedtuple
result_tuple = namedtuple('ModelExport', ['destination'])
return result_tuple(destination)
import kfp.dsl as dsl
import kfp.components as comp
import time
@dsl.pipeline(
name='Training pipeline for hotel recommendation prediction',
description='Training pipeline for hotel recommendation prediction'
)
def training_pipeline(project_id:str, dataset_name:str, model_storage:str, base_image_path:str):
import json
#############################
#Defining pipeline execution graph
dataset = dataset_name
base_image = 'YOUR BASE IMAGE CONTAINER URL FOR COMPONENTS BELOW'
#Minimum threshold for model metric to determine if model will be deployed to inference: 0.5 is a basically a coin toss with 50/50 chance
mf_msqe_threshold = 0.5
class_auc_threshold = 0.8
#Defining function containers
ddlop = comp.func_to_container_op(run_bigquery_ddl, packages_to_install=['google-cloud-bigquery'])
evaluate_mf_op = comp.func_to_container_op(evaluate_matrix_factorization_model, base_image=base_image, packages_to_install=['google-cloud-bigquery','pandas'])
evaluate_class_op = comp.func_to_container_op(evaluate_class, base_image=base_image, packages_to_install=['google-cloud-bigquery','pandas'])
export_bqml_model_op = comp.func_to_container_op(export_bqml_model, base_image=base_image, packages_to_install=['google-cloud-bigquery'])
#Train matrix factorization model
mf_model_output = train_matrix_factorization_model(ddlop, project_id, dataset).set_display_name('train matrix factorization model')
mf_model_output.execution_options.caching_strategy.max_cache_staleness = 'P0D'
mf_model = mf_model_output.outputs['created_table']
#Evaluate matrix factorization model
mf_eval_output = evaluate_mf_op(project_id, mf_model).set_display_name('evaluate matrix factorization model')
mf_eval_output.execution_options.caching_strategy.max_cache_staleness = 'P0D'
#mean squared quantization error
with dsl.Condition(mf_eval_output.outputs['msqe'] < mf_msqe_threshold):
#Create features for Classification model
user_features_output = create_user_features(ddlop, project_id, dataset, mf_model).set_display_name('create user factors features')
user_features = user_features_output.outputs['created_table']
user_features_output.execution_options.caching_strategy.max_cache_staleness = 'P0D'
hotel_features_output = create_hotel_features(ddlop, project_id, dataset, mf_model).set_display_name('create hotel factors features')
hotel_features = hotel_features_output.outputs['created_table']
hotel_features_output.execution_options.caching_strategy.max_cache_staleness = 'P0D'
total_features_output = combine_features(ddlop, project_id, dataset, mf_model, hotel_features, user_features).set_display_name('combine all features')
total_features = total_features_output.outputs['created_table']
total_features_output.execution_options.caching_strategy.max_cache_staleness = 'P0D'
#Train XGBoost model
class_model_output = train_xgboost_model(ddlop, project_id, dataset, total_features).set_display_name('train XGBoost model')
class_model = class_model_output.outputs['created_table']
class_model_output.execution_options.caching_strategy.max_cache_staleness = 'P0D'
class_model = 'hotel_recommendations.recommender_hybrid_xgboost_prod'
#Evaluate XGBoost model
class_eval_output = evaluate_class_op(project_id, dataset, class_model, total_features).set_display_name('evaluate XGBoost model')
class_eval_output.execution_options.caching_strategy.max_cache_staleness = 'P0D'
with dsl.Condition(class_eval_output.outputs['roc_auc'] > class_auc_threshold):
#Export model
export_destination_output = export_bqml_model_op(project_id, class_model, model_storage).set_display_name('export XGBoost model')
export_destination_output.execution_options.caching_strategy.max_cache_staleness = 'P0D'
export_destination = export_destination_output.outputs['destination']
def main(**args):
#Specify pipeline argument values
arguments = {
'project_id': args['project_id'],
'dataset_name': args['dataset_name'],
'model_storage': args['model_storage']
}
pipeline_func = training_pipeline
pipeline_filename = pipeline_func.__name__ + '.zip'
import kfp.compiler as compiler
import kfp
compiler.Compiler().compile(pipeline_func, pipeline_filename)
#Get or create an experiment and submit a pipeline run
client = kfp.Client(args['kfp_host'])
experiment = client.create_experiment('hotel_recommender_experiment')
#Submit a pipeline run
run_name = pipeline_func.__name__ + ' run'
run_result = client.run_pipeline(experiment.id, run_name, pipeline_filename, arguments)
if __name__ == '__main__':
fire.Fire(main)