retail/recommendation-system/bqml-mlops/part_3/vertex_ai_pipeline.ipynb (766 lines of code) (raw):
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Copyright 2020 Google LLC\n",
"#\n",
"# Licensed under the Apache License, Version 2.0 (the \"License\");\n",
"# you may not use this file except in compliance with the License.\n",
"# You may obtain a copy of the License at\n",
"#\n",
"# https://www.apache.org/licenses/LICENSE-2.0\n",
"#\n",
"# Unless required by applicable law or agreed to in writing, software\n",
"# distributed under the License is distributed on an \"AS IS\" BASIS,\n",
"# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n",
"# See the License for the specific language governing permissions and\n",
"# limitations under the License."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Tutorial Overview \n",
"\n",
"This is part three of the tutorial where you will learn how to run same code in [Part One](../README.md) (with minor changes) in Google's new Vertex AI pipeline. Vertex Pipelines helps you to automate, monitor, and govern your ML systems by orchestrating your ML workflow in a serverless manner, and storing your workflow's artifacts using Vertex ML Metadata. By storing the artifacts of your ML workflow in Vertex ML Metadata, you can analyze the lineage of your workflow's artifacts — for example, an ML model's lineage may include the training data, hyperparameters, and code that were used to create the model.\n",
"\n",
"You will also learn how to export the final BQML model and hosted on the Google Vertex AI Endpoint. "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
""
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Prerequisites\n",
"* Download the [Expedia Hotel Recommendation Dataset](https://www.kaggle.com/c/expedia-hotel-recommendations) from Kaggle. You will be mostly working with the train.csv dataset for this tutorial\n",
"* Upload the dataset to BigQuery by following the how-to guide [Loading CSV Data](https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-csv)\n",
"* Follow the how-to guide [create flex slots, reservation and assignment in BigQuery](https://cloud.google.com/bigquery/docs/reservations-get-started) for training ML models. <strong>Make sure to create Flex slots and not month/year slots so you can delete them after the tutorial.</strong> \n",
"* Build and push a docker image using [this dockerfile](dockerfile) as the base image for the Kubeflow pipeline components. \n",
"* Create or use a [Google Cloud Storage](https://console.cloud.google.com/storage) bucket to export the finalized model to. <strong>Make sure to create the bucket in the same region where you will create Vertex AI Endpoint to host your model.</strong> \n",
"* If you do not specify a service account, Vertex Pipelines uses the Compute Engine default service account to run your pipelines. The Compute Engine default service account has the Project Editor role by default so it should have access to BigQuery as well as Google Cloud Storage.\n",
"* Change the following cell to reflect your setup"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"PATH=%env PATH\n",
"%env PATH={PATH}:/home/jupyter/.local/bin\n",
"\n",
"# CHANGE the following settings\n",
"BASE_IMAGE='gcr.io/your-image-name' #This is the image built from the Dockfile in the same folder\n",
"REGION='vertex-ai-region' #For example, us-central1, note that Vertex AI endpoint deployment region must match MODEL_STORAGE bucket region\n",
"MODEL_STORAGE = 'gs://your-bucket-name/folder-name' #Make sure this bucket is created in the same region defined above\n",
"BQ_DATASET_NAME=\"hotel_recommendations\" #This is the name of the target dataset where you model and predictions will be stored\n",
"PROJECT_ID=\"your-project-id\" #This is your GCP project ID that can be found in the GCP console\n",
"\n",
"# Required Parameters for Vertex AI\n",
"USER = 'your-user-name'\n",
"BUCKET_NAME = 'your-bucket-name' \n",
"PIPELINE_ROOT = 'gs://{}/pipeline_root/{}'.format(BUCKET_NAME, USER) #Cloud Storage URI that your pipelines service account can access.\n",
"ENDPOINT_NAME='bqml-hotel-recommendations' #Vertex AI Endpoint Name\n",
"DEPLOY_COMPUTE='n1-standard-4' #Could be any supported Vertex AI Instance Types\n",
"DEPLOY_IMAGE='us-docker.pkg.dev/vertex-ai/prediction/xgboost-cpu.0-82:latest'#Do not change, BQML XGBoost is currently compatible with 0.82\n",
"\n",
"\n",
"print('PIPELINE_ROOT: {}'.format(PIPELINE_ROOT))"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Check the KFP version, The KFP version should be >= 1.6. If lower, run !pip3 install --user kfp --upgrade, then restart the kernel\n",
"!python3 -c \"import kfp; print('KFP version: {}'.format(kfp.__version__))\""
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Create BigQuery function\n",
"\n",
"Create a generic BigQuery function that runs a BigQuery query and returns the table/model created. This will be re-used to return BigQuery results for all the different segments of the BigQuery process in the Kubeflow Pipeline. You will see later in the tutorial where this function is being passed as parameter (ddlop) to other functions to perform certain BigQuery operation."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from typing import NamedTuple\n",
"import json\n",
"import os\n",
"\n",
"def run_bigquery_ddl(project_id: str, query_string: str, location: str) -> NamedTuple(\n",
" 'DDLOutput', [('created_table', str), ('query', str)]):\n",
" \"\"\"\n",
" Runs BigQuery query and returns a table/model name\n",
" \"\"\"\n",
" print(query_string)\n",
" \n",
" from google.cloud import bigquery\n",
" from google.api_core.future import polling\n",
" from google.cloud import bigquery\n",
" from google.cloud.bigquery import retry as bq_retry\n",
" \n",
" bqclient = bigquery.Client(project=project_id, location=location)\n",
" job = bqclient.query(query_string, retry=bq_retry.DEFAULT_RETRY)\n",
" job._retry = polling.DEFAULT_RETRY\n",
" \n",
" print('bq version: {}'.format(bigquery.__version__))\n",
" \n",
" while job.running():\n",
" from time import sleep\n",
" sleep(30)\n",
" print('Running ...')\n",
" \n",
" tblname = '{}.{}'.format(job.ddl_target_table.dataset_id, job.ddl_target_table.table_id)\n",
" print('{} created in {}'.format(tblname, job.ended - job.started))\n",
" \n",
" from collections import namedtuple\n",
" result_tuple = namedtuple('DDLOutput', ['created_table', 'query'])\n",
" return result_tuple(tblname, query_string)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Train Matrix Factorization model and evaluate it\n",
"\n",
"We will start by training a matrix factorization model that will allow us to understand the latent relationship between user and hotel clusters. The reason why we are doing this is because matrix factorization approach can only find latent relationship between a user and a hotel. However, there are other intuitive useful predictors (such as is_mobile, location, and etc) that can improve the model performance. So togther, we can feed the resulting weights/factors as features among with other features to train the final XGBoost model. "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def train_matrix_factorization_model(ddlop, project_id: str, dataset: str):\n",
" query = \"\"\"\n",
" CREATE OR REPLACE MODEL `{project_id}.{dataset}.my_implicit_mf_model_quantiles_demo_binary_prod`\n",
" OPTIONS\n",
" (model_type='matrix_factorization',\n",
" feedback_type='implicit',\n",
" user_col='user_id',\n",
" item_col='hotel_cluster',\n",
" rating_col='rating',\n",
" l2_reg=30,\n",
" num_factors=15) AS\n",
"\n",
" SELECT\n",
" user_id,\n",
" hotel_cluster,\n",
" if(sum(is_booking) > 0, 1, sum(is_booking)) AS rating\n",
" FROM `{project_id}.{dataset}.hotel_train`\n",
" group by 1,2\n",
" \"\"\".format(project_id = project_id, dataset = dataset)\n",
" return ddlop(project_id, query, 'US')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def evaluate_matrix_factorization_model(project_id:str, mf_model: str, location: str='US')-> NamedTuple('MFMetrics', [('msqe', float)]):\n",
" \n",
" query = \"\"\"\n",
" SELECT * FROM ML.EVALUATE(MODEL `{project_id}.{mf_model}`)\n",
" \"\"\".format(project_id = project_id, mf_model = mf_model)\n",
"\n",
" print(query)\n",
"\n",
" from google.cloud import bigquery\n",
" import json\n",
"\n",
" bqclient = bigquery.Client(project=project_id, location=location)\n",
" job = bqclient.query(query)\n",
" metrics_df = job.result().to_dataframe()\n",
" from collections import namedtuple\n",
" result_tuple = namedtuple('MFMetrics', ['msqe'])\n",
" return result_tuple(metrics_df.loc[0].to_dict()['mean_squared_error'])"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Creating embedding features for users and hotels \n",
"\n",
"We will use the matrix factorization model to create corresponding user factors, hotel factors and embed them together with additional features such as total visits and distinct cities to create a new training dataset to an XGBoost classifier which will try to predict the the likelihood of booking for any user/hotel combination. Also note that we aggregated and grouped the orginal dataset by user_id."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def create_user_features(ddlop, project_id:str, dataset:str, mf_model:str):\n",
" #Feature engineering for useres\n",
" query = \"\"\"\n",
" CREATE OR REPLACE TABLE `{project_id}.{dataset}.user_features_prod` AS\n",
" WITH u as \n",
" (\n",
" select\n",
" user_id,\n",
" count(*) as total_visits,\n",
" count(distinct user_location_city) as distinct_cities,\n",
" sum(distinct site_name) as distinct_sites,\n",
" sum(is_mobile) as total_mobile,\n",
" sum(is_booking) as total_bookings,\n",
" FROM `{project_id}.{dataset}.hotel_train`\n",
" GROUP BY 1\n",
" )\n",
" SELECT\n",
" u.*,\n",
" (SELECT ARRAY_AGG(weight) FROM UNNEST(factor_weights)) AS user_factors\n",
" FROM\n",
" u JOIN ML.WEIGHTS( MODEL `{mf_model}`) w\n",
" ON processed_input = 'user_id' AND feature = CAST(u.user_id AS STRING)\n",
" \"\"\".format(project_id = project_id, dataset = dataset, mf_model=mf_model)\n",
" return ddlop(project_id, query, 'US')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def create_hotel_features(ddlop, project_id:str, dataset:str, mf_model:str):\n",
" #Feature eingineering for hotels\n",
" query = \"\"\"\n",
" CREATE OR REPLACE TABLE `{project_id}.{dataset}.hotel_features_prod` AS\n",
" WITH h as \n",
" (\n",
" select\n",
" hotel_cluster,\n",
" count(*) as total_cluster_searches,\n",
" count(distinct hotel_country) as distinct_hotel_countries,\n",
" sum(distinct hotel_market) as distinct_hotel_markets,\n",
" sum(is_mobile) as total_mobile_searches,\n",
" sum(is_booking) as total_cluster_bookings,\n",
" FROM `{project_id}.{dataset}.hotel_train`\n",
" group by 1\n",
" )\n",
" SELECT\n",
" h.*,\n",
" (SELECT ARRAY_AGG(weight) FROM UNNEST(factor_weights)) AS hotel_factors\n",
" FROM\n",
" h JOIN ML.WEIGHTS( MODEL `{mf_model}`) w\n",
" ON processed_input = 'hotel_cluster' AND feature = CAST(h.hotel_cluster AS STRING)\n",
" \"\"\".format(project_id = project_id, dataset = dataset, mf_model=mf_model)\n",
" return ddlop(project_id, query, 'US')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Function below combines all the features selected (total_mobile_searches) and engineered (user factors and hotel factors) into a training dataset for the XGBoost classifier. Note the target variable is rating which is converted into a binary classfication."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def combine_features(ddlop, project_id:str, dataset:str, mf_model:str, hotel_features:str, user_features:str):\n",
" #Combine user and hotel embedding features with the rating associated with each combination\n",
" query = \"\"\"\n",
" CREATE OR REPLACE TABLE `{project_id}.{dataset}.total_features_prod` AS\n",
" with ratings as(\n",
" SELECT\n",
" user_id,\n",
" hotel_cluster,\n",
" if(sum(is_booking) > 0, 1, sum(is_booking)) AS rating\n",
" FROM `{project_id}.{dataset}.hotel_train`\n",
" group by 1,2\n",
" )\n",
" select\n",
" h.* EXCEPT(hotel_cluster),\n",
" u.* EXCEPT(user_id),\n",
" IFNULL(rating,0) as rating\n",
" from `{hotel_features}` h, `{user_features}` u\n",
" LEFT OUTER JOIN ratings r\n",
" ON r.user_id = u.user_id AND r.hotel_cluster = h.hotel_cluster\n",
" \"\"\".format(project_id = project_id, dataset = dataset, mf_model=mf_model, hotel_features=hotel_features, user_features=user_features)\n",
" return ddlop(project_id, query, 'US')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We will create a couple of BigQuery user-defined functions (UDF) to convert arrays to a struct and its array elements are the fields in the struct. <strong>Be sure to change the BigQuery dataset name to your dataset name. </strong>"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%bigquery --project $PROJECT_ID\n",
"CREATE OR REPLACE FUNCTION `hotel_recommendations.arr_to_input_15_hotels`(h ARRAY<FLOAT64>)\n",
"RETURNS \n",
" STRUCT<\n",
" h1 FLOAT64,\n",
" h2 FLOAT64,\n",
" h3 FLOAT64,\n",
" h4 FLOAT64,\n",
" h5 FLOAT64,\n",
" h6 FLOAT64,\n",
" h7 FLOAT64,\n",
" h8 FLOAT64,\n",
" h9 FLOAT64,\n",
" h10 FLOAT64,\n",
" h11 FLOAT64,\n",
" h12 FLOAT64,\n",
" h13 FLOAT64,\n",
" h14 FLOAT64,\n",
" h15 FLOAT64\n",
" > AS (STRUCT(\n",
" h[OFFSET(0)],\n",
" h[OFFSET(1)],\n",
" h[OFFSET(2)],\n",
" h[OFFSET(3)],\n",
" h[OFFSET(4)],\n",
" h[OFFSET(5)],\n",
" h[OFFSET(6)],\n",
" h[OFFSET(7)],\n",
" h[OFFSET(8)],\n",
" h[OFFSET(9)],\n",
" h[OFFSET(10)],\n",
" h[OFFSET(11)],\n",
" h[OFFSET(12)],\n",
" h[OFFSET(13)],\n",
" h[OFFSET(14)]\n",
"));\n",
"\n",
"CREATE OR REPLACE FUNCTION `hotel_recommendations.arr_to_input_15_users`(u ARRAY<FLOAT64>)\n",
"RETURNS \n",
" STRUCT<\n",
" u1 FLOAT64,\n",
" u2 FLOAT64,\n",
" u3 FLOAT64,\n",
" u4 FLOAT64,\n",
" u5 FLOAT64,\n",
" u6 FLOAT64,\n",
" u7 FLOAT64,\n",
" u8 FLOAT64,\n",
" u9 FLOAT64,\n",
" u10 FLOAT64,\n",
" u11 FLOAT64,\n",
" u12 FLOAT64,\n",
" u13 FLOAT64,\n",
" u14 FLOAT64,\n",
" u15 FLOAT64\n",
" > AS (STRUCT(\n",
" u[OFFSET(0)],\n",
" u[OFFSET(1)],\n",
" u[OFFSET(2)],\n",
" u[OFFSET(3)],\n",
" u[OFFSET(4)],\n",
" u[OFFSET(5)],\n",
" u[OFFSET(6)],\n",
" u[OFFSET(7)],\n",
" u[OFFSET(8)],\n",
" u[OFFSET(9)],\n",
" u[OFFSET(10)],\n",
" u[OFFSET(11)],\n",
" u[OFFSET(12)],\n",
" u[OFFSET(13)],\n",
" u[OFFSET(14)]\n",
"));"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Train XGBoost model and evaluate it"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def train_xgboost_model(ddlop, project_id:str, dataset:str, total_features:str):\n",
" #Combine user and hotel embedding features with the rating associated with each combination\n",
" query = \"\"\"\n",
" CREATE OR REPLACE MODEL `{project_id}.{dataset}.recommender_hybrid_xgboost_prod` \n",
" OPTIONS(model_type='boosted_tree_classifier', input_label_cols=['rating'], AUTO_CLASS_WEIGHTS=True)\n",
" AS\n",
"\n",
" SELECT\n",
" * EXCEPT(user_factors, hotel_factors),\n",
" {dataset}.arr_to_input_15_users(user_factors).*,\n",
" {dataset}.arr_to_input_15_hotels(hotel_factors).*\n",
" FROM\n",
" `{total_features}`\n",
" \"\"\".format(project_id = project_id, dataset = dataset, total_features=total_features)\n",
" return ddlop(project_id, query, 'US')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def evaluate_class(project_id:str, dataset:str, class_model:str, total_features:str, location:str='US')-> NamedTuple('ClassMetrics', [('roc_auc', float)]):\n",
" \n",
" query = \"\"\"\n",
" SELECT\n",
" *\n",
" FROM ML.EVALUATE(MODEL `{class_model}`, (\n",
" SELECT\n",
" * EXCEPT(user_factors, hotel_factors),\n",
" {dataset}.arr_to_input_15_users(user_factors).*,\n",
" {dataset}.arr_to_input_15_hotels(hotel_factors).*\n",
" FROM\n",
" `{total_features}`\n",
" ))\n",
" \"\"\".format(dataset = dataset, class_model = class_model, total_features = total_features)\n",
"\n",
" print(query)\n",
"\n",
" from google.cloud import bigquery\n",
"\n",
" bqclient = bigquery.Client(project=project_id, location=location)\n",
" job = bqclient.query(query)\n",
" metrics_df = job.result().to_dataframe()\n",
" from collections import namedtuple\n",
" result_tuple = namedtuple('ClassMetrics', ['roc_auc'])\n",
" return result_tuple(metrics_df.loc[0].to_dict()['roc_auc'])"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Export XGBoost model and host it on Vertex AI\n",
"\n",
"One of the nice features of BigQuery ML is the ability to import and export machine learning models. In the function defined below, we are going to export the trained XGBoost model to a Google Cloud Storage bucket. We will later have Google Cloud AI Platform host this model for predictions. It is worth mentioning that you can host this model on any platform that supports Booster (XGBoost 0.82). Check out [the documentation](https://cloud.google.com/bigquery-ml/docs/exporting-models) for more information on exporting BigQuery ML models and their formats. "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def export_bqml_model(project_id:str, model:str, destination:str) -> NamedTuple('ModelExport', [('destination', str)]):\n",
" import subprocess\n",
" import shutil\n",
" #command='bq extract -destination_format=ML_XGBOOST_BOOSTER -m {}:{} {}'.format(project_id, model, destination)\n",
" model_name = '{}:{}'.format(project_id, model)\n",
" print (model_name)\n",
" #subprocess.run(['bq', 'extract', '-destination_format=ML_XGBOOST_BOOSTER', '-m', model_name, destination], check=True)\n",
" subprocess.run(\n",
" (\n",
" shutil.which(\"bq\"),\n",
" \"extract\",\n",
" \"-destination_format=ML_XGBOOST_BOOSTER\",\n",
" \"--project_id=\" + project_id,\n",
" \"-m\",\n",
" model_name,\n",
" destination\n",
" ),\n",
" stderr=subprocess.PIPE,\n",
" check=True)\n",
"\n",
" from collections import namedtuple\n",
" result_tuple = namedtuple('ModelExport', ['destination'])\n",
" return result_tuple(destination)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def deploy_bqml_model_vertexai(project_id:str, region:str, model_name:str, endpoint_name:str, model_dir:str, deploy_image:str, deploy_compute:str):\n",
" from google.cloud import aiplatform\n",
" \n",
" parent = \"projects/\" + project_id + \"/locations/\" + region\n",
" client_options = {\"api_endpoint\": \"{}-aiplatform.googleapis.com\".format(region)}\n",
" clients = {}\n",
"\n",
" #upload the model to Vertex AI\n",
" clients['model'] = aiplatform.gapic.ModelServiceClient(client_options=client_options)\n",
" model = {\n",
" \"display_name\": model_name,\n",
" \"metadata_schema_uri\": \"\",\n",
" \"artifact_uri\": model_dir,\n",
" \"container_spec\": {\n",
" \"image_uri\": deploy_image,\n",
" \"command\": [],\n",
" \"args\": [],\n",
" \"env\": [],\n",
" \"ports\": [{\"container_port\": 8080}],\n",
" \"predict_route\": \"\",\n",
" \"health_route\": \"\"\n",
" }\n",
" }\n",
" upload_model_response = clients['model'].upload_model(parent=parent, model=model)\n",
" print(\"Long running operation on uploading the model:\", upload_model_response.operation.name)\n",
" model_info = clients['model'].get_model(name=upload_model_response.result(timeout=180).model)\n",
"\n",
" #Create an endpoint on Vertex AI to host the model\n",
" clients['endpoint'] = aiplatform.gapic.EndpointServiceClient(client_options=client_options)\n",
" create_endpoint_response = clients['endpoint'].create_endpoint(parent=parent, endpoint={\"display_name\": endpoint_name})\n",
" print(\"Long running operation on creating endpoint:\", create_endpoint_response.operation.name)\n",
" endpoint_info = clients['endpoint'].get_endpoint(name=create_endpoint_response.result(timeout=180).name)\n",
"\n",
" #Deploy the model to the endpoint\n",
" dmodel = {\n",
" \"model\": model_info.name,\n",
" \"display_name\": 'deployed_'+model_name,\n",
" \"dedicated_resources\": {\n",
" \"min_replica_count\": 1,\n",
" \"max_replica_count\": 1,\n",
" \"machine_spec\": {\n",
" \"machine_type\": deploy_compute,\n",
" \"accelerator_count\": 0,\n",
" }\n",
" } \n",
" }\n",
"\n",
" traffic = {\n",
" '0' : 100\n",
" }\n",
"\n",
" deploy_model_response = clients['endpoint'].deploy_model(endpoint=endpoint_info.name, deployed_model=dmodel, traffic_split=traffic)\n",
" print(\"Long running operation on deploying the model:\", deploy_model_response.operation.name)\n",
" deploy_model_result = deploy_model_response.result()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Defining the Kubeflow Pipelines\n",
"\n",
"Now we have the necessary functions defined, we are now ready to create a workflow using Kubeflow Pipeline. The workflow implemented by the pipeline is defined using a Python based Domain Specific Language (DSL). \n",
"The pipeline's DSL has been designed to avoid hardcoding any environment specific settings like file paths or connection strings. These settings are provided to the pipeline code through a set of environment variables.\n",
"\n",
"The pipeline performs the following steps - \n",
"* Trains a Matrix Factorization model \n",
"* Evaluates the trained Matrix Factorization model and if the Mean Square Error is less than threadshold, it will continue to the next step, otherwise, the pipeline will stop\n",
"* Engineers new user factors feature with the Matrix Factorization model\n",
"* Engineers new hotel factors feature with the Matrix Factorization model\n",
"* Combines all the features selected (total_mobile_searches) and engineered (user factors and hotel factors) into a training dataset for the XGBoost classifier \n",
"* Trains a XGBoost classifier\n",
"* Evalutes the trained XGBoost model and if the ROC AUC score is more than threadshold, it will continue to the next step, otherwise, the pipeline will stop\n",
"* Exports the XGBoost model to a Google Cloud Storage bucket\n",
"* Deploys the XGBoost model from the Google Cloud Storage bucket to Google Cloud AI Platform for prediction"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import kfp.v2.dsl as dsl\n",
"import kfp.v2.components as comp\n",
"import time\n",
"\n",
"@dsl.pipeline(\n",
" name='hotel-recs-pipeline',\n",
" description='training pipeline for hotel recommendation prediction'\n",
")\n",
"def training_pipeline():\n",
"\n",
" import json\n",
" \n",
" #Minimum threshold for model metric to determine if model will be deployed to inference: 0.5 is a basically a coin toss with 50/50 chance\n",
" mf_msqe_threshold = 0.5\n",
" class_auc_threshold = 0.8\n",
" \n",
" #Defining function containers\n",
" ddlop = comp.func_to_container_op(run_bigquery_ddl, base_image=BASE_IMAGE, packages_to_install=['google-cloud-bigquery'])\n",
" \n",
" evaluate_mf_op = comp.func_to_container_op(evaluate_matrix_factorization_model, base_image=BASE_IMAGE, packages_to_install=['google-cloud-bigquery', 'google-cloud-bigquery-storage', 'pandas', 'pyarrow'], output_component_file='mf_eval.yaml')\n",
" \n",
" evaluate_class_op = comp.func_to_container_op(evaluate_class, base_image=BASE_IMAGE, packages_to_install=['google-cloud-bigquery','pandas', 'pyarrow'])\n",
" \n",
" export_bqml_model_op = comp.func_to_container_op(export_bqml_model, base_image=BASE_IMAGE, output_component_file='export_bqml.yaml')\n",
" \n",
" deploy_bqml_model_op = comp.func_to_container_op(deploy_bqml_model_vertexai, base_image=BASE_IMAGE, packages_to_install=['google-cloud-aiplatform'])\n",
"\n",
" \n",
" ############################# \n",
" #Defining pipeline execution graph\n",
" dataset = BQ_DATASET_NAME\n",
" \n",
" #Train matrix factorization model\n",
" mf_model_output = train_matrix_factorization_model(ddlop, PROJECT_ID, dataset).set_display_name('train matrix factorization model')\n",
" mf_model_output.execution_options.caching_strategy.max_cache_staleness = 'P0D'\n",
" mf_model = mf_model_output.outputs['created_table']\n",
" \n",
" #Evaluate matrix factorization model\n",
" mf_eval_output = evaluate_mf_op(PROJECT_ID, mf_model).set_display_name('evaluate matrix factorization model')\n",
" mf_eval_output.execution_options.caching_strategy.max_cache_staleness = 'P0D'\n",
" \n",
" #mean squared quantization error \n",
" with dsl.Condition(mf_eval_output.outputs['msqe'] < mf_msqe_threshold):\n",
" \n",
" #Create features for Classification model\n",
" user_features_output = create_user_features(ddlop, PROJECT_ID, dataset, mf_model).set_display_name('create user factors features')\n",
" user_features = user_features_output.outputs['created_table']\n",
" user_features_output.execution_options.caching_strategy.max_cache_staleness = 'P0D'\n",
" \n",
" hotel_features_output = create_hotel_features(ddlop, PROJECT_ID, dataset, mf_model).set_display_name('create hotel factors features')\n",
" hotel_features = hotel_features_output.outputs['created_table']\n",
" hotel_features_output.execution_options.caching_strategy.max_cache_staleness = 'P0D'\n",
"\n",
" total_features_output = combine_features(ddlop, PROJECT_ID, dataset, mf_model, hotel_features, user_features).set_display_name('combine all features')\n",
" total_features = total_features_output.outputs['created_table']\n",
" total_features_output.execution_options.caching_strategy.max_cache_staleness = 'P0D'\n",
"\n",
" #Train XGBoost model\n",
" class_model_output = train_xgboost_model(ddlop, PROJECT_ID, dataset, total_features).set_display_name('train XGBoost model')\n",
" class_model = class_model_output.outputs['created_table']\n",
" class_model_output.execution_options.caching_strategy.max_cache_staleness = 'P0D'\n",
"\n",
" #Evaluate XGBoost model\n",
" class_eval_output = evaluate_class_op(PROJECT_ID, dataset, class_model, total_features).set_display_name('evaluate XGBoost model')\n",
" class_eval_output.execution_options.caching_strategy.max_cache_staleness = 'P0D'\n",
" \n",
" with dsl.Condition(class_eval_output.outputs['roc_auc'] > class_auc_threshold):\n",
" #Export model\n",
" export_destination_output = export_bqml_model_op(PROJECT_ID, class_model, MODEL_STORAGE).set_display_name('export XGBoost model')\n",
" export_destination_output.execution_options.caching_strategy.max_cache_staleness = 'P0D'\n",
" export_destination = export_destination_output.outputs['destination']\n",
" deploy_model = deploy_bqml_model_op(PROJECT_ID, REGION, class_model, ENDPOINT_NAME, MODEL_STORAGE, DEPLOY_IMAGE, DEPLOY_COMPUTE).set_display_name('Deploy XGBoost model')\n",
" deploy_model.execution_options.caching_strategy.max_cache_staleness = 'P0D'"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Submitting pipeline runs \n",
"\n",
"You can trigger pipeline runs using an API from the KFP SDK or using KFP CLI. To submit the run using KFP CLI, execute the following commands. Notice how the pipeline's parameters are passed to the pipeline run."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import kfp.v2 as kfp\n",
"from kfp.v2 import compiler\n",
"\n",
"pipeline_func = training_pipeline\n",
"compiler.Compiler().compile(pipeline_func=pipeline_func, \n",
" package_path='hotel_rec_pipeline_job.json')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from kfp.v2.google.client import AIPlatformClient\n",
"\n",
"api_client = AIPlatformClient(project_id=PROJECT_ID, region=REGION)\n",
"\n",
"response = api_client.create_run_from_job_spec(\n",
" job_spec_path='hotel_rec_pipeline_job.json', \n",
" enable_caching=False,\n",
" pipeline_root=PIPELINE_ROOT # optional- use if want to override compile-time value\n",
" #parameter_values={'text': 'Hello world!'}\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Monitoring the run \n",
"The Pipeline will take several hours to train two models, you can monitor the run using [Vertex AI Pipelins Console](https://console.cloud.google.com/vertex-ai/pipelines). "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Cleaning up\n",
"\n",
"* Delete the Model in [Cloud AI Platform](https://console.cloud.google.com/ai-platform/models), note you may have to delete all versions first before deleting the model.\n",
"* Delete the bucket or model content in [Google Cloud Storage](https://console.cloud.google.com/storage).\n",
"* Delete the dataset in [BigQuery](https://console.cloud.google.com/bigquery), it will delete the models, tables and UDFs created in BigQuery.\n",
"* Follow how-to guide to [delete Flex commitment](https://console.cloud.google.com/bigquery/docs/reservations-get-started#cleaning-up)\n",
"* Delete the container from the [Google Container Registry](https://console.cloud.google.com/gcr/images)\n",
"* Delete the [Vertex AI](https://console.cloud.google.com/vertex-ai), undeploy the model within Endpoint first, then delete the Endpoint and finally delete the Model\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"environment": {
"name": "tf2-2-3-gpu.2-3.m59",
"type": "gcloud",
"uri": "gcr.io/deeplearning-platform-release/tf2-2-3-gpu.2-3:m59"
},
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.8"
}
},
"nbformat": 4,
"nbformat_minor": 4
}