retail/propensity-model/bqml/bqml_kfp_retail_propensity_to_purchase.ipynb (3,240 lines of code) (raw):
{
"nbformat": 4,
"nbformat_minor": 0,
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.8"
},
"colab": {
"name": "Propensity to purchase using BQML",
"provenance": [],
"collapsed_sections": [],
"toc_visible": true
}
},
"cells": [
{
"cell_type": "markdown",
"metadata": {
"id": "vf9yado5waoK",
"colab_type": "text"
},
"source": [
"# License"
]
},
{
"cell_type": "code",
"metadata": {
"id": "c1t7SIoPwaoL",
"colab_type": "code",
"colab": {}
},
"source": [
"# Copyright 2020 Google LLC\n",
"\n",
"# Licensed under the Apache License, Version 2.0 (the \"License\");\n",
"# you may not use this file except in compliance with the License.\n",
"# You may obtain a copy of the License at\n",
"\n",
"# https://www.apache.org/licenses/LICENSE-2.0\n",
"\n",
"# Unless required by applicable law or agreed to in writing, software\n",
"# distributed under the License is distributed on an \"AS IS\" BASIS,\n",
"# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n",
"# See the License for the specific language governing permissions and\n",
"# limitations under the License."
],
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {
"id": "vTuyPqeywaoQ",
"colab_type": "text"
},
"source": [
"# Overview\n",
"Propensity to purchase use cases are widely applicable across many industry verticals such as Retail, Finance and more. In this article, we will show you how to build an end to end solution using [BigQuery ML](https://cloud.google.com/bigquery-ml/docs) and [Kubeflow Pipelines (KFP)](https://www.kubeflow.org/docs/pipelines/overview/pipelines-overview/) using a [Google Analytics dataset](https://console.cloud.google.com/marketplace/details/obfuscated-ga360-data/obfuscated-ga360-data?filter=solution-type:dataset) to determine which customers have the propensity to purchase. You could use the solution to reach out to your targeted customers in an offline campaign via email or postal channels. You could also use it in an online campaign via on the spot decision, when the customer is browsing your products in your website, to recommend some products or trigger a personalized email for the customer.\n",
"\n",
"Propensity to purchase use case is a subset of personalization use case. It is a key driver of how many organizations do marketing today. In today's changing times, you need to ensure that you are targeting the right messages to the right customers at the right time. “Personalization at scale has the potential to create $1.7 trillion to $3 trillion in new value” \\([McKinsey study](https://www.mckinsey.com/business-functions/marketing-and-sales/our-insights/a-technology-blueprint-for-personalization-at-scale)\\). Propensity modeling helps companies to identify these \"right\" customers and prospects that have a high likelihood to purchase a particular product or service.\n",
"\n",
"Propensity models are important as it is a mechanism for targeting sales outreach with personalized messages as they are keys to the success of getting attention of the customers. By using a propensity to purchase model, you can more effectively target customers who are most likely to purchase certain products.\n",
"\n",
"# What is BigQuery ML?\n",
"[BigQuery ML](https://cloud.google.com/bigquery-ml/docs/bigqueryml-intro) enables users to create and execute machine learning models in BigQuery by using standard SQL queries. This means, if your data is already in BigQuery, you don’t need to export your data to train and deploy machine learning models — by training, you’re also deploying in the same step. Combined with BigQuery’s auto-scaling of compute resources, you won’t have to worry about spinning up a cluster or building a model training and deployment pipeline. This means you’ll be saving time building your machine learning pipeline, enabling your business to focus more on the value of machine learning instead of spending time setting up the infrastructure.\n",
"\n",
"## Scope of this notebook\n",
"### Dataset\n",
"The Google Analytics dataset is hosted publicly on BigQuery and is a dataset that provides 12 months (August 2016 to August 2017) of obfuscated Google Analytics 360 data from the [Google Merchandise Store](https://www.googlemerchandisestore.com/), a real e-commerce store that sells Google-branded merchandise.\n",
"\n",
"### Objective\n",
" To help you be conversant on the following:\n",
"1. Environment Setup\n",
"1. KFP Setup\n",
"1. Data Exploration using BigQuery, Pandas, matplotlib\n",
"1. SDLC methodologies Adherence (opinionated)\n",
"1. KFP knowledge share (demonstration)\n",
"\n",
"### Costs\n",
"This tutorial uses billable components of Google Cloud:\n",
"* [BigQuery](https://cloud.google.com/bigquery)\n",
"* [BigQuery ML](https://cloud.google.com/bigquery-ml/docs)\n",
"* [Cloud Storage](https://cloud.google.com/storage)\n",
"* [Cloud AI Platform Pipelines](https://cloud.google.com/ai-platform/pipelines/docs/) \\(uses [Google Cloud Kubernetes Engine (GKE)](https://cloud.google.com/kubernetes-engine)\\)\n",
"* [Cloud AI Prediction](https://cloud.google.com/ai-platform/prediction/docs)\n",
"\n",
"Use the [Pricing Calculator[(https://cloud.google.com/products/calculator/) to generate a cost estimate based on your projected usage.\n",
"\n",
"## Before you begin\n",
"For this reference guide, you need a [Google Cloud project](https://console.cloud.google.com/cloud-resource-manager).\n",
"\n",
"You can create a new one, or select a project you already created.\n",
"The following steps are required, regardless where you are running your notebook (local or in Cloud AI Platform Notebook).\n",
"* [Select or create a Google Cloud project](https://console.cloud.google.com/cloud-resource-manager). When you first create an account, you get a $300 free credit towards your compute/storage costs.\n",
"* [Make sure that billing is enabled for your project](https://cloud.google.com/billing/docs/how-to/modify-project). \n",
"* (When using non-Google Cloud local envirionments)Install Google Cloud SDK [Google Cloud SDK](https://cloud.google.com/sdk/)\n",
"\n",
"### Mandatory variables\n",
"You must set the below variables:\n",
"* RPM_GCP_PROJECT to [Your Google Cloud Project]\n",
"* RPM_GCP_KFP_HOST to [Your KFP pipeline host]: See the instruction later to find out how to get that\n",
"* RPM_GCP_APPLICATION_CREDENTIALS to [Full path with the file name to the Service Account json file]"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "rn0zRs76waoQ",
"colab_type": "text"
},
"source": [
"# Setup local environment"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "n0QAU9KKwaoz",
"colab_type": "text"
},
"source": [
"## PIP install appropriate packages"
]
},
{
"cell_type": "code",
"metadata": {
"tags": [],
"id": "Vidii1fJwao0",
"colab_type": "code",
"colab": {}
},
"source": [
"!pip install google-cloud-storage #for Storage Account\n",
"!pip install google-cloud #for cloud sdk\n",
"!pip install google-cloud-bigquery #for BigQuery\n",
"!pip install google-cloud-bigquery-storage #for BigQuery Storage client\n",
"!pip install kfp # Install the KFP AI SDK"
],
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {
"id": "98mkp-d-GWtn",
"colab_type": "text"
},
"source": [
"## Setup KFP Host\n",
"Prepare the Cloud AI Platform Pipelines in Google Cloud:\n",
"* [Setting up AI Platform Pipelines](https://cloud.google.com/ai-platform/pipelines/docs/getting-started#set_up_your_instance).\n",
"* [Optional] Read the 'Introducing Cloud AI Platform Pipelines' [blogpost](https://cloud.google.com/blog/products/ai-machine-learning/introducing-cloud-ai-platform-pipelines)\n",
"* Follow the [instructions](https://cloud.google.com/ai-platform/pipelines/docs/connecting-with-sdk#using_the_to_connect_to_an_cluster) to collect the Kubeflow Pipelines host:\n",
"1. Copy from the code snippet from the popup dialogbox.\n",
"1. Find the hostname in the URL of the Kubeflow Pipelines dashboard. The hostname is the portion of the URL between https:// and /#/start, and should match the pattern *.pipelines.googleusercontent.com.\n",
"\n",
"\n",
" "
]
},
{
"cell_type": "code",
"metadata": {
"id": "w2q-UGy_waoR",
"colab_type": "code",
"colab": {}
},
"source": [
"RPM_GCP_KFP_HOST = \"<Your KFP pipeline host>\""
],
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {
"id": "3647-gHRwaoU",
"colab_type": "text"
},
"source": [
"## Setup Google Cloud Project"
]
},
{
"cell_type": "code",
"metadata": {
"id": "-aDrzxH5waoV",
"colab_type": "code",
"colab": {}
},
"source": [
"# set the Google Cloud project id\n",
"RPM_GCP_PROJECT = \"<Your Google Cloud project>\" #for local !bash"
],
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {
"id": "poJmkPFlwaoY",
"colab_type": "text"
},
"source": [
"## Setup Google Cloud Credentials"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "3wrGKHIqwaoY",
"colab_type": "text"
},
"source": [
"### Specify the location of the ServiceAccount Key file"
]
},
{
"cell_type": "code",
"metadata": {
"id": "OJ8peThgwaoZ",
"colab_type": "code",
"colab": {}
},
"source": [
"# download the ServiceAccount key and provide the path to the file below\n",
"RPM_GCP_APPLICATION_CREDENTIALS = \"<Full path with the file name to the above downloaded json file>\""
],
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {
"id": "_tY9FxElwaoc",
"colab_type": "text"
},
"source": [
"### Upload your service account file (colab specific code block)"
]
},
{
"cell_type": "code",
"metadata": {
"id": "_sLTsZcwwaoc",
"colab_type": "code",
"colab": {}
},
"source": [
"# uncomment the below code in codelab environment\n",
"# from google.colab import files\n",
"# # Upload service account key\n",
"# keyfile_upload = files.upload()\n",
"# RPM_GCP_APPLICATION_CREDENTIALS = list(keyfile_upload.keys())[0]"
],
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {
"id": "usviVg5dwaof",
"colab_type": "text"
},
"source": [
"## Setup your local runtime enviorment\n"
]
},
{
"cell_type": "code",
"metadata": {
"id": "3tB93tQjwaof",
"colab_type": "code",
"colab": {}
},
"source": [
"!export RPM_GCP_PROJECT\n",
"!echo $RPM_GCP_PROJECT"
],
"execution_count": null,
"outputs": []
},
{
"cell_type": "code",
"metadata": {
"tags": [],
"id": "KNexaYJcwaoi",
"colab_type": "code",
"colab": {}
},
"source": [
"# et the desired Google Cloud project\n",
"!gcloud config set project $RPM_GCP_PROJECT"
],
"execution_count": null,
"outputs": []
},
{
"cell_type": "code",
"metadata": {
"tags": [],
"id": "PGlZ6hOiwaol",
"colab_type": "code",
"colab": {}
},
"source": [
"# alidate that the Google Cloud project has been set properly.\n",
"!gcloud info --format='value(config.project)'"
],
"execution_count": null,
"outputs": []
},
{
"cell_type": "code",
"metadata": {
"id": "RZ5pwiIrwaon",
"colab_type": "code",
"colab": {}
},
"source": [
"import os\n",
"os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = RPM_GCP_APPLICATION_CREDENTIALS"
],
"execution_count": null,
"outputs": []
},
{
"cell_type": "code",
"metadata": {
"tags": [],
"id": "NocjwMIYwaop",
"colab_type": "code",
"colab": {}
},
"source": [
"# set the account\n",
"!gcloud auth activate-service-account --key-file=$RPM_GCP_APPLICATION_CREDENTIALS"
],
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {
"id": "wjgaMOYhwaos",
"colab_type": "text"
},
"source": [
"# Enable the below Google Cloud Services for the solution"
]
},
{
"cell_type": "code",
"metadata": {
"tags": [],
"id": "azRj5bbjwaov",
"colab_type": "code",
"colab": {}
},
"source": [
"# set the proper Permission for the required Google Cloud Services\n",
"!gcloud services enable \\\n",
" storage-component.googleapis.com \\\n",
" bigquery.googleapis.com \\\n",
" ml.googleapis.com \\\n",
" notebooks.googleapis.com"
],
"execution_count": null,
"outputs": []
},
{
"cell_type": "code",
"metadata": {
"tags": [],
"id": "kQ97mGj_waox",
"colab_type": "code",
"colab": {}
},
"source": [
"# validate that all desired Permission have been set properl.\n",
"!gcloud services list | grep 'storage-component.googleapis.com\\|bigquery.googleapis.com\\|ml.googleapis.com\\|notebooks.googleapis.com'"
],
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {
"id": "K-UMCR1-wao5",
"colab_type": "text"
},
"source": [
"# Adjust other varables"
]
},
{
"cell_type": "code",
"metadata": {
"id": "Asjw1Qc4wao5",
"colab_type": "code",
"colab": {}
},
"source": [
"# load_params\n",
"import json\n",
"def load_params():\n",
" \"\"\"The variables are used in the pipeline.\n",
"\n",
" Provide appropriate variables for your environments\n",
" Set apppriate variables with the pattern RPM_*\n",
" (these are IMMUTABLE variables those acts as default)\n",
" You could print all the variables used in your environment\n",
" (e.g. local environment) which starts with RPM_* or rpm_*\n",
" (comes handy while troubleshooting)\n",
"\n",
" Returns:\n",
" dict: all python variables used in the pipeline\n",
" \"\"\"\n",
" return {\n",
" 'RPM_GCP_PROJECT': RPM_GCP_PROJECT,\n",
" 'RPM_LOCATION': 'us-central1-b', # KFP/K8s cluster zone\n",
" 'RPM_PVC_DATASET_FOLDER_NAME': 'rpm_ds',\n",
" 'RPM_PVC_NAME': 'rpm-vol',\n",
" # create the bucket if it don't exists\n",
" 'RPM_GCP_STORAGE_BUCKET': '',\n",
" # create the folder if it don't exists\n",
" 'RPM_GCP_STORAGE_BUCKET_FOLDER': '',\n",
" 'RPM_DEFAULT_BUCKET_EXT': '_retail_propensity_model_assets',\n",
" 'RPM_DEFAULT_BUCKET_FOLDER_NAME': 'rpm_data_set',\n",
" 'RPM_BQ_DATASET_NAME': '', # create the dataset if it don't exists\n",
" 'RPM_BQ_TABLE_NAME': '',\n",
" 'RPM_DEFAULT_BQ_TABLE_NAME_EXT': '_tbl',\n",
" 'RPM_DEFAULT_DATA_SET_NAME_EXT': '_rpm_data_set',\n",
" 'RPM_MODEL_NAME': '',\n",
" 'RPM_DEFAULT_MODEL_NAME': 'rpm_bqml_model',\n",
" 'RPM_MODEL_EXPORT_PATH': '',\n",
" 'RPM_DEFAULT_MODEL_EXPORT_PATH': 'bqml/model/export/',\n",
" 'RPM_MODEL_VER_PREFIX': 'V_',\n",
" 'RPM_RUNTIME_VERSION': '1.15', # do not change\n",
" 'RPM_PYTHON_VERSION': '3.7', # do not change\n",
" 'RPM_CLUSTER_NAME': 'cluster-1', # KFP/K8s cluster name\n",
" # variables created by the program\n",
" # from user supplied set or from the program defaults\n",
" 'rpm_bq_ds_name': '',\n",
" 'rpm_gcs_rpm_ds_url': '',\n",
" 'rpm_file_name': '',\n",
" 'rpm_table_id': '',\n",
" 'rpm_bqml_model': '',\n",
" 'rpm_bqml_model_export_path': '',\n",
" 'rpm_model_version': '',\n",
" 'rpm_model_uri': '',\n",
" 'rpm_pred_table_id': '',\n",
" }\n",
"all_vars = load_params()\n",
"RPM_DS_DOWNLOAD_EXPERIMENT_NAME = 'GoogleStore Retail Pipeline'"
],
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {
"id": "qZnuhxRawao7",
"colab_type": "text"
},
"source": [
"# Reset the local context for local development if needed"
]
},
{
"cell_type": "code",
"metadata": {
"id": "GkAns6Jtwao8",
"colab_type": "code",
"colab": {}
},
"source": [
"# reset_local_context\n",
"def reset_local_context():\n",
" \"\"\"Resets all the variables used in the local environment.\n",
"\n",
" Comes handy while deveoping and testing the code locally.\n",
" \"\"\"\n",
" try:\n",
" del globals()['local_context']\n",
" except KeyError as e:\n",
" print('local_context not found!!!')\n",
" print(e)\n",
" globals().get('local_context') # validate that the local variable is removed\n",
"# reset_local_context() # use before testing a component locally if needed"
],
"execution_count": null,
"outputs": []
},
{
"cell_type": "code",
"metadata": {
"id": "4v6uuCI-wao_",
"colab_type": "code",
"colab": {}
},
"source": [
"# get_local_context\n",
"def get_local_context(init_var=None):\n",
" \"\"\"Define local rpm_context object\n",
"\n",
" The funtion sets the appropriate variables to\n",
" execute the code in a local environment.\n",
" local_context contains all the variables used in the local envrionmnet.\n",
" You could check the variable before and after the call to find out the\n",
" desired result (comes handy while developing\n",
" and testing the code locally)\n",
" Args:\n",
" init_var (:obj:`dict`, optional): The dict object overrides the existing\n",
" local context (use sparingly only, when needed)\n",
" Returns:\n",
" dict: all python variables used in the pipeline\n",
" \"\"\"\n",
" global local_context\n",
" local_context = globals().get('local_context')\n",
" if not local_context:\n",
" local_context = load_params()\n",
" if init_var:\n",
" local_context = init_var\n",
" local_context[\"RPM_PVC_NAME\"] = os.environ[\"HOME\"]\n",
" if not local_context.get(\"rpm_bq_ds_name\"):\n",
" local_context[\"rpm_bq_ds_name\"] = f\"{all_vars['RPM_GCP_PROJECT'].replace('-','_')}_rpm_data_set\"\n",
" if not local_context.get(\"rpm_gcs_rpm_ds_url\"):\n",
" local_context[\"rpm_gcs_rpm_ds_url\"] = f\"gs://{all_vars['RPM_GCP_PROJECT']}_retail_propensity_model_assets/rpm_data_set/\"\n",
" if not local_context.get(\"rpm_table_id\"):\n",
" local_context[\"rpm_table_id\"] = f\"{all_vars['RPM_GCP_PROJECT']}.{all_vars['RPM_GCP_PROJECT'].replace('-','_')}_rpm_data_set.{all_vars['RPM_GCP_PROJECT'].replace('-','_')}_rpm_data_set_tbl\"\n",
" if not local_context.get(\"RPM_MODEL_NAME\"):\n",
" local_context[\"rpm_bqml_model\"] = \"rpm_bqml_model\"\n",
" if not local_context.get(\"rpm_bqml_model_export_path\"):\n",
" local_context[\"rpm_bqml_model_export_path\"] = \"bqml/model/export/V_1/\"\n",
" if not local_context.get(\"rpm_model_version\"):\n",
" local_context[\"rpm_model_version\"] = \"V_1\"\n",
" if not local_context[\"rpm_model_uri\"]:\n",
" local_context[\"rpm_model_uri\"] = f\"gs://{all_vars['RPM_GCP_PROJECT']}_retail_propensity_model_assets/rpm_data_set/bqml/model/export/V_1/\"\n",
" if not local_context[\"rpm_pred_table_id\"]:\n",
" local_context[\"rpm_pred_table_id\"] = f\"{all_vars['RPM_GCP_PROJECT']}.{all_vars['RPM_GCP_PROJECT'].replace('-','_')}_rpm_data_set.{all_vars['RPM_GCP_PROJECT'].replace('-','_')}_rpm_data_set_pred_tbl\"\n",
"\n",
" print (local_context)\n",
" return local_context\n",
"\n",
"def test_comp_local(func):\n",
" local_context = get_local_context()\n",
" import json\n",
" new_local_context_str = func(json.dumps(local_context))\n",
" print(f'type: {type(new_local_context_str)}; new_local_context_str:{new_local_context_str}')\n",
" local_context = json.loads(new_local_context_str)\n",
"\n",
"def update_local_context(output):\n",
" print(f'type: {type(output)}; new_local_context_str:{output}')\n",
" local_context = json.loads(output[0])"
],
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {
"id": "a8KutqBIwapB",
"colab_type": "text"
},
"source": [
"# Instantiate the KFP Client"
]
},
{
"cell_type": "code",
"metadata": {
"id": "aL3IakRywapB",
"colab_type": "code",
"colab": {}
},
"source": [
"# Create a KFP Client and Validate that you are able to access the KFP Pipelines\n",
"# You will be using the KFP HOST to deploy the KFP pipeline (experiment) and lauch the experiment\n",
"import kfp\n",
"kfp_client = kfp.Client(host=RPM_GCP_KFP_HOST)\n",
"kfp_client.LOCAL_KFP_CONTEXT"
],
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {
"id": "9fXvMko3wapD",
"colab_type": "text"
},
"source": [
"# Create Google Cloud Storage bucket and folder - function"
]
},
{
"cell_type": "code",
"metadata": {
"id": "PEF0LnhUwapD",
"colab_type": "code",
"colab": {}
},
"source": [
"# create_gcs_bucket_folder\n",
"from typing import NamedTuple\n",
"def create_gcs_bucket_folder(ctx: str,\n",
" RPM_GCP_STORAGE_BUCKET: str,\n",
" RPM_GCP_PROJECT: str,\n",
" RPM_DEFAULT_BUCKET_EXT: str,\n",
" RPM_GCP_STORAGE_BUCKET_FOLDER: str,\n",
" RPM_DEFAULT_BUCKET_FOLDER_NAME: str\n",
" ) -> NamedTuple('Outputs', [\n",
" ('rpm_context', str),\n",
" ('rpm_gcs_rpm_ds_url', str),\n",
" ]):\n",
" \"\"\"The function (also used as a base for a KFP Component) creates a\n",
" Google Cloud Storage bucket and a folder if they don't exist.\n",
"\n",
" The idea is to create the bucket and folder only on the first\n",
" run of the pipeline.\n",
" The pipeline uses the same storage account and the folder\n",
" for repeated runs.\n",
" Args:\n",
" ctx(:obj:`str`): The dict object with all the variables\n",
" used in the pipeline\n",
" RPM_GCP_STORAGE_BUCKET(:obj:`str`): User supplied Storage Bucket name\n",
" RPM_GCP_PROJECT:(:obj:`str`): Google Cloud project for deployment\n",
" RPM_DEFAULT_BUCKET_EXT:(:obj:`str`): Name of the bucket,\n",
" when user hasn't supplied a bucket name\n",
" RPM_GCP_STORAGE_BUCKET_FOLDER:(:obj:`str`): User supplied folder name\n",
" RPM_DEFAULT_BUCKET_FOLDER_NAME:(:obj:`str`): Name for creating a\n",
" bucket, when User hasn't supplied a folder name\n",
"\n",
" Returns:\n",
" Outputs(:obj: `tuple`): Returns the below outputs:\n",
" rpm_context(:obj:`str`): All variables used in the pipeline\n",
" rpm_gcs_rpm_ds_url(:obj:`str`): Full Google Cloud Storage path with\n",
" bucket name and folder name\n",
" \"\"\"\n",
" # loading rpm_context string\n",
" import json\n",
" rpm_context = json.loads(ctx)\n",
" print(rpm_context)\n",
" RPM_GCP_STORAGE_BUCKET = rpm_context['RPM_GCP_STORAGE_BUCKET']\n",
" RPM_GCP_PROJECT = rpm_context['RPM_GCP_PROJECT']\n",
" RPM_DEFAULT_BUCKET_EXT = rpm_context['RPM_DEFAULT_BUCKET_EXT']\n",
" RPM_GCP_STORAGE_BUCKET_FOLDER = rpm_context['RPM_GCP_STORAGE_BUCKET_FOLDER']\n",
" RPM_DEFAULT_BUCKET_FOLDER_NAME = rpm_context['RPM_DEFAULT_BUCKET_FOLDER_NAME']\n",
"\n",
" if RPM_GCP_STORAGE_BUCKET:\n",
" gcs_storage_bucket_name = RPM_GCP_STORAGE_BUCKET\n",
" else:\n",
" gcs_storage_bucket_name = RPM_GCP_PROJECT + RPM_DEFAULT_BUCKET_EXT\n",
"\n",
" if RPM_GCP_STORAGE_BUCKET_FOLDER:\n",
" gcs_folder_name = RPM_GCP_STORAGE_BUCKET_FOLDER + '/'\n",
" else:\n",
" gcs_folder_name = RPM_DEFAULT_BUCKET_FOLDER_NAME + '/'\n",
" print(f\"{gcs_storage_bucket_name} bucket and {gcs_folder_name} will be used in the project.\")\n",
"\n",
" rpm_gcs_rpm_ds_url = f\"gs://{gcs_storage_bucket_name}/{gcs_folder_name}\"\n",
" print(rpm_gcs_rpm_ds_url)\n",
" rpm_context['rpm_gcs_rpm_ds_url'] = rpm_gcs_rpm_ds_url\n",
"\n",
" # defining the install function\n",
" import subprocess\n",
" def install(name):\n",
" subprocess.call(['pip', 'install', name])\n",
" pacakages_to_install = ['google-cloud', 'google-cloud-storage']\n",
" for each_package in pacakages_to_install:\n",
" install(each_package)\n",
" print(f\"'{each_package}' package installed :)\")\n",
"\n",
" cmd = f\"gcloud config set project {rpm_context['RPM_GCP_PROJECT']}\"\n",
" print(cmd)\n",
" process = subprocess.run(cmd, shell=True, check=True,\n",
" stdout=subprocess.PIPE, universal_newlines=True)\n",
" print(f\"'output from {cmd}': {process.stdout}\")\n",
"\n",
" from google.cloud import storage\n",
" from google.cloud.exceptions import NotFound\n",
" from google.cloud.exceptions import Forbidden\n",
" import traceback\n",
"\n",
" def check_storage_bucket_and_folder(bucket_name, folder_name):\n",
" if not bucket_name or not folder_name:\n",
" return(False, False)\n",
" client = storage.Client()\n",
" try: # check if the bucket exists and that we have the proper permission\n",
" bucket = client.get_bucket(bucket_name)\n",
" print(f\"Bucket: {bucket_name} exists.\")\n",
" bucket_exists = True\n",
" try:\n",
" blob = bucket.get_blob(folder_name)\n",
" if blob is None:\n",
" print(f\"Folder name {folder_name} does not exist!\")\n",
" folder_exists = False\n",
" else:\n",
" print(f\"Folder name {folder_name} exist.\")\n",
" folder_exists = True\n",
" except:\n",
" print(f\"Folder name {folder_name} doest not exist!\")\n",
" folder_exists = False\n",
" except Forbidden as e:\n",
" print(f\"Sorry, you don't have access to the bucket: {bucket_name}!\")\n",
" print(e)\n",
" error = traceback.format_exc()\n",
" print(error)\n",
" bucket_exists = False\n",
" folder_exists = False\n",
" except NotFound as e:\n",
" print(f\"Sorry, the bucket: {bucket_name} does not exist!\")\n",
" print(e)\n",
" error = traceback.format_exc()\n",
" print(error)\n",
" bucket_exists = False\n",
" folder_exists = False\n",
" return(bucket_exists, folder_exists)\n",
"\n",
" # Create a bucket if it doesn't exists\n",
" def create_storage_bucket(bucket_name):\n",
" if bucket_name:\n",
" client = storage.Client()\n",
" try:\n",
" bucket = client.create_bucket(bucket_name)\n",
" print(f\"Bucket {bucket.name} created\")\n",
" return True\n",
" except Exception as e:\n",
" print(f\"Bucket {bucket_name} couldn't be created\")\n",
" print(e)\n",
" error = traceback.format_exc()\n",
" print(error)\n",
" return False\n",
" else:\n",
" print(f\"Bucket {bucket_name} couldn't be created. Name is empty.\")\n",
" return False\n",
"\n",
" # Create the folder in the bucket\n",
" def create_storage_folder(bucket_name, folder_name):\n",
" if len(bucket_name) == 0 or len(folder_name) == 0:\n",
" print(f\"Folder {folder_name} couldn't be created. Name is empty.\")\n",
" return False\n",
" else:\n",
" client = storage.Client()\n",
" try:\n",
" bucket = client.get_bucket(bucket_name)\n",
" blob = bucket.blob(folder_name)\n",
" blob.upload_from_string('')\n",
" print(f\"Folder {blob.name} created\")\n",
" return True\n",
" except Exception as e:\n",
" print(f\"Folder {folder_name} couldn't be created\")\n",
" print(e)\n",
" error = traceback.format_exc()\n",
" print(error)\n",
" return False\n",
"\n",
" result = check_storage_bucket_and_folder(gcs_storage_bucket_name,\n",
" gcs_folder_name)\n",
" if result[0] == False:\n",
" create_storage_bucket(gcs_storage_bucket_name)\n",
" if result[1] == False:\n",
" create_storage_folder(gcs_storage_bucket_name, gcs_folder_name)\n",
"\n",
" return (\n",
" json.dumps(rpm_context),\n",
" rpm_context['rpm_gcs_rpm_ds_url']\n",
" )"
],
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {
"id": "wg5AdXbuwapF",
"colab_type": "text"
},
"source": [
"# *Test locally create_gcs_bucket_folder*"
]
},
{
"cell_type": "code",
"metadata": {
"tags": [],
"id": "kLTlHVH6wapG",
"colab_type": "code",
"colab": {}
},
"source": [
"# test locally create_gcs_bucket_folder\n",
"# You could unit test the above code in your local environment.\n",
"# You don't need to execute the code, when simply building or running the KFP pipeline (experiment)\n",
"local_context = get_local_context()\n",
"import json\n",
"update_local_context(create_gcs_bucket_folder(\n",
" json.dumps(local_context),\n",
" local_context['RPM_GCP_STORAGE_BUCKET'],\n",
" local_context['RPM_GCP_PROJECT'],\n",
" local_context['RPM_DEFAULT_BUCKET_EXT'],\n",
" local_context['RPM_GCP_STORAGE_BUCKET_FOLDER'],\n",
" local_context['RPM_DEFAULT_BUCKET_FOLDER_NAME']\n",
"))\n"
],
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {
"id": "IEvTkyPhwapH",
"colab_type": "text"
},
"source": [
"# Create BigQuery dataset - function"
]
},
{
"cell_type": "code",
"metadata": {
"id": "A2OwWAnBwapI",
"colab_type": "code",
"colab": {}
},
"source": [
"# create_bq_ds\n",
"from typing import NamedTuple\n",
"def create_bq_ds(ctx: str,\n",
" RPM_GCP_PROJECT: str,\n",
" RPM_BQ_DATASET_NAME: str,\n",
" RPM_LOCATION: str\n",
" ) -> NamedTuple('Outputs', [\n",
" ('rpm_context', str),\n",
" ('rpm_bq_ds_name', str),\n",
" ]):\n",
" \"\"\"The function(also used as a base for a KFP Component) creates a\n",
" BigQuery dataset if don't exist.\n",
"\n",
" The idea is to create DataSet only on the first run of the pipeline.\n",
" The pipeline uses the same DataSet for repeated runs.\n",
" Args:\n",
" ctx(:obj:`str`): The dict object with all the variables\n",
" used in the pipeline\n",
" RPM_GCP_PROJECT:(:obj:`str`): Google Cloud project for deployment\n",
" RPM_BQ_DATASET_NAME:(:obj:`str`): Name of the dataset.\n",
" RPM_LOCATION:(:obj:`str`): Location of the Google Cloud region\n",
" of the BigQuery dataset\n",
" Returns:\n",
" Outputs(:obj: `tuple`): Returns the below outputs:\n",
" rpm_context(:obj:`str`): All variables used in the pipeline\n",
" rpm_bq_ds_name(:obj:`str`): The dataset name\n",
" used in the rest of the pipeline\n",
" \"\"\"\n",
" import json\n",
" rpm_context = json.loads(ctx)\n",
" print(rpm_context)\n",
"\n",
" rpm_bq_ds_name = rpm_context['RPM_BQ_DATASET_NAME']\n",
" if not rpm_bq_ds_name:\n",
" rpm_bq_ds_name = \\\n",
" f\"{rpm_context['RPM_GCP_PROJECT']}{rpm_context['RPM_DEFAULT_DATA_SET_NAME_EXT']}\"\n",
" rpm_bq_ds_name = rpm_bq_ds_name.replace('-', '_')\n",
"\n",
" rpm_context['rpm_bq_ds_name'] = rpm_bq_ds_name\n",
"\n",
" import subprocess\n",
" import traceback\n",
" def exec_cmd(cmd):\n",
" try:\n",
" print(cmd)\n",
" process = subprocess.run(cmd, shell=True, check=True,\n",
" stdout=subprocess.PIPE, universal_newlines=True)\n",
" print(f\"'output from {cmd}': {process.stdout}\")\n",
" except subprocess.CalledProcessError as e:\n",
" error = traceback.format_exc()\n",
" print(error)\n",
" print(e.output)\n",
"\n",
" exec_cmd(f\"gcloud config set project {rpm_context['RPM_GCP_PROJECT']}\")\n",
"\n",
" def install(name):\n",
" subprocess.call(['pip', 'install', name])\n",
" pacakages_to_install = ['google-cloud', 'google-cloud-bigquery']\n",
" for each_package in pacakages_to_install:\n",
" install(each_package)\n",
" print(f\"'{each_package}' package installed :)\")\n",
"\n",
" from google.cloud import bigquery\n",
" from google.cloud.exceptions import NotFound\n",
" client = bigquery.Client()\n",
" dataset_id = f\"{rpm_context['RPM_GCP_PROJECT']}.{rpm_bq_ds_name}\"\n",
" ds_found = True\n",
" try:\n",
" client.get_dataset(dataset_id) # Make an API request.\n",
" print('Dataset {} already exists'.format(dataset_id))\n",
" except NotFound:\n",
" print('Dataset {} is not found'.format(dataset_id))\n",
" ds_found = False\n",
"\n",
" if ds_found is False:\n",
" try:\n",
" # Construct a full Dataset object to send to the API.\n",
" dataset = bigquery.Dataset(dataset_id)\n",
" dataset.location = rpm_context['RPM_LOCATION'].split('-')[0].upper()\n",
" dataset = client.create_dataset(dataset) # Make an API request.\n",
" print('Created dataset {}.{} in location: {}.'.\\\n",
" format(client.project, dataset.dataset_id, dataset.location))\n",
" except Exception as e:\n",
" error = traceback.format_exc()\n",
" print(error)\n",
" print(e)\n",
" raise RuntimeError(f\"Can't create the BigQuery DS {dataset_id}\")\n",
"\n",
" return (\n",
" json.dumps(rpm_context),\n",
" rpm_context['rpm_bq_ds_name']\n",
" )\n"
],
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {
"id": "Depc_dw5wapK",
"colab_type": "text"
},
"source": [
"# *Test locally create_bq_ds*"
]
},
{
"cell_type": "code",
"metadata": {
"tags": [],
"id": "xpJX3dTwwapK",
"colab_type": "code",
"colab": {}
},
"source": [
"# test locally create_bq_ds\n",
"# You could unit test the above code in your local environment.\n",
"# You don't need to execute the code, when simply building or running the KFP pipeline (experiment)\n",
"local_context = get_local_context()\n",
"import json\n",
"update_local_context(create_bq_ds(\n",
" json.dumps(local_context),\n",
" local_context['RPM_GCP_PROJECT'],\n",
" local_context['RPM_BQ_DATASET_NAME'],\n",
" local_context['RPM_LOCATION']))\n"
],
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {
"id": "3UuyNqskwapN",
"colab_type": "text"
},
"source": [
"# Load the data to BigQuery - function"
]
},
{
"cell_type": "code",
"metadata": {
"id": "wkXoZqDAwapN",
"colab_type": "code",
"colab": {}
},
"source": [
"from typing import NamedTuple\n",
"def load_bq_ds(ctx: str,\n",
" RPM_GCP_PROJECT: str,\n",
" RPM_BQ_TABLE_NAME: str,\n",
" RPM_DEFAULT_BQ_TABLE_NAME_EXT: str,\n",
" rpm_bq_ds_name: str, ) -> NamedTuple('Outputs', [\n",
" ('rpm_context', str),\n",
" ('rpm_table_id', str),\n",
" ]):\n",
" \"\"\"The function(also used as a base for a KFP Component)\n",
" loads the data to a BigQuery table.\n",
"\n",
" You need to replace the component with your data source\n",
" e.g. you might download the data from a different source,\n",
" in which case you to code those steps\n",
" Decide on your load strategy here such add or append.\n",
" Furthermore you could cache this KFP component\n",
" if the load is just a onetime thing.\n",
" Args:\n",
" ctx(:obj:`str`): The dict object with all the variables\n",
" used in the pipeline\n",
" RPM_GCP_PROJECT:(:obj:`str`): Google Cloud project for deployment\n",
" RPM_BQ_TABLE_NAME:(:obj:`str`): Name of the table.\n",
" RPM_DEFAULT_BQ_TABLE_NAME_EXT:(:obj:`str`): Default table name\n",
" if the user didn't provide one(RPM_BQ_TABLE_NAME)\n",
" rpm_bq_ds_name(:obj:`str`): The dataset name\n",
" used in the rest of the pipeline\n",
" Returns:\n",
" Outputs(:obj: `tuple`): Returns the below outputs:\n",
" rpm_context(:obj:`str`): All variables used in the pipeline\n",
" rpm_table_id(:obj:`str`): The table name\n",
" used in the rest of the pipeline\n",
" \"\"\"\n",
" import json\n",
" rpm_context = json.loads(ctx)\n",
" print(rpm_context)\n",
"\n",
" rpm_bq_ds_name = rpm_context['rpm_bq_ds_name']\n",
" dataset_id = f\"{rpm_context['RPM_GCP_PROJECT']}.{rpm_bq_ds_name}\"\n",
"\n",
" if not rpm_context['RPM_BQ_TABLE_NAME']:\n",
" rpm_table_id = f\"{dataset_id}.{rpm_bq_ds_name}{rpm_context['RPM_DEFAULT_BQ_TABLE_NAME_EXT']}\"\n",
" else:\n",
" rpm_table_id = f\"{dataset_id}.{rpm_context['RPM_BQ_TABLE_NAME']}\"\n",
" rpm_context['rpm_table_id'] = rpm_table_id\n",
"\n",
" import subprocess\n",
" def install(name):\n",
" subprocess.call(['pip', 'install', name])\n",
" pacakages_to_install = ['google-cloud', 'google-cloud-bigquery']\n",
" for each_package in pacakages_to_install:\n",
" install(each_package)\n",
" print(f\"'{each_package}' package installed :)\")\n",
"\n",
" query = f\"\"\"\n",
" # select initial features and label to feed into our model\n",
" CREATE OR REPLACE TABLE {rpm_table_id}\n",
" OPTIONS(\n",
" description=\"Google Store curated Data\"\n",
" ) AS \n",
" SELECT\n",
" fullVisitorId,\n",
" bounces,\n",
" time_on_site,\n",
" will_buy_on_return_visit # <--- our label\n",
" FROM\n",
" # features\n",
" (SELECT\n",
" fullVisitorId,\n",
" IFNULL(totals.bounces, 0) AS bounces,\n",
" IFNULL(totals.timeOnSite, 0) AS time_on_site\n",
" FROM\n",
" `bigquery-public-data.google_analytics_sample.*`\n",
" WHERE\n",
" totals.newVisits = 1\n",
" AND date BETWEEN '20160801' AND '20170430') # train on first 9 months\n",
" JOIN\n",
" (SELECT\n",
" fullvisitorid,\n",
" IF(COUNTIF(totals.transactions > 0 AND totals.newVisits IS NULL) > 0, 1, 0) AS will_buy_on_return_visit\n",
" FROM\n",
" `bigquery-public-data.google_analytics_sample.*`\n",
" GROUP BY fullvisitorid)\n",
" USING (fullVisitorId)\n",
" ORDER BY time_on_site DESC # order by most time spent first\n",
" \"\"\"\n",
" print(query)\n",
" import traceback\n",
" from google.cloud import bigquery\n",
" try:\n",
" client = bigquery.Client()\n",
" print(query)\n",
" query_job = client.query(query) # Make an API request.\n",
" print(f\"Table {rpm_table_id} created.\")\n",
" except Exception as e:\n",
" error = traceback.format_exc()\n",
" print(error)\n",
" print(e)\n",
" raise RuntimeError(f\"Can't create the table {rpm_table_id}\")\n",
" destination_table = rpm_table_id\n",
" print(f\"{destination_table}\")\n",
"\n",
" return (\n",
" json.dumps(rpm_context),\n",
" rpm_context['rpm_table_id']\n",
" )"
],
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {
"id": "PUrom3OYwapP",
"colab_type": "text"
},
"source": [
"# *Test locally load_bq_ds*"
]
},
{
"cell_type": "code",
"metadata": {
"tags": [],
"id": "-EVZQFLEwapQ",
"colab_type": "code",
"colab": {}
},
"source": [
"# test locally load_bq_ds\n",
"# You could unit test the above code in your local environment.\n",
"# You don't need to execute the code, when simply building or running the KFP pipeline (experiment)\n",
"local_context = get_local_context()\n",
"import json\n",
"update_local_context(load_bq_ds(\n",
" json.dumps(local_context),\n",
" local_context['RPM_GCP_PROJECT'],\n",
" local_context['RPM_BQ_TABLE_NAME'],\n",
" local_context['RPM_DEFAULT_BQ_TABLE_NAME_EXT'],\n",
" local_context['rpm_bq_ds_name'],\n",
" ))\n"
],
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {
"id": "sUsmunohwapS",
"colab_type": "text"
},
"source": [
"# Create the BigQuery ML model - function"
]
},
{
"cell_type": "code",
"metadata": {
"id": "dx31iHRcwapS",
"colab_type": "code",
"colab": {}
},
"source": [
"# create_bq_ml\n",
"from typing import NamedTuple\n",
"def create_bq_ml(ctx: str,\n",
" RPM_GCP_PROJECT: str,\n",
" RPM_MODEL_NAME: str,\n",
" RPM_DEFAULT_MODEL_NAME: str,\n",
" rpm_bq_ds_name: str,\n",
" rpm_table_id: str ) -> NamedTuple('Outputs', [\n",
" ('rpm_context', str),\n",
" ('rpm_bqml_model', str),\n",
" ]):\n",
" \"\"\" The function(also used as a base for a KFP Component) creates a model\n",
" from the data that you have already loaded previously.\n",
"\n",
" You need to adjust the model type, model hyperparamters, features,\n",
" and label depending on your need.\n",
" Args:\n",
" ctx(:obj:`str`): The dict object with all the variables\n",
" used in the pipeline\n",
" RPM_GCP_PROJECT:(:obj:`str`): Google Cloud project for deployment\n",
" RPM_MODEL_NAME:(:obj:`str`): Name of the model.\n",
" RPM_DEFAULT_MODEL_NAME:(:obj:`str`): Default model name\n",
" if the user didn't provide one(RPM_MODEL_NAME)\n",
" rpm_bq_ds_name(:obj:`str`): The dataset name\n",
" used in the rest of the pipeline\n",
" rpm_table_id(:obj:`str`): The table name\n",
" used in the rest of the pipeline\n",
" Returns:\n",
" Outputs(:obj: `tuple`): Returns the below outputs:\n",
" rpm_context(:obj:`str`): All variables used in the pipeline\n",
" rpm_bqml_model(:obj:`str`): The model name\n",
" used in the rest of the pipeline\n",
" \"\"\"\n",
" import json\n",
" rpm_context = json.loads(ctx)\n",
" print(rpm_context)\n",
"\n",
" rpm_bqml_model = rpm_context['RPM_MODEL_NAME']\n",
" if not rpm_bqml_model:\n",
" rpm_bqml_model = rpm_context['RPM_DEFAULT_MODEL_NAME']\n",
" rpm_bqml_model = rpm_bqml_model.replace('-', '_')\n",
" rpm_context['rpm_bqml_model'] = rpm_bqml_model\n",
"\n",
" import subprocess\n",
" import traceback\n",
" def exec_cmd(cmd):\n",
" try:\n",
" print(cmd)\n",
" process = subprocess.run(cmd, shell=True, check=True,\n",
" stdout=subprocess.PIPE, universal_newlines=True)\n",
" print(f\"'output from {cmd}': {process.stdout}\")\n",
" return process.stdout\n",
" except subprocess.CalledProcessError as e:\n",
" error = traceback.format_exc()\n",
" print(error)\n",
" print(e.output)\n",
" return e.output\n",
"\n",
" exec_cmd(f\"gcloud config set project {rpm_context['RPM_GCP_PROJECT']}\")\n",
"\n",
" bqml_create_sql = f\"\"\"\n",
" CREATE OR REPLACE MODEL\n",
" {rpm_context['rpm_bq_ds_name']}.{rpm_bqml_model}\n",
" OPTIONS\n",
" ( model_type='LOGISTIC_REG',\n",
" auto_class_weights=TRUE,\n",
" input_label_cols=['will_buy_on_return_visit']) AS\n",
" SELECT\n",
" * EXCEPT(fullVisitorId)\n",
" FROM\n",
" {rpm_context['rpm_table_id'].replace(RPM_GCP_PROJECT+'.', '')}\n",
" \"\"\"\n",
" # you can uncomment the below query to try out the XGBoost model\n",
" # bqml_create_sql = f\"\"\"\n",
" # CREATE OR REPLACE MODEL\n",
" # \\`{rpm_context['rpm_bq_ds_name']}.{rpm_bqml_model}\\`\n",
" # OPTIONS(MODEL_TYPE='BOOSTED_TREE_CLASSIFIER',\n",
" # BOOSTER_TYPE = 'GBTREE',\n",
" # NUM_PARALLEL_TREE = 1,\n",
" # MAX_ITERATIONS = 50,\n",
" # TREE_METHOD = 'HIST',\n",
" # EARLY_STOP = FALSE,\n",
" # SUBSAMPLE = 0.85,\n",
" # INPUT_LABEL_COLS = ['will_buy_on_return_visit'])\n",
" # AS\n",
" # SELECT\n",
" # * EXCEPT(fullVisitorId)\n",
" # FROM\n",
" # \\`{rpm_context['rpm_table_id']}\\`\n",
" # \"\"\"\n",
" exec_cmd(f'bq query --use_legacy_sql=false \"{bqml_create_sql}\"')\n",
" bq_model_created = exec_cmd(f\"bq ls -m --format=pretty {rpm_context['rpm_bq_ds_name']} | grep {rpm_bqml_model}\")\n",
" if not bq_model_created:\n",
" raise RuntimeError(f\"Please check if the model {rpm_context['rpm_bq_ds_name']} created.\")\n",
"\n",
" return (\n",
" json.dumps(rpm_context),\n",
" rpm_context['rpm_bqml_model']\n",
" )"
],
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {
"id": "z2Zjf895wapU",
"colab_type": "text"
},
"source": [
"# *Test locally create_bq_ml*"
]
},
{
"cell_type": "code",
"metadata": {
"tags": [],
"id": "4S8dNWQ2wapV",
"colab_type": "code",
"colab": {}
},
"source": [
"# test locally create_bq_ml\n",
"# You could unit test the above code in your local environment.\n",
"# You don't need to execute the code, when simply building or running the KFP pipeline (experiment)\n",
"local_context = get_local_context()\n",
"import json\n",
"update_local_context(create_bq_ml(\n",
" json.dumps(local_context),\n",
" local_context['RPM_GCP_PROJECT'],\n",
" local_context['RPM_MODEL_NAME'],\n",
" local_context['RPM_DEFAULT_MODEL_NAME'],\n",
" local_context['rpm_bq_ds_name'],\n",
" local_context['rpm_table_id']))"
],
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {
"id": "hiw0V-JQwapW",
"colab_type": "text"
},
"source": [
"# Evaluate the BigQuery ML model - function"
]
},
{
"cell_type": "code",
"metadata": {
"id": "zUD6ucM2wapX",
"colab_type": "code",
"colab": {}
},
"source": [
"# evaluate_ml_model\n",
"from typing import NamedTuple\n",
"def evaluate_ml_model(ctx: str,\n",
" RPM_GCP_PROJECT: str,\n",
" rpm_bq_ds_name: str,\n",
" rpm_bqml_model: str, ) -> NamedTuple('Outputs', [\n",
" ('rpm_context', str),\n",
" ('rpm_eval_query', str),\n",
" ('rpm_eval_result', str),\n",
" ]):\n",
" \"\"\" The function(also used as a base for a KFP Component) evaluates\n",
" the model you created.\n",
"\n",
" Update your selection criteria and stop the pipeline\n",
" if the model didn't meet the criteria\n",
" You can raise an exception to stop the pipeline\n",
" Args:\n",
" ctx(:obj:`str`): The dict object with all the variables\n",
" used in the pipeline\n",
" RPM_GCP_PROJECT:(:obj:`str`): Google Cloud project for deployment\n",
" rpm_bq_ds_name(:obj:`str`): The dataset name\n",
" used in the rest of the pipeline\n",
" rpm_bqml_model(:obj:`str`): The model name\n",
" used in the rest of the pipeline\n",
" Returns:\n",
" Outputs(:obj: `tuple`): Returns the below outputs:\n",
" rpm_context(:obj:`str`): All variables used in the pipeline\n",
" rpm_eval_query(:obj:`str`): The evaluate sql query,\n",
" which is saved for auditing purpose in the pipeline artifacts\n",
" rpm_eval_result(:obj:`str`): The result of the evaluated query,\n",
" which is saved for auditing purpose in the pipeline artifacts\n",
" \"\"\"\n",
" import json\n",
" rpm_context = json.loads(ctx)\n",
" print(rpm_context)\n",
"\n",
" import subprocess\n",
" import traceback\n",
" def exec_cmd(cmd):\n",
" try:\n",
" print(cmd)\n",
" process = subprocess.run(cmd, shell=True, check=True,\n",
" stdout=subprocess.PIPE, universal_newlines=True)\n",
" print(f\"'output from {cmd}': {process.stdout}\")\n",
" return(process.stdout, 0)\n",
" except subprocess.CalledProcessError as e:\n",
" error = traceback.format_exc()\n",
" print(error)\n",
" print(e.output)\n",
" return(e.output, 1)\n",
"\n",
" exec_cmd(f\"gcloud config set project {rpm_context['RPM_GCP_PROJECT']}\")\n",
"\n",
" bqml_eval_query = f\"\"\"\n",
" SELECT\n",
" roc_auc, CASE WHEN roc_auc > .9 THEN 'good'\n",
" WHEN roc_auc > .8 THEN 'fair' WHEN roc_auc > .7 THEN 'decent'\n",
" WHEN roc_auc > .6 THEN 'not great' ELSE 'poor' END AS modelquality\n",
" FROM\n",
" ML.EVALUATE(MODEL {rpm_bq_ds_name}.{rpm_bqml_model})\n",
" \"\"\"\n",
" rpm_eval_result = exec_cmd(f'bq query --use_legacy_sql=false --format=json \"{bqml_eval_query}\"')\n",
" print(rpm_eval_result)\n",
"\n",
" rpm_context['bqml_eval_query'] = bqml_eval_query\n",
" rpm_context['rpm_eval_result'] = rpm_eval_result\n",
"\n",
" return (\n",
" json.dumps(rpm_context),\n",
" f\"\"\"{bqml_eval_query}\"\"\",\n",
" f\"\"\"{rpm_eval_result}\"\"\",\n",
" )"
],
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {
"id": "snaVs3CwwapZ",
"colab_type": "text"
},
"source": [
"# *Test locally evaluate_ml_model*"
]
},
{
"cell_type": "code",
"metadata": {
"tags": [],
"id": "SUFHCPfRwapa",
"colab_type": "code",
"colab": {}
},
"source": [
"# test locally evaluate_ml_model\n",
"# You could unit test the above code in your local environment.\n",
"# You don't need to execute the code, when simply building or running the KFP pipeline (experiment)\n",
"local_context = get_local_context()\n",
"import json\n",
"update_local_context(evaluate_ml_model(\n",
" json.dumps(local_context),\n",
" local_context['RPM_GCP_PROJECT'],\n",
" local_context['rpm_bq_ds_name'],\n",
" local_context['rpm_bqml_model'],\n",
" ))"
],
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {
"id": "ledhuYLIwapb",
"colab_type": "text"
},
"source": [
"# Prepare dataset for batch prediction with BigQuery ML - function"
]
},
{
"cell_type": "code",
"metadata": {
"id": "btqXpW9ywapc",
"colab_type": "code",
"colab": {}
},
"source": [
"# create_batch_prediction_dataset\n",
"from typing import NamedTuple\n",
"def create_batch_prediction_dataset(ctx: str,\n",
" RPM_GCP_PROJECT: str,\n",
" rpm_bq_ds_name: str,\n",
" rpm_table_id: str ) -> NamedTuple('Outputs', [\n",
" ('rpm_context', str),\n",
" ('rpm_pred_table_id', str),\n",
" ]):\n",
" \"\"\" The function(also used as a base for a KFP Component)\n",
" creates a BigQuery table which contains the input data for\n",
" which we want predictions.\n",
"\n",
" You might not need this this componenet with\n",
" your input table if already exists.\n",
" You might need some transformation or filteration on your input data,\n",
" in which case you need to make appropriate code change.\n",
" Args:\n",
" ctx(:obj:`str`): The dict object with all the variables\n",
" used in the pipeline\n",
" RPM_GCP_PROJECT:(:obj:`str`): Google Cloud project for deployment\n",
" rpm_bq_ds_name(:obj:`str`): The dataset name used in the rest of the pipeline\n",
" rpm_table_id(:obj:`str`): The table name used in the rest of the pipeline\n",
" Returns:\n",
" Outputs(:obj: `tuple`): Returns the below outputs:\n",
" rpm_context(:obj:`str`): All variables used in the pipeline\n",
" rpm_pred_table_id(:obj:`str`): The table that contains the input data,\n",
" which we want batch predict later\n",
" \"\"\"\n",
" import json\n",
" rpm_context = json.loads(ctx)\n",
" print(rpm_context)\n",
"\n",
" import subprocess\n",
" def install(name):\n",
" subprocess.call(['pip', 'install', name])\n",
" pacakages_to_install = ['google-cloud', 'google-cloud-bigquery']\n",
" for each_package in pacakages_to_install:\n",
" install(each_package)\n",
" print(f\"'{each_package}' package installed :)\")\n",
"\n",
" rpm_pred_table_id = rpm_table_id.replace('_tbl', '_pred_tbl')\n",
"\n",
" query = f\"\"\"\n",
" # create the input table to conduct batch predict\n",
" CREATE OR REPLACE TABLE {rpm_pred_table_id}\n",
" OPTIONS(\n",
" description=\"Input data for prediction\"\n",
" ) AS\n",
" SELECT *\n",
" FROM {rpm_context['rpm_table_id']}\n",
" LIMIT 10\n",
" \"\"\"\n",
" print(query)\n",
" import traceback\n",
" from google.cloud import bigquery\n",
" try:\n",
" client = bigquery.Client()\n",
" query_job = client.query(query) # Make an API request.\n",
" print(f\"Table {rpm_pred_table_id} created.\")\n",
" except Exception as e:\n",
" error = traceback.format_exc()\n",
" print(error)\n",
" print(e)\n",
" raise RuntimeError(f\"Can't create the table {rpm_pred_table_id}\")\n",
"\n",
" rpm_context['rpm_pred_table_id'] = rpm_pred_table_id\n",
"\n",
" return (json.dumps(rpm_context),\n",
" rpm_context['rpm_pred_table_id'],\n",
" )"
],
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {
"id": "3qCL6izYwapd",
"colab_type": "text"
},
"source": [
"# *Test locally create batch prediction dataset*"
]
},
{
"cell_type": "code",
"metadata": {
"tags": [],
"id": "AIz9L4p5wape",
"colab_type": "code",
"colab": {}
},
"source": [
"#test locally create batch prediction dataset\n",
"# You could unit test the above code in your local environment.\n",
"# You don't need to execute the code, when simply building or running the KFP pipeline (experiment)\n",
"local_context = get_local_context()\n",
"import json\n",
"update_local_context(create_batch_prediction_dataset(\n",
" json.dumps(local_context),\n",
" local_context['RPM_GCP_PROJECT'],\n",
" local_context['rpm_bq_ds_name'],\n",
" local_context['rpm_table_id'],\n",
" ))"
],
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {
"id": "TlW2h_ppwapg",
"colab_type": "text"
},
"source": [
"# Make batch prediction - function"
]
},
{
"cell_type": "code",
"metadata": {
"id": "od7AKx6Wwaph",
"colab_type": "code",
"colab": {}
},
"source": [
"# predict_batch_ml_model\n",
"from typing import NamedTuple\n",
"def predict_batch_ml_model(ctx: str,\n",
" RPM_GCP_PROJECT: str,\n",
" rpm_bq_ds_name: str,\n",
" rpm_bqml_model: str,\n",
" rpm_pred_table_id: str, ) -> NamedTuple('Outputs', [\n",
" ('rpm_context', str),\n",
" ('rpm_predict_batch_output', str),\n",
" ]):\n",
" \"\"\"\n",
" The function(also used as a base for a KFP Component) uses the model\n",
" to predict the data in mass.\n",
"\n",
" You migth also need to save the predicted values\n",
" at an appropriate repository of your choice.\n",
" Currently the predicted value is printed on the console\n",
" and returned as an output from the function.\n",
" Args:\n",
" ctx(:obj:`str`): The dict object with all the variables\n",
" used in the pipeline\n",
" RPM_GCP_PROJECT:(:obj:`str`): Google Cloud project for deployment\n",
" rpm_bq_ds_name(:obj:`str`): The dataset name\n",
" used in the rest of the pipeline\n",
" rpm_bqml_model(:obj:`str`): The model name\n",
" used in the rest of the pipeline\n",
" rpm_pred_table_id(:obj:`str`): The table that contains the input data\n",
" which we want batch predict later\n",
" Returns:\n",
" Outputs(:obj: `tuple`): Returns the below outputs:\n",
" rpm_context(:obj:`str`): All variables used in the pipeline\n",
" rpm_predict_batch_output(:obj:`str`): The output f\n",
" rom the batch prediction\n",
" \"\"\"\n",
" import json\n",
" rpm_context = json.loads(ctx)\n",
" print(rpm_context)\n",
"\n",
" import subprocess\n",
" def install(name):\n",
" subprocess.call(['pip', 'install', name])\n",
" pacakages_to_install = [\"google-cloud\", \"google-cloud-bigquery\"]\n",
" for each_package in pacakages_to_install:\n",
" install(each_package)\n",
" print(f\"'{each_package}' package installed :)\")\n",
"\n",
" query = f\"\"\"\n",
" # predict the inputs (rows) from the input table\n",
" SELECT\n",
" fullVisitorId, predicted_will_buy_on_return_visit\n",
" FROM\n",
" ML.PREDICT(MODEL {rpm_bq_ds_name}.{rpm_bqml_model},\n",
" (\n",
" SELECT\n",
" fullVisitorId,\n",
" bounces,\n",
" time_on_site\n",
" from {rpm_pred_table_id}\n",
" ))\n",
" \"\"\"\n",
" print(query)\n",
" import traceback\n",
" from google.cloud import bigquery\n",
" output = \"\"\n",
" try:\n",
" client = bigquery.Client()\n",
" query_job = client.query(query) # Make an API request.\n",
" print(\"The query data:\")\n",
" for row in query_job:\n",
" # Row values can be accessed by field name or index.\n",
" print(f\"row data: {row}\")\n",
" output += str(row)\n",
" except Exception as e:\n",
" error = traceback.format_exc()\n",
" print(error)\n",
" print(e)\n",
" raise RuntimeError(f\"Can't batch predict\")\n",
"\n",
" rpm_context['rpm_predict_output'] = output\n",
"\n",
" return (\n",
" json.dumps(rpm_context),\n",
" rpm_context['rpm_predict_output'],\n",
" )"
],
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {
"id": "Qm8PZE-lwapj",
"colab_type": "text"
},
"source": [
"# *Test locally batch prediction*"
]
},
{
"cell_type": "code",
"metadata": {
"tags": [],
"id": "SYo6XsMKwapj",
"colab_type": "code",
"colab": {}
},
"source": [
"#test locally batch prediction\n",
"# You could unit test the above code in your local environment.\n",
"# You don't need to execute the code, when simply building or running the KFP pipeline (experiment)\n",
"local_context = get_local_context()\n",
"import json\n",
"update_local_context(predict_batch_ml_model(\n",
" json.dumps(local_context),\n",
" local_context['RPM_GCP_PROJECT'],\n",
" local_context['rpm_bq_ds_name'],\n",
" local_context['rpm_bqml_model'],\n",
" local_context['rpm_pred_table_id'],\n",
" ))\n"
],
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {
"id": "sYQBKwJRwapm",
"colab_type": "text"
},
"source": [
"# Determine the new revision of the model - function"
]
},
{
"cell_type": "code",
"metadata": {
"id": "Z7KEFuPawapn",
"colab_type": "code",
"colab": {}
},
"source": [
"# get_bqml_model_version\n",
"from typing import NamedTuple\n",
"def get_bqml_model_version(ctx: str,\n",
" RPM_GCP_PROJECT: str,\n",
" RPM_MODEL_EXPORT_PATH: str,\n",
" RPM_DEFAULT_MODEL_EXPORT_PATH: str,\n",
" RPM_MODEL_VER_PREFIX: str,\n",
" rpm_gcs_rpm_ds_url: str ) -> NamedTuple('Outputs', [\n",
" ('rpm_context', str),\n",
" ('rpm_bqml_model_export_path', str),\n",
" ('rpm_model_version', str),\n",
" ]):\n",
" \"\"\"\n",
" The function(also used as a base for a KFP Component) determines\n",
" the revision of the models.\n",
"\n",
" It checkes the current version and increments by 1.\n",
" It prepares the folder for the BigQuery ML to export the model.\n",
" Args:\n",
" ctx(:obj:`str`): The dict object with all the variables\n",
" used in the pipeline\n",
" RPM_GCP_PROJECT:(:obj:`str`): Google Cloud project for deployment\n",
" RPM_MODEL_EXPORT_PATH(:obj:`str`): User supplied model export path\n",
" RPM_DEFAULT_MODEL_EXPORT_PATH(:obj:`str`): Uses the default path,\n",
" if the user didn't provide a path\n",
" RPM_MODEL_VER_PREFIX(:obj:`str`): The folder with prefix\n",
" rpm_pred_table_id(:obj:`str`): The table that contains the input data\n",
" which we want batch predict later\n",
" Returns:\n",
" Outputs(:obj: `tuple`): Returns the below outputs:\n",
" rpm_context(:obj:`str`): All variables used in the pipeline\n",
" rpm_bqml_model_export_path(:obj:`str`): the path\n",
" to which we can export the model\n",
" rpm_model_version(:obj:`str`): the version which we will use when\n",
" we deploy the model to Cloud AI Prediction\n",
" \"\"\"\n",
" import json\n",
" rpm_context = json.loads(ctx)\n",
" print(rpm_context)\n",
"\n",
" # defining the install function\n",
" import subprocess\n",
" import os\n",
" def install(name):\n",
" subprocess.call(['pip', 'install', name])\n",
" pacakages_to_install = ['google-cloud', 'google-cloud-storage']\n",
" for each_package in pacakages_to_install:\n",
" install(each_package)\n",
" print(f\"'{each_package}' package installed :)\")\n",
"\n",
" cmd = f\"gcloud config set project {rpm_context['RPM_GCP_PROJECT']}\"\n",
" print(cmd)\n",
" process = subprocess.run(cmd, shell=True, check=True,\n",
" stdout=subprocess.PIPE, universal_newlines=True)\n",
" print(f\"'output from {cmd}': {process.stdout}\")\n",
"\n",
" from google.cloud import storage\n",
" from google.cloud.exceptions import NotFound\n",
" from google.cloud.exceptions import Forbidden\n",
" import traceback\n",
"\n",
" client = storage.Client()\n",
" try:\n",
" bucket_name= rpm_context['rpm_gcs_rpm_ds_url'].split('/')[2]\n",
" rpm_bq_ds_name = rpm_context['rpm_gcs_rpm_ds_url'].split('/')[3]\n",
" rpm_bqml_model_export_path = rpm_context['RPM_MODEL_EXPORT_PATH']\n",
" if not rpm_bqml_model_export_path:\n",
" rpm_bqml_model_export_path = rpm_context['RPM_DEFAULT_MODEL_EXPORT_PATH']\n",
" bucket = client.get_bucket(bucket_name)\n",
" print(f'Details: {bucket}')\n",
" folder_name = os.path.join(rpm_bq_ds_name, rpm_bqml_model_export_path)\n",
" blob = bucket.get_blob(folder_name)\n",
" model_version = 0\n",
" if blob is None:\n",
" print(f\"Folder name {folder_name} does not exist!\")\n",
" print(f\"{bucket_name}, {folder_name}\")\n",
" blob = bucket.blob(folder_name)\n",
" blob.upload_from_string('')\n",
" print(f\"Folder name {folder_name} created.\")\n",
" else:\n",
" print(f\"Folder name {folder_name} exist.\")\n",
" client = storage.Client()\n",
" blobs = client.list_blobs(bucket_name, prefix=folder_name)\n",
" print('Blobs:')\n",
" for blob in blobs:\n",
" print(f\"blob name: {blob.name}\")\n",
" curr_ver = blob.name.replace(folder_name, '')\n",
" print(f\"folder_name: {folder_name}\")\n",
" print(f\"after folder_name replace: {curr_ver}\")\n",
" if rpm_context['RPM_MODEL_VER_PREFIX'] in curr_ver \\\n",
" and len(curr_ver.split('/')) == 2 and \\\n",
" len(curr_ver.split('/')[1]) == 0:\n",
" curr_ver = curr_ver.replace(rpm_context['RPM_MODEL_VER_PREFIX'], '').replace('/','').split('/')[0]\n",
" model_version = max(model_version, int(curr_ver))\n",
" # increment the model version\n",
" model_version += 1\n",
" model_version_full_name = f\"{rpm_context['RPM_MODEL_VER_PREFIX']}{model_version}/\"\n",
" folder_name = os.path.join(folder_name, model_version_full_name)\n",
" print(f\"Going to create folder {folder_name} created.\")\n",
" blob = bucket.get_blob(folder_name)\n",
" blob = bucket.blob(folder_name)\n",
" blob.upload_from_string('')\n",
" print(f\"Folder name {folder_name} created.\")\n",
" rpm_context['rpm_bqml_model_export_path'] = os.path.join(rpm_bqml_model_export_path, model_version_full_name)\n",
" rpm_context['rpm_model_version'] = model_version_full_name.rstrip('/')\n",
" except Forbidden as e:\n",
" print(f\"Sorry, you don't have access to the bucket: {bucket_name}!\")\n",
" print(e)\n",
" error = traceback.format_exc()\n",
" print(error)\n",
" except NotFound as e:\n",
" print(f\"Sorry, the bucket: {bucket_name} does not exist!\")\n",
" print(e)\n",
" error = traceback.format_exc()\n",
" print(error)\n",
"\n",
" return (\n",
" json.dumps(rpm_context),\n",
" rpm_context['rpm_bqml_model_export_path'],\n",
" rpm_context['rpm_model_version']\n",
" )"
],
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {
"id": "uoyANmHLwapo",
"colab_type": "text"
},
"source": [
"# *Test locally get_bqml_model_version*"
]
},
{
"cell_type": "code",
"metadata": {
"tags": [],
"id": "D2NAf7qgwapp",
"colab_type": "code",
"colab": {}
},
"source": [
"# test locally get_bqml_model_version\n",
"# You could unit test the above code in your local environment.\n",
"# You don't need to execute the code, when simply building or running the KFP pipeline (experiment)\n",
"local_context = get_local_context()\n",
"import json\n",
"update_local_context(get_bqml_model_version(\n",
" json.dumps(local_context),\n",
" local_context['RPM_GCP_PROJECT'],\n",
" local_context['RPM_MODEL_EXPORT_PATH'],\n",
" local_context['RPM_DEFAULT_MODEL_EXPORT_PATH'],\n",
" local_context['RPM_MODEL_VER_PREFIX'],\n",
" local_context['rpm_gcs_rpm_ds_url']))\n"
],
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {
"id": "qkX573wzwapq",
"colab_type": "text"
},
"source": [
"# Export the BigQuery ML model to the Google Cloud Storage bucket - function"
]
},
{
"cell_type": "code",
"metadata": {
"id": "yBVWPGXBwapr",
"colab_type": "code",
"colab": {}
},
"source": [
"# export_bqml_model_to_gcs\n",
"from typing import NamedTuple\n",
"def export_bqml_model_to_gcs(ctx: str,\n",
" RPM_GCP_PROJECT: str,\n",
" RPM_PVC_NAME: str,\n",
" RPM_PVC_DATASET_FOLDER_NAME: str,\n",
" rpm_bqml_model_export_path: str,\n",
" rpm_gcs_rpm_ds_url: str,\n",
" rpm_bq_ds_name: str,\n",
" rpm_bqml_model: str, ) -> NamedTuple('Outputs', [\n",
" ('rpm_context', str),\n",
" ('rpm_model_uri', str),\n",
" ]):\n",
" \"\"\" The function(also used as a base for a KFP Component) exports\n",
" the BigQuery ML model.\n",
"\n",
" It also saves the the details used in the model\n",
" e.g. losses, learning rate adjustment, #of iterations.\n",
" It also saves the evaluation details e.g. roc, accuracy, etc.\n",
" Args:\n",
" ctx(:obj:`str`): The dict object with all the variables\n",
" used in the pipeline\n",
" RPM_GCP_PROJECT:(:obj:`str`): Google Cloud project for deployment\n",
" RPM_PVC_NAME:(:obj:`str`): Ther persitent volume name\n",
" RPM_MODEL_EXPORT_PATH(:obj:`str`): The path to store the temporary files\n",
" before we upload to Google Cloud Storage\n",
" RPM_PVC_DATASET_FOLDER_NAME(:obj:`str`): The folder name to store\n",
" the temporary files before we upload to Google Cloud Storage\n",
" rpm_bqml_model_export_path(:obj:`str`): The path\n",
" to which we can export the model\n",
" rpm_gcs_rpm_ds_url(:obj:`str`): Full Google Cloud Storage path\n",
" with bucket name and folder name\n",
" rpm_bq_ds_name(:obj:`str`): The dataset name\n",
" used in the rest of the pipeline\n",
" rpm_bqml_model(:obj:`str`): The model name\n",
" used in the rest of the pipeline\n",
" Returns:\n",
" Outputs(:obj: `tuple`): Returns the below outputs:\n",
" rpm_context(:obj:`str`): All variables used in the pipeline\n",
" rpm_model_uri(:obj:`str`): The path to where we export the model\n",
" \"\"\"\n",
" import json\n",
" rpm_context = json.loads(ctx)\n",
" print(rpm_context)\n",
"\n",
" import subprocess\n",
" import traceback\n",
" def exec_cmd(cmd):\n",
" try:\n",
" print(cmd)\n",
" process = subprocess.run(cmd, shell=True, check=True,\n",
" stdout=subprocess.PIPE, universal_newlines=True)\n",
" print(f\"'output from {cmd}': {process.stdout}\")\n",
" return process.stdout\n",
" except subprocess.CalledProcessError as e:\n",
" error = traceback.format_exc()\n",
" print(error)\n",
" print(e.output)\n",
" return e.output\n",
" import os\n",
" if not rpm_context['rpm_bqml_model_export_path']:\n",
" raise RuntimeError(\"Can't export the BigQuery model: export destination is empty!\")\n",
"\n",
" import os\n",
" path_to_ds = os.path.join(rpm_context['RPM_PVC_NAME'],\n",
" rpm_context['RPM_PVC_DATASET_FOLDER_NAME'])\n",
" # check if that the dataset directory already exists\n",
" exec_cmd(f\"test -d {path_to_ds} && echo 'Exists' || echo 'Does not exist'\")\n",
" # create the datset directory\n",
" exec_cmd(f\"mkdir -p {path_to_ds}\")\n",
" # validate that the dataset directory has been created\n",
" exec_cmd(f\"test -d {path_to_ds} && echo 'Exists' || echo 'Does not exist'\")\n",
"\n",
"\n",
" rpm_bqml_model_export_path = os.path.join(rpm_context['rpm_gcs_rpm_ds_url'],\n",
" rpm_context['rpm_bqml_model_export_path'])\n",
" rpm_bqml_model_export_path=rpm_bqml_model_export_path.rstrip('/')\n",
"\n",
" exec_cmd(f\"gcloud config set project {rpm_context['RPM_GCP_PROJECT']}\")\n",
" cmd = f\"bq extract -m {rpm_context['rpm_bq_ds_name']}.{rpm_context['rpm_bqml_model']} {rpm_bqml_model_export_path}\"\n",
" print(cmd)\n",
" exec_cmd(cmd)\n",
"\n",
" rpm_context['rpm_model_uri'] = \\\n",
" os.path.join(rpm_context['rpm_gcs_rpm_ds_url'],\n",
" rpm_context['rpm_bqml_model_export_path'])\n",
"\n",
" bqml_eval_query = f\"\"\"\n",
" SELECT\n",
" *\n",
" FROM\n",
" ML.EVALUATE(MODEL `{rpm_bq_ds_name}.{rpm_bqml_model}`)\n",
" \"\"\"\n",
" rpm_eval_output = exec_cmd(f\"bq query --use_legacy_sql=false --format=json '{bqml_eval_query}'\")\n",
" print(rpm_eval_output)\n",
"\n",
" bqml_train_detail_query = f\"\"\"\n",
" SELECT\n",
" *\n",
" FROM\n",
" ML.TRAINING_INFO(MODEL `{rpm_bq_ds_name}.{rpm_bqml_model}`)\n",
" \"\"\"\n",
" bqml_train_detail_query_output = exec_cmd(f\"bq query --use_legacy_sql=false --format=json '{bqml_train_detail_query}'\")\n",
" print(bqml_train_detail_query_output)\n",
"\n",
" path_to_ds = f\"/{rpm_context['RPM_PVC_NAME']}/{rpm_context['RPM_PVC_DATASET_FOLDER_NAME']}/\"\n",
" import os\n",
" # export the eval model output in a file called evalu_details.txt\n",
"\n",
" rpm_eval_output_filename = os.path.join(path_to_ds, 'eval_detail.txt')\n",
" with open(rpm_eval_output_filename, 'w') as outfile:\n",
" outfile.write(rpm_eval_output)\n",
" exec_cmd(f\"cat {rpm_eval_output_filename}\")\n",
" exec_cmd(f\"gsutil -m cp {rpm_eval_output_filename} {rpm_context['rpm_model_uri']}\")\n",
"\n",
" # export the training details in a file called train_details.txt\n",
" bqml_train_detail_query_filename = os.path.join(path_to_ds, 'train_detail.txt')\n",
" with open(bqml_train_detail_query_filename, 'w') as outfile:\n",
" outfile.write(bqml_train_detail_query_output)\n",
" exec_cmd(f\"cat {bqml_train_detail_query_filename}\")\n",
" exec_cmd(f\"gsutil -m cp {bqml_train_detail_query_filename} {rpm_context['rpm_model_uri']}\")\n",
"\n",
" return (json.dumps(rpm_context),\n",
" rpm_context['rpm_model_uri'],\n",
" )"
],
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {
"id": "sM8KC14uwapu",
"colab_type": "text"
},
"source": [
"# *Test locally export_bqml_model_to_gcs*"
]
},
{
"cell_type": "code",
"metadata": {
"tags": [],
"id": "BNJMaey2wapu",
"colab_type": "code",
"colab": {}
},
"source": [
"# test locally export_bqml_model_to_gcs\n",
"# You could unit test the above code in your local environment.\n",
"# You don't need to execute the code, when simply building or running the KFP pipeline (experiment)\n",
"local_context = get_local_context()\n",
"import json\n",
"update_local_context(export_bqml_model_to_gcs(\n",
" json.dumps(local_context),\n",
" local_context['RPM_GCP_PROJECT'],\n",
" local_context['RPM_PVC_NAME'],\n",
" local_context['RPM_PVC_DATASET_FOLDER_NAME'],\n",
" local_context['rpm_bqml_model_export_path'],\n",
" local_context['rpm_gcs_rpm_ds_url'],\n",
" local_context['rpm_bq_ds_name'],\n",
" local_context['rpm_bqml_model'],\n",
" ))"
],
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {
"id": "HfVhjsLAwapw",
"colab_type": "text"
},
"source": [
"# Deploy the ML model - function"
]
},
{
"cell_type": "code",
"metadata": {
"id": "UY57cWHcwapx",
"colab_type": "code",
"colab": {}
},
"source": [
"from typing import NamedTuple\n",
"def deploy_ml_model_online_pred(ctx: str,\n",
" RPM_GCP_PROJECT: str,\n",
" RPM_LOCATION: str,\n",
" RPM_RUNTIME_VERSION: str,\n",
" RPM_PYTHON_VERSION: str,\n",
" rpm_model_uri: str,\n",
" rpm_bqml_model: str,\n",
" rpm_model_version: str,\n",
" rpm_gcs_rpm_ds_url: str, ) -> NamedTuple('Outputs', [\n",
" ('rpm_context', str),\n",
" ('rpm_url_to_monitor', str),\n",
" ('rpm_model_region', str),\n",
" ]):\n",
" \"\"\" The function(also used as a base for a KFP Component) deploys\n",
" the model that is exported above to Cloud AI Platform Prediction\n",
" Args:\n",
" ctx(:obj:`str`): The dict object with all the variables\n",
" used in the pipeline\n",
" RPM_GCP_PROJECT:(:obj:`str`): Google Cloud project for deployment\n",
" RPM_LOCATION:(:obj:`str`): Google Cloud region\n",
" where we are going to deploy the model\n",
" RPM_RUNTIME_VERSION(:obj:`str`): The runtime version\n",
" of the caip predicted\n",
" RPM_PYTHON_VERSION(:obj:`str`): The python version\n",
" of the caip predicted\n",
" rpm_model_uri(:obj:`str`): The path to where we export the model\n",
" rpm_bqml_model(:obj:`str`): The model name\n",
" used in the rest of the pipeline\n",
" rpm_model_version(:obj:`str`): The version which we will use when\n",
" we deploy the model to caip prediction\n",
" rpm_gcs_rpm_ds_url(:obj:`str`): Full Google Cloud Storage path\n",
" with bucket name and folder name\n",
" Returns:\n",
" Outputs(:obj: `tuple`): Returns the below outputs:\n",
" rpm_context(:obj:`str`): All variables used in the pipeline\n",
" rpm_url_to_monitor(:obj:`str`): The url in the Google Cloud Console\n",
" which you can use to monitor\n",
" rpm_model_region(:obj:`str`): The Google Cloud region\n",
" where you are going to deploy the model\n",
" \"\"\"\n",
" import json\n",
" rpm_context = json.loads(ctx)\n",
" print(rpm_context)\n",
" rpm_context['rpm_url_to_monitor'] = f\"https://console.cloud.google.com/ai-platform/models/{rpm_bqml_model}/versions\"\n",
"\n",
" model_region = RPM_LOCATION[:-2]\n",
" rpm_context['rpm_model_region'] = model_region\n",
"\n",
" import subprocess\n",
" import traceback\n",
" def exec_cmd(cmd):\n",
" try:\n",
" print(cmd)\n",
" process = subprocess.run(cmd, shell=True, check=True,\n",
" stdout=subprocess.PIPE, universal_newlines=True)\n",
" print(f\"'output from {cmd}': {process.stdout}\")\n",
" return(process.stdout, 0)\n",
" except subprocess.CalledProcessError as e:\n",
" error = traceback.format_exc()\n",
" print(error)\n",
" print(e.output)\n",
" return(e.output, 1)\n",
"\n",
" exec_cmd(f\"gcloud config set project {rpm_context['RPM_GCP_PROJECT']}\")\n",
"\n",
" (output, returncode) = \\\n",
" exec_cmd(f\"gcloud ai-platform models list --format='value(name)' | grep {rpm_bqml_model}\")\n",
" print(f'output:{output}, returncode:{returncode}')\n",
" if returncode == 0: #grep returns 1 if nothing is found\n",
" print(f\"{rpm_bqml_model} already exists\")\n",
" else:\n",
" print(f\"{rpm_bqml_model} doesn't exists. Creating...\")\n",
" (output_create, returncode_create) = \\\n",
" exec_cmd(f'gcloud ai-platform models create --regions={model_region} {rpm_bqml_model}')\n",
" print(f\"output:{output_create}, returncode:{returncode_create}\")\n",
" if returncode_create != 0:\n",
" raise RuntimeError(f\"Can't create the ML Model {rpm_bqml_model}\")\n",
" print(f\"{rpm_bqml_model} created.\")\n",
"\n",
" (output, returncode) = \\\n",
" exec_cmd(f\"gcloud ai-platform versions list --model {rpm_bqml_model} --format='value(name)' | grep {rpm_model_version}\")\n",
" if returncode == 0: #grep returns 1 if nothing is found\n",
" print(f\"{rpm_bqml_model} with version {rpm_model_version} already exists\")\n",
" else:\n",
" print(f\"{rpm_bqml_model} with version {rpm_model_version} doesn't exists. Creating...\")\n",
" cmd = f\"\"\"\n",
" gcloud ai-platform versions create --model={rpm_bqml_model} \\\n",
" {rpm_model_version} \\\n",
" --framework=tensorflow --python-version={RPM_PYTHON_VERSION} \\\n",
" --runtime-version={RPM_RUNTIME_VERSION} \\\n",
" --origin={rpm_model_uri} \\\n",
" --staging-bucket=gs://{rpm_gcs_rpm_ds_url.split('/')[2]}\n",
" \"\"\"\n",
" (output_create, returncode_create) = exec_cmd(cmd)\n",
" print(f\"output:{output_create}, returncode:{returncode_create}\")\n",
" if returncode_create != 0:\n",
" raise RuntimeError(f\"Can't create the ML Model {rpm_bqml_model} with version {rpm_model_version}!!!\")\n",
" print(f\"{rpm_bqml_model} with version {rpm_model_version} created.\")\n",
"\n",
" print(f\"Monitor models at {rpm_context['rpm_url_to_monitor']}\")\n",
"\n",
" return (\n",
" json.dumps(rpm_context),\n",
" rpm_context['rpm_url_to_monitor'],\n",
" rpm_context['rpm_model_region']\n",
" )"
],
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {
"id": "YlIz8PPiwapz",
"colab_type": "text"
},
"source": [
"# *Test locally deploy_ml_model_online_pred*"
]
},
{
"cell_type": "code",
"metadata": {
"tags": [],
"id": "dRbMYIkvwap0",
"colab_type": "code",
"colab": {}
},
"source": [
"# test locally deploy_ml_model_online_pred\n",
"# You could unit test the above code in your local environment.\n",
"# You don't need to execute the code, when simply building or running the KFP pipeline (experiment)\n",
"local_context = get_local_context()\n",
"import json\n",
"update_local_context(deploy_ml_model_online_pred(\n",
" json.dumps(local_context),\n",
" local_context['RPM_GCP_PROJECT'],\n",
" local_context['RPM_LOCATION'],\n",
" local_context['RPM_RUNTIME_VERSION'],\n",
" local_context['RPM_PYTHON_VERSION'],\n",
" local_context['rpm_model_uri'], \n",
" local_context['rpm_bqml_model'],\n",
" local_context['rpm_model_version'],\n",
" local_context['rpm_gcs_rpm_ds_url'],\n",
" ))"
],
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {
"id": "aAjOpTf6wap2",
"colab_type": "text"
},
"source": [
"# Make online prediction - function"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"colab": {},
"colab_type": "code",
"id": "l6KCqVzDwap2"
},
"outputs": [],
"source": [
"# predict_online_ml_model\n",
"from typing import NamedTuple\n",
"def predict_online_ml_model(ctx: str,\n",
" RPM_GCP_PROJECT: str,\n",
" RPM_PVC_NAME: str,\n",
" RPM_PVC_DATASET_FOLDER_NAME: str,\n",
" rpm_bqml_model: str,\n",
" rpm_model_version: str, ) -> NamedTuple('Outputs', [\n",
" ('rpm_context', str),\n",
" ('rpm_predict_online_output', str),\n",
" ]):\n",
" \"\"\" The function(also used as a base for a KFP Component)\n",
" does the online prediction.\n",
"\n",
" This is to confirm that the endpoint is available and ready to serve.\n",
" Args:\n",
" ctx(:obj:`str`): The dict object with all the variables\n",
" used in the pipeline\n",
" RPM_GCP_PROJECT:(:obj:`str`): Google Cloud project for deployment\n",
" RPM_PVC_NAME:(:obj:`str`): Ther persitent volume name\n",
" RPM_PVC_DATASET_FOLDER_NAME(:obj:`str`): The folder name to store\n",
" the temporary files before we upload to Google Cloud Storage\n",
" rpm_bqml_model(:obj:`str`): The model name\n",
" used in the rest of the pipeline\n",
" rpm_model_version(:obj:`str`): The version which we will use\n",
" when we deploy the model to Cloud AI Platform Prediction\n",
" Returns:\n",
" Outputs(:obj: `tuple`): Returns the below outputs:\n",
" rpm_context(:obj:`str`): All variables used in the pipeline\n",
" rpm_url_to_monitor(:obj:`str`): The url in the Google Cloud Console\n",
" which you can use to monitor\n",
" rpm_predict_online_output(:obj:`str`): The output\n",
" from the online prediction\n",
" \"\"\"\n",
" import json\n",
" rpm_context = json.loads(ctx)\n",
" print(rpm_context)\n",
"\n",
" import subprocess\n",
" import traceback\n",
" def exec_cmd(cmd):\n",
" try:\n",
" print(cmd)\n",
" process = subprocess.run(cmd, shell=True, check=True,\n",
" stdout=subprocess.PIPE, universal_newlines=True)\n",
" print(f\"'output from {cmd}': {process.stdout}\")\n",
" return(process.stdout, 0)\n",
" except subprocess.CalledProcessError as e:\n",
" error = traceback.format_exc()\n",
" print(error)\n",
" print(e.output)\n",
" return(e.output, 1)\n",
"\n",
" exec_cmd(f\"gcloud config set project {rpm_context['RPM_GCP_PROJECT']}\")\n",
"\n",
" import os\n",
" path_to_ds = os.path.join(rpm_context['RPM_PVC_NAME'],\n",
" rpm_context['RPM_PVC_DATASET_FOLDER_NAME'])\n",
" # check if that the dataset directory already exists\n",
" exec_cmd(f\"test -d {path_to_ds} && echo 'Exists' || echo 'Does not exist'\")\n",
" # create the datset directory\n",
" exec_cmd(f\"mkdir -p {path_to_ds}\")\n",
" # validate that the dataset directory has been created\n",
" exec_cmd(f\"test -d {path_to_ds} && echo 'Exists' || echo 'Does not exist'\")\n",
"\n",
" input_data = \"\"\"{\"bounces\": 0, \"time_on_site\": 7363}\"\"\"\n",
" filename = os.path.join(path_to_ds, 'input.json')\n",
" with open(filename, 'w') as outfile:\n",
" outfile.write(input_data)\n",
"\n",
" cmd = f\"\"\"\n",
" gcloud ai-platform predict --model {rpm_bqml_model} \\\n",
" --version {rpm_model_version} --json-instances {filename}\n",
" \"\"\"\n",
" (output, returncode) = exec_cmd(cmd)\n",
" print(f\"Predicted results for {input_data} is {output} \")\n",
" rpm_context['rpm_predict_online_output'] = output\n",
"\n",
" return (\n",
" json.dumps(rpm_context),\n",
" rpm_context['rpm_predict_online_output'],\n",
" )"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "gQjqEpt5wap5",
"colab_type": "text"
},
"source": [
"# *Test locally predict_online_ml_model*"
]
},
{
"cell_type": "code",
"metadata": {
"tags": [],
"id": "qQfawPl5wap5",
"colab_type": "code",
"colab": {}
},
"source": [
"# test locally predict_online_ml_model\n",
"# You could unit test the above code in your local environment.\n",
"# You don't need to execute the code, when simply building or running the KFP pipeline (experiment)\n",
"local_context = get_local_context()\n",
"import json\n",
"update_local_context(predict_online_ml_model(\n",
" json.dumps(local_context),\n",
" local_context['RPM_GCP_PROJECT'],\n",
" local_context['RPM_PVC_NAME'],\n",
" local_context['RPM_PVC_DATASET_FOLDER_NAME'],\n",
" local_context['rpm_bqml_model'],\n",
" local_context['rpm_model_version'],\n",
" ))"
],
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {
"id": "TXsOJsaVwap7",
"colab_type": "text"
},
"source": [
"# Define the KubeFlow Pipeline (KFP)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"colab": {},
"colab_type": "code",
"id": "Rc0X4C35wap7"
},
"outputs": [],
"source": [
"# define the pipeline\n",
"import kfp.components as comp\n",
"def create_kfp_comp(rpm_comp):\n",
" \"\"\" Converts a Python function to a component\n",
" and returns a task(ContainerOp) factory\n",
"\n",
" Returns:\n",
" Outputs (:obj: `ContainerOp`): returns the operation\n",
" \"\"\"\n",
" return comp.func_to_container_op(\n",
" func=rpm_comp,\n",
" base_image=\"google/cloud-sdk:latest\"\n",
" )\n",
"\n",
"# reload the properties; undo any properties set to test component locally\n",
"all_vars = load_params()\n",
"\n",
"from kfp.dsl import pipeline, VolumeOp\n",
"import kfp.dsl as dsl\n",
"import json\n",
"# define the pipeline metadata\n",
"@pipeline(\n",
" name='Propensity to purchase using BigQuery ML',\n",
" description='Propensity model if a customer is likely to purchase'\n",
")\n",
"\n",
"# define the pipeline\n",
"def bq_googlestr_dataset_to_bq_to_caip_pipeline(\n",
" data_path = all_vars['RPM_PVC_NAME'] #you can pass input variables\n",
"):\n",
" \"\"\" The function defines the pipeline.\n",
"\n",
" Args:\n",
" data_path:(:obj:`str`): the volume to store the temporary files\n",
" \"\"\"\n",
" rpm_context = json.dumps(all_vars)\n",
" gcs_bucket_folder_op = create_kfp_comp(create_gcs_bucket_folder)(\n",
" rpm_context,\n",
" all_vars['RPM_GCP_STORAGE_BUCKET'],\n",
" all_vars['RPM_GCP_PROJECT'],\n",
" all_vars['RPM_DEFAULT_BUCKET_EXT'],\n",
" all_vars['RPM_GCP_STORAGE_BUCKET_FOLDER'],\n",
" all_vars['RPM_DEFAULT_BUCKET_FOLDER_NAME']\n",
" )\n",
"\n",
" create_bq_ds_op = create_kfp_comp(create_bq_ds)(\n",
" gcs_bucket_folder_op.outputs['rpm_context'],\n",
" all_vars['RPM_GCP_PROJECT'],\n",
" all_vars['RPM_BQ_DATASET_NAME'],\n",
" all_vars['RPM_LOCATION']\n",
" )\n",
"\n",
"\n",
" load_bq_ds_op = create_kfp_comp(load_bq_ds)(\n",
" create_bq_ds_op.outputs['rpm_context'],\n",
" all_vars['RPM_GCP_PROJECT'],\n",
" all_vars['RPM_BQ_TABLE_NAME'],\n",
" all_vars['RPM_DEFAULT_BQ_TABLE_NAME_EXT'],\n",
" create_bq_ds_op.outputs['rpm_bq_ds_name'],\n",
" )\n",
"\n",
" create_bq_ml_op = create_kfp_comp(create_bq_ml)(\n",
" load_bq_ds_op.outputs['rpm_context'],\n",
" all_vars['RPM_GCP_PROJECT'],\n",
" all_vars['RPM_MODEL_NAME'],\n",
" all_vars['RPM_DEFAULT_MODEL_NAME'],\n",
" create_bq_ds_op.outputs['rpm_bq_ds_name'],\n",
" load_bq_ds_op.outputs['rpm_table_id']\n",
" )\n",
"\n",
" evaluate_ml_model_op = create_kfp_comp(evaluate_ml_model)(\n",
" create_bq_ml_op.outputs['rpm_context'],\n",
" all_vars['RPM_GCP_PROJECT'],\n",
" create_bq_ds_op.outputs['rpm_bq_ds_name'],\n",
" create_bq_ml_op.outputs['rpm_bqml_model'],\n",
" )\n",
"\n",
" create_batch_prediction_dataset_op = create_kfp_comp(create_batch_prediction_dataset)(\n",
" evaluate_ml_model_op.outputs['rpm_context'],\n",
" all_vars['RPM_GCP_PROJECT'],\n",
" create_bq_ds_op.outputs['rpm_bq_ds_name'],\n",
" load_bq_ds_op.outputs['rpm_table_id'],\n",
" )\n",
"\n",
" predict_batch_ml_model_op = create_kfp_comp(predict_batch_ml_model)(\n",
" evaluate_ml_model_op.outputs['rpm_context'],\n",
" all_vars['RPM_GCP_PROJECT'],\n",
" create_bq_ds_op.outputs['rpm_bq_ds_name'],\n",
" create_bq_ml_op.outputs['rpm_bqml_model'],\n",
" create_batch_prediction_dataset_op.outputs['rpm_pred_table_id'],\n",
" )\n",
"\n",
" get_versioned_bqml_model_export_path_op = create_kfp_comp(get_bqml_model_version)(\n",
" create_bq_ml_op.outputs['rpm_context'],\n",
" all_vars['RPM_GCP_PROJECT'],\n",
" all_vars['RPM_MODEL_EXPORT_PATH'],\n",
" all_vars['RPM_DEFAULT_MODEL_EXPORT_PATH'],\n",
" all_vars['RPM_MODEL_VER_PREFIX'],\n",
" gcs_bucket_folder_op.outputs['rpm_gcs_rpm_ds_url']\n",
" )\n",
"\n",
" # create a volume where the dataset will be temporarily stored.\n",
" pvc_op = VolumeOp(\n",
" name=all_vars['RPM_PVC_NAME'],\n",
" resource_name=all_vars['RPM_PVC_NAME'],\n",
" size=\"20Gi\",\n",
" modes=dsl.VOLUME_MODE_RWO\n",
" )\n",
"\n",
" export_bqml_model_to_gcs_op = create_kfp_comp(export_bqml_model_to_gcs)(get_versioned_bqml_model_export_path_op.outputs['rpm_context'],\n",
" all_vars['RPM_GCP_PROJECT'],\n",
" all_vars['RPM_PVC_NAME'],\n",
" all_vars['RPM_PVC_DATASET_FOLDER_NAME'],\n",
" get_versioned_bqml_model_export_path_op.outputs['rpm_bqml_model_export_path'],\n",
" gcs_bucket_folder_op.outputs['rpm_gcs_rpm_ds_url'],\n",
" create_bq_ds_op.outputs['rpm_bq_ds_name'],\n",
" create_bq_ml_op.outputs['rpm_bqml_model'],\n",
" )\n",
" export_bqml_model_to_gcs_op.add_pvolumes({data_path: pvc_op.volume})\n",
"\n",
" model_deploy_op = create_kfp_comp(deploy_ml_model_online_pred)(\n",
" export_bqml_model_to_gcs_op.outputs['rpm_context'],\n",
" all_vars['RPM_GCP_PROJECT'],\n",
" all_vars['RPM_LOCATION'],\n",
" all_vars['RPM_RUNTIME_VERSION'],\n",
" all_vars['RPM_PYTHON_VERSION'],\n",
" export_bqml_model_to_gcs_op.outputs['rpm_model_uri'],\n",
" create_bq_ml_op.outputs['rpm_bqml_model'],\n",
" get_versioned_bqml_model_export_path_op.outputs['rpm_model_version'],\n",
" gcs_bucket_folder_op.outputs['rpm_gcs_rpm_ds_url'],\n",
" )\n",
"\n",
" predict_online_ml_model_op = create_kfp_comp(predict_online_ml_model)(\n",
" model_deploy_op.outputs['rpm_context'],\n",
" all_vars['RPM_GCP_PROJECT'],\n",
" all_vars['RPM_PVC_NAME'],\n",
" all_vars['RPM_PVC_DATASET_FOLDER_NAME'],\n",
" create_bq_ml_op.outputs['rpm_bqml_model'],\n",
" get_versioned_bqml_model_export_path_op.outputs['rpm_model_version'],\n",
" )\n",
" predict_online_ml_model_op.add_pvolumes({data_path: pvc_op.volume})\n",
"\n",
" # don't cache the following comps\n",
" # the below is for model versioning only\n",
" get_versioned_bqml_model_export_path_op.execution_options.caching_strategy.max_cache_staleness = \"P0D\"\n",
" export_bqml_model_to_gcs_op.execution_options.caching_strategy.max_cache_staleness = \"P0D\"\n",
" model_deploy_op.execution_options.caching_strategy.max_cache_staleness = \"P0D\"\n",
" predict_online_ml_model_op.execution_options.caching_strategy.max_cache_staleness = \"P0D\"\n",
"\n",
" # don't cache any comps\n",
" # you don't want to cache any comps when you are repetatively integration testing\n",
" gcs_bucket_folder_op.execution_options.caching_strategy.max_cache_staleness = \"P0D\"\n",
" create_bq_ds_op.execution_options.caching_strategy.max_cache_staleness = \"P0D\"\n",
" load_bq_ds_op.execution_options.caching_strategy.max_cache_staleness = \"P0D\"\n",
" create_bq_ml_op.execution_options.caching_strategy.max_cache_staleness = \"P0D\"\n",
" evaluate_ml_model_op.execution_options.caching_strategy.max_cache_staleness = \"P0D\"\n",
" create_batch_prediction_dataset_op.execution_options.caching_strategy.max_cache_staleness = \"P0D\"\n",
" predict_batch_ml_model_op.execution_options.caching_strategy.max_cache_staleness = \"P0D\"\n",
" get_versioned_bqml_model_export_path_op.execution_options.caching_strategy.max_cache_staleness = \"P0D\"\n",
" export_bqml_model_to_gcs_op.execution_options.caching_strategy.max_cache_staleness = \"P0D\"\n",
" model_deploy_op.execution_options.caching_strategy.max_cache_staleness = \"P0D\"\n",
" predict_online_ml_model_op.execution_options.caching_strategy.max_cache_staleness = \"P0D\""
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "RbateePuwap9",
"colab_type": "text"
},
"source": [
"# Compile, watch out for errors in the pipeline composition"
]
},
{
"cell_type": "code",
"metadata": {
"id": "nYXb7UJtwap9",
"colab_type": "code",
"colab": {}
},
"source": [
"#compile the pipeline\n",
"def complie_pipeline(pipeline_func):\n",
" \"\"\" Compile the pipeline, watch out for errors in the pipeline composition.\n",
"\n",
" Args:\n",
" pipeline_func (:obj:`bq_googlestr_dataset_to_bq_to_caip_pipeline`):\n",
" The pipeline definition\n",
" Returns:\n",
" pipeline_filename (:obj:`str`): the compressed file compipled file\n",
" to upload to CloudAI Platform Prediction\n",
" pipeline_func (:obj:`str`): bq_googlestr_dataset_to_bq_to_caip_pipeline,\n",
" name of the the pipeline\n",
" arguments (:obj:`str`): the arguments to pass to the pipeline\n",
" when you launch it\n",
" \"\"\"\n",
" pipeline_func = pipeline_func\n",
" pipeline_filename = pipeline_func.__name__ + '.zip'\n",
"\n",
" import kfp.compiler as compiler\n",
" compiler.Compiler().compile(pipeline_func, pipeline_filename)\n",
"\n",
" arguments = {}\n",
" return (pipeline_filename, pipeline_func, arguments)\n",
"pipeline_filename, pipeline_func, arguments = complie_pipeline(bq_googlestr_dataset_to_bq_to_caip_pipeline)"
],
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {
"id": "6ACMErXIwap_",
"colab_type": "text"
},
"source": [
"# Create an experiment and run the pipeline immediately\n",
"Please use the links in the output to go directly to the experiment/run launched in the browser"
]
},
{
"cell_type": "code",
"metadata": {
"id": "iwRZDx0zwaqA",
"colab_type": "code",
"colab": {}
},
"source": [
"# create and run an experiment\n",
"def create_experiment_and_run():\n",
" \"\"\" Create an experiment and run the pipeline immediately.\n",
" Please use the links in the output to go directly to the experiment/run launched in the browser\n",
" \"\"\"\n",
" client = kfp.Client(RPM_GCP_KFP_HOST)\n",
" experiment = client.create_experiment(RPM_DS_DOWNLOAD_EXPERIMENT_NAME)\n",
" #Submit a pipeline run\n",
" run_name = pipeline_func.__name__ + ' run'\n",
" run_result = client.run_pipeline(\n",
" experiment_id=experiment.id,\n",
" job_name=run_name,\n",
" pipeline_package_path=pipeline_filename,\n",
" params=arguments)\n",
"create_experiment_and_run()"
],
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {
"id": "Wzw4YDNs_iA9",
"colab_type": "text"
},
"source": [
"If there is error in the pipeline, you will see that in the KubeFlow Pipelines UI in the Experiments section. If you encounter any errors, identify the issue, fix it in the Python function, unit test the function, update the pipeline defintion, compile, create an experiment, and run the experiment. Iterate through the process until you successfully run a pipeline.\n",
"\n",
"You have run successfully run a KubeFlow Pipelines. The pipeline created a model using BigQuery ML. You could now make use of the model while exploring or analyzing the data. You use SQL to use the model for prediction and then use the result as you wish. You could visualize the result using your favorite visualization package such as matplot lib.\n",
"\n",
"The section, below, demonstrates how you can use the BigQuery library to execute a sql in the BigQuey and collect the result in a pandas data frame. You can use any query you want including that of a query to predict using the model which the pipeline created."
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "CGMI9l4qwaqC",
"colab_type": "text"
},
"source": [
"# Data Exploraton"
]
},
{
"cell_type": "code",
"metadata": {
"tags": [],
"id": "JgvYUrK1waqC",
"colab_type": "code",
"colab": {}
},
"source": [
"# Data Exploration !!! BE CAREFUL !!! adjust the query to sample the data.\n",
"# 1. Get pandas df from BigQuery\n",
"# 2. plot histogram using matplotlib\n",
"#########################\n",
"from google.cloud import bigquery as bq\n",
"import pandas as pd\n",
"\n",
"rpm_context = get_local_context()\n",
"client = bq.Client(project=rpm_context[\"RPM_GCP_PROJECT\"])\n",
"\n",
"# adjust the below query to grab only a sample dataset e.g. use a where clause.\n",
"df = client.query('''\n",
" SELECT *\n",
" FROM `%s.%s`\n",
" LIMIT 10\n",
"''' % (rpm_context[\"rpm_bq_ds_name\"], rpm_context[\"rpm_table_id\"].split('.')[2])).to_dataframe()"
],
"execution_count": null,
"outputs": []
},
{
"cell_type": "code",
"metadata": {
"id": "xTFMsyMVwaqE",
"colab_type": "code",
"colab": {}
},
"source": [
"df.head()"
],
"execution_count": null,
"outputs": []
},
{
"cell_type": "code",
"metadata": {
"id": "jTIG7BckwaqG",
"colab_type": "code",
"colab": {}
},
"source": [
"df.tail()"
],
"execution_count": null,
"outputs": []
},
{
"cell_type": "code",
"metadata": {
"tags": [],
"id": "UGqVZmILwaqJ",
"colab_type": "code",
"colab": {}
},
"source": [
"df.info()"
],
"execution_count": null,
"outputs": []
},
{
"cell_type": "code",
"metadata": {
"id": "7urbxPZZwaqL",
"colab_type": "code",
"colab": {}
},
"source": [
"df.shape"
],
"execution_count": null,
"outputs": []
},
{
"cell_type": "code",
"metadata": {
"id": "cjoZwvmYwaqM",
"colab_type": "code",
"colab": {}
},
"source": [
"import matplotlib.pyplot as plt\n",
"%matplotlib inline\n",
"plt.close('all') \n",
"df.hist(bins=50, figsize=(20,15))\n",
"plt.show()"
],
"execution_count": null,
"outputs": []
},
{
"cell_type": "code",
"metadata": {
"tags": [],
"id": "E75eIJfKwaqO",
"colab_type": "code",
"colab": {}
},
"source": [
"# takes a bit of time...BE CAREFUL!!!\n",
"# works on local Jupyter instance.\n",
"import pandas_profiling as pp\n",
"pp.ProfileReport(df)"
],
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {
"id": "lzznZNc0_ntW",
"colab_type": "text"
},
"source": [
"The utilities method in the section below provides convinient way to delete the Google Cloud resources. You can use the methods while developing your pipeline components.\n"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "46nuAibTwaqP",
"colab_type": "text"
},
"source": [
"# Clean up - !!! BE CAREFUL!!!"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "0DOWwgC5waqP",
"colab_type": "text"
},
"source": [
"## Delete the PODs and the PVCs in the KFP (Kubernetes Cluster)"
]
},
{
"cell_type": "code",
"metadata": {
"id": "MR42JjY_waqP",
"colab_type": "code",
"colab": {}
},
"source": [
"# delete_pod_pvc\n",
"def delete_pod_pvc(ctx: str) -> str:\n",
" \"\"\" Removes the Pods and Persistence Volume (PVCs) created in the pipeline,\n",
" This is not recommendated to use it in a production enviornment.\n",
" Comes handy in the iterative development and testing phases of the SDLC.\n",
" !!! BE CAREFUL !!!!\n",
" Args:\n",
" ctx(:obj:`str`): The dict object\n",
" with all the variables in the local context\n",
" \"\"\"\n",
"\n",
" # loading rpm_context string\n",
" import json\n",
" rpm_context = json.loads(ctx)\n",
" print(rpm_context)\n",
" import subprocess\n",
" def exec_cmd (cmd):\n",
" try:\n",
" print(cmd)\n",
" process = subprocess.run(cmd, shell=True, check=True, stdout=subprocess.PIPE, universal_newlines=True)\n",
" print(f\"'output from {cmd}': {process.stdout}\")\n",
" except subprocess.CalledProcessError as e:\n",
" error = traceback.format_exc()\n",
" print(error)\n",
" print(e)\n",
"\n",
" exec_cmd(f\"gcloud config set project {rpm_context['RPM_GCP_PROJECT']}\")\n",
" exec_cmd(f\"gcloud container clusters get-credentials {rpm_context['RPM_CLUSTER_NAME']} --zone {rpm_context['RPM_LOCATION']} --project {rpm_context['RPM_GCP_PROJECT']}\")\n",
" exec_cmd(''' for pod in `kubectl get pod | grep 'bq-public-google-ds-to-bq-' | awk -F ' ' '{print $1}'`; do echo kubectl delete pod $pod; kubectl delete pod $pod; done ''')\n",
" exec_cmd(''' for pvc in `kubectl get pvc | grep 'bq-public-google-ds-to-bq-' | awk -F ' ' '{print $1}'`; do echo kubectl delete pvc $pvc; kubectl delete pvc $pvc; done ''')\n",
" exec_cmd(''' for pod in `kubectl get pod | grep 'bq-public-google-ds-to-bq-' | awk -F ' ' '{print $1}'`; do echo kubectl patch pod $pod -p '{\"metadata\":{\"finalizers\":null}}'; kubectl patch pod $pod -p '{\"metadata\":{\"finalizers\":null}}'; done ''')\n",
" exec_cmd(''' for pvc in `kubectl get pvc | grep 'bq-public-google-ds-to-bq-' | awk -F ' ' '{print $1}'`; do echo kubectl patch pvc $pvc -p '{\"metadata\":{\"finalizers\":null}}'; kubectl patch pvc $pvc -p '{\"metadata\":{\"finalizers\":null}}'; done ''')\n"
],
"execution_count": null,
"outputs": []
},
{
"cell_type": "code",
"metadata": {
"tags": [],
"id": "oXB1DLIKwaqR",
"colab_type": "code",
"colab": {}
},
"source": [
"test_comp_local(delete_pod_pvc)"
],
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {
"id": "ICHm_tGlwaqS",
"colab_type": "text"
},
"source": [
"## Delete Google Cloud Storage folder"
]
},
{
"cell_type": "code",
"metadata": {
"id": "-NrsZqu5waqT",
"colab_type": "code",
"colab": {}
},
"source": [
"# delete the storage folder\n",
"from google.cloud import storage\n",
"from google.cloud.exceptions import NotFound\n",
"from google.cloud.exceptions import Forbidden\n",
"import traceback\n",
"def delete_storage_folder(bucket_name, folder_name):\n",
" \"\"\"Deletes a folder in the Google Cloust Storage,\n",
" This is not recommendated to use it in a production enviornment.\n",
" Comes handy in the iterative development and testing phases of the SDLC.\n",
" !!! BE CAREFUL !!!!\n",
" Args:\n",
" bucket_name(:obj:`str`): The Cloud Storage bucket name,\n",
" where the folder exists\n",
" folder_name(:obj:`str`): The folder that we want to delete\n",
" Returns:\n",
" (:obj:`boolean`): True if we are able to scucessfully delete the folder\n",
" \"\"\"\n",
" if len(bucket_name) == 0 or len(folder_name) == 0:\n",
" print(f\"Folder {folder_name} couldn't be deleted. Name is empty.\")\n",
" return False\n",
" else:\n",
" client = storage.Client()\n",
" try:\n",
" bucket = client.get_bucket(bucket_name)\n",
" blob = bucket.get_blob(folder_name)\n",
" if blob is None:\n",
" print(f\"Folder name {folder_name} does not exist!\")\n",
" return False\n",
" else:\n",
" bucket.delete_blobs(blobs=bucket.list_blobs(prefix=folder_name))\n",
" print(f\"Folder {folder_name} deleted\")\n",
" return True\n",
" except Exception as e:\n",
" print(f\"Folder {folder_name} couldn't be deleted\")\n",
" print(e)\n",
" error = traceback.format_exc()\n",
" print(error)\n",
" return False"
],
"execution_count": null,
"outputs": []
},
{
"cell_type": "code",
"metadata": {
"tags": [],
"id": "EEcjbftawaqU",
"colab_type": "code",
"colab": {}
},
"source": [
"# delete storage folder if desired...!!!BE CAREFUL!!!!\n",
"local_context = get_local_context()\n",
"delete_storage_folder(local_context['rpm_gcs_rpm_ds_url'].split('/')[2],\n",
" local_context['rpm_gcs_rpm_ds_url'].split('/')[3]+'/')"
],
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {
"id": "o0Bp9ZrGwaqW",
"colab_type": "text"
},
"source": [
"## Delete Google Cloud Storage bucket"
]
},
{
"cell_type": "code",
"metadata": {
"id": "pYNul6qWwaqX",
"colab_type": "code",
"colab": {}
},
"source": [
"#delete the bucket\n",
"from google.cloud import storage\n",
"from google.cloud.exceptions import NotFound\n",
"from google.cloud.exceptions import Forbidden\n",
"import traceback\n",
"def delete_storage_bucket (bucket_name):\n",
" \"\"\"Deletes a folder in the Google Cloust Storage,\n",
" This is not recommendated to use it in a production enviornment.\n",
" Comes handy in the iterative development and testing phases of the SDLC.\n",
" !!! BE CAREFUL !!!!\n",
" Args:\n",
" bucket_name(:obj:`str`): The Cloud Storage bucket name,\n",
" that we want to delete\n",
" Returns:\n",
" (:obj:`boolean`): True if we are able to scucessfully delete the folder\n",
" \"\"\"\n",
" if bucket_name:\n",
" client = storage.Client()\n",
" try:\n",
" bucket = client.get_bucket(bucket_name)\n",
" bucket.delete()\n",
" print(f\"Bucket {bucket.name} deleted\")\n",
" return True\n",
" except Exception as e:\n",
" print(f\"Bucket {bucket_name} couldn't be deleted\")\n",
" print(e)\n",
" error = traceback.format_exc()\n",
" print(error)\n",
" return False\n",
" else:\n",
" print(f\"Bucket {bucket_name} couldn't be deleted. Name is empty.\")\n",
" return False"
],
"execution_count": null,
"outputs": []
},
{
"cell_type": "code",
"metadata": {
"tags": [],
"id": "id_bfu_owaqY",
"colab_type": "code",
"colab": {}
},
"source": [
"# delete storage bucket if desired...!!! BE CAREFUL !!!!\n",
"delete_storage_bucket(get_local_context()['rpm_gcs_rpm_ds_url'].split('/')[2])"
],
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {
"id": "CF2ebPbXwaqb",
"colab_type": "text"
},
"source": [
"## Delete the table in BigQuery"
]
},
{
"cell_type": "code",
"metadata": {
"id": "ipzP_M-5waqb",
"colab_type": "code",
"colab": {}
},
"source": [
"#delete BigQuery table if not needed...!!! BE CAREFUL !!!\n",
"def delete_table(table_id):\n",
" \"\"\"Deletes a BigQuery table\n",
" This is not recommendated to use it in a production enviornment.\n",
" Comes handy in the iterative development and testing phases of the SDLC.\n",
" !!! BE CAREFUL !!!!\n",
" Args:\n",
" table_id(:obj:`str`): The BigQuery table name that we want to delete\n",
" \"\"\"\n",
" from google.cloud import bigquery\n",
" # Construct a BigQuery client object.\n",
" client = bigquery.Client()\n",
" # client.delete_table(table_id, not_found_ok=True) # Make an API request.\n",
" client.delete_table(table_id) # Make an API request.\n",
" print(\"Deleted table '{}'.\".format(table_id))"
],
"execution_count": null,
"outputs": []
},
{
"cell_type": "code",
"metadata": {
"tags": [],
"id": "GGKE8kV9waqd",
"colab_type": "code",
"colab": {}
},
"source": [
"#delete the table in the BigQuery\n",
"delete_table(get_local_context()['rpm_table_id'])"
],
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {
"id": "W2XMVpwXwaqe",
"colab_type": "text"
},
"source": [
"## Delete the dataset in BigQuery"
]
},
{
"cell_type": "code",
"metadata": {
"id": "VWcKti2dwaqe",
"colab_type": "code",
"colab": {}
},
"source": [
"def delete_dataset(dataset_id):\n",
" \"\"\"Deletes a BigQuery dataset\n",
" This is not recommendated to use it in a production enviornment.\n",
" Comes handy in the iterative development and testing phases of the SDLC.\n",
" !!! BE CAREFUL !!!!\n",
" Args:\n",
" dataset_id(:obj:`str`): The BigQuery dataset name that we want to delete\n",
" \"\"\"\n",
" # [START bigquery_delete_dataset]\n",
" from google.cloud import bigquery\n",
" # Construct a BigQuery client object.\n",
" client = bigquery.Client()\n",
" # dataset_id = 'your-project.your_dataset'\n",
" # Use the delete_contents parameter to delete a dataset and its contents.\n",
" # Use the not_found_ok parameter to not receive an error if the\n",
" # dataset has already been deleted.\n",
" client.delete_dataset(\n",
" dataset_id, delete_contents=True, not_found_ok=True\n",
" ) # Make an API request.\n",
" print(\"Deleted dataset '{}'.\".format(dataset_id))"
],
"execution_count": null,
"outputs": []
},
{
"cell_type": "code",
"metadata": {
"tags": [],
"id": "n5MlUKnPwaqf",
"colab_type": "code",
"colab": {}
},
"source": [
"#delete the BigQuery dataset\n",
"rpm_context = get_local_context()\n",
"delete_dataset(f\"{rpm_context['RPM_GCP_PROJECT']}.{rpm_context['rpm_bq_ds_name']}\")"
],
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {
"id": "_ULFMAURwaqg",
"colab_type": "text"
},
"source": [
"## Delete the Google Cloud Storage folders which contains exported model artifacts "
]
},
{
"cell_type": "code",
"metadata": {
"tags": [],
"id": "N-ZSWu60waqh",
"colab_type": "code",
"colab": {}
},
"source": [
"# delete the Cloud Storage folders where the models are saved\n",
"local_context = get_local_context()\n",
"bucket_name= local_context['rpm_gcs_rpm_ds_url'].split('/')[2]\n",
"rpm_bq_ds_name = local_context['rpm_gcs_rpm_ds_url'].split('/')[3]\n",
"rpm_bqml_model_export_path = local_context['RPM_MODEL_EXPORT_PATH']\n",
"if not rpm_bqml_model_export_path:\n",
" rpm_bqml_model_export_path = local_context[\"RPM_DEFAULT_MODEL_EXPORT_PATH\"]\n",
"folder_name = os.path.join(rpm_bq_ds_name, rpm_bqml_model_export_path)\n",
"assert delete_storage_folder(bucket_name, folder_name) == True"
],
"execution_count": null,
"outputs": []
},
{
"cell_type": "code",
"metadata": {
"id": "CEPJZGwzwaqi",
"colab_type": "code",
"colab": {}
},
"source": [
"#exec a cmd in python; an utility func\n",
"import subprocess\n",
"import traceback\n",
"def exec_cmd(cmd):\n",
" \"\"\"Executes an OS command.\n",
" Args:\n",
" cmd(:obj:`str`): The OS command\n",
" Returns:\n",
" (:obj:`str`): The output of the execution of the OS command\n",
" (:obj:`str`): The returned code of the excecution of the OS command\n",
" \"\"\"\n",
" try:\n",
" print(cmd)\n",
" process = subprocess.run(cmd, shell=True, check=True, \n",
" stdout=subprocess.PIPE, universal_newlines=True)\n",
" print(f\"'output from {cmd}': {process.stdout}\")\n",
" return (process.stdout, 0)\n",
" except subprocess.CalledProcessError as e:\n",
" error = traceback.format_exc()\n",
" print(error)\n",
" print(e.output)\n",
" return (e.output, 1)"
],
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {
"id": "00LXbnRRwaqj",
"colab_type": "text"
},
"source": [
"## Delete the Cloud AI Platform Prediction models"
]
},
{
"cell_type": "code",
"metadata": {
"tags": [],
"id": "IJ_QwrYVwaql",
"colab_type": "code",
"colab": {}
},
"source": [
"#delete the model\n",
"def delete_caip_model():\n",
" \"\"\"Deletes the models from the Cloud AI Platform Prediction\n",
" \"\"\"\n",
" local_context = get_local_context()\n",
" (output, returncode) = exec_cmd(f\"gcloud ai-platform versions list --model {local_context['rpm_bqml_model']} --format='value(name)'\")\n",
" for each_ver in output.split('\\n'):\n",
" print(each_ver)\n",
" cmd = f\"gcloud ai-platform versions delete {each_ver} --model={local_context['rpm_bqml_model']}\"\n",
" exec_cmd(cmd)\n",
" cmd = f'gcloud ai-platform models delete {local_context[\"rpm_bqml_model\"]}'\n",
" exec_cmd(cmd)\n",
"delete_caip_model()"
],
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {
"id": "HluVXlKzwaqm",
"colab_type": "text"
},
"source": [
"## Delete the GCP Project\n",
"To avoid incurring charges to your Google Cloud Platform account for the resources used in this tutorial is to **Delete the project**.\n",
"\n",
"The easiest way to eliminate billing is to delete the project you created for the tutorial.\n",
"\n",
"**Caution**: Deleting a project has the following effects:\n",
"* *Everything in the project is deleted.* If you used an existing project for this tutorial, when you delete it, you also delete any other work you've done in the project.\n",
"* <b>Custom project IDs are lost. </b>When you created this project, you might have created a custom project ID that you want to use in the future. To preserve the URLs that use the project ID, such as an appspot.com</b> URL, delete selected resources inside the project instead of deleting the whole project. \n",
"\n",
"If you plan to explore multiple tutorials and quickstarts, reusing projects can help you avoid exceeding project quota limits.\n",
"<br>\n",
"<ol type=\"1\">\n",
" <li>In the Cloud Console, go to the <b>Manage resources</b> page.</li>\n",
" Go to the <a href=\"https://console.cloud.google.com/iam-admin/projects\">Manage resources page</a>\n",
" <li>In the project list, select the project that you want to delete and then click <b>Delete</b> Trash icon.</li>\n",
" <li>In the dialog, type the project ID and then click <b>Shut down</b> to delete the project. </li>\n",
"</ol>\n"
]
}
]
}