# License

In [None]:
# Copyright 2020 Google LLC

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at

#      https://www.apache.org/licenses/LICENSE-2.0

# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Overview
Propensity to purchase use cases are widely applicable across many industry verticals such as Retail, Finance and more. In this article, we will show you how to build an end to end solution using [BigQuery ML](https://cloud.google.com/bigquery-ml/docs) and [Kubeflow Pipelines (KFP)](https://www.kubeflow.org/docs/pipelines/overview/pipelines-overview/) using a [Google Analytics dataset](https://console.cloud.google.com/marketplace/details/obfuscated-ga360-data/obfuscated-ga360-data?filter=solution-type:dataset) to determine which customers have the propensity to purchase. You could use the solution to reach out to your targeted customers in an offline campaign via email or postal channels. You could also use it in an online campaign via on the spot decision, when the customer is browsing your products in your website, to recommend some products or trigger a personalized email for the customer.

Propensity to purchase use case is a subset of personalization use case. It is a key driver of how many organizations do marketing today. In today's changing times, you need to ensure that you are targeting the right messages to the right customers at the right time. “Personalization at scale has the potential to create $1.7 trillion to $3 trillion in new value” \([McKinsey study](https://www.mckinsey.com/business-functions/marketing-and-sales/our-insights/a-technology-blueprint-for-personalization-at-scale)\). Propensity modeling helps companies to identify these "right" customers and prospects that have a high likelihood to purchase a particular product or service.

Propensity models are important as it is a mechanism for targeting sales outreach with personalized messages as they are keys to the success of getting attention of the customers. By using a propensity to purchase model, you can more effectively target customers who are most likely to purchase certain products.

# What is BigQuery ML?
[BigQuery ML](https://cloud.google.com/bigquery-ml/docs/bigqueryml-intro) enables users to create and execute machine learning models in BigQuery by using standard SQL queries. This means, if your data is already in BigQuery, you don’t need to export your data to train and deploy machine learning models — by training, you’re also deploying in the same step. Combined with BigQuery’s auto-scaling of compute resources, you won’t have to worry about spinning up a cluster or building a model training and deployment pipeline. This means you’ll be saving time building your machine learning pipeline, enabling your business to focus more on the value of machine learning instead of spending time setting up the infrastructure.

## Scope of this notebook
### Dataset
The Google Analytics dataset is hosted publicly on BigQuery and is a dataset that provides 12 months (August 2016 to August 2017) of obfuscated Google Analytics 360 data from the [Google Merchandise Store](https://www.googlemerchandisestore.com/), a real e-commerce store that sells Google-branded merchandise.

### Objective
 To help you be conversant on the following:
1. Environment Setup
1. KFP Setup
1. Data Exploration using BigQuery, Pandas, matplotlib
1. SDLC methodologies Adherence (opinionated)
1. KFP knowledge share (demonstration)

### Costs
This tutorial uses billable components of Google Cloud:
* [BigQuery](https://cloud.google.com/bigquery)
* [BigQuery ML](https://cloud.google.com/bigquery-ml/docs)
* [Cloud Storage](https://cloud.google.com/storage)
* [Cloud AI Platform Pipelines](https://cloud.google.com/ai-platform/pipelines/docs/) \(uses [Google Cloud Kubernetes Engine (GKE)](https://cloud.google.com/kubernetes-engine)\)
* [Cloud AI Prediction](https://cloud.google.com/ai-platform/prediction/docs)

Use the [Pricing Calculator[(https://cloud.google.com/products/calculator/) to generate a cost estimate based on your projected usage.

## Before you begin
For this reference guide, you need a [Google Cloud project](https://console.cloud.google.com/cloud-resource-manager).

You can create a new one, or select a project you already created.
The following steps are required, regardless where you are running your notebook (local or in Cloud AI Platform Notebook).
* [Select or create a Google Cloud project](https://console.cloud.google.com/cloud-resource-manager). When you first create an account, you get a $300 free credit towards your compute/storage costs.
* [Make sure that billing is enabled for your project](https://cloud.google.com/billing/docs/how-to/modify-project). 
* (When using non-Google Cloud local envirionments)Install Google Cloud SDK [Google Cloud SDK](https://cloud.google.com/sdk/)

### Mandatory variables
You must set the below variables:
* RPM_GCP_PROJECT to [Your Google Cloud Project]
* RPM_GCP_KFP_HOST to [Your KFP pipeline host]: See the instruction later to find out how to get that
* RPM_GCP_APPLICATION_CREDENTIALS to [Full path with the file name to the Service Account json file]

# Setup local environment

## PIP install appropriate packages

In [None]:
!pip install google-cloud-storage #for Storage Account
!pip install google-cloud #for cloud sdk
!pip install google-cloud-bigquery #for BigQuery
!pip install google-cloud-bigquery-storage #for BigQuery Storage client
!pip install kfp # Install the KFP AI SDK

## Setup KFP Host
Prepare the Cloud AI Platform Pipelines in Google Cloud:
* [Setting up AI Platform Pipelines](https://cloud.google.com/ai-platform/pipelines/docs/getting-started#set_up_your_instance).
* [Optional] Read the 'Introducing Cloud AI Platform Pipelines' [blogpost](https://cloud.google.com/blog/products/ai-machine-learning/introducing-cloud-ai-platform-pipelines)
* Follow the [instructions](https://cloud.google.com/ai-platform/pipelines/docs/connecting-with-sdk#using_the_to_connect_to_an_cluster) to collect the Kubeflow Pipelines host:
1. Copy from the code snippet from the popup dialogbox.
1. Find the hostname in the URL of the Kubeflow Pipelines dashboard. The hostname is the portion of the URL between https:// and /#/start, and should match the pattern *.pipelines.googleusercontent.com.


 

In [None]:
RPM_GCP_KFP_HOST = "<Your KFP pipeline host>"

## Setup Google Cloud Project

In [None]:
# set the Google Cloud project id
RPM_GCP_PROJECT = "<Your Google Cloud project>" #for local !bash

## Setup Google Cloud Credentials

### Specify the location of the ServiceAccount Key file

In [None]:
# download the ServiceAccount key and provide the path to the file below
RPM_GCP_APPLICATION_CREDENTIALS = "<Full path with the file name to the above downloaded json file>"

### Upload your service account file (colab specific code block)

In [None]:
# uncomment the below code in codelab environment
# from google.colab import files
# # Upload service account key
# keyfile_upload = files.upload()
# RPM_GCP_APPLICATION_CREDENTIALS = list(keyfile_upload.keys())[0]

## Setup your local runtime enviorment


In [None]:
!export RPM_GCP_PROJECT
!echo $RPM_GCP_PROJECT

In [None]:
# et the desired Google Cloud project
!gcloud config set project $RPM_GCP_PROJECT

In [None]:
# alidate that the Google Cloud project has been set properly.
!gcloud info --format='value(config.project)'

In [None]:
import os
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = RPM_GCP_APPLICATION_CREDENTIALS

In [None]:
# set the account
!gcloud auth activate-service-account --key-file=$RPM_GCP_APPLICATION_CREDENTIALS

# Enable the below Google Cloud Services for the solution

In [None]:
# set the proper Permission for the required Google Cloud Services
!gcloud services enable \
    storage-component.googleapis.com \
    bigquery.googleapis.com \
    ml.googleapis.com \
    notebooks.googleapis.com

In [None]:
# validate that all desired Permission have been set properl.
!gcloud services list | grep 'storage-component.googleapis.com\|bigquery.googleapis.com\|ml.googleapis.com\|notebooks.googleapis.com'

# Adjust other varables

In [None]:
# load_params
import json
def load_params():
  """The variables are used in the pipeline.

    Provide appropriate variables for your environments
    Set apppriate variables with the pattern RPM_*
      (these are IMMUTABLE variables those acts as default)
    You could print all the variables used in your environment
      (e.g. local environment) which starts with RPM_* or rpm_*
        (comes handy while troubleshooting)

  Returns:
    dict: all python variables used in the pipeline
  """
  return {
      'RPM_GCP_PROJECT': RPM_GCP_PROJECT,
      'RPM_LOCATION': 'us-central1-b',  # KFP/K8s cluster zone
      'RPM_PVC_DATASET_FOLDER_NAME': 'rpm_ds',
      'RPM_PVC_NAME': 'rpm-vol',
      # create the bucket if it don't exists
      'RPM_GCP_STORAGE_BUCKET': '',
      # create the folder if it don't exists
      'RPM_GCP_STORAGE_BUCKET_FOLDER': '',
      'RPM_DEFAULT_BUCKET_EXT': '_retail_propensity_model_assets',
      'RPM_DEFAULT_BUCKET_FOLDER_NAME': 'rpm_data_set',
      'RPM_BQ_DATASET_NAME': '',  # create the dataset if it don't exists
      'RPM_BQ_TABLE_NAME': '',
      'RPM_DEFAULT_BQ_TABLE_NAME_EXT': '_tbl',
      'RPM_DEFAULT_DATA_SET_NAME_EXT': '_rpm_data_set',
      'RPM_MODEL_NAME': '',
      'RPM_DEFAULT_MODEL_NAME': 'rpm_bqml_model',
      'RPM_MODEL_EXPORT_PATH': '',
      'RPM_DEFAULT_MODEL_EXPORT_PATH': 'bqml/model/export/',
      'RPM_MODEL_VER_PREFIX': 'V_',
      'RPM_RUNTIME_VERSION': '1.15',  # do not change
      'RPM_PYTHON_VERSION': '3.7',  # do not change
      'RPM_CLUSTER_NAME': 'cluster-1',  # KFP/K8s cluster name
      # variables created by the program
      # from user supplied set or from the program defaults
      'rpm_bq_ds_name': '',
      'rpm_gcs_rpm_ds_url': '',
      'rpm_file_name': '',
      'rpm_table_id': '',
      'rpm_bqml_model': '',
      'rpm_bqml_model_export_path': '',
      'rpm_model_version': '',
      'rpm_model_uri': '',
      'rpm_pred_table_id': '',
  }
all_vars = load_params()
RPM_DS_DOWNLOAD_EXPERIMENT_NAME = 'GoogleStore Retail Pipeline'

# Reset the local context for local development if needed

In [None]:
# reset_local_context
def reset_local_context():
  """Resets all the variables used in the local environment.

    Comes handy while deveoping and testing the code locally.
  """
  try:
    del globals()['local_context']
  except KeyError as e:
    print('local_context not found!!!')
    print(e)
  globals().get('local_context')  # validate that the local variable is removed
# reset_local_context()  # use before testing a component locally if needed

In [None]:
# get_local_context
def get_local_context(init_var=None):
  """Define local rpm_context object

      The funtion sets the appropriate variables to
        execute the code in a local environment.
      local_context contains all the variables used in the local envrionmnet.
      You could check the variable before and after the call to find out the
        desired result (comes handy while developing
        and testing the code locally)
    Args:
        init_var (:obj:`dict`, optional): The dict object overrides the existing
          local context (use sparingly only, when needed)
    Returns:
      dict: all python variables used in the pipeline
  """
  global local_context
  local_context = globals().get('local_context')
  if not local_context:
      local_context = load_params()
  if init_var:
      local_context = init_var
  local_context["RPM_PVC_NAME"] = os.environ["HOME"]
  if not local_context.get("rpm_bq_ds_name"):
      local_context["rpm_bq_ds_name"] = f"{all_vars['RPM_GCP_PROJECT'].replace('-','_')}_rpm_data_set"
  if not local_context.get("rpm_gcs_rpm_ds_url"):
      local_context["rpm_gcs_rpm_ds_url"] = f"gs://{all_vars['RPM_GCP_PROJECT']}_retail_propensity_model_assets/rpm_data_set/"
  if not local_context.get("rpm_table_id"):
      local_context["rpm_table_id"] = f"{all_vars['RPM_GCP_PROJECT']}.{all_vars['RPM_GCP_PROJECT'].replace('-','_')}_rpm_data_set.{all_vars['RPM_GCP_PROJECT'].replace('-','_')}_rpm_data_set_tbl"
  if not local_context.get("RPM_MODEL_NAME"):
      local_context["rpm_bqml_model"] = "rpm_bqml_model"
  if not local_context.get("rpm_bqml_model_export_path"):
      local_context["rpm_bqml_model_export_path"] = "bqml/model/export/V_1/"
  if not local_context.get("rpm_model_version"):
      local_context["rpm_model_version"] = "V_1"
  if not local_context["rpm_model_uri"]:
      local_context["rpm_model_uri"] = f"gs://{all_vars['RPM_GCP_PROJECT']}_retail_propensity_model_assets/rpm_data_set/bqml/model/export/V_1/"
  if not local_context["rpm_pred_table_id"]:
      local_context["rpm_pred_table_id"] = f"{all_vars['RPM_GCP_PROJECT']}.{all_vars['RPM_GCP_PROJECT'].replace('-','_')}_rpm_data_set.{all_vars['RPM_GCP_PROJECT'].replace('-','_')}_rpm_data_set_pred_tbl"

  print (local_context)
  return local_context

def test_comp_local(func):
  local_context = get_local_context()
  import json
  new_local_context_str = func(json.dumps(local_context))
  print(f'type: {type(new_local_context_str)}; new_local_context_str:{new_local_context_str}')
  local_context = json.loads(new_local_context_str)

def update_local_context(output):
  print(f'type: {type(output)}; new_local_context_str:{output}')
  local_context = json.loads(output[0])

# Instantiate the KFP Client

In [None]:
# Create a KFP Client and Validate that you are able to access the KFP Pipelines
# You will be using the KFP HOST to deploy the KFP pipeline (experiment) and lauch the experiment
import kfp
kfp_client = kfp.Client(host=RPM_GCP_KFP_HOST)
kfp_client.LOCAL_KFP_CONTEXT

# Create Google Cloud Storage bucket and folder - function

In [None]:
# create_gcs_bucket_folder
from typing import NamedTuple
def create_gcs_bucket_folder(ctx: str,
                             RPM_GCP_STORAGE_BUCKET: str,
                             RPM_GCP_PROJECT: str,
                             RPM_DEFAULT_BUCKET_EXT: str,
                             RPM_GCP_STORAGE_BUCKET_FOLDER: str,
                             RPM_DEFAULT_BUCKET_FOLDER_NAME: str
                            ) -> NamedTuple('Outputs', [
       ('rpm_context', str),
       ('rpm_gcs_rpm_ds_url', str),
    ]):
  """The function (also used as a base for a KFP Component) creates a
    Google Cloud Storage bucket and a folder if they don't exist.

    The idea is to create the bucket and folder only on the first
      run of the pipeline.
    The pipeline uses the same storage account and the folder
      for repeated runs.
    Args:
      ctx(:obj:`str`): The dict object with all the variables
        used in the pipeline
      RPM_GCP_STORAGE_BUCKET(:obj:`str`): User supplied Storage Bucket name
      RPM_GCP_PROJECT:(:obj:`str`): Google Cloud project for deployment
      RPM_DEFAULT_BUCKET_EXT:(:obj:`str`): Name of the bucket,
        when user hasn't supplied a bucket name
      RPM_GCP_STORAGE_BUCKET_FOLDER:(:obj:`str`): User supplied folder name
      RPM_DEFAULT_BUCKET_FOLDER_NAME:(:obj:`str`): Name for creating a
        bucket, when User hasn't supplied a folder name

    Returns:
      Outputs(:obj: `tuple`): Returns the below outputs:
        rpm_context(:obj:`str`): All variables used in the pipeline
        rpm_gcs_rpm_ds_url(:obj:`str`): Full Google Cloud Storage path with
          bucket name and folder name
  """
  # loading rpm_context string
  import json
  rpm_context = json.loads(ctx)
  print(rpm_context)
  RPM_GCP_STORAGE_BUCKET = rpm_context['RPM_GCP_STORAGE_BUCKET']
  RPM_GCP_PROJECT = rpm_context['RPM_GCP_PROJECT']
  RPM_DEFAULT_BUCKET_EXT = rpm_context['RPM_DEFAULT_BUCKET_EXT']
  RPM_GCP_STORAGE_BUCKET_FOLDER = rpm_context['RPM_GCP_STORAGE_BUCKET_FOLDER']
  RPM_DEFAULT_BUCKET_FOLDER_NAME = rpm_context['RPM_DEFAULT_BUCKET_FOLDER_NAME']

  if RPM_GCP_STORAGE_BUCKET:
    gcs_storage_bucket_name = RPM_GCP_STORAGE_BUCKET
  else:
    gcs_storage_bucket_name = RPM_GCP_PROJECT + RPM_DEFAULT_BUCKET_EXT

  if RPM_GCP_STORAGE_BUCKET_FOLDER:
    gcs_folder_name = RPM_GCP_STORAGE_BUCKET_FOLDER + '/'
  else:
    gcs_folder_name = RPM_DEFAULT_BUCKET_FOLDER_NAME + '/'
  print(f"{gcs_storage_bucket_name} bucket and {gcs_folder_name} will be used in the project.")

  rpm_gcs_rpm_ds_url = f"gs://{gcs_storage_bucket_name}/{gcs_folder_name}"
  print(rpm_gcs_rpm_ds_url)
  rpm_context['rpm_gcs_rpm_ds_url'] = rpm_gcs_rpm_ds_url

  # defining the install function
  import subprocess
  def install(name):
    subprocess.call(['pip', 'install', name])
  pacakages_to_install = ['google-cloud', 'google-cloud-storage']
  for each_package in pacakages_to_install:
    install(each_package)
    print(f"'{each_package}' package installed :)")

  cmd = f"gcloud config set project {rpm_context['RPM_GCP_PROJECT']}"
  print(cmd)
  process = subprocess.run(cmd, shell=True, check=True,
                           stdout=subprocess.PIPE, universal_newlines=True)
  print(f"'output from {cmd}': {process.stdout}")

  from google.cloud import storage
  from google.cloud.exceptions import NotFound
  from google.cloud.exceptions import Forbidden
  import traceback

  def check_storage_bucket_and_folder(bucket_name, folder_name):
    if not bucket_name or not folder_name:
      return(False, False)
    client = storage.Client()
    try:  # check if the bucket exists and that we have the proper permission
      bucket = client.get_bucket(bucket_name)
      print(f"Bucket: {bucket_name} exists.")
      bucket_exists = True
      try:
        blob = bucket.get_blob(folder_name)
        if blob is None:
          print(f"Folder name {folder_name} does not exist!")
          folder_exists = False
        else:
          print(f"Folder name {folder_name} exist.")
          folder_exists = True
      except:
          print(f"Folder name {folder_name} doest not exist!")
          folder_exists = False
    except Forbidden as e:
      print(f"Sorry, you don't have access to the bucket: {bucket_name}!")
      print(e)
      error = traceback.format_exc()
      print(error)
      bucket_exists = False
      folder_exists = False
    except NotFound as e:
      print(f"Sorry, the bucket: {bucket_name} does not exist!")
      print(e)
      error = traceback.format_exc()
      print(error)
      bucket_exists = False
      folder_exists = False
    return(bucket_exists, folder_exists)

  # Create a bucket if it doesn't exists
  def create_storage_bucket(bucket_name):
    if bucket_name:
      client = storage.Client()
      try:
        bucket = client.create_bucket(bucket_name)
        print(f"Bucket {bucket.name} created")
        return True
      except Exception as e:
        print(f"Bucket {bucket_name} couldn't be created")
        print(e)
        error = traceback.format_exc()
        print(error)
        return False
    else:
      print(f"Bucket {bucket_name} couldn't be created. Name is empty.")
      return False

  # Create the folder in the bucket
  def create_storage_folder(bucket_name, folder_name):
    if len(bucket_name) == 0 or len(folder_name) == 0:
      print(f"Folder {folder_name} couldn't be created. Name is empty.")
      return False
    else:
      client = storage.Client()
      try:
        bucket = client.get_bucket(bucket_name)
        blob = bucket.blob(folder_name)
        blob.upload_from_string('')
        print(f"Folder {blob.name} created")
        return True
      except Exception as e:
        print(f"Folder {folder_name} couldn't be created")
        print(e)
        error = traceback.format_exc()
        print(error)
        return False

  result = check_storage_bucket_and_folder(gcs_storage_bucket_name,
                                           gcs_folder_name)
  if result[0] == False:
    create_storage_bucket(gcs_storage_bucket_name)
  if result[1] == False:
    create_storage_folder(gcs_storage_bucket_name, gcs_folder_name)

  return (
      json.dumps(rpm_context),
      rpm_context['rpm_gcs_rpm_ds_url']
      )

# *Test locally create_gcs_bucket_folder*

In [None]:
# test locally create_gcs_bucket_folder
# You could unit test the above code in your local environment.
# You don't need to execute the code, when simply building or running the KFP pipeline (experiment)
local_context = get_local_context()
import json
update_local_context(create_gcs_bucket_folder(
    json.dumps(local_context),
    local_context['RPM_GCP_STORAGE_BUCKET'],
    local_context['RPM_GCP_PROJECT'],
    local_context['RPM_DEFAULT_BUCKET_EXT'],
    local_context['RPM_GCP_STORAGE_BUCKET_FOLDER'],
    local_context['RPM_DEFAULT_BUCKET_FOLDER_NAME']
))


# Create BigQuery dataset - function

In [None]:
# create_bq_ds
from typing import NamedTuple
def create_bq_ds(ctx: str,
                 RPM_GCP_PROJECT: str,
                 RPM_BQ_DATASET_NAME: str,
                 RPM_LOCATION: str
                 ) -> NamedTuple('Outputs', [
       ('rpm_context', str),
       ('rpm_bq_ds_name', str),
    ]):
  """The function(also used as a base for a KFP Component) creates a
        BigQuery dataset if don't exist.

      The idea is to create DataSet only on the first run of the pipeline.
      The pipeline uses the same DataSet for repeated runs.
      Args:
        ctx(:obj:`str`): The dict object with all the variables
          used in the pipeline
        RPM_GCP_PROJECT:(:obj:`str`): Google Cloud project for deployment
        RPM_BQ_DATASET_NAME:(:obj:`str`): Name of the dataset.
        RPM_LOCATION:(:obj:`str`): Location of the Google Cloud region
          of the BigQuery dataset
      Returns:
        Outputs(:obj: `tuple`): Returns the below outputs:
          rpm_context(:obj:`str`): All variables used in the pipeline
          rpm_bq_ds_name(:obj:`str`): The dataset name
            used in the rest of the pipeline
  """
  import json
  rpm_context = json.loads(ctx)
  print(rpm_context)

  rpm_bq_ds_name = rpm_context['RPM_BQ_DATASET_NAME']
  if not rpm_bq_ds_name:
    rpm_bq_ds_name = \
        f"{rpm_context['RPM_GCP_PROJECT']}{rpm_context['RPM_DEFAULT_DATA_SET_NAME_EXT']}"
  rpm_bq_ds_name = rpm_bq_ds_name.replace('-', '_')

  rpm_context['rpm_bq_ds_name'] = rpm_bq_ds_name

  import subprocess
  import traceback
  def exec_cmd(cmd):
    try:
      print(cmd)
      process = subprocess.run(cmd, shell=True, check=True,
                               stdout=subprocess.PIPE, universal_newlines=True)
      print(f"'output from {cmd}': {process.stdout}")
    except subprocess.CalledProcessError as e:
      error = traceback.format_exc()
      print(error)
      print(e.output)

  exec_cmd(f"gcloud config set project {rpm_context['RPM_GCP_PROJECT']}")

  def install(name):
    subprocess.call(['pip', 'install', name])
  pacakages_to_install = ['google-cloud', 'google-cloud-bigquery']
  for each_package in pacakages_to_install:
    install(each_package)
    print(f"'{each_package}' package installed :)")

  from google.cloud import bigquery
  from google.cloud.exceptions import NotFound
  client = bigquery.Client()
  dataset_id = f"{rpm_context['RPM_GCP_PROJECT']}.{rpm_bq_ds_name}"
  ds_found = True
  try:
    client.get_dataset(dataset_id)  # Make an API request.
    print('Dataset {} already exists'.format(dataset_id))
  except NotFound:
    print('Dataset {} is not found'.format(dataset_id))
    ds_found = False

  if ds_found is False:
    try:
      # Construct a full Dataset object to send to the API.
      dataset = bigquery.Dataset(dataset_id)
      dataset.location = rpm_context['RPM_LOCATION'].split('-')[0].upper()
      dataset = client.create_dataset(dataset)  # Make an API request.
      print('Created dataset {}.{} in location: {}.'.\
            format(client.project, dataset.dataset_id, dataset.location))
    except Exception as e:
      error = traceback.format_exc()
      print(error)
      print(e)
      raise RuntimeError(f"Can't create the BigQuery DS {dataset_id}")

  return (
      json.dumps(rpm_context),
      rpm_context['rpm_bq_ds_name']
      )


# *Test locally create_bq_ds*

In [None]:
# test locally create_bq_ds
# You could unit test the above code in your local environment.
# You don't need to execute the code, when simply building or running the KFP pipeline (experiment)
local_context = get_local_context()
import json
update_local_context(create_bq_ds(
    json.dumps(local_context),
    local_context['RPM_GCP_PROJECT'],
    local_context['RPM_BQ_DATASET_NAME'],
    local_context['RPM_LOCATION']))


# Load the data to BigQuery - function

In [None]:
from typing import NamedTuple
def load_bq_ds(ctx: str,
               RPM_GCP_PROJECT: str,
               RPM_BQ_TABLE_NAME: str,
               RPM_DEFAULT_BQ_TABLE_NAME_EXT: str,
               rpm_bq_ds_name: str, ) -> NamedTuple('Outputs', [
       ('rpm_context', str),
       ('rpm_table_id', str),
    ]):
  """The function(also used as a base for a KFP Component)
        loads the data to a BigQuery table.

    You need to replace the component with your data source
    e.g. you might download the data from a different source,
      in which case you to code those steps
    Decide on your load strategy here such add or append.
    Furthermore you could cache this KFP component
      if the load is just a onetime thing.
    Args:
      ctx(:obj:`str`): The dict object with all the variables
        used in the pipeline
      RPM_GCP_PROJECT:(:obj:`str`): Google Cloud project for deployment
      RPM_BQ_TABLE_NAME:(:obj:`str`): Name of the table.
      RPM_DEFAULT_BQ_TABLE_NAME_EXT:(:obj:`str`): Default table name
        if the user didn't provide one(RPM_BQ_TABLE_NAME)
      rpm_bq_ds_name(:obj:`str`): The dataset name
        used in the rest of the pipeline
    Returns:
      Outputs(:obj: `tuple`): Returns the below outputs:
        rpm_context(:obj:`str`): All variables used in the pipeline
        rpm_table_id(:obj:`str`): The table name
          used in the rest of the pipeline
  """
  import json
  rpm_context = json.loads(ctx)
  print(rpm_context)

  rpm_bq_ds_name = rpm_context['rpm_bq_ds_name']
  dataset_id = f"{rpm_context['RPM_GCP_PROJECT']}.{rpm_bq_ds_name}"

  if not rpm_context['RPM_BQ_TABLE_NAME']:
    rpm_table_id = f"{dataset_id}.{rpm_bq_ds_name}{rpm_context['RPM_DEFAULT_BQ_TABLE_NAME_EXT']}"
  else:
    rpm_table_id = f"{dataset_id}.{rpm_context['RPM_BQ_TABLE_NAME']}"
  rpm_context['rpm_table_id'] = rpm_table_id

  import subprocess
  def install(name):
    subprocess.call(['pip', 'install', name])
  pacakages_to_install = ['google-cloud', 'google-cloud-bigquery']
  for each_package in pacakages_to_install:
    install(each_package)
    print(f"'{each_package}' package installed :)")

  query = f"""
    # select initial features and label to feed into our model
    CREATE OR REPLACE TABLE {rpm_table_id}
    OPTIONS(
    description="Google Store curated Data"
    ) AS 
    SELECT
    fullVisitorId,
    bounces,
    time_on_site,
    will_buy_on_return_visit # <--- our label
    FROM
    # features
    (SELECT
        fullVisitorId,
        IFNULL(totals.bounces, 0) AS bounces,
        IFNULL(totals.timeOnSite, 0) AS time_on_site
    FROM
        `bigquery-public-data.google_analytics_sample.*`
    WHERE
        totals.newVisits = 1
        AND date BETWEEN '20160801' AND '20170430') # train on first 9 months
    JOIN
    (SELECT
        fullvisitorid,
        IF(COUNTIF(totals.transactions > 0 AND totals.newVisits IS NULL) > 0, 1, 0) AS will_buy_on_return_visit
    FROM
        `bigquery-public-data.google_analytics_sample.*`
    GROUP BY fullvisitorid)
    USING (fullVisitorId)
    ORDER BY time_on_site DESC # order by most time spent first
  """
  print(query)
  import traceback
  from google.cloud import bigquery
  try:
    client = bigquery.Client()
    print(query)
    query_job = client.query(query)  # Make an API request.
    print(f"Table {rpm_table_id} created.")
  except Exception as e:
    error = traceback.format_exc()
    print(error)
    print(e)
    raise RuntimeError(f"Can't create the table {rpm_table_id}")
  destination_table = rpm_table_id
  print(f"{destination_table}")

  return (
      json.dumps(rpm_context),
      rpm_context['rpm_table_id']
      )

# *Test locally load_bq_ds*

In [None]:
# test locally load_bq_ds
# You could unit test the above code in your local environment.
# You don't need to execute the code, when simply building or running the KFP pipeline (experiment)
local_context = get_local_context()
import json
update_local_context(load_bq_ds(
    json.dumps(local_context),
    local_context['RPM_GCP_PROJECT'],
    local_context['RPM_BQ_TABLE_NAME'],
    local_context['RPM_DEFAULT_BQ_TABLE_NAME_EXT'],
    local_context['rpm_bq_ds_name'],
    ))


# Create the BigQuery ML model - function

In [None]:
# create_bq_ml
from typing import NamedTuple
def create_bq_ml(ctx: str,
                  RPM_GCP_PROJECT: str,
                  RPM_MODEL_NAME: str,
                  RPM_DEFAULT_MODEL_NAME: str,
                  rpm_bq_ds_name: str,
                  rpm_table_id: str ) -> NamedTuple('Outputs', [
       ('rpm_context', str),
       ('rpm_bqml_model', str),
    ]):
  """ The function(also used as a base for a KFP Component) creates a model
      from the data that you have already loaded previously.

    You need to adjust the model type, model hyperparamters, features,
      and label depending on your need.
    Args:
      ctx(:obj:`str`): The dict object with all the variables
        used in the pipeline
      RPM_GCP_PROJECT:(:obj:`str`): Google Cloud project for deployment
      RPM_MODEL_NAME:(:obj:`str`): Name of the model.
      RPM_DEFAULT_MODEL_NAME:(:obj:`str`): Default model name
        if the user didn't provide one(RPM_MODEL_NAME)
      rpm_bq_ds_name(:obj:`str`): The dataset name
        used in the rest of the pipeline
      rpm_table_id(:obj:`str`): The table name
        used in the rest of the pipeline
    Returns:
      Outputs(:obj: `tuple`): Returns the below outputs:
        rpm_context(:obj:`str`): All variables used in the pipeline
        rpm_bqml_model(:obj:`str`): The model name
          used in the rest of the pipeline
  """
  import json
  rpm_context = json.loads(ctx)
  print(rpm_context)

  rpm_bqml_model = rpm_context['RPM_MODEL_NAME']
  if not rpm_bqml_model:
    rpm_bqml_model = rpm_context['RPM_DEFAULT_MODEL_NAME']
  rpm_bqml_model = rpm_bqml_model.replace('-', '_')
  rpm_context['rpm_bqml_model'] = rpm_bqml_model

  import subprocess
  import traceback
  def exec_cmd(cmd):
    try:
      print(cmd)
      process = subprocess.run(cmd, shell=True, check=True,
                               stdout=subprocess.PIPE, universal_newlines=True)
      print(f"'output from {cmd}': {process.stdout}")
      return process.stdout
    except subprocess.CalledProcessError as e:
      error = traceback.format_exc()
      print(error)
      print(e.output)
      return e.output

  exec_cmd(f"gcloud config set project {rpm_context['RPM_GCP_PROJECT']}")

  bqml_create_sql = f"""
        CREATE OR REPLACE MODEL
        {rpm_context['rpm_bq_ds_name']}.{rpm_bqml_model}
        OPTIONS
        ( model_type='LOGISTIC_REG',
          auto_class_weights=TRUE,
          input_label_cols=['will_buy_on_return_visit']) AS
        SELECT
        * EXCEPT(fullVisitorId)
        FROM
        {rpm_context['rpm_table_id'].replace(RPM_GCP_PROJECT+'.', '')}
  """
  # you can uncomment the below query to try out the XGBoost model
  # bqml_create_sql = f"""
  #     CREATE OR REPLACE MODEL
  #     \`{rpm_context['rpm_bq_ds_name']}.{rpm_bqml_model}\`
  #     OPTIONS(MODEL_TYPE='BOOSTED_TREE_CLASSIFIER',
  #             BOOSTER_TYPE = 'GBTREE',
  #             NUM_PARALLEL_TREE = 1,
  #             MAX_ITERATIONS = 50,
  #             TREE_METHOD = 'HIST',
  #             EARLY_STOP = FALSE,
  #             SUBSAMPLE = 0.85,
  #             INPUT_LABEL_COLS = ['will_buy_on_return_visit'])
  #     AS
  #     SELECT
  #     * EXCEPT(fullVisitorId)
  #     FROM
  #     \`{rpm_context['rpm_table_id']}\`
  # """
  exec_cmd(f'bq query --use_legacy_sql=false "{bqml_create_sql}"')
  bq_model_created = exec_cmd(f"bq ls -m --format=pretty {rpm_context['rpm_bq_ds_name']} | grep {rpm_bqml_model}")
  if not bq_model_created:
    raise RuntimeError(f"Please check if the model {rpm_context['rpm_bq_ds_name']} created.")

  return (
      json.dumps(rpm_context),
      rpm_context['rpm_bqml_model']
      )

# *Test locally create_bq_ml*

In [None]:
# test locally create_bq_ml
# You could unit test the above code in your local environment.
# You don't need to execute the code, when simply building or running the KFP pipeline (experiment)
local_context = get_local_context()
import json
update_local_context(create_bq_ml(
    json.dumps(local_context),
    local_context['RPM_GCP_PROJECT'],
    local_context['RPM_MODEL_NAME'],
    local_context['RPM_DEFAULT_MODEL_NAME'],
    local_context['rpm_bq_ds_name'],
    local_context['rpm_table_id']))

# Evaluate the BigQuery ML model - function

In [None]:
# evaluate_ml_model
from typing import NamedTuple
def evaluate_ml_model(ctx: str,
                       RPM_GCP_PROJECT: str,
                       rpm_bq_ds_name: str,
                       rpm_bqml_model: str, ) -> NamedTuple('Outputs', [
       ('rpm_context', str),
       ('rpm_eval_query', str),
       ('rpm_eval_result', str),
    ]):
  """ The function(also used as a base for a KFP Component) evaluates
    the model you created.

    Update your selection criteria and stop the pipeline
      if the model didn't meet the criteria
    You can raise an exception to stop the pipeline
    Args:
      ctx(:obj:`str`): The dict object with all the variables
        used in the pipeline
      RPM_GCP_PROJECT:(:obj:`str`): Google Cloud project for deployment
      rpm_bq_ds_name(:obj:`str`): The dataset name
        used in the rest of the pipeline
      rpm_bqml_model(:obj:`str`): The model name
        used in the rest of the pipeline
    Returns:
      Outputs(:obj: `tuple`): Returns the below outputs:
        rpm_context(:obj:`str`): All variables used in the pipeline
        rpm_eval_query(:obj:`str`): The evaluate sql query,
          which is saved for auditing purpose in the pipeline artifacts
        rpm_eval_result(:obj:`str`): The result of the evaluated query,
          which is saved for auditing purpose in the pipeline artifacts
  """
  import json
  rpm_context = json.loads(ctx)
  print(rpm_context)

  import subprocess
  import traceback
  def exec_cmd(cmd):
    try:
      print(cmd)
      process = subprocess.run(cmd, shell=True, check=True,
                               stdout=subprocess.PIPE, universal_newlines=True)
      print(f"'output from {cmd}': {process.stdout}")
      return(process.stdout, 0)
    except subprocess.CalledProcessError as e:
      error = traceback.format_exc()
      print(error)
      print(e.output)
      return(e.output, 1)

  exec_cmd(f"gcloud config set project {rpm_context['RPM_GCP_PROJECT']}")

  bqml_eval_query = f"""
    SELECT
    roc_auc, CASE WHEN roc_auc > .9 THEN 'good'
    WHEN roc_auc > .8 THEN 'fair' WHEN roc_auc > .7 THEN 'decent'
    WHEN roc_auc > .6 THEN 'not great' ELSE 'poor' END AS modelquality
    FROM
    ML.EVALUATE(MODEL {rpm_bq_ds_name}.{rpm_bqml_model})
  """
  rpm_eval_result = exec_cmd(f'bq query --use_legacy_sql=false --format=json "{bqml_eval_query}"')
  print(rpm_eval_result)

  rpm_context['bqml_eval_query'] = bqml_eval_query
  rpm_context['rpm_eval_result'] = rpm_eval_result

  return (
      json.dumps(rpm_context),
      f"""{bqml_eval_query}""",
      f"""{rpm_eval_result}""",
      )

# *Test locally evaluate_ml_model*

In [None]:
# test locally evaluate_ml_model
# You could unit test the above code in your local environment.
# You don't need to execute the code, when simply building or running the KFP pipeline (experiment)
local_context = get_local_context()
import json
update_local_context(evaluate_ml_model(
    json.dumps(local_context),
    local_context['RPM_GCP_PROJECT'],
    local_context['rpm_bq_ds_name'],
    local_context['rpm_bqml_model'],
    ))

# Prepare dataset for batch prediction with BigQuery ML - function

In [None]:
# create_batch_prediction_dataset
from typing import NamedTuple
def create_batch_prediction_dataset(ctx: str,
                                    RPM_GCP_PROJECT: str,
                                    rpm_bq_ds_name: str,
                                    rpm_table_id: str ) -> NamedTuple('Outputs', [
       ('rpm_context', str),
       ('rpm_pred_table_id', str),
    ]):
  """ The function(also used as a base for a KFP Component)
        creates a BigQuery table which contains the input data for
        which we want predictions.

    You might not need this this componenet with
      your input table if already exists.
    You might need some transformation or filteration on your input data,
    in which case you need to make appropriate code change.
    Args:
      ctx(:obj:`str`): The dict object with all the variables
        used in the pipeline
      RPM_GCP_PROJECT:(:obj:`str`): Google Cloud project for deployment
      rpm_bq_ds_name(:obj:`str`): The dataset name used in the rest of the pipeline
      rpm_table_id(:obj:`str`): The table name used in the rest of the pipeline
    Returns:
      Outputs(:obj: `tuple`): Returns the below outputs:
        rpm_context(:obj:`str`): All variables used in the pipeline
        rpm_pred_table_id(:obj:`str`): The table that contains the input data,
          which we want batch predict later
  """
  import json
  rpm_context = json.loads(ctx)
  print(rpm_context)

  import subprocess
  def install(name):
    subprocess.call(['pip', 'install', name])
  pacakages_to_install = ['google-cloud', 'google-cloud-bigquery']
  for each_package in pacakages_to_install:
    install(each_package)
    print(f"'{each_package}' package installed :)")

  rpm_pred_table_id = rpm_table_id.replace('_tbl', '_pred_tbl')

  query = f"""
      # create the input table to conduct batch predict
      CREATE OR REPLACE TABLE {rpm_pred_table_id}
      OPTIONS(
      description="Input data for prediction"
      ) AS
      SELECT *
      FROM {rpm_context['rpm_table_id']}
      LIMIT 10
  """
  print(query)
  import traceback
  from google.cloud import bigquery
  try:
    client = bigquery.Client()
    query_job = client.query(query)  # Make an API request.
    print(f"Table {rpm_pred_table_id} created.")
  except Exception as e:
    error = traceback.format_exc()
    print(error)
    print(e)
    raise RuntimeError(f"Can't create the table {rpm_pred_table_id}")

  rpm_context['rpm_pred_table_id'] = rpm_pred_table_id

  return (json.dumps(rpm_context),
         rpm_context['rpm_pred_table_id'],
         )

# *Test locally create batch prediction dataset*

In [None]:
#test locally create batch prediction dataset
# You could unit test the above code in your local environment.
# You don't need to execute the code, when simply building or running the KFP pipeline (experiment)
local_context = get_local_context()
import json
update_local_context(create_batch_prediction_dataset(
    json.dumps(local_context),
    local_context['RPM_GCP_PROJECT'],
    local_context['rpm_bq_ds_name'],
    local_context['rpm_table_id'],
    ))

# Make batch prediction - function

In [None]:
# predict_batch_ml_model
from typing import NamedTuple
def predict_batch_ml_model(ctx: str,
                           RPM_GCP_PROJECT: str,
                           rpm_bq_ds_name: str,
                           rpm_bqml_model: str,
                           rpm_pred_table_id: str, ) -> NamedTuple('Outputs', [
      ('rpm_context', str),
      ('rpm_predict_batch_output', str),
    ]):
  """
    The function(also used as a base for a KFP Component) uses the model
      to predict the data in mass.

    You migth also need to save the predicted values
      at an appropriate repository of your choice.
    Currently the predicted value is printed on the console
      and returned as an output from the function.
    Args:
      ctx(:obj:`str`): The dict object with all the variables
        used in the pipeline
      RPM_GCP_PROJECT:(:obj:`str`): Google Cloud project for deployment
      rpm_bq_ds_name(:obj:`str`): The dataset name
        used in the rest of the pipeline
      rpm_bqml_model(:obj:`str`): The model name
        used in the rest of the pipeline
      rpm_pred_table_id(:obj:`str`): The table that contains the input data
        which we want batch predict later
    Returns:
      Outputs(:obj: `tuple`): Returns the below outputs:
        rpm_context(:obj:`str`): All variables used in the pipeline
        rpm_predict_batch_output(:obj:`str`): The output f
          rom the batch prediction
  """
  import json
  rpm_context = json.loads(ctx)
  print(rpm_context)

  import subprocess
  def install(name):
    subprocess.call(['pip', 'install', name])
  pacakages_to_install = ["google-cloud", "google-cloud-bigquery"]
  for each_package in pacakages_to_install:
    install(each_package)
    print(f"'{each_package}' package installed :)")

  query = f"""
      # predict the inputs (rows) from the input table
      SELECT
      fullVisitorId, predicted_will_buy_on_return_visit
      FROM
      ML.PREDICT(MODEL {rpm_bq_ds_name}.{rpm_bqml_model},
      (
          SELECT
          fullVisitorId,
          bounces,
          time_on_site
          from {rpm_pred_table_id}
      ))
  """
  print(query)
  import traceback
  from google.cloud import bigquery
  output = ""
  try:
    client = bigquery.Client()
    query_job = client.query(query)  # Make an API request.
    print("The query data:")
    for row in query_job:
      # Row values can be accessed by field name or index.
      print(f"row data: {row}")
      output += str(row)
  except Exception as e:
    error = traceback.format_exc()
    print(error)
    print(e)
    raise RuntimeError(f"Can't batch predict")

  rpm_context['rpm_predict_output'] = output

  return (
      json.dumps(rpm_context),
      rpm_context['rpm_predict_output'],
      )

# *Test locally batch prediction*

In [None]:
#test locally batch prediction
# You could unit test the above code in your local environment.
# You don't need to execute the code, when simply building or running the KFP pipeline (experiment)
local_context = get_local_context()
import json
update_local_context(predict_batch_ml_model(
    json.dumps(local_context),
    local_context['RPM_GCP_PROJECT'],
    local_context['rpm_bq_ds_name'],
    local_context['rpm_bqml_model'],
    local_context['rpm_pred_table_id'],
    ))


# Determine the new revision of the model - function

In [None]:
# get_bqml_model_version
from typing import NamedTuple
def get_bqml_model_version(ctx: str,
                           RPM_GCP_PROJECT: str,
                           RPM_MODEL_EXPORT_PATH: str,
                           RPM_DEFAULT_MODEL_EXPORT_PATH: str,
                           RPM_MODEL_VER_PREFIX: str,
                           rpm_gcs_rpm_ds_url: str ) -> NamedTuple('Outputs', [
        ('rpm_context', str),
        ('rpm_bqml_model_export_path', str),
        ('rpm_model_version', str),
    ]):
  """
    The function(also used as a base for a KFP Component) determines
      the revision of the models.

    It checkes the current version and increments by 1.
    It prepares the folder for the BigQuery ML to export the model.
    Args:
      ctx(:obj:`str`): The dict object with all the variables
        used in the pipeline
      RPM_GCP_PROJECT:(:obj:`str`): Google Cloud project for deployment
      RPM_MODEL_EXPORT_PATH(:obj:`str`): User supplied model export path
      RPM_DEFAULT_MODEL_EXPORT_PATH(:obj:`str`): Uses the default path,
        if the user didn't provide a path
      RPM_MODEL_VER_PREFIX(:obj:`str`): The folder with prefix
      rpm_pred_table_id(:obj:`str`): The table that contains the input data
        which we want batch predict later
    Returns:
      Outputs(:obj: `tuple`): Returns the below outputs:
        rpm_context(:obj:`str`): All variables used in the pipeline
        rpm_bqml_model_export_path(:obj:`str`): the path
          to which we can export the model
        rpm_model_version(:obj:`str`): the version which we will use when
          we deploy the model to Cloud AI Prediction
  """
  import json
  rpm_context = json.loads(ctx)
  print(rpm_context)

  # defining the install function
  import subprocess
  import os
  def install(name):
    subprocess.call(['pip', 'install', name])
  pacakages_to_install = ['google-cloud', 'google-cloud-storage']
  for each_package in pacakages_to_install:
    install(each_package)
    print(f"'{each_package}' package installed :)")

  cmd = f"gcloud config set project {rpm_context['RPM_GCP_PROJECT']}"
  print(cmd)
  process = subprocess.run(cmd, shell=True, check=True,
                           stdout=subprocess.PIPE, universal_newlines=True)
  print(f"'output from {cmd}': {process.stdout}")

  from google.cloud import storage
  from google.cloud.exceptions import NotFound
  from google.cloud.exceptions import Forbidden
  import traceback

  client = storage.Client()
  try:
    bucket_name= rpm_context['rpm_gcs_rpm_ds_url'].split('/')[2]
    rpm_bq_ds_name = rpm_context['rpm_gcs_rpm_ds_url'].split('/')[3]
    rpm_bqml_model_export_path = rpm_context['RPM_MODEL_EXPORT_PATH']
    if not rpm_bqml_model_export_path:
      rpm_bqml_model_export_path = rpm_context['RPM_DEFAULT_MODEL_EXPORT_PATH']
    bucket = client.get_bucket(bucket_name)
    print(f'Details: {bucket}')
    folder_name = os.path.join(rpm_bq_ds_name, rpm_bqml_model_export_path)
    blob = bucket.get_blob(folder_name)
    model_version = 0
    if blob is None:
      print(f"Folder name {folder_name} does not exist!")
      print(f"{bucket_name}, {folder_name}")
      blob = bucket.blob(folder_name)
      blob.upload_from_string('')
      print(f"Folder name {folder_name} created.")
    else:
      print(f"Folder name {folder_name} exist.")
      client = storage.Client()
      blobs = client.list_blobs(bucket_name, prefix=folder_name)
      print('Blobs:')
      for blob in blobs:
        print(f"blob name: {blob.name}")
        curr_ver = blob.name.replace(folder_name, '')
        print(f"folder_name: {folder_name}")
        print(f"after folder_name replace: {curr_ver}")
        if rpm_context['RPM_MODEL_VER_PREFIX'] in curr_ver \
            and len(curr_ver.split('/')) == 2 and  \
            len(curr_ver.split('/')[1]) == 0:
          curr_ver = curr_ver.replace(rpm_context['RPM_MODEL_VER_PREFIX'], '').replace('/','').split('/')[0]
          model_version = max(model_version, int(curr_ver))
    # increment the model version
    model_version += 1
    model_version_full_name = f"{rpm_context['RPM_MODEL_VER_PREFIX']}{model_version}/"
    folder_name = os.path.join(folder_name, model_version_full_name)
    print(f"Going to create folder {folder_name} created.")
    blob = bucket.get_blob(folder_name)
    blob = bucket.blob(folder_name)
    blob.upload_from_string('')
    print(f"Folder name {folder_name} created.")
    rpm_context['rpm_bqml_model_export_path'] = os.path.join(rpm_bqml_model_export_path, model_version_full_name)
    rpm_context['rpm_model_version'] = model_version_full_name.rstrip('/')
  except Forbidden as e:
    print(f"Sorry, you don't have access to the bucket: {bucket_name}!")
    print(e)
    error = traceback.format_exc()
    print(error)
  except NotFound as e:
    print(f"Sorry, the bucket: {bucket_name} does not exist!")
    print(e)
    error = traceback.format_exc()
    print(error)

  return (
      json.dumps(rpm_context),
      rpm_context['rpm_bqml_model_export_path'],
      rpm_context['rpm_model_version']
      )

# *Test locally get_bqml_model_version*

In [None]:
# test locally get_bqml_model_version
# You could unit test the above code in your local environment.
# You don't need to execute the code, when simply building or running the KFP pipeline (experiment)
local_context = get_local_context()
import json
update_local_context(get_bqml_model_version(
    json.dumps(local_context),
    local_context['RPM_GCP_PROJECT'],
    local_context['RPM_MODEL_EXPORT_PATH'],
    local_context['RPM_DEFAULT_MODEL_EXPORT_PATH'],
    local_context['RPM_MODEL_VER_PREFIX'],
    local_context['rpm_gcs_rpm_ds_url']))


# Export the BigQuery ML model to the Google Cloud Storage bucket - function

In [None]:
# export_bqml_model_to_gcs
from typing import NamedTuple
def export_bqml_model_to_gcs(ctx: str,
                             RPM_GCP_PROJECT: str,
                             RPM_PVC_NAME: str,
                             RPM_PVC_DATASET_FOLDER_NAME: str,
                             rpm_bqml_model_export_path: str,
                             rpm_gcs_rpm_ds_url: str,
                             rpm_bq_ds_name: str,
                             rpm_bqml_model: str, ) -> NamedTuple('Outputs', [
        ('rpm_context', str),
        ('rpm_model_uri', str),
    ]):
  """ The function(also used as a base for a KFP Component) exports
      the BigQuery ML model.

    It also saves the the details used in the model
      e.g. losses, learning rate adjustment, #of iterations.
    It also saves the evaluation details e.g. roc, accuracy, etc.
    Args:
      ctx(:obj:`str`): The dict object with all the variables
        used in the pipeline
      RPM_GCP_PROJECT:(:obj:`str`): Google Cloud project for deployment
      RPM_PVC_NAME:(:obj:`str`): Ther persitent volume name
      RPM_MODEL_EXPORT_PATH(:obj:`str`): The path to store the temporary files
        before we upload to Google Cloud Storage
      RPM_PVC_DATASET_FOLDER_NAME(:obj:`str`): The folder name to store
        the temporary files before we upload to Google Cloud Storage
      rpm_bqml_model_export_path(:obj:`str`): The path
        to which we can export the model
      rpm_gcs_rpm_ds_url(:obj:`str`): Full Google Cloud Storage path
        with bucket name and folder name
      rpm_bq_ds_name(:obj:`str`): The dataset name
        used in the rest of the pipeline
      rpm_bqml_model(:obj:`str`): The model name
        used in the rest of the pipeline
    Returns:
      Outputs(:obj: `tuple`): Returns the below outputs:
        rpm_context(:obj:`str`): All variables used in the pipeline
        rpm_model_uri(:obj:`str`): The path to where we export the model
  """
  import json
  rpm_context = json.loads(ctx)
  print(rpm_context)

  import subprocess
  import traceback
  def exec_cmd(cmd):
    try:
      print(cmd)
      process = subprocess.run(cmd, shell=True, check=True,
                               stdout=subprocess.PIPE, universal_newlines=True)
      print(f"'output from {cmd}': {process.stdout}")
      return process.stdout
    except subprocess.CalledProcessError as e:
      error = traceback.format_exc()
      print(error)
      print(e.output)
      return e.output
  import os
  if not rpm_context['rpm_bqml_model_export_path']:
    raise RuntimeError("Can't export the BigQuery model: export destination is empty!")

  import os
  path_to_ds = os.path.join(rpm_context['RPM_PVC_NAME'],
  rpm_context['RPM_PVC_DATASET_FOLDER_NAME'])
  # check if that the dataset directory already exists
  exec_cmd(f"test -d {path_to_ds} && echo 'Exists' || echo 'Does not exist'")
  # create the datset directory
  exec_cmd(f"mkdir -p {path_to_ds}")
  # validate that the dataset directory has been created
  exec_cmd(f"test -d {path_to_ds} && echo 'Exists' || echo 'Does not exist'")


  rpm_bqml_model_export_path = os.path.join(rpm_context['rpm_gcs_rpm_ds_url'],
  rpm_context['rpm_bqml_model_export_path'])
  rpm_bqml_model_export_path=rpm_bqml_model_export_path.rstrip('/')

  exec_cmd(f"gcloud config set project {rpm_context['RPM_GCP_PROJECT']}")
  cmd = f"bq extract -m {rpm_context['rpm_bq_ds_name']}.{rpm_context['rpm_bqml_model']} {rpm_bqml_model_export_path}"
  print(cmd)
  exec_cmd(cmd)

  rpm_context['rpm_model_uri'] = \
      os.path.join(rpm_context['rpm_gcs_rpm_ds_url'],
                   rpm_context['rpm_bqml_model_export_path'])

  bqml_eval_query = f"""
      SELECT
      *
      FROM
      ML.EVALUATE(MODEL `{rpm_bq_ds_name}.{rpm_bqml_model}`)
  """
  rpm_eval_output = exec_cmd(f"bq query --use_legacy_sql=false --format=json '{bqml_eval_query}'")
  print(rpm_eval_output)

  bqml_train_detail_query = f"""
      SELECT
      *
      FROM
      ML.TRAINING_INFO(MODEL `{rpm_bq_ds_name}.{rpm_bqml_model}`)
  """
  bqml_train_detail_query_output = exec_cmd(f"bq query --use_legacy_sql=false --format=json '{bqml_train_detail_query}'")
  print(bqml_train_detail_query_output)

  path_to_ds = f"/{rpm_context['RPM_PVC_NAME']}/{rpm_context['RPM_PVC_DATASET_FOLDER_NAME']}/"
  import os
  # export the eval model output in a file called evalu_details.txt

  rpm_eval_output_filename = os.path.join(path_to_ds, 'eval_detail.txt')
  with open(rpm_eval_output_filename, 'w') as outfile:
    outfile.write(rpm_eval_output)
  exec_cmd(f"cat {rpm_eval_output_filename}")
  exec_cmd(f"gsutil -m cp {rpm_eval_output_filename} {rpm_context['rpm_model_uri']}")

  # export the training details in a file called train_details.txt
  bqml_train_detail_query_filename = os.path.join(path_to_ds, 'train_detail.txt')
  with open(bqml_train_detail_query_filename, 'w') as outfile:
    outfile.write(bqml_train_detail_query_output)
  exec_cmd(f"cat {bqml_train_detail_query_filename}")
  exec_cmd(f"gsutil -m cp {bqml_train_detail_query_filename} {rpm_context['rpm_model_uri']}")

  return (json.dumps(rpm_context),
         rpm_context['rpm_model_uri'],
         )

# *Test locally export_bqml_model_to_gcs*

In [None]:
# test locally export_bqml_model_to_gcs
# You could unit test the above code in your local environment.
# You don't need to execute the code, when simply building or running the KFP pipeline (experiment)
local_context = get_local_context()
import json
update_local_context(export_bqml_model_to_gcs(
    json.dumps(local_context),
    local_context['RPM_GCP_PROJECT'],
    local_context['RPM_PVC_NAME'],
    local_context['RPM_PVC_DATASET_FOLDER_NAME'],
    local_context['rpm_bqml_model_export_path'],
    local_context['rpm_gcs_rpm_ds_url'],
    local_context['rpm_bq_ds_name'],
    local_context['rpm_bqml_model'],
    ))

# Deploy the ML model - function

In [None]:
from typing import NamedTuple
def deploy_ml_model_online_pred(ctx: str,
                                RPM_GCP_PROJECT: str,
                                RPM_LOCATION: str,
                                RPM_RUNTIME_VERSION: str,
                                RPM_PYTHON_VERSION: str,
                                rpm_model_uri: str,
                                rpm_bqml_model: str,
                                rpm_model_version: str,
                                rpm_gcs_rpm_ds_url: str, ) -> NamedTuple('Outputs', [
        ('rpm_context', str),
        ('rpm_url_to_monitor', str),
        ('rpm_model_region', str),
    ]):
  """ The function(also used as a base for a KFP Component) deploys
    the model that is exported above to Cloud AI Platform Prediction
    Args:
      ctx(:obj:`str`): The dict object with all the variables
        used in the pipeline
      RPM_GCP_PROJECT:(:obj:`str`): Google Cloud project for deployment
      RPM_LOCATION:(:obj:`str`): Google Cloud region
        where we are going to deploy the model
      RPM_RUNTIME_VERSION(:obj:`str`): The runtime version
        of the caip predicted
      RPM_PYTHON_VERSION(:obj:`str`): The python version
        of the caip predicted
      rpm_model_uri(:obj:`str`): The path to where we export the model
      rpm_bqml_model(:obj:`str`): The model name
        used in the rest of the pipeline
      rpm_model_version(:obj:`str`): The version which we will use when
        we deploy the model to caip prediction
      rpm_gcs_rpm_ds_url(:obj:`str`): Full Google Cloud Storage path
        with bucket name and folder name
    Returns:
      Outputs(:obj: `tuple`): Returns the below outputs:
        rpm_context(:obj:`str`): All variables used in the pipeline
        rpm_url_to_monitor(:obj:`str`): The url in the Google Cloud Console
          which you can use to monitor
        rpm_model_region(:obj:`str`): The Google Cloud region
          where you are going to deploy the model
  """
  import json
  rpm_context = json.loads(ctx)
  print(rpm_context)
  rpm_context['rpm_url_to_monitor'] = f"https://console.cloud.google.com/ai-platform/models/{rpm_bqml_model}/versions"

  model_region = RPM_LOCATION[:-2]
  rpm_context['rpm_model_region'] = model_region

  import subprocess
  import traceback
  def exec_cmd(cmd):
    try:
      print(cmd)
      process = subprocess.run(cmd, shell=True, check=True,
                               stdout=subprocess.PIPE, universal_newlines=True)
      print(f"'output from {cmd}': {process.stdout}")
      return(process.stdout, 0)
    except subprocess.CalledProcessError as e:
      error = traceback.format_exc()
      print(error)
      print(e.output)
      return(e.output, 1)

  exec_cmd(f"gcloud config set project {rpm_context['RPM_GCP_PROJECT']}")

  (output, returncode) = \
      exec_cmd(f"gcloud ai-platform models list --format='value(name)' | grep {rpm_bqml_model}")
  print(f'output:{output}, returncode:{returncode}')
  if returncode == 0: #grep returns 1 if nothing is found
    print(f"{rpm_bqml_model} already exists")
  else:
    print(f"{rpm_bqml_model} doesn't exists. Creating...")
    (output_create, returncode_create) = \
        exec_cmd(f'gcloud ai-platform models create --regions={model_region} {rpm_bqml_model}')
    print(f"output:{output_create}, returncode:{returncode_create}")
    if returncode_create != 0:
      raise RuntimeError(f"Can't create the ML Model {rpm_bqml_model}")
    print(f"{rpm_bqml_model} created.")

  (output, returncode) = \
      exec_cmd(f"gcloud ai-platform versions list --model {rpm_bqml_model} --format='value(name)' | grep {rpm_model_version}")
  if returncode == 0: #grep returns 1 if nothing is found
    print(f"{rpm_bqml_model} with version {rpm_model_version} already exists")
  else:
    print(f"{rpm_bqml_model} with version {rpm_model_version} doesn't exists. Creating...")
    cmd = f"""
    gcloud ai-platform versions create --model={rpm_bqml_model} \
        {rpm_model_version} \
     --framework=tensorflow --python-version={RPM_PYTHON_VERSION} \
     --runtime-version={RPM_RUNTIME_VERSION} \
     --origin={rpm_model_uri} \
     --staging-bucket=gs://{rpm_gcs_rpm_ds_url.split('/')[2]}
    """
    (output_create, returncode_create) = exec_cmd(cmd)
    print(f"output:{output_create}, returncode:{returncode_create}")
    if returncode_create != 0:
      raise RuntimeError(f"Can't create the ML Model {rpm_bqml_model} with version {rpm_model_version}!!!")
    print(f"{rpm_bqml_model} with version {rpm_model_version} created.")

  print(f"Monitor models at {rpm_context['rpm_url_to_monitor']}")

  return (
      json.dumps(rpm_context),
      rpm_context['rpm_url_to_monitor'],
      rpm_context['rpm_model_region']
      )

# *Test locally deploy_ml_model_online_pred*

In [None]:
# test locally deploy_ml_model_online_pred
# You could unit test the above code in your local environment.
# You don't need to execute the code, when simply building or running the KFP pipeline (experiment)
local_context = get_local_context()
import json
update_local_context(deploy_ml_model_online_pred(
    json.dumps(local_context),
    local_context['RPM_GCP_PROJECT'],
    local_context['RPM_LOCATION'],
    local_context['RPM_RUNTIME_VERSION'],
    local_context['RPM_PYTHON_VERSION'],
    local_context['rpm_model_uri'], 
    local_context['rpm_bqml_model'],
    local_context['rpm_model_version'],
    local_context['rpm_gcs_rpm_ds_url'],
    ))

# Make online prediction - function

In [None]:
# predict_online_ml_model
from typing import NamedTuple
def predict_online_ml_model(ctx: str,
                            RPM_GCP_PROJECT: str,
                            RPM_PVC_NAME: str,
                            RPM_PVC_DATASET_FOLDER_NAME: str,
                            rpm_bqml_model: str,
                            rpm_model_version: str, ) -> NamedTuple('Outputs', [
        ('rpm_context', str),
        ('rpm_predict_online_output', str),
    ]):
  """ The function(also used as a base for a KFP Component)
    does the online prediction.

    This is to confirm that the endpoint is available and ready to serve.
    Args:
      ctx(:obj:`str`): The dict object with all the variables
        used in the pipeline
      RPM_GCP_PROJECT:(:obj:`str`): Google Cloud project for deployment
      RPM_PVC_NAME:(:obj:`str`): Ther persitent volume name
      RPM_PVC_DATASET_FOLDER_NAME(:obj:`str`): The folder name to store
        the temporary files before we upload to Google Cloud Storage
      rpm_bqml_model(:obj:`str`): The model name
        used in the rest of the pipeline
      rpm_model_version(:obj:`str`): The version which we will use
        when we deploy the model to Cloud AI Platform Prediction
    Returns:
      Outputs(:obj: `tuple`): Returns the below outputs:
        rpm_context(:obj:`str`): All variables used in the pipeline
        rpm_url_to_monitor(:obj:`str`): The url in the Google Cloud Console
          which you can use to monitor
        rpm_predict_online_output(:obj:`str`): The output
          from the online prediction
  """
  import json
  rpm_context = json.loads(ctx)
  print(rpm_context)

  import subprocess
  import traceback
  def exec_cmd(cmd):
    try:
      print(cmd)
      process = subprocess.run(cmd, shell=True, check=True,
                               stdout=subprocess.PIPE, universal_newlines=True)
      print(f"'output from {cmd}': {process.stdout}")
      return(process.stdout, 0)
    except subprocess.CalledProcessError as e:
      error = traceback.format_exc()
      print(error)
      print(e.output)
      return(e.output, 1)

  exec_cmd(f"gcloud config set project {rpm_context['RPM_GCP_PROJECT']}")

  import os
  path_to_ds = os.path.join(rpm_context['RPM_PVC_NAME'],
  rpm_context['RPM_PVC_DATASET_FOLDER_NAME'])
  # check if that the dataset directory already exists
  exec_cmd(f"test -d {path_to_ds} && echo 'Exists' || echo 'Does not exist'")
  # create the datset directory
  exec_cmd(f"mkdir -p {path_to_ds}")
  # validate that the dataset directory has been created
  exec_cmd(f"test -d {path_to_ds} && echo 'Exists' || echo 'Does not exist'")

  input_data = """{"bounces": 0, "time_on_site": 7363}"""
  filename = os.path.join(path_to_ds, 'input.json')
  with open(filename, 'w') as outfile:
    outfile.write(input_data)

  cmd = f"""
  gcloud ai-platform predict --model {rpm_bqml_model} \
     --version {rpm_model_version} --json-instances {filename}
  """
  (output, returncode) = exec_cmd(cmd)
  print(f"Predicted results for {input_data} is {output} ")
  rpm_context['rpm_predict_online_output'] = output

  return (
      json.dumps(rpm_context),
      rpm_context['rpm_predict_online_output'],
      )

# *Test locally predict_online_ml_model*

In [None]:
# test locally predict_online_ml_model
# You could unit test the above code in your local environment.
# You don't need to execute the code, when simply building or running the KFP pipeline (experiment)
local_context = get_local_context()
import json
update_local_context(predict_online_ml_model(
    json.dumps(local_context),
    local_context['RPM_GCP_PROJECT'],
    local_context['RPM_PVC_NAME'],
    local_context['RPM_PVC_DATASET_FOLDER_NAME'],
    local_context['rpm_bqml_model'],
    local_context['rpm_model_version'],
    ))

# Define the KubeFlow Pipeline (KFP)

In [None]:
# define the pipeline
import kfp.components as comp
def create_kfp_comp(rpm_comp):
  """ Converts a Python function to a component
    and returns a task(ContainerOp) factory

    Returns:
      Outputs (:obj: `ContainerOp`): returns the operation
  """
  return comp.func_to_container_op(
      func=rpm_comp,
      base_image="google/cloud-sdk:latest"
      )

# reload the properties; undo any properties set to test component locally
all_vars = load_params()

from kfp.dsl import pipeline, VolumeOp
import kfp.dsl as dsl
import json
# define the pipeline metadata
@pipeline(
  name='Propensity to purchase using BigQuery ML',
  description='Propensity model if a customer is likely to purchase'
)

# define the pipeline
def bq_googlestr_dataset_to_bq_to_caip_pipeline(
  data_path = all_vars['RPM_PVC_NAME'] #you can pass input variables
):
  """ The function defines the pipeline.

    Args:
        data_path:(:obj:`str`): the volume to store the temporary files
  """
  rpm_context = json.dumps(all_vars)
  gcs_bucket_folder_op = create_kfp_comp(create_gcs_bucket_folder)(
      rpm_context,
      all_vars['RPM_GCP_STORAGE_BUCKET'],
      all_vars['RPM_GCP_PROJECT'],
      all_vars['RPM_DEFAULT_BUCKET_EXT'],
      all_vars['RPM_GCP_STORAGE_BUCKET_FOLDER'],
      all_vars['RPM_DEFAULT_BUCKET_FOLDER_NAME']
      )

  create_bq_ds_op = create_kfp_comp(create_bq_ds)(
      gcs_bucket_folder_op.outputs['rpm_context'],
      all_vars['RPM_GCP_PROJECT'],
      all_vars['RPM_BQ_DATASET_NAME'],
      all_vars['RPM_LOCATION']
      )


  load_bq_ds_op = create_kfp_comp(load_bq_ds)(
      create_bq_ds_op.outputs['rpm_context'],
      all_vars['RPM_GCP_PROJECT'],
      all_vars['RPM_BQ_TABLE_NAME'],
      all_vars['RPM_DEFAULT_BQ_TABLE_NAME_EXT'],
      create_bq_ds_op.outputs['rpm_bq_ds_name'],
      )

  create_bq_ml_op = create_kfp_comp(create_bq_ml)(
      load_bq_ds_op.outputs['rpm_context'],
      all_vars['RPM_GCP_PROJECT'],
      all_vars['RPM_MODEL_NAME'],
      all_vars['RPM_DEFAULT_MODEL_NAME'],
      create_bq_ds_op.outputs['rpm_bq_ds_name'],
      load_bq_ds_op.outputs['rpm_table_id']
      )

  evaluate_ml_model_op = create_kfp_comp(evaluate_ml_model)(
      create_bq_ml_op.outputs['rpm_context'],
      all_vars['RPM_GCP_PROJECT'],
      create_bq_ds_op.outputs['rpm_bq_ds_name'],
      create_bq_ml_op.outputs['rpm_bqml_model'],
      )

  create_batch_prediction_dataset_op = create_kfp_comp(create_batch_prediction_dataset)(
      evaluate_ml_model_op.outputs['rpm_context'],
      all_vars['RPM_GCP_PROJECT'],
      create_bq_ds_op.outputs['rpm_bq_ds_name'],
      load_bq_ds_op.outputs['rpm_table_id'],
      )

  predict_batch_ml_model_op = create_kfp_comp(predict_batch_ml_model)(
    evaluate_ml_model_op.outputs['rpm_context'],
    all_vars['RPM_GCP_PROJECT'],
    create_bq_ds_op.outputs['rpm_bq_ds_name'],
    create_bq_ml_op.outputs['rpm_bqml_model'],
    create_batch_prediction_dataset_op.outputs['rpm_pred_table_id'],
    )

  get_versioned_bqml_model_export_path_op = create_kfp_comp(get_bqml_model_version)(
      create_bq_ml_op.outputs['rpm_context'],
      all_vars['RPM_GCP_PROJECT'],
      all_vars['RPM_MODEL_EXPORT_PATH'],
      all_vars['RPM_DEFAULT_MODEL_EXPORT_PATH'],
      all_vars['RPM_MODEL_VER_PREFIX'],
      gcs_bucket_folder_op.outputs['rpm_gcs_rpm_ds_url']
      )

  # create a volume where the dataset will be temporarily stored.
  pvc_op = VolumeOp(
      name=all_vars['RPM_PVC_NAME'],
      resource_name=all_vars['RPM_PVC_NAME'],
      size="20Gi",
      modes=dsl.VOLUME_MODE_RWO
      )

  export_bqml_model_to_gcs_op = create_kfp_comp(export_bqml_model_to_gcs)(get_versioned_bqml_model_export_path_op.outputs['rpm_context'],
      all_vars['RPM_GCP_PROJECT'],
      all_vars['RPM_PVC_NAME'],
      all_vars['RPM_PVC_DATASET_FOLDER_NAME'],
      get_versioned_bqml_model_export_path_op.outputs['rpm_bqml_model_export_path'],
      gcs_bucket_folder_op.outputs['rpm_gcs_rpm_ds_url'],
      create_bq_ds_op.outputs['rpm_bq_ds_name'],
      create_bq_ml_op.outputs['rpm_bqml_model'],
  )
  export_bqml_model_to_gcs_op.add_pvolumes({data_path: pvc_op.volume})

  model_deploy_op = create_kfp_comp(deploy_ml_model_online_pred)(
      export_bqml_model_to_gcs_op.outputs['rpm_context'],
      all_vars['RPM_GCP_PROJECT'],
      all_vars['RPM_LOCATION'],
      all_vars['RPM_RUNTIME_VERSION'],
      all_vars['RPM_PYTHON_VERSION'],
      export_bqml_model_to_gcs_op.outputs['rpm_model_uri'],
      create_bq_ml_op.outputs['rpm_bqml_model'],
      get_versioned_bqml_model_export_path_op.outputs['rpm_model_version'],
      gcs_bucket_folder_op.outputs['rpm_gcs_rpm_ds_url'],
  )

  predict_online_ml_model_op = create_kfp_comp(predict_online_ml_model)(
      model_deploy_op.outputs['rpm_context'],
      all_vars['RPM_GCP_PROJECT'],
      all_vars['RPM_PVC_NAME'],
      all_vars['RPM_PVC_DATASET_FOLDER_NAME'],
      create_bq_ml_op.outputs['rpm_bqml_model'],
      get_versioned_bqml_model_export_path_op.outputs['rpm_model_version'],
  )
  predict_online_ml_model_op.add_pvolumes({data_path: pvc_op.volume})

  # don't cache the following comps
  # the below is for model versioning only
  get_versioned_bqml_model_export_path_op.execution_options.caching_strategy.max_cache_staleness = "P0D"
  export_bqml_model_to_gcs_op.execution_options.caching_strategy.max_cache_staleness = "P0D"
  model_deploy_op.execution_options.caching_strategy.max_cache_staleness = "P0D"
  predict_online_ml_model_op.execution_options.caching_strategy.max_cache_staleness = "P0D"

  # don't cache any comps
  # you don't want to cache any comps when you are repetatively integration testing
  gcs_bucket_folder_op.execution_options.caching_strategy.max_cache_staleness = "P0D"
  create_bq_ds_op.execution_options.caching_strategy.max_cache_staleness = "P0D"
  load_bq_ds_op.execution_options.caching_strategy.max_cache_staleness = "P0D"
  create_bq_ml_op.execution_options.caching_strategy.max_cache_staleness = "P0D"
  evaluate_ml_model_op.execution_options.caching_strategy.max_cache_staleness = "P0D"
  create_batch_prediction_dataset_op.execution_options.caching_strategy.max_cache_staleness = "P0D"
  predict_batch_ml_model_op.execution_options.caching_strategy.max_cache_staleness = "P0D"
  get_versioned_bqml_model_export_path_op.execution_options.caching_strategy.max_cache_staleness = "P0D"
  export_bqml_model_to_gcs_op.execution_options.caching_strategy.max_cache_staleness = "P0D"
  model_deploy_op.execution_options.caching_strategy.max_cache_staleness = "P0D"
  predict_online_ml_model_op.execution_options.caching_strategy.max_cache_staleness = "P0D"

# Compile, watch out for errors in the pipeline composition

In [None]:
#compile the pipeline
def complie_pipeline(pipeline_func):
  """ Compile the pipeline, watch out for errors in the pipeline composition.

    Args:
      pipeline_func (:obj:`bq_googlestr_dataset_to_bq_to_caip_pipeline`):
        The pipeline definition
    Returns:
      pipeline_filename (:obj:`str`): the compressed file compipled file
        to upload to CloudAI Platform Prediction
      pipeline_func (:obj:`str`): bq_googlestr_dataset_to_bq_to_caip_pipeline,
        name of the the pipeline
      arguments (:obj:`str`): the arguments to pass to the pipeline
        when you launch it
  """
  pipeline_func = pipeline_func
  pipeline_filename = pipeline_func.__name__ + '.zip'

  import kfp.compiler as compiler
  compiler.Compiler().compile(pipeline_func, pipeline_filename)

  arguments = {}
  return (pipeline_filename, pipeline_func, arguments)
pipeline_filename, pipeline_func, arguments = complie_pipeline(bq_googlestr_dataset_to_bq_to_caip_pipeline)

# Create an experiment and run the pipeline immediately
Please use the links in the output to go directly to the experiment/run launched in the browser

In [None]:
# create and run an experiment
def create_experiment_and_run():
  """ Create an experiment and run the pipeline immediately.
    Please use the links in the output to go directly to the experiment/run launched in the browser
  """
  client = kfp.Client(RPM_GCP_KFP_HOST)
  experiment = client.create_experiment(RPM_DS_DOWNLOAD_EXPERIMENT_NAME)
  #Submit a pipeline run
  run_name = pipeline_func.__name__ + ' run'
  run_result = client.run_pipeline(
      experiment_id=experiment.id,
      job_name=run_name,
      pipeline_package_path=pipeline_filename,
      params=arguments)
create_experiment_and_run()

If there is error in the pipeline, you will see that in the KubeFlow Pipelines UI in the Experiments section. If you encounter any errors, identify the issue, fix it in the Python function, unit test the function, update the pipeline defintion, compile, create an experiment, and run the experiment. Iterate through the process until you successfully run a pipeline.

You have run successfully run a KubeFlow Pipelines. The pipeline created a model using BigQuery ML. You could now make use of the model while exploring or analyzing the data. You use SQL to use the model for prediction and then use the result as you wish. You could visualize the result using your favorite visualization package such as matplot lib.

The section, below, demonstrates how you can use the BigQuery library to execute a sql in the BigQuey and collect the result in a pandas data frame. You can use any query you want including that of a query to predict using the model which the pipeline created.

# Data Exploraton

In [None]:
# Data Exploration !!! BE CAREFUL !!! adjust the query to sample the data.
# 1. Get pandas df from BigQuery
# 2. plot histogram using matplotlib
#########################
from google.cloud import bigquery as bq
import pandas as pd

rpm_context = get_local_context()
client = bq.Client(project=rpm_context["RPM_GCP_PROJECT"])

# adjust the below query to grab only a sample dataset e.g. use a where clause.
df = client.query('''
  SELECT *
  FROM `%s.%s`
  LIMIT 10
''' % (rpm_context["rpm_bq_ds_name"], rpm_context["rpm_table_id"].split('.')[2])).to_dataframe()

In [None]:
df.head()

In [None]:
df.tail()

In [None]:
df.info()

In [None]:
df.shape

In [None]:
import matplotlib.pyplot as plt
%matplotlib inline
plt.close('all') 
df.hist(bins=50, figsize=(20,15))
plt.show()

In [None]:
# takes a bit of time...BE CAREFUL!!!
# works on local Jupyter instance.
import pandas_profiling as pp
pp.ProfileReport(df)

The utilities method in the section below provides convinient way to delete the Google Cloud resources. You can use the methods  while developing your pipeline components.


# Clean up - !!! BE CAREFUL!!!

## Delete the PODs and the PVCs in the KFP (Kubernetes Cluster)

In [None]:
# delete_pod_pvc
def delete_pod_pvc(ctx: str) -> str:
  """ Removes the Pods and Persistence Volume (PVCs) created in the pipeline,
    This is not recommendated to use it in a production enviornment.
    Comes handy in the iterative development and testing phases of the SDLC.
    !!! BE CAREFUL !!!!
    Args:
      ctx(:obj:`str`): The dict object
      with all the variables in the local context
  """

  # loading rpm_context string
  import json
  rpm_context = json.loads(ctx)
  print(rpm_context)
  import subprocess
  def exec_cmd (cmd):
    try:
      print(cmd)
      process = subprocess.run(cmd, shell=True, check=True, stdout=subprocess.PIPE, universal_newlines=True)
      print(f"'output from {cmd}': {process.stdout}")
    except subprocess.CalledProcessError as e:
      error = traceback.format_exc()
      print(error)
      print(e)

  exec_cmd(f"gcloud config set project {rpm_context['RPM_GCP_PROJECT']}")
  exec_cmd(f"gcloud container clusters get-credentials {rpm_context['RPM_CLUSTER_NAME']} --zone {rpm_context['RPM_LOCATION']} --project {rpm_context['RPM_GCP_PROJECT']}")
  exec_cmd(''' for pod in `kubectl get pod | grep 'bq-public-google-ds-to-bq-' | awk -F ' ' '{print $1}'`; do echo kubectl delete pod $pod; kubectl delete pod $pod; done ''')
  exec_cmd(''' for pvc in `kubectl get pvc | grep 'bq-public-google-ds-to-bq-' | awk -F ' ' '{print $1}'`; do echo kubectl delete pvc $pvc; kubectl delete pvc $pvc; done ''')
  exec_cmd(''' for pod in `kubectl get pod | grep 'bq-public-google-ds-to-bq-' | awk -F ' ' '{print $1}'`; do echo kubectl patch pod $pod -p '{"metadata":{"finalizers":null}}'; kubectl patch pod $pod -p '{"metadata":{"finalizers":null}}'; done ''')
  exec_cmd(''' for pvc in `kubectl get pvc | grep 'bq-public-google-ds-to-bq-' | awk -F ' ' '{print $1}'`; do echo kubectl patch pvc $pvc -p '{"metadata":{"finalizers":null}}'; kubectl patch pvc $pvc -p '{"metadata":{"finalizers":null}}'; done ''')


In [None]:
test_comp_local(delete_pod_pvc)

## Delete Google Cloud Storage folder

In [None]:
# delete the storage folder
from google.cloud import storage
from google.cloud.exceptions import NotFound
from google.cloud.exceptions import Forbidden
import traceback
def delete_storage_folder(bucket_name, folder_name):
  """Deletes a folder in the Google Cloust Storage,
    This is not recommendated to use it in a production enviornment.
    Comes handy in the iterative development and testing phases of the SDLC.
    !!! BE CAREFUL !!!!
    Args:
      bucket_name(:obj:`str`): The Cloud Storage bucket name,
        where the folder exists
      folder_name(:obj:`str`): The folder that we want to delete
    Returns:
      (:obj:`boolean`): True if we are able to scucessfully delete the folder
  """
  if len(bucket_name) == 0 or len(folder_name) == 0:
    print(f"Folder {folder_name} couldn't be deleted. Name is empty.")
    return False
  else:
    client = storage.Client()
    try:
      bucket = client.get_bucket(bucket_name)
      blob = bucket.get_blob(folder_name)
      if blob is None:
        print(f"Folder name {folder_name} does not exist!")
        return False
      else:
        bucket.delete_blobs(blobs=bucket.list_blobs(prefix=folder_name))
        print(f"Folder {folder_name} deleted")
      return True
    except Exception as e:
      print(f"Folder {folder_name} couldn't be deleted")
      print(e)
      error = traceback.format_exc()
      print(error)
      return False

In [None]:
# delete storage folder if desired...!!!BE CAREFUL!!!!
local_context = get_local_context()
delete_storage_folder(local_context['rpm_gcs_rpm_ds_url'].split('/')[2],
                      local_context['rpm_gcs_rpm_ds_url'].split('/')[3]+'/')

## Delete Google Cloud Storage bucket

In [None]:
#delete the bucket
from google.cloud import storage
from google.cloud.exceptions import NotFound
from google.cloud.exceptions import Forbidden
import traceback
def delete_storage_bucket (bucket_name):
  """Deletes a folder in the Google Cloust Storage,
    This is not recommendated to use it in a production enviornment.
    Comes handy in the iterative development and testing phases of the SDLC.
    !!! BE CAREFUL !!!!
    Args:
      bucket_name(:obj:`str`): The Cloud Storage bucket name,
        that we want to delete
    Returns:
      (:obj:`boolean`): True if we are able to scucessfully delete the folder
  """
  if bucket_name:
    client = storage.Client()
    try:
      bucket = client.get_bucket(bucket_name)
      bucket.delete()
      print(f"Bucket {bucket.name} deleted")
      return True
    except Exception as e:
      print(f"Bucket {bucket_name} couldn't be deleted")
      print(e)
      error = traceback.format_exc()
      print(error)
      return False
  else:
    print(f"Bucket {bucket_name} couldn't be deleted. Name is empty.")
    return False

In [None]:
# delete storage bucket if desired...!!! BE CAREFUL !!!!
delete_storage_bucket(get_local_context()['rpm_gcs_rpm_ds_url'].split('/')[2])

## Delete the table in BigQuery

In [None]:
#delete BigQuery table if not needed...!!! BE CAREFUL !!!
def delete_table(table_id):
  """Deletes a BigQuery table
    This is not recommendated to use it in a production enviornment.
    Comes handy in the iterative development and testing phases of the SDLC.
    !!! BE CAREFUL !!!!
    Args:
      table_id(:obj:`str`): The BigQuery table name that we want to delete
  """
  from google.cloud import bigquery
  # Construct a BigQuery client object.
  client = bigquery.Client()
  # client.delete_table(table_id, not_found_ok=True)  # Make an API request.
  client.delete_table(table_id)  # Make an API request.
  print("Deleted table '{}'.".format(table_id))

In [None]:
#delete the table in the BigQuery
delete_table(get_local_context()['rpm_table_id'])

## Delete the dataset in BigQuery

In [None]:
def delete_dataset(dataset_id):
  """Deletes a BigQuery dataset
    This is not recommendated to use it in a production enviornment.
    Comes handy in the iterative development and testing phases of the SDLC.
    !!! BE CAREFUL !!!!
    Args:
      dataset_id(:obj:`str`): The BigQuery dataset name that we want to delete
  """
    # [START bigquery_delete_dataset]
    from google.cloud import bigquery
    # Construct a BigQuery client object.
    client = bigquery.Client()
    # dataset_id = 'your-project.your_dataset'
    # Use the delete_contents parameter to delete a dataset and its contents.
    # Use the not_found_ok parameter to not receive an error if the
    #     dataset has already been deleted.
    client.delete_dataset(
        dataset_id, delete_contents=True, not_found_ok=True
    )  # Make an API request.
    print("Deleted dataset '{}'.".format(dataset_id))

In [None]:
#delete the BigQuery dataset
rpm_context = get_local_context()
delete_dataset(f"{rpm_context['RPM_GCP_PROJECT']}.{rpm_context['rpm_bq_ds_name']}")

## Delete the Google Cloud Storage folders which contains exported model artifacts 

In [None]:
# delete the Cloud Storage folders where the models are saved
local_context = get_local_context()
bucket_name= local_context['rpm_gcs_rpm_ds_url'].split('/')[2]
rpm_bq_ds_name = local_context['rpm_gcs_rpm_ds_url'].split('/')[3]
rpm_bqml_model_export_path = local_context['RPM_MODEL_EXPORT_PATH']
if not rpm_bqml_model_export_path:
  rpm_bqml_model_export_path = local_context["RPM_DEFAULT_MODEL_EXPORT_PATH"]
folder_name = os.path.join(rpm_bq_ds_name, rpm_bqml_model_export_path)
assert delete_storage_folder(bucket_name, folder_name) == True

In [None]:
#exec a cmd in python; an utility func
import subprocess
import traceback
def exec_cmd(cmd):
  """Executes an OS command.
    Args:
      cmd(:obj:`str`): The OS command
    Returns:
      (:obj:`str`): The output of the execution of the OS command
      (:obj:`str`): The returned code of the excecution of the OS command
  """
  try:
    print(cmd)
    process = subprocess.run(cmd, shell=True, check=True, 
                             stdout=subprocess.PIPE, universal_newlines=True)
    print(f"'output from {cmd}': {process.stdout}")
    return (process.stdout, 0)
  except subprocess.CalledProcessError as e:
    error = traceback.format_exc()
    print(error)
    print(e.output)
    return (e.output, 1)

## Delete the Cloud AI Platform Prediction models

In [None]:
#delete the model
def delete_caip_model():
  """Deletes the models from the Cloud AI Platform Prediction
  """
  local_context = get_local_context()
  (output, returncode) = exec_cmd(f"gcloud ai-platform versions list --model {local_context['rpm_bqml_model']} --format='value(name)'")
  for each_ver in output.split('\n'):
    print(each_ver)
    cmd = f"gcloud ai-platform versions delete {each_ver} --model={local_context['rpm_bqml_model']}"
    exec_cmd(cmd)
  cmd = f'gcloud ai-platform models delete {local_context["rpm_bqml_model"]}'
  exec_cmd(cmd)
delete_caip_model()

## Delete the GCP Project
To avoid incurring charges to your Google Cloud Platform account for the resources used in this tutorial is to **Delete the project**.

The easiest way to eliminate billing is to delete the project you created for the tutorial.

**Caution**: Deleting a project has the following effects:
* *Everything in the project is deleted.* If you used an existing project for this tutorial, when you delete it, you also delete any other work you've done in the project.
* <b>Custom project IDs are lost. </b>When you created this project, you might have created a custom project ID that you want to use in the future. To preserve the URLs that use the project ID, such as an appspot.com</b> URL, delete selected resources inside the project instead of deleting the whole project. 

If you plan to explore multiple tutorials and quickstarts, reusing projects can help you avoid exceeding project quota limits.
<br>
<ol type="1">
    <li>In the Cloud Console, go to the <b>Manage resources</b> page.</li>
    Go to the <a href="https://console.cloud.google.com/iam-admin/projects">Manage resources page</a>
    <li>In the project list, select the project that you want to delete and then click <b>Delete</b> Trash icon.</li>
    <li>In the dialog, type the project ID and then click <b>Shut down</b> to delete the project. </li>
</ol>
