in retail/recommendation-system/bqml-mlops/part_2/pipeline.py [0:0]
def training_pipeline(project_id:str, dataset_name:str, model_storage:str, base_image_path:str):
import json
#############################
#Defining pipeline execution graph
dataset = dataset_name
base_image = 'YOUR BASE IMAGE CONTAINER URL FOR COMPONENTS BELOW'
#Minimum threshold for model metric to determine if model will be deployed to inference: 0.5 is a basically a coin toss with 50/50 chance
mf_msqe_threshold = 0.5
class_auc_threshold = 0.8
#Defining function containers
ddlop = comp.func_to_container_op(run_bigquery_ddl, packages_to_install=['google-cloud-bigquery'])
evaluate_mf_op = comp.func_to_container_op(evaluate_matrix_factorization_model, base_image=base_image, packages_to_install=['google-cloud-bigquery','pandas'])
evaluate_class_op = comp.func_to_container_op(evaluate_class, base_image=base_image, packages_to_install=['google-cloud-bigquery','pandas'])
export_bqml_model_op = comp.func_to_container_op(export_bqml_model, base_image=base_image, packages_to_install=['google-cloud-bigquery'])
#Train matrix factorization model
mf_model_output = train_matrix_factorization_model(ddlop, project_id, dataset).set_display_name('train matrix factorization model')
mf_model_output.execution_options.caching_strategy.max_cache_staleness = 'P0D'
mf_model = mf_model_output.outputs['created_table']
#Evaluate matrix factorization model
mf_eval_output = evaluate_mf_op(project_id, mf_model).set_display_name('evaluate matrix factorization model')
mf_eval_output.execution_options.caching_strategy.max_cache_staleness = 'P0D'
#mean squared quantization error
with dsl.Condition(mf_eval_output.outputs['msqe'] < mf_msqe_threshold):
#Create features for Classification model
user_features_output = create_user_features(ddlop, project_id, dataset, mf_model).set_display_name('create user factors features')
user_features = user_features_output.outputs['created_table']
user_features_output.execution_options.caching_strategy.max_cache_staleness = 'P0D'
hotel_features_output = create_hotel_features(ddlop, project_id, dataset, mf_model).set_display_name('create hotel factors features')
hotel_features = hotel_features_output.outputs['created_table']
hotel_features_output.execution_options.caching_strategy.max_cache_staleness = 'P0D'
total_features_output = combine_features(ddlop, project_id, dataset, mf_model, hotel_features, user_features).set_display_name('combine all features')
total_features = total_features_output.outputs['created_table']
total_features_output.execution_options.caching_strategy.max_cache_staleness = 'P0D'
#Train XGBoost model
class_model_output = train_xgboost_model(ddlop, project_id, dataset, total_features).set_display_name('train XGBoost model')
class_model = class_model_output.outputs['created_table']
class_model_output.execution_options.caching_strategy.max_cache_staleness = 'P0D'
class_model = 'hotel_recommendations.recommender_hybrid_xgboost_prod'
#Evaluate XGBoost model
class_eval_output = evaluate_class_op(project_id, dataset, class_model, total_features).set_display_name('evaluate XGBoost model')
class_eval_output.execution_options.caching_strategy.max_cache_staleness = 'P0D'
with dsl.Condition(class_eval_output.outputs['roc_auc'] > class_auc_threshold):
#Export model
export_destination_output = export_bqml_model_op(project_id, class_model, model_storage).set_display_name('export XGBoost model')
export_destination_output.execution_options.caching_strategy.max_cache_staleness = 'P0D'
export_destination = export_destination_output.outputs['destination']