def training_pipeline()

in retail/recommendation-system/bqml-mlops/part_2/pipeline.py [0:0]


def training_pipeline(project_id:str, dataset_name:str, model_storage:str, base_image_path:str):

    import json
        
    #############################                                 
    #Defining pipeline execution graph
    dataset = dataset_name
    base_image = 'YOUR BASE IMAGE CONTAINER URL FOR COMPONENTS BELOW'
    
    #Minimum threshold for model metric to determine if model will be deployed to inference: 0.5 is a basically a coin toss with 50/50 chance
    mf_msqe_threshold = 0.5
    class_auc_threshold = 0.8
    
    #Defining function containers
    ddlop = comp.func_to_container_op(run_bigquery_ddl, packages_to_install=['google-cloud-bigquery'])
    evaluate_mf_op = comp.func_to_container_op(evaluate_matrix_factorization_model, base_image=base_image, packages_to_install=['google-cloud-bigquery','pandas'])
    evaluate_class_op = comp.func_to_container_op(evaluate_class, base_image=base_image, packages_to_install=['google-cloud-bigquery','pandas'])
    export_bqml_model_op = comp.func_to_container_op(export_bqml_model, base_image=base_image, packages_to_install=['google-cloud-bigquery'])
    
    #Train matrix factorization model
    mf_model_output = train_matrix_factorization_model(ddlop, project_id, dataset).set_display_name('train matrix factorization model')
    mf_model_output.execution_options.caching_strategy.max_cache_staleness = 'P0D'
    mf_model = mf_model_output.outputs['created_table']
    
    #Evaluate matrix factorization model
    mf_eval_output = evaluate_mf_op(project_id, mf_model).set_display_name('evaluate matrix factorization model')
    mf_eval_output.execution_options.caching_strategy.max_cache_staleness = 'P0D'
    
    #mean squared quantization error 
    with dsl.Condition(mf_eval_output.outputs['msqe'] < mf_msqe_threshold):
    
        #Create features for Classification model
        user_features_output = create_user_features(ddlop, project_id, dataset, mf_model).set_display_name('create user factors features')
        user_features = user_features_output.outputs['created_table']
        user_features_output.execution_options.caching_strategy.max_cache_staleness = 'P0D'
        
        hotel_features_output = create_hotel_features(ddlop, project_id, dataset, mf_model).set_display_name('create hotel factors features')
        hotel_features = hotel_features_output.outputs['created_table']
        hotel_features_output.execution_options.caching_strategy.max_cache_staleness = 'P0D'

        total_features_output = combine_features(ddlop, project_id, dataset, mf_model, hotel_features, user_features).set_display_name('combine all features')
        total_features = total_features_output.outputs['created_table']
        total_features_output.execution_options.caching_strategy.max_cache_staleness = 'P0D'

        #Train XGBoost model
        class_model_output = train_xgboost_model(ddlop, project_id, dataset, total_features).set_display_name('train XGBoost model')
        class_model = class_model_output.outputs['created_table']
        class_model_output.execution_options.caching_strategy.max_cache_staleness = 'P0D'

        class_model = 'hotel_recommendations.recommender_hybrid_xgboost_prod'
    
        #Evaluate XGBoost model
        class_eval_output = evaluate_class_op(project_id, dataset, class_model, total_features).set_display_name('evaluate XGBoost model')
        class_eval_output.execution_options.caching_strategy.max_cache_staleness = 'P0D'
        
        with dsl.Condition(class_eval_output.outputs['roc_auc'] > class_auc_threshold):
            #Export model
            export_destination_output = export_bqml_model_op(project_id, class_model, model_storage).set_display_name('export XGBoost model')
            export_destination_output.execution_options.caching_strategy.max_cache_staleness = 'P0D'
            export_destination = export_destination_output.outputs['destination']