In [None]:
!pip3 install --no-cache-dir --upgrade "kfp>2" google-cloud-aiplatform==1.25.0 # You may need to install kfp or aiplatform

In [None]:
import kfp
import matplotlib.pyplot as plt
import pandas as pd
import requests

from kfp import dsl
from kfp import compiler
from kfp.dsl import (Artifact, Dataset, Input, InputPath, Model, Output,
                        OutputPath, ClassificationMetrics, Metrics, component)

from google.cloud import aiplatform
from google.cloud import storage
from google.cloud.aiplatform import pipeline_jobs
from typing import NamedTuple

from datetime import datetime

In [None]:
VIEW_NAME = 'ga_data' # BigQuery view you create for input data to model
DATA_SET_ID = 'propensity' # The Data Set ID where the view sits
PROJECT_ID = 'YOUR_GCP_PROJECT' # The Project ID
BUCKET_NAME = 'YOUR_GCP_BUCKET' # Bucket where the base_sql.txt file lives. You'll need to make the bucket.
BLOB_PATH = f'{BUCKET_NAME}/base_sql.txt' # The actual path where base_sql will be sent to

In [None]:
PATH=%env PATH
%env PATH={PATH}:/home/jupyter/.local/bin
REGION="us-central1"

PIPELINE_ROOT = f'gs://{BUCKET_NAME}' # This is where all pipeline artifacts are sent. You'll need to ensure the bucket is created ahead of time
PIPELINE_ROOT

In [None]:
# In order to build BQ Dataset
!gcloud config set project $PROJECT_ID
REGION = 'US'
!bq mk --location=$REGION --dataset $PROJECT_ID:$DATA_SET_ID

In [None]:
# Send base_sql.txt to GCS bucket

storage_client = storage.Client()
bucket = storage_client.get_bucket(BUCKET_NAME)
blob = bucket.blob(BLOB_PATH)
blob.upload_from_filename("base_sql.txt")
blob.public_url

In [None]:
@component(
    # this component builds a BQ view, which will be the underlying source for model
    packages_to_install=["google-cloud-bigquery", "google-cloud-storage"],
    base_image="python:3.9",
)

def create_input_view(view_name: str,
                      data_set_id: str,
                      project_id: str,
                      bucket_name: str,
                      blob_path: str

):
    from google.cloud import bigquery
    from google.cloud import storage
    client = bigquery.Client(project=project_id)
    dataset = client.dataset(data_set_id)
    table_ref = dataset.table(view_name)
    ga_data_ref = 'bigquery-public-data.google_analytics_sample.ga_sessions_*'
    conversion = "hits.page.pageTitle like '%Shopping Cart%'" # this is sql like syntax used to define the conversion in the GA360 raw export
    start_date = '20170101'
    end_date = '20170131'


    def get_sql(bucket_name, blob_path):
        from google.cloud import storage
        storage_client = storage.Client()
        bucket = storage_client.get_bucket(bucket_name)
        blob = bucket.get_blob(blob_path)
        content = blob.download_as_string()
        return content

    def if_tbl_exists(client, table_ref):
        from google.cloud.exceptions import NotFound
        try:
            client.get_table(table_ref)
            return True
        except NotFound:
            return False

    if if_tbl_exists(client, table_ref):
        print("view already exists")

    else:
        #load sql from base_sql.txt.  This can be modified if you want to modify your query
        content = get_sql(bucket_name, blob_path)
        content = str(content, 'utf-8')
        create_base_feature_set_query = content.format(start_date = start_date,
                                                       end_date = end_date,
                                                       ga_data_ref = ga_data_ref,
                                                       conversion = conversion)

        shared_dataset_ref = client.dataset(data_set_id)
        base_feature_set_view_ref = shared_dataset_ref.table(view_name)
        base_feature_set_view = bigquery.Table(base_feature_set_view_ref)
        base_feature_set_view.view_query = create_base_feature_set_query.format(project_id)
        base_feature_set_view = client.create_table(base_feature_set_view)  # API request


In [None]:
@component(
    # this component builds a logistic regression with BigQuery ML
    packages_to_install=["google-cloud-bigquery"],
    base_image="python:3.9",
)


def build_bqml_logistic(project_id: str,
                         data_set_id: str,
                         model_name: str,
                         training_view: str
):
    from google.cloud import bigquery
    client = bigquery.Client(project=project_id)

    model_name = f"{project_id}.{data_set_id}.{model_name}"
    training_set = f"{project_id}.{data_set_id}.{training_view}"
    build_model_query_bqml_logistic = '''
    CREATE OR REPLACE MODEL `{model_name}`
    OPTIONS(model_type='logistic_reg'
    , INPUT_LABEL_COLS = ['label']
    , L1_REG = 1
    , DATA_SPLIT_METHOD = 'RANDOM'
    , DATA_SPLIT_EVAL_FRACTION = 0.20
    ) AS
        SELECT * EXCEPT (fullVisitorId, label),
        CASE WHEN label is null then 0 ELSE label end as label
    FROM `{training_set}`
    '''.format(model_name = model_name, training_set = training_set)

    job_config = bigquery.QueryJobConfig()
    client.query(build_model_query_bqml_logistic, job_config=job_config)  # Make an API request.

In [None]:
@component(
    # this component builds an xgboost classifier with BigQuery ML
    packages_to_install=["google-cloud-bigquery"],
    base_image="python:3.9",
)


def build_bqml_xgboost(project_id: str,
                         data_set_id: str,
                         model_name: str,
                         training_view: str
):
    from google.cloud import bigquery
    client = bigquery.Client(project=project_id)

    model_name = f"{project_id}.{data_set_id}.{model_name}"
    training_set = f"{project_id}.{data_set_id}.{training_view}"
    build_model_query_bqml_xgboost = '''
    CREATE OR REPLACE MODEL `{model_name}`
    OPTIONS(model_type='BOOSTED_TREE_CLASSIFIER'
    , INPUT_LABEL_COLS = ['label']
    , L1_REG = 1
    , DATA_SPLIT_METHOD = 'RANDOM'
    , DATA_SPLIT_EVAL_FRACTION = 0.20
    ) AS
        SELECT * EXCEPT (fullVisitorId, label),
        CASE WHEN label is null then 0 ELSE label end as label
    FROM `{training_set}`
    '''.format(model_name = model_name, training_set = training_set)

    job_config = bigquery.QueryJobConfig()
    client.query(build_model_query_bqml_xgboost, job_config=job_config)  # Make an API request.

In [None]:
@component(
    # this component builds an AutoML classifier with BigQuery ML
    packages_to_install=["google-cloud-bigquery"],
    base_image="python:3.9",
)


def build_bqml_automl(project_id: str,
                         data_set_id: str,
                         model_name: str,
                         training_view: str
):
    from google.cloud import bigquery
    client = bigquery.Client(project=project_id)

    model_name = f"{project_id}.{data_set_id}.{model_name}"
    training_set = f"{project_id}.{data_set_id}.{training_view}"
    build_model_query_bqml_automl = '''
    CREATE OR REPLACE MODEL `{model_name}`
    OPTIONS(model_type='BOOSTED_TREE_CLASSIFIER'
    , INPUT_LABEL_COLS = ['label']
    ) AS
        SELECT * EXCEPT (fullVisitorId, label),
        CASE WHEN label is null then 0 ELSE label end as label
    FROM `{training_set}`
    '''.format(model_name = model_name, training_set = training_set)

    job_config = bigquery.QueryJobConfig()
    client.query(build_model_query_bqml_automl, job_config=job_config)  # Make an API request.

In [None]:
@component(
    # this component builds an xgboost classifier with xgboost
    packages_to_install=["google-cloud-bigquery", "xgboost==1.6.2", "pandas==1.3.5", "scikit-learn==1.0.2", "joblib==1.1.0","pyarrow", "db-dtypes"],
    base_image="python:3.9",
)

def build_xgb_xgboost(project_id: str,
                            data_set_id: str,
                            training_view: str,
                            metrics: Output[Metrics],
                            model: Output[Model]

):
    from google.cloud import bigquery
    import xgboost as xgb
    from xgboost import XGBClassifier
    from sklearn.model_selection import train_test_split, StratifiedKFold, RandomizedSearchCV, GridSearchCV
    from sklearn.metrics import accuracy_score, roc_auc_score, precision_recall_curve
    from joblib import dump
    import pandas as pd
    import pyarrow
    import os

    client = bigquery.Client(project=project_id)

    data_set = f"{project_id}.{data_set_id}.{training_view}"
    build_df_for_xgboost = '''
    SELECT * FROM `{data_set}`
    '''.format(data_set = data_set)

    job_config = bigquery.QueryJobConfig()
    df = client.query(build_df_for_xgboost, job_config=job_config).to_dataframe()  # Make an API request.
    df = pd.get_dummies(df.drop(['fullVisitorId'], axis=1), prefix=['visited_dma', 'visited_daypart', 'visited_dow'])


    X = df.drop(['label'], axis=1).values
    y = df['label'].values

    X_train, X_test, y_train, y_test  = train_test_split(X,y)
    xgb_model = XGBClassifier(n_estimators=50, objective='binary:hinge',
                              silent=True, nthread=1,
                              eval_metric="auc")

    xgb_model.fit(X_train, y_train)



    os.makedirs(model.path, exist_ok=True)
    dump(xgb_model, os.path.join(model.path, "model.joblib"))




In [None]:
@component(
    # this component evaluations Logistic Regression
    packages_to_install=["google-cloud-bigquery", "pandas", "pyarrow", "matplotlib", "db-dtypes"],
    base_image="python:3.9",
)


def evaluate_bqml_logistic(project_id: str,
                            data_set_id: str,
                            model_name: str,
                            training_view: str,
                            logistic_data_path: OutputPath("Dataset")
):
    from google.cloud import bigquery
    from google.cloud.exceptions import NotFound
    import pandas as pd
    import pyarrow
    import matplotlib as plt
    import time

    client = bigquery.Client(project=project_id)

    # wait to ensure the model exists.  check 5 times with a minute wait between.
    model_name = project_id+'.'+data_set_id+'.'+model_name

    for i in range(0,5):
        try:
            client.get_model(model_name) # Make an API request.
            # print(f"Model {model_name} already exists.")
            break # if here, the model exists so we exit the loop
        except:
            # print(f"Model {model_name} is not found. Attempt #: {i}")
            time.sleep(60)

    training_set = project_id+'.'+data_set_id+'.'+training_view
    evaluate_model_query_bqml_logistic = '''
    SELECT
      round(threshold, 2) as threshold,
      * except(threshold),
      true_positives / (true_positives + false_positives) AS precision
    FROM
      ML.ROC_CURVE(MODEL `{model_name}`,
                   TABLE `{table_name}`,
                   GENERATE_ARRAY(0,1, 0.01))

    ORDER BY threshold
    '''.format(model_name = model_name, table_name = training_set)

    job_config = bigquery.QueryJobConfig()
    query_job = client.query(evaluate_model_query_bqml_logistic, job_config=job_config)  # Make an API request.
    df_evaluation_logistic = query_job.result()
    df_evaluation_logistic = df_evaluation_logistic.to_dataframe()
    df_evaluation_logistic.to_csv(logistic_data_path)
    graph = df_evaluation_logistic.plot(x='threshold', y=['precision', 'recall']).get_figure()
    graph.savefig(logistic_data_path)


In [None]:
@component(
    # this component evaluates BigQuery ML XGBoost
    packages_to_install=["google-cloud-bigquery", "pandas", "pyarrow", "matplotlib", "db-dtypes"],
    base_image="python:3.9",
)


def evaluate_bqml_xgboost(project_id: str,
                            data_set_id: str,
                            model_name: str,
                            training_view: str,
                            xgboost_data_path: OutputPath("Dataset")
):
    from google.cloud import bigquery
    from google.cloud.exceptions import NotFound
    import pandas as pd
    import pyarrow
    import matplotlib as plt
    import time


    client = bigquery.Client(project=project_id)

    # wait to ensure the model exists.  check 5 times with a minute wait between.
    model_name = project_id+'.'+data_set_id+'.'+model_name

    for i in range(0,5):
        try:
            client.get_model(model_name) # Make an API request.
            # print(f"Model {model_name} already exists.")
            break # if here, the model exists so we exit the loop
        except:
            # print(f"Model {model_name} is not found. Attempt #: {i}")
            time.sleep(60)

    training_set = f"{project_id}.{data_set_id}.{training_view}"
    evaluate_model_query_bqml_xgboost = '''
    SELECT
      round(threshold, 2) as threshold,
      * except(threshold),
      true_positives / (true_positives + false_positives) AS precision
    FROM
      ML.ROC_CURVE(MODEL `{model_name}`,
                   TABLE `{table_name}`,
                   GENERATE_ARRAY(0,1, 0.01))

    ORDER BY threshold
    '''.format(model_name = model_name, table_name = training_set)


    job_config = bigquery.QueryJobConfig()
    query_job = client.query(evaluate_model_query_bqml_xgboost, job_config=job_config)  # Make an API request.
    df_evaluation_xgboost = query_job.result()
    df_evaluation_xgboost = df_evaluation_xgboost.to_dataframe()
    df_evaluation_xgboost.to_csv(xgboost_data_path)
    graph = df_evaluation_xgboost.plot(x='threshold', y=['precision', 'recall']).get_figure()
    graph.savefig(xgboost_data_path)

In [None]:
@component(
    # this component evaluates BigQuery ML autoML
    packages_to_install=["google-cloud-bigquery", "pandas", "pyarrow", "matplotlib", "db-dtypes"],
    base_image="python:3.9",
)


def evaluate_bqml_automl(project_id: str,
                            data_set_id: str,
                            model_name: str,
                            training_view: str,
                            automl_data_path: OutputPath("Dataset")
):
    from google.cloud import bigquery
    from google.cloud.exceptions import NotFound
    import pandas as pd
    import pyarrow
    import matplotlib as plt
    import time


    client = bigquery.Client(project=project_id)

    # wait to ensure the model exists.  check 5 times with a minute wait between.
    model_name = project_id+'.'+data_set_id+'.'+model_name

    for i in range(0,5):
        try:
            client.get_model(model_name) # Make an API request.
            # print(f"Model {model_name} already exists.")
            break # if here, the model exists so we exit the loop
        except:
            # print(f"Model {model_name} is not found. Attempt #: {i}")
            time.sleep(60)

    training_set = f"{project_id}.{data_set_id}.{training_view}"
    evaluate_model_query_bqml_automl = '''
    SELECT
      round(threshold, 2) as threshold,
      * except(threshold),
      true_positives / (true_positives + false_positives) AS precision
    FROM
      ML.ROC_CURVE(MODEL `{model_name}`,
                   TABLE `{table_name}`,
                   GENERATE_ARRAY(0,1, 0.01))

    ORDER BY threshold
    '''.format(model_name = model_name, table_name = training_set)


    job_config = bigquery.QueryJobConfig()
    query_job = client.query(evaluate_model_query_bqml_automl, job_config=job_config)  # Make an API request.
    df_evaluation_automl = query_job.result()
    df_evaluation_automl = df_evaluation_automl.to_dataframe()
    df_evaluation_automl.to_csv(automl_data_path)
    graph = df_evaluation_automl.plot(x='threshold', y=['precision', 'recall']).get_figure()
    graph.savefig(automl_data_path)

In [None]:
@component(
    # Deploys xgboost model
      packages_to_install=["google-cloud-aiplatform==1.25.0"],
    base_image="python:3.9",
)
def deploy_xgb(
    model: Input[Model],
    project_id: str,
    vertex_endpoint: Output[Artifact],
    vertex_model: Output[Model]
):
    from google.cloud import aiplatform
    # import os
    aiplatform.init(project=project_id)
    deployed_model = aiplatform.Model.upload(
        display_name='propensity_demo',
        artifact_uri = model.uri,
        serving_container_image_uri="us-docker.pkg.dev/vertex-ai/prediction/xgboost-cpu.1-6:latest"
    )
    endpoint = deployed_model.deploy(machine_type="n1-standard-16")

    # Save data to the output params
    vertex_endpoint.uri = endpoint.resource_name
    vertex_model.uri = deployed_model.resource_name


In [None]:
@dsl.pipeline(
    # Default pipeline root. You can override it when submitting the pipeline.
    pipeline_root=PIPELINE_ROOT,
    # A name for the pipeline.
    name="pipeline-test",
    description='Propensity BigQuery ML Test'
)
def pipeline():

    create_input_view_op = create_input_view(view_name = VIEW_NAME,
                                             data_set_id = DATA_SET_ID,
                                             project_id = PROJECT_ID,
                                             bucket_name = BUCKET_NAME,
                                             blob_path = BLOB_PATH
                                             )


    build_bqml_logistic_op = build_bqml_logistic(project_id = PROJECT_ID,
                                                   data_set_id = DATA_SET_ID,
                                                   model_name = 'bqml_logistic_model',
                                                   training_view = VIEW_NAME
                                                   )

    build_bqml_xgboost_op = build_bqml_xgboost(project_id = PROJECT_ID,
                                                 data_set_id = DATA_SET_ID,
                                                 model_name = 'bqml_xgboost_model',
                                                 training_view = VIEW_NAME
                                                 )

    build_bqml_automl_op = build_bqml_automl (project_id = PROJECT_ID,
                                                data_set_id = DATA_SET_ID,
                                                model_name = 'bqml_automl_model',
                                                training_view = VIEW_NAME
                                               )



    build_xgb_xgboost_op = build_xgb_xgboost(project_id = PROJECT_ID,
                                                         data_set_id = DATA_SET_ID,
                                                         training_view = VIEW_NAME
                                                        )


    evaluate_bqml_logistic_op = evaluate_bqml_logistic(project_id = PROJECT_ID,
                                                         data_set_id = DATA_SET_ID,
                                                         model_name = 'bqml_logistic_model',
                                                         training_view = VIEW_NAME
                                                         )

    evaluate_bqml_xgboost_op = evaluate_bqml_xgboost(project_id = PROJECT_ID,
                                                         data_set_id = DATA_SET_ID,
                                                         model_name = 'bqml_xgboost_model',
                                                         training_view = VIEW_NAME
                                                         )

    evaluate_bqml_automl_op = evaluate_bqml_automl(project_id = PROJECT_ID,
                                                         data_set_id = DATA_SET_ID,
                                                         model_name = 'bqml_automl_model',
                                                         training_view = VIEW_NAME
                                                         )


    deploy_xgb_op = deploy_xgb(project_id = PROJECT_ID,
                                   model=build_xgb_xgboost_op.outputs["model"]
                                  )


    build_bqml_logistic_op.after(create_input_view_op)
    build_bqml_xgboost_op.after(create_input_view_op)
    build_bqml_automl_op.after(create_input_view_op)
    build_xgb_xgboost_op.after(create_input_view_op)

    evaluate_bqml_logistic_op.after(build_bqml_logistic_op)
    evaluate_bqml_xgboost_op.after(build_bqml_xgboost_op)
    evaluate_bqml_automl_op.after(build_bqml_automl_op)

In [None]:
compiler.Compiler().compile(
    pipeline_func=pipeline, package_path="pipeline.yaml"
)

In [None]:
TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")
run = pipeline_jobs.PipelineJob(
    display_name="test-pipeine",
    template_path="pipeline.yaml",
    pipeline_root=PIPELINE_ROOT,

    job_id="test-{0}".format(TIMESTAMP),
    enable_caching=True
)

In [None]:
run.run()

In [None]:
# this schedules a cron like job by building an endpoint using cloud functions and then scheduler

from kfp.v2.google.client import AIPlatformClient

api_client = AIPlatformClient(project_id=PROJECT_ID,
                             region='us-central1'
                             )

api_client.create_schedule_from_job_spec(
    job_spec_path='pipeline.json',
    schedule='0 * * * *',
    enable_caching=False
)