marketing-analytics/predicting/kfp_pipeline/Propensity_Pipeline.ipynb (788 lines of code) (raw):
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"id": "1PbxHas94vfS",
"metadata": {
"id": "1PbxHas94vfS"
},
"outputs": [],
"source": [
"!pip3 install --no-cache-dir --upgrade \"kfp>2\" google-cloud-aiplatform==1.25.0 # You may need to install kfp or aiplatform"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "af318aeb",
"metadata": {
"id": "af318aeb"
},
"outputs": [],
"source": [
"import kfp\n",
"import matplotlib.pyplot as plt\n",
"import pandas as pd\n",
"import requests\n",
"\n",
"from kfp import dsl\n",
"from kfp import compiler\n",
"from kfp.dsl import (Artifact, Dataset, Input, InputPath, Model, Output,\n",
" OutputPath, ClassificationMetrics, Metrics, component)\n",
"\n",
"from google.cloud import aiplatform\n",
"from google.cloud import storage\n",
"from google.cloud.aiplatform import pipeline_jobs\n",
"from typing import NamedTuple\n",
"\n",
"from datetime import datetime"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "a77095ea",
"metadata": {
"id": "a77095ea"
},
"outputs": [],
"source": [
"VIEW_NAME = 'ga_data' # BigQuery view you create for input data to model\n",
"DATA_SET_ID = 'propensity' # The Data Set ID where the view sits\n",
"PROJECT_ID = 'YOUR_GCP_PROJECT' # The Project ID\n",
"BUCKET_NAME = 'YOUR_GCP_BUCKET' # Bucket where the base_sql.txt file lives. You'll need to make the bucket.\n",
"BLOB_PATH = f'{BUCKET_NAME}/base_sql.txt' # The actual path where base_sql will be sent to"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "mYMQx9w35DAa",
"metadata": {
"id": "mYMQx9w35DAa"
},
"outputs": [],
"source": [
"PATH=%env PATH\n",
"%env PATH={PATH}:/home/jupyter/.local/bin\n",
"REGION=\"us-central1\"\n",
"\n",
"PIPELINE_ROOT = f'gs://{BUCKET_NAME}' # This is where all pipeline artifacts are sent. You'll need to ensure the bucket is created ahead of time\n",
"PIPELINE_ROOT"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "idXvSzhu5G3d",
"metadata": {
"id": "idXvSzhu5G3d"
},
"outputs": [],
"source": [
"# In order to build BQ Dataset\n",
"!gcloud config set project $PROJECT_ID\n",
"REGION = 'US'\n",
"!bq mk --location=$REGION --dataset $PROJECT_ID:$DATA_SET_ID"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "dd437eda",
"metadata": {},
"outputs": [],
"source": [
"# Send base_sql.txt to GCS bucket\n",
"\n",
"storage_client = storage.Client()\n",
"bucket = storage_client.get_bucket(BUCKET_NAME)\n",
"blob = bucket.blob(BLOB_PATH)\n",
"blob.upload_from_filename(\"base_sql.txt\")\n",
"blob.public_url"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "9f6dc47a-348c-489a-b11b-0d09ff7eb74d",
"metadata": {
"id": "9f6dc47a-348c-489a-b11b-0d09ff7eb74d"
},
"outputs": [],
"source": [
"@component(\n",
" # this component builds a BQ view, which will be the underlying source for model\n",
" packages_to_install=[\"google-cloud-bigquery\", \"google-cloud-storage\"],\n",
" base_image=\"python:3.9\",\n",
")\n",
"\n",
"def create_input_view(view_name: str,\n",
" data_set_id: str,\n",
" project_id: str,\n",
" bucket_name: str,\n",
" blob_path: str\n",
"\n",
"):\n",
" from google.cloud import bigquery\n",
" from google.cloud import storage\n",
" client = bigquery.Client(project=project_id)\n",
" dataset = client.dataset(data_set_id)\n",
" table_ref = dataset.table(view_name)\n",
" ga_data_ref = 'bigquery-public-data.google_analytics_sample.ga_sessions_*'\n",
" conversion = \"hits.page.pageTitle like '%Shopping Cart%'\" # this is sql like syntax used to define the conversion in the GA360 raw export\n",
" start_date = '20170101'\n",
" end_date = '20170131'\n",
"\n",
"\n",
" def get_sql(bucket_name, blob_path):\n",
" from google.cloud import storage\n",
" storage_client = storage.Client()\n",
" bucket = storage_client.get_bucket(bucket_name)\n",
" blob = bucket.get_blob(blob_path)\n",
" content = blob.download_as_string()\n",
" return content\n",
"\n",
" def if_tbl_exists(client, table_ref):\n",
" from google.cloud.exceptions import NotFound\n",
" try:\n",
" client.get_table(table_ref)\n",
" return True\n",
" except NotFound:\n",
" return False\n",
"\n",
" if if_tbl_exists(client, table_ref):\n",
" print(\"view already exists\")\n",
"\n",
" else:\n",
" #load sql from base_sql.txt. This can be modified if you want to modify your query\n",
" content = get_sql(bucket_name, blob_path)\n",
" content = str(content, 'utf-8')\n",
" create_base_feature_set_query = content.format(start_date = start_date,\n",
" end_date = end_date,\n",
" ga_data_ref = ga_data_ref,\n",
" conversion = conversion)\n",
"\n",
" shared_dataset_ref = client.dataset(data_set_id)\n",
" base_feature_set_view_ref = shared_dataset_ref.table(view_name)\n",
" base_feature_set_view = bigquery.Table(base_feature_set_view_ref)\n",
" base_feature_set_view.view_query = create_base_feature_set_query.format(project_id)\n",
" base_feature_set_view = client.create_table(base_feature_set_view) # API request\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "0ba6d3be",
"metadata": {
"id": "0ba6d3be"
},
"outputs": [],
"source": [
"@component(\n",
" # this component builds a logistic regression with BigQuery ML\n",
" packages_to_install=[\"google-cloud-bigquery\"],\n",
" base_image=\"python:3.9\",\n",
")\n",
"\n",
"\n",
"def build_bqml_logistic(project_id: str,\n",
" data_set_id: str,\n",
" model_name: str,\n",
" training_view: str\n",
"):\n",
" from google.cloud import bigquery\n",
" client = bigquery.Client(project=project_id)\n",
"\n",
" model_name = f\"{project_id}.{data_set_id}.{model_name}\"\n",
" training_set = f\"{project_id}.{data_set_id}.{training_view}\"\n",
" build_model_query_bqml_logistic = '''\n",
" CREATE OR REPLACE MODEL `{model_name}`\n",
" OPTIONS(model_type='logistic_reg'\n",
" , INPUT_LABEL_COLS = ['label']\n",
" , L1_REG = 1\n",
" , DATA_SPLIT_METHOD = 'RANDOM'\n",
" , DATA_SPLIT_EVAL_FRACTION = 0.20\n",
" ) AS\n",
" SELECT * EXCEPT (fullVisitorId, label),\n",
" CASE WHEN label is null then 0 ELSE label end as label\n",
" FROM `{training_set}`\n",
" '''.format(model_name = model_name, training_set = training_set)\n",
"\n",
" job_config = bigquery.QueryJobConfig()\n",
" client.query(build_model_query_bqml_logistic, job_config=job_config) # Make an API request."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "fe603955",
"metadata": {
"id": "fe603955"
},
"outputs": [],
"source": [
"@component(\n",
" # this component builds an xgboost classifier with BigQuery ML\n",
" packages_to_install=[\"google-cloud-bigquery\"],\n",
" base_image=\"python:3.9\",\n",
")\n",
"\n",
"\n",
"def build_bqml_xgboost(project_id: str,\n",
" data_set_id: str,\n",
" model_name: str,\n",
" training_view: str\n",
"):\n",
" from google.cloud import bigquery\n",
" client = bigquery.Client(project=project_id)\n",
"\n",
" model_name = f\"{project_id}.{data_set_id}.{model_name}\"\n",
" training_set = f\"{project_id}.{data_set_id}.{training_view}\"\n",
" build_model_query_bqml_xgboost = '''\n",
" CREATE OR REPLACE MODEL `{model_name}`\n",
" OPTIONS(model_type='BOOSTED_TREE_CLASSIFIER'\n",
" , INPUT_LABEL_COLS = ['label']\n",
" , L1_REG = 1\n",
" , DATA_SPLIT_METHOD = 'RANDOM'\n",
" , DATA_SPLIT_EVAL_FRACTION = 0.20\n",
" ) AS\n",
" SELECT * EXCEPT (fullVisitorId, label),\n",
" CASE WHEN label is null then 0 ELSE label end as label\n",
" FROM `{training_set}`\n",
" '''.format(model_name = model_name, training_set = training_set)\n",
"\n",
" job_config = bigquery.QueryJobConfig()\n",
" client.query(build_model_query_bqml_xgboost, job_config=job_config) # Make an API request."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "1e27cdb8",
"metadata": {
"id": "1e27cdb8"
},
"outputs": [],
"source": [
"@component(\n",
" # this component builds an AutoML classifier with BigQuery ML\n",
" packages_to_install=[\"google-cloud-bigquery\"],\n",
" base_image=\"python:3.9\",\n",
")\n",
"\n",
"\n",
"def build_bqml_automl(project_id: str,\n",
" data_set_id: str,\n",
" model_name: str,\n",
" training_view: str\n",
"):\n",
" from google.cloud import bigquery\n",
" client = bigquery.Client(project=project_id)\n",
"\n",
" model_name = f\"{project_id}.{data_set_id}.{model_name}\"\n",
" training_set = f\"{project_id}.{data_set_id}.{training_view}\"\n",
" build_model_query_bqml_automl = '''\n",
" CREATE OR REPLACE MODEL `{model_name}`\n",
" OPTIONS(model_type='BOOSTED_TREE_CLASSIFIER'\n",
" , INPUT_LABEL_COLS = ['label']\n",
" ) AS\n",
" SELECT * EXCEPT (fullVisitorId, label),\n",
" CASE WHEN label is null then 0 ELSE label end as label\n",
" FROM `{training_set}`\n",
" '''.format(model_name = model_name, training_set = training_set)\n",
"\n",
" job_config = bigquery.QueryJobConfig()\n",
" client.query(build_model_query_bqml_automl, job_config=job_config) # Make an API request."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "211c652f",
"metadata": {
"id": "211c652f"
},
"outputs": [],
"source": [
"@component(\n",
" # this component builds an xgboost classifier with xgboost\n",
" packages_to_install=[\"google-cloud-bigquery\", \"xgboost==1.6.2\", \"pandas==1.3.5\", \"scikit-learn==1.0.2\", \"joblib==1.1.0\",\"pyarrow\", \"db-dtypes\"],\n",
" base_image=\"python:3.9\",\n",
")\n",
"\n",
"def build_xgb_xgboost(project_id: str,\n",
" data_set_id: str,\n",
" training_view: str,\n",
" metrics: Output[Metrics],\n",
" model: Output[Model]\n",
"\n",
"):\n",
" from google.cloud import bigquery\n",
" import xgboost as xgb\n",
" from xgboost import XGBClassifier\n",
" from sklearn.model_selection import train_test_split, StratifiedKFold, RandomizedSearchCV, GridSearchCV\n",
" from sklearn.metrics import accuracy_score, roc_auc_score, precision_recall_curve\n",
" from joblib import dump\n",
" import pandas as pd\n",
" import pyarrow\n",
" import os\n",
"\n",
" client = bigquery.Client(project=project_id)\n",
"\n",
" data_set = f\"{project_id}.{data_set_id}.{training_view}\"\n",
" build_df_for_xgboost = '''\n",
" SELECT * FROM `{data_set}`\n",
" '''.format(data_set = data_set)\n",
"\n",
" job_config = bigquery.QueryJobConfig()\n",
" df = client.query(build_df_for_xgboost, job_config=job_config).to_dataframe() # Make an API request.\n",
" df = pd.get_dummies(df.drop(['fullVisitorId'], axis=1), prefix=['visited_dma', 'visited_daypart', 'visited_dow'])\n",
"\n",
"\n",
" X = df.drop(['label'], axis=1).values\n",
" y = df['label'].values\n",
"\n",
" X_train, X_test, y_train, y_test = train_test_split(X,y)\n",
" xgb_model = XGBClassifier(n_estimators=50, objective='binary:hinge',\n",
" silent=True, nthread=1,\n",
" eval_metric=\"auc\")\n",
"\n",
" xgb_model.fit(X_train, y_train)\n",
"\n",
"\n",
"\n",
" os.makedirs(model.path, exist_ok=True)\n",
" dump(xgb_model, os.path.join(model.path, \"model.joblib\"))\n",
"\n",
"\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "0bf45265",
"metadata": {
"id": "0bf45265"
},
"outputs": [],
"source": [
"@component(\n",
" # this component evaluations Logistic Regression\n",
" packages_to_install=[\"google-cloud-bigquery\", \"pandas\", \"pyarrow\", \"matplotlib\", \"db-dtypes\"],\n",
" base_image=\"python:3.9\",\n",
")\n",
"\n",
"\n",
"def evaluate_bqml_logistic(project_id: str,\n",
" data_set_id: str,\n",
" model_name: str,\n",
" training_view: str,\n",
" logistic_data_path: OutputPath(\"Dataset\")\n",
"):\n",
" from google.cloud import bigquery\n",
" from google.cloud.exceptions import NotFound\n",
" import pandas as pd\n",
" import pyarrow\n",
" import matplotlib as plt\n",
" import time\n",
"\n",
" client = bigquery.Client(project=project_id)\n",
"\n",
" # wait to ensure the model exists. check 5 times with a minute wait between.\n",
" model_name = project_id+'.'+data_set_id+'.'+model_name\n",
"\n",
" for i in range(0,5):\n",
" try:\n",
" client.get_model(model_name) # Make an API request.\n",
" # print(f\"Model {model_name} already exists.\")\n",
" break # if here, the model exists so we exit the loop\n",
" except:\n",
" # print(f\"Model {model_name} is not found. Attempt #: {i}\")\n",
" time.sleep(60)\n",
"\n",
" training_set = project_id+'.'+data_set_id+'.'+training_view\n",
" evaluate_model_query_bqml_logistic = '''\n",
" SELECT\n",
" round(threshold, 2) as threshold,\n",
" * except(threshold),\n",
" true_positives / (true_positives + false_positives) AS precision\n",
" FROM\n",
" ML.ROC_CURVE(MODEL `{model_name}`,\n",
" TABLE `{table_name}`,\n",
" GENERATE_ARRAY(0,1, 0.01))\n",
"\n",
" ORDER BY threshold\n",
" '''.format(model_name = model_name, table_name = training_set)\n",
"\n",
" job_config = bigquery.QueryJobConfig()\n",
" query_job = client.query(evaluate_model_query_bqml_logistic, job_config=job_config) # Make an API request.\n",
" df_evaluation_logistic = query_job.result()\n",
" df_evaluation_logistic = df_evaluation_logistic.to_dataframe()\n",
" df_evaluation_logistic.to_csv(logistic_data_path)\n",
" graph = df_evaluation_logistic.plot(x='threshold', y=['precision', 'recall']).get_figure()\n",
" graph.savefig(logistic_data_path)\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "8ee5a851",
"metadata": {
"id": "8ee5a851"
},
"outputs": [],
"source": [
"@component(\n",
" # this component evaluates BigQuery ML XGBoost\n",
" packages_to_install=[\"google-cloud-bigquery\", \"pandas\", \"pyarrow\", \"matplotlib\", \"db-dtypes\"],\n",
" base_image=\"python:3.9\",\n",
")\n",
"\n",
"\n",
"def evaluate_bqml_xgboost(project_id: str,\n",
" data_set_id: str,\n",
" model_name: str,\n",
" training_view: str,\n",
" xgboost_data_path: OutputPath(\"Dataset\")\n",
"):\n",
" from google.cloud import bigquery\n",
" from google.cloud.exceptions import NotFound\n",
" import pandas as pd\n",
" import pyarrow\n",
" import matplotlib as plt\n",
" import time\n",
"\n",
"\n",
" client = bigquery.Client(project=project_id)\n",
"\n",
" # wait to ensure the model exists. check 5 times with a minute wait between.\n",
" model_name = project_id+'.'+data_set_id+'.'+model_name\n",
"\n",
" for i in range(0,5):\n",
" try:\n",
" client.get_model(model_name) # Make an API request.\n",
" # print(f\"Model {model_name} already exists.\")\n",
" break # if here, the model exists so we exit the loop\n",
" except:\n",
" # print(f\"Model {model_name} is not found. Attempt #: {i}\")\n",
" time.sleep(60)\n",
"\n",
" training_set = f\"{project_id}.{data_set_id}.{training_view}\"\n",
" evaluate_model_query_bqml_xgboost = '''\n",
" SELECT\n",
" round(threshold, 2) as threshold,\n",
" * except(threshold),\n",
" true_positives / (true_positives + false_positives) AS precision\n",
" FROM\n",
" ML.ROC_CURVE(MODEL `{model_name}`,\n",
" TABLE `{table_name}`,\n",
" GENERATE_ARRAY(0,1, 0.01))\n",
"\n",
" ORDER BY threshold\n",
" '''.format(model_name = model_name, table_name = training_set)\n",
"\n",
"\n",
" job_config = bigquery.QueryJobConfig()\n",
" query_job = client.query(evaluate_model_query_bqml_xgboost, job_config=job_config) # Make an API request.\n",
" df_evaluation_xgboost = query_job.result()\n",
" df_evaluation_xgboost = df_evaluation_xgboost.to_dataframe()\n",
" df_evaluation_xgboost.to_csv(xgboost_data_path)\n",
" graph = df_evaluation_xgboost.plot(x='threshold', y=['precision', 'recall']).get_figure()\n",
" graph.savefig(xgboost_data_path)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "2c9a8757",
"metadata": {
"id": "2c9a8757",
"tags": []
},
"outputs": [],
"source": [
"@component(\n",
" # this component evaluates BigQuery ML autoML\n",
" packages_to_install=[\"google-cloud-bigquery\", \"pandas\", \"pyarrow\", \"matplotlib\", \"db-dtypes\"],\n",
" base_image=\"python:3.9\",\n",
")\n",
"\n",
"\n",
"def evaluate_bqml_automl(project_id: str,\n",
" data_set_id: str,\n",
" model_name: str,\n",
" training_view: str,\n",
" automl_data_path: OutputPath(\"Dataset\")\n",
"):\n",
" from google.cloud import bigquery\n",
" from google.cloud.exceptions import NotFound\n",
" import pandas as pd\n",
" import pyarrow\n",
" import matplotlib as plt\n",
" import time\n",
"\n",
"\n",
" client = bigquery.Client(project=project_id)\n",
"\n",
" # wait to ensure the model exists. check 5 times with a minute wait between.\n",
" model_name = project_id+'.'+data_set_id+'.'+model_name\n",
"\n",
" for i in range(0,5):\n",
" try:\n",
" client.get_model(model_name) # Make an API request.\n",
" # print(f\"Model {model_name} already exists.\")\n",
" break # if here, the model exists so we exit the loop\n",
" except:\n",
" # print(f\"Model {model_name} is not found. Attempt #: {i}\")\n",
" time.sleep(60)\n",
"\n",
" training_set = f\"{project_id}.{data_set_id}.{training_view}\"\n",
" evaluate_model_query_bqml_automl = '''\n",
" SELECT\n",
" round(threshold, 2) as threshold,\n",
" * except(threshold),\n",
" true_positives / (true_positives + false_positives) AS precision\n",
" FROM\n",
" ML.ROC_CURVE(MODEL `{model_name}`,\n",
" TABLE `{table_name}`,\n",
" GENERATE_ARRAY(0,1, 0.01))\n",
"\n",
" ORDER BY threshold\n",
" '''.format(model_name = model_name, table_name = training_set)\n",
"\n",
"\n",
" job_config = bigquery.QueryJobConfig()\n",
" query_job = client.query(evaluate_model_query_bqml_automl, job_config=job_config) # Make an API request.\n",
" df_evaluation_automl = query_job.result()\n",
" df_evaluation_automl = df_evaluation_automl.to_dataframe()\n",
" df_evaluation_automl.to_csv(automl_data_path)\n",
" graph = df_evaluation_automl.plot(x='threshold', y=['precision', 'recall']).get_figure()\n",
" graph.savefig(automl_data_path)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "184373fd",
"metadata": {
"id": "184373fd"
},
"outputs": [],
"source": [
"@component(\n",
" # Deploys xgboost model\n",
" packages_to_install=[\"google-cloud-aiplatform==1.25.0\"],\n",
" base_image=\"python:3.9\",\n",
")\n",
"def deploy_xgb(\n",
" model: Input[Model],\n",
" project_id: str,\n",
" vertex_endpoint: Output[Artifact],\n",
" vertex_model: Output[Model]\n",
"):\n",
" from google.cloud import aiplatform\n",
" # import os\n",
" aiplatform.init(project=project_id)\n",
" deployed_model = aiplatform.Model.upload(\n",
" display_name='propensity_demo',\n",
" artifact_uri = model.uri,\n",
" serving_container_image_uri=\"us-docker.pkg.dev/vertex-ai/prediction/xgboost-cpu.1-6:latest\"\n",
" )\n",
" endpoint = deployed_model.deploy(machine_type=\"n1-standard-16\")\n",
"\n",
" # Save data to the output params\n",
" vertex_endpoint.uri = endpoint.resource_name\n",
" vertex_model.uri = deployed_model.resource_name\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "df55e79c",
"metadata": {
"id": "df55e79c"
},
"outputs": [],
"source": [
"@dsl.pipeline(\n",
" # Default pipeline root. You can override it when submitting the pipeline.\n",
" pipeline_root=PIPELINE_ROOT,\n",
" # A name for the pipeline.\n",
" name=\"pipeline-test\",\n",
" description='Propensity BigQuery ML Test'\n",
")\n",
"def pipeline():\n",
"\n",
" create_input_view_op = create_input_view(view_name = VIEW_NAME,\n",
" data_set_id = DATA_SET_ID,\n",
" project_id = PROJECT_ID,\n",
" bucket_name = BUCKET_NAME,\n",
" blob_path = BLOB_PATH\n",
" )\n",
"\n",
"\n",
" build_bqml_logistic_op = build_bqml_logistic(project_id = PROJECT_ID,\n",
" data_set_id = DATA_SET_ID,\n",
" model_name = 'bqml_logistic_model',\n",
" training_view = VIEW_NAME\n",
" )\n",
"\n",
" build_bqml_xgboost_op = build_bqml_xgboost(project_id = PROJECT_ID,\n",
" data_set_id = DATA_SET_ID,\n",
" model_name = 'bqml_xgboost_model',\n",
" training_view = VIEW_NAME\n",
" )\n",
"\n",
" build_bqml_automl_op = build_bqml_automl (project_id = PROJECT_ID,\n",
" data_set_id = DATA_SET_ID,\n",
" model_name = 'bqml_automl_model',\n",
" training_view = VIEW_NAME\n",
" )\n",
"\n",
"\n",
"\n",
" build_xgb_xgboost_op = build_xgb_xgboost(project_id = PROJECT_ID,\n",
" data_set_id = DATA_SET_ID,\n",
" training_view = VIEW_NAME\n",
" )\n",
"\n",
"\n",
" evaluate_bqml_logistic_op = evaluate_bqml_logistic(project_id = PROJECT_ID,\n",
" data_set_id = DATA_SET_ID,\n",
" model_name = 'bqml_logistic_model',\n",
" training_view = VIEW_NAME\n",
" )\n",
"\n",
" evaluate_bqml_xgboost_op = evaluate_bqml_xgboost(project_id = PROJECT_ID,\n",
" data_set_id = DATA_SET_ID,\n",
" model_name = 'bqml_xgboost_model',\n",
" training_view = VIEW_NAME\n",
" )\n",
"\n",
" evaluate_bqml_automl_op = evaluate_bqml_automl(project_id = PROJECT_ID,\n",
" data_set_id = DATA_SET_ID,\n",
" model_name = 'bqml_automl_model',\n",
" training_view = VIEW_NAME\n",
" )\n",
"\n",
"\n",
" deploy_xgb_op = deploy_xgb(project_id = PROJECT_ID,\n",
" model=build_xgb_xgboost_op.outputs[\"model\"]\n",
" )\n",
"\n",
"\n",
" build_bqml_logistic_op.after(create_input_view_op)\n",
" build_bqml_xgboost_op.after(create_input_view_op)\n",
" build_bqml_automl_op.after(create_input_view_op)\n",
" build_xgb_xgboost_op.after(create_input_view_op)\n",
"\n",
" evaluate_bqml_logistic_op.after(build_bqml_logistic_op)\n",
" evaluate_bqml_xgboost_op.after(build_bqml_xgboost_op)\n",
" evaluate_bqml_automl_op.after(build_bqml_automl_op)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "4abfd490",
"metadata": {
"id": "4abfd490"
},
"outputs": [],
"source": [
"compiler.Compiler().compile(\n",
" pipeline_func=pipeline, package_path=\"pipeline.yaml\"\n",
")"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "7a7a3ec0",
"metadata": {
"id": "7a7a3ec0"
},
"outputs": [],
"source": [
"TIMESTAMP = datetime.now().strftime(\"%Y%m%d%H%M%S\")\n",
"run = pipeline_jobs.PipelineJob(\n",
" display_name=\"test-pipeine\",\n",
" template_path=\"pipeline.yaml\",\n",
" pipeline_root=PIPELINE_ROOT,\n",
"\n",
" job_id=\"test-{0}\".format(TIMESTAMP),\n",
" enable_caching=True\n",
")"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "0be305bd",
"metadata": {},
"outputs": [],
"source": [
"run.run()"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "79c9316c",
"metadata": {
"id": "79c9316c"
},
"outputs": [],
"source": [
"# this schedules a cron like job by building an endpoint using cloud functions and then scheduler\n",
"\n",
"from kfp.v2.google.client import AIPlatformClient\n",
"\n",
"api_client = AIPlatformClient(project_id=PROJECT_ID,\n",
" region='us-central1'\n",
" )\n",
"\n",
"api_client.create_schedule_from_job_spec(\n",
" job_spec_path='pipeline.json',\n",
" schedule='0 * * * *',\n",
" enable_caching=False\n",
")"
]
}
],
"metadata": {
"colab": {
"provenance": []
},
"environment": {
"kernel": "conda-root-py",
"name": "workbench-notebooks.m109",
"type": "gcloud",
"uri": "gcr.io/deeplearning-platform-release/workbench-notebooks:m109"
},
"interpreter": {
"hash": "3494ecf7585668a5944fbfc2a6c96b24395c92a20dd2d911f61e7d937ec88b5e"
},
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "conda-root-py"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.12"
}
},
"nbformat": 4,
"nbformat_minor": 5
}