marketing-analytics/predicting/kfp_pipeline/Propensity_Pipeline.ipynb (788 lines of code) (raw):

{ "cells": [ { "cell_type": "code", "execution_count": null, "id": "1PbxHas94vfS", "metadata": { "id": "1PbxHas94vfS" }, "outputs": [], "source": [ "!pip3 install --no-cache-dir --upgrade \"kfp>2\" google-cloud-aiplatform==1.25.0 # You may need to install kfp or aiplatform" ] }, { "cell_type": "code", "execution_count": null, "id": "af318aeb", "metadata": { "id": "af318aeb" }, "outputs": [], "source": [ "import kfp\n", "import matplotlib.pyplot as plt\n", "import pandas as pd\n", "import requests\n", "\n", "from kfp import dsl\n", "from kfp import compiler\n", "from kfp.dsl import (Artifact, Dataset, Input, InputPath, Model, Output,\n", " OutputPath, ClassificationMetrics, Metrics, component)\n", "\n", "from google.cloud import aiplatform\n", "from google.cloud import storage\n", "from google.cloud.aiplatform import pipeline_jobs\n", "from typing import NamedTuple\n", "\n", "from datetime import datetime" ] }, { "cell_type": "code", "execution_count": null, "id": "a77095ea", "metadata": { "id": "a77095ea" }, "outputs": [], "source": [ "VIEW_NAME = 'ga_data' # BigQuery view you create for input data to model\n", "DATA_SET_ID = 'propensity' # The Data Set ID where the view sits\n", "PROJECT_ID = 'YOUR_GCP_PROJECT' # The Project ID\n", "BUCKET_NAME = 'YOUR_GCP_BUCKET' # Bucket where the base_sql.txt file lives. You'll need to make the bucket.\n", "BLOB_PATH = f'{BUCKET_NAME}/base_sql.txt' # The actual path where base_sql will be sent to" ] }, { "cell_type": "code", "execution_count": null, "id": "mYMQx9w35DAa", "metadata": { "id": "mYMQx9w35DAa" }, "outputs": [], "source": [ "PATH=%env PATH\n", "%env PATH={PATH}:/home/jupyter/.local/bin\n", "REGION=\"us-central1\"\n", "\n", "PIPELINE_ROOT = f'gs://{BUCKET_NAME}' # This is where all pipeline artifacts are sent. You'll need to ensure the bucket is created ahead of time\n", "PIPELINE_ROOT" ] }, { "cell_type": "code", "execution_count": null, "id": "idXvSzhu5G3d", "metadata": { "id": "idXvSzhu5G3d" }, "outputs": [], "source": [ "# In order to build BQ Dataset\n", "!gcloud config set project $PROJECT_ID\n", "REGION = 'US'\n", "!bq mk --location=$REGION --dataset $PROJECT_ID:$DATA_SET_ID" ] }, { "cell_type": "code", "execution_count": null, "id": "dd437eda", "metadata": {}, "outputs": [], "source": [ "# Send base_sql.txt to GCS bucket\n", "\n", "storage_client = storage.Client()\n", "bucket = storage_client.get_bucket(BUCKET_NAME)\n", "blob = bucket.blob(BLOB_PATH)\n", "blob.upload_from_filename(\"base_sql.txt\")\n", "blob.public_url" ] }, { "cell_type": "code", "execution_count": null, "id": "9f6dc47a-348c-489a-b11b-0d09ff7eb74d", "metadata": { "id": "9f6dc47a-348c-489a-b11b-0d09ff7eb74d" }, "outputs": [], "source": [ "@component(\n", " # this component builds a BQ view, which will be the underlying source for model\n", " packages_to_install=[\"google-cloud-bigquery\", \"google-cloud-storage\"],\n", " base_image=\"python:3.9\",\n", ")\n", "\n", "def create_input_view(view_name: str,\n", " data_set_id: str,\n", " project_id: str,\n", " bucket_name: str,\n", " blob_path: str\n", "\n", "):\n", " from google.cloud import bigquery\n", " from google.cloud import storage\n", " client = bigquery.Client(project=project_id)\n", " dataset = client.dataset(data_set_id)\n", " table_ref = dataset.table(view_name)\n", " ga_data_ref = 'bigquery-public-data.google_analytics_sample.ga_sessions_*'\n", " conversion = \"hits.page.pageTitle like '%Shopping Cart%'\" # this is sql like syntax used to define the conversion in the GA360 raw export\n", " start_date = '20170101'\n", " end_date = '20170131'\n", "\n", "\n", " def get_sql(bucket_name, blob_path):\n", " from google.cloud import storage\n", " storage_client = storage.Client()\n", " bucket = storage_client.get_bucket(bucket_name)\n", " blob = bucket.get_blob(blob_path)\n", " content = blob.download_as_string()\n", " return content\n", "\n", " def if_tbl_exists(client, table_ref):\n", " from google.cloud.exceptions import NotFound\n", " try:\n", " client.get_table(table_ref)\n", " return True\n", " except NotFound:\n", " return False\n", "\n", " if if_tbl_exists(client, table_ref):\n", " print(\"view already exists\")\n", "\n", " else:\n", " #load sql from base_sql.txt. This can be modified if you want to modify your query\n", " content = get_sql(bucket_name, blob_path)\n", " content = str(content, 'utf-8')\n", " create_base_feature_set_query = content.format(start_date = start_date,\n", " end_date = end_date,\n", " ga_data_ref = ga_data_ref,\n", " conversion = conversion)\n", "\n", " shared_dataset_ref = client.dataset(data_set_id)\n", " base_feature_set_view_ref = shared_dataset_ref.table(view_name)\n", " base_feature_set_view = bigquery.Table(base_feature_set_view_ref)\n", " base_feature_set_view.view_query = create_base_feature_set_query.format(project_id)\n", " base_feature_set_view = client.create_table(base_feature_set_view) # API request\n" ] }, { "cell_type": "code", "execution_count": null, "id": "0ba6d3be", "metadata": { "id": "0ba6d3be" }, "outputs": [], "source": [ "@component(\n", " # this component builds a logistic regression with BigQuery ML\n", " packages_to_install=[\"google-cloud-bigquery\"],\n", " base_image=\"python:3.9\",\n", ")\n", "\n", "\n", "def build_bqml_logistic(project_id: str,\n", " data_set_id: str,\n", " model_name: str,\n", " training_view: str\n", "):\n", " from google.cloud import bigquery\n", " client = bigquery.Client(project=project_id)\n", "\n", " model_name = f\"{project_id}.{data_set_id}.{model_name}\"\n", " training_set = f\"{project_id}.{data_set_id}.{training_view}\"\n", " build_model_query_bqml_logistic = '''\n", " CREATE OR REPLACE MODEL `{model_name}`\n", " OPTIONS(model_type='logistic_reg'\n", " , INPUT_LABEL_COLS = ['label']\n", " , L1_REG = 1\n", " , DATA_SPLIT_METHOD = 'RANDOM'\n", " , DATA_SPLIT_EVAL_FRACTION = 0.20\n", " ) AS\n", " SELECT * EXCEPT (fullVisitorId, label),\n", " CASE WHEN label is null then 0 ELSE label end as label\n", " FROM `{training_set}`\n", " '''.format(model_name = model_name, training_set = training_set)\n", "\n", " job_config = bigquery.QueryJobConfig()\n", " client.query(build_model_query_bqml_logistic, job_config=job_config) # Make an API request." ] }, { "cell_type": "code", "execution_count": null, "id": "fe603955", "metadata": { "id": "fe603955" }, "outputs": [], "source": [ "@component(\n", " # this component builds an xgboost classifier with BigQuery ML\n", " packages_to_install=[\"google-cloud-bigquery\"],\n", " base_image=\"python:3.9\",\n", ")\n", "\n", "\n", "def build_bqml_xgboost(project_id: str,\n", " data_set_id: str,\n", " model_name: str,\n", " training_view: str\n", "):\n", " from google.cloud import bigquery\n", " client = bigquery.Client(project=project_id)\n", "\n", " model_name = f\"{project_id}.{data_set_id}.{model_name}\"\n", " training_set = f\"{project_id}.{data_set_id}.{training_view}\"\n", " build_model_query_bqml_xgboost = '''\n", " CREATE OR REPLACE MODEL `{model_name}`\n", " OPTIONS(model_type='BOOSTED_TREE_CLASSIFIER'\n", " , INPUT_LABEL_COLS = ['label']\n", " , L1_REG = 1\n", " , DATA_SPLIT_METHOD = 'RANDOM'\n", " , DATA_SPLIT_EVAL_FRACTION = 0.20\n", " ) AS\n", " SELECT * EXCEPT (fullVisitorId, label),\n", " CASE WHEN label is null then 0 ELSE label end as label\n", " FROM `{training_set}`\n", " '''.format(model_name = model_name, training_set = training_set)\n", "\n", " job_config = bigquery.QueryJobConfig()\n", " client.query(build_model_query_bqml_xgboost, job_config=job_config) # Make an API request." ] }, { "cell_type": "code", "execution_count": null, "id": "1e27cdb8", "metadata": { "id": "1e27cdb8" }, "outputs": [], "source": [ "@component(\n", " # this component builds an AutoML classifier with BigQuery ML\n", " packages_to_install=[\"google-cloud-bigquery\"],\n", " base_image=\"python:3.9\",\n", ")\n", "\n", "\n", "def build_bqml_automl(project_id: str,\n", " data_set_id: str,\n", " model_name: str,\n", " training_view: str\n", "):\n", " from google.cloud import bigquery\n", " client = bigquery.Client(project=project_id)\n", "\n", " model_name = f\"{project_id}.{data_set_id}.{model_name}\"\n", " training_set = f\"{project_id}.{data_set_id}.{training_view}\"\n", " build_model_query_bqml_automl = '''\n", " CREATE OR REPLACE MODEL `{model_name}`\n", " OPTIONS(model_type='BOOSTED_TREE_CLASSIFIER'\n", " , INPUT_LABEL_COLS = ['label']\n", " ) AS\n", " SELECT * EXCEPT (fullVisitorId, label),\n", " CASE WHEN label is null then 0 ELSE label end as label\n", " FROM `{training_set}`\n", " '''.format(model_name = model_name, training_set = training_set)\n", "\n", " job_config = bigquery.QueryJobConfig()\n", " client.query(build_model_query_bqml_automl, job_config=job_config) # Make an API request." ] }, { "cell_type": "code", "execution_count": null, "id": "211c652f", "metadata": { "id": "211c652f" }, "outputs": [], "source": [ "@component(\n", " # this component builds an xgboost classifier with xgboost\n", " packages_to_install=[\"google-cloud-bigquery\", \"xgboost==1.6.2\", \"pandas==1.3.5\", \"scikit-learn==1.0.2\", \"joblib==1.1.0\",\"pyarrow\", \"db-dtypes\"],\n", " base_image=\"python:3.9\",\n", ")\n", "\n", "def build_xgb_xgboost(project_id: str,\n", " data_set_id: str,\n", " training_view: str,\n", " metrics: Output[Metrics],\n", " model: Output[Model]\n", "\n", "):\n", " from google.cloud import bigquery\n", " import xgboost as xgb\n", " from xgboost import XGBClassifier\n", " from sklearn.model_selection import train_test_split, StratifiedKFold, RandomizedSearchCV, GridSearchCV\n", " from sklearn.metrics import accuracy_score, roc_auc_score, precision_recall_curve\n", " from joblib import dump\n", " import pandas as pd\n", " import pyarrow\n", " import os\n", "\n", " client = bigquery.Client(project=project_id)\n", "\n", " data_set = f\"{project_id}.{data_set_id}.{training_view}\"\n", " build_df_for_xgboost = '''\n", " SELECT * FROM `{data_set}`\n", " '''.format(data_set = data_set)\n", "\n", " job_config = bigquery.QueryJobConfig()\n", " df = client.query(build_df_for_xgboost, job_config=job_config).to_dataframe() # Make an API request.\n", " df = pd.get_dummies(df.drop(['fullVisitorId'], axis=1), prefix=['visited_dma', 'visited_daypart', 'visited_dow'])\n", "\n", "\n", " X = df.drop(['label'], axis=1).values\n", " y = df['label'].values\n", "\n", " X_train, X_test, y_train, y_test = train_test_split(X,y)\n", " xgb_model = XGBClassifier(n_estimators=50, objective='binary:hinge',\n", " silent=True, nthread=1,\n", " eval_metric=\"auc\")\n", "\n", " xgb_model.fit(X_train, y_train)\n", "\n", "\n", "\n", " os.makedirs(model.path, exist_ok=True)\n", " dump(xgb_model, os.path.join(model.path, \"model.joblib\"))\n", "\n", "\n" ] }, { "cell_type": "code", "execution_count": null, "id": "0bf45265", "metadata": { "id": "0bf45265" }, "outputs": [], "source": [ "@component(\n", " # this component evaluations Logistic Regression\n", " packages_to_install=[\"google-cloud-bigquery\", \"pandas\", \"pyarrow\", \"matplotlib\", \"db-dtypes\"],\n", " base_image=\"python:3.9\",\n", ")\n", "\n", "\n", "def evaluate_bqml_logistic(project_id: str,\n", " data_set_id: str,\n", " model_name: str,\n", " training_view: str,\n", " logistic_data_path: OutputPath(\"Dataset\")\n", "):\n", " from google.cloud import bigquery\n", " from google.cloud.exceptions import NotFound\n", " import pandas as pd\n", " import pyarrow\n", " import matplotlib as plt\n", " import time\n", "\n", " client = bigquery.Client(project=project_id)\n", "\n", " # wait to ensure the model exists. check 5 times with a minute wait between.\n", " model_name = project_id+'.'+data_set_id+'.'+model_name\n", "\n", " for i in range(0,5):\n", " try:\n", " client.get_model(model_name) # Make an API request.\n", " # print(f\"Model {model_name} already exists.\")\n", " break # if here, the model exists so we exit the loop\n", " except:\n", " # print(f\"Model {model_name} is not found. Attempt #: {i}\")\n", " time.sleep(60)\n", "\n", " training_set = project_id+'.'+data_set_id+'.'+training_view\n", " evaluate_model_query_bqml_logistic = '''\n", " SELECT\n", " round(threshold, 2) as threshold,\n", " * except(threshold),\n", " true_positives / (true_positives + false_positives) AS precision\n", " FROM\n", " ML.ROC_CURVE(MODEL `{model_name}`,\n", " TABLE `{table_name}`,\n", " GENERATE_ARRAY(0,1, 0.01))\n", "\n", " ORDER BY threshold\n", " '''.format(model_name = model_name, table_name = training_set)\n", "\n", " job_config = bigquery.QueryJobConfig()\n", " query_job = client.query(evaluate_model_query_bqml_logistic, job_config=job_config) # Make an API request.\n", " df_evaluation_logistic = query_job.result()\n", " df_evaluation_logistic = df_evaluation_logistic.to_dataframe()\n", " df_evaluation_logistic.to_csv(logistic_data_path)\n", " graph = df_evaluation_logistic.plot(x='threshold', y=['precision', 'recall']).get_figure()\n", " graph.savefig(logistic_data_path)\n" ] }, { "cell_type": "code", "execution_count": null, "id": "8ee5a851", "metadata": { "id": "8ee5a851" }, "outputs": [], "source": [ "@component(\n", " # this component evaluates BigQuery ML XGBoost\n", " packages_to_install=[\"google-cloud-bigquery\", \"pandas\", \"pyarrow\", \"matplotlib\", \"db-dtypes\"],\n", " base_image=\"python:3.9\",\n", ")\n", "\n", "\n", "def evaluate_bqml_xgboost(project_id: str,\n", " data_set_id: str,\n", " model_name: str,\n", " training_view: str,\n", " xgboost_data_path: OutputPath(\"Dataset\")\n", "):\n", " from google.cloud import bigquery\n", " from google.cloud.exceptions import NotFound\n", " import pandas as pd\n", " import pyarrow\n", " import matplotlib as plt\n", " import time\n", "\n", "\n", " client = bigquery.Client(project=project_id)\n", "\n", " # wait to ensure the model exists. check 5 times with a minute wait between.\n", " model_name = project_id+'.'+data_set_id+'.'+model_name\n", "\n", " for i in range(0,5):\n", " try:\n", " client.get_model(model_name) # Make an API request.\n", " # print(f\"Model {model_name} already exists.\")\n", " break # if here, the model exists so we exit the loop\n", " except:\n", " # print(f\"Model {model_name} is not found. Attempt #: {i}\")\n", " time.sleep(60)\n", "\n", " training_set = f\"{project_id}.{data_set_id}.{training_view}\"\n", " evaluate_model_query_bqml_xgboost = '''\n", " SELECT\n", " round(threshold, 2) as threshold,\n", " * except(threshold),\n", " true_positives / (true_positives + false_positives) AS precision\n", " FROM\n", " ML.ROC_CURVE(MODEL `{model_name}`,\n", " TABLE `{table_name}`,\n", " GENERATE_ARRAY(0,1, 0.01))\n", "\n", " ORDER BY threshold\n", " '''.format(model_name = model_name, table_name = training_set)\n", "\n", "\n", " job_config = bigquery.QueryJobConfig()\n", " query_job = client.query(evaluate_model_query_bqml_xgboost, job_config=job_config) # Make an API request.\n", " df_evaluation_xgboost = query_job.result()\n", " df_evaluation_xgboost = df_evaluation_xgboost.to_dataframe()\n", " df_evaluation_xgboost.to_csv(xgboost_data_path)\n", " graph = df_evaluation_xgboost.plot(x='threshold', y=['precision', 'recall']).get_figure()\n", " graph.savefig(xgboost_data_path)" ] }, { "cell_type": "code", "execution_count": null, "id": "2c9a8757", "metadata": { "id": "2c9a8757", "tags": [] }, "outputs": [], "source": [ "@component(\n", " # this component evaluates BigQuery ML autoML\n", " packages_to_install=[\"google-cloud-bigquery\", \"pandas\", \"pyarrow\", \"matplotlib\", \"db-dtypes\"],\n", " base_image=\"python:3.9\",\n", ")\n", "\n", "\n", "def evaluate_bqml_automl(project_id: str,\n", " data_set_id: str,\n", " model_name: str,\n", " training_view: str,\n", " automl_data_path: OutputPath(\"Dataset\")\n", "):\n", " from google.cloud import bigquery\n", " from google.cloud.exceptions import NotFound\n", " import pandas as pd\n", " import pyarrow\n", " import matplotlib as plt\n", " import time\n", "\n", "\n", " client = bigquery.Client(project=project_id)\n", "\n", " # wait to ensure the model exists. check 5 times with a minute wait between.\n", " model_name = project_id+'.'+data_set_id+'.'+model_name\n", "\n", " for i in range(0,5):\n", " try:\n", " client.get_model(model_name) # Make an API request.\n", " # print(f\"Model {model_name} already exists.\")\n", " break # if here, the model exists so we exit the loop\n", " except:\n", " # print(f\"Model {model_name} is not found. Attempt #: {i}\")\n", " time.sleep(60)\n", "\n", " training_set = f\"{project_id}.{data_set_id}.{training_view}\"\n", " evaluate_model_query_bqml_automl = '''\n", " SELECT\n", " round(threshold, 2) as threshold,\n", " * except(threshold),\n", " true_positives / (true_positives + false_positives) AS precision\n", " FROM\n", " ML.ROC_CURVE(MODEL `{model_name}`,\n", " TABLE `{table_name}`,\n", " GENERATE_ARRAY(0,1, 0.01))\n", "\n", " ORDER BY threshold\n", " '''.format(model_name = model_name, table_name = training_set)\n", "\n", "\n", " job_config = bigquery.QueryJobConfig()\n", " query_job = client.query(evaluate_model_query_bqml_automl, job_config=job_config) # Make an API request.\n", " df_evaluation_automl = query_job.result()\n", " df_evaluation_automl = df_evaluation_automl.to_dataframe()\n", " df_evaluation_automl.to_csv(automl_data_path)\n", " graph = df_evaluation_automl.plot(x='threshold', y=['precision', 'recall']).get_figure()\n", " graph.savefig(automl_data_path)" ] }, { "cell_type": "code", "execution_count": null, "id": "184373fd", "metadata": { "id": "184373fd" }, "outputs": [], "source": [ "@component(\n", " # Deploys xgboost model\n", " packages_to_install=[\"google-cloud-aiplatform==1.25.0\"],\n", " base_image=\"python:3.9\",\n", ")\n", "def deploy_xgb(\n", " model: Input[Model],\n", " project_id: str,\n", " vertex_endpoint: Output[Artifact],\n", " vertex_model: Output[Model]\n", "):\n", " from google.cloud import aiplatform\n", " # import os\n", " aiplatform.init(project=project_id)\n", " deployed_model = aiplatform.Model.upload(\n", " display_name='propensity_demo',\n", " artifact_uri = model.uri,\n", " serving_container_image_uri=\"us-docker.pkg.dev/vertex-ai/prediction/xgboost-cpu.1-6:latest\"\n", " )\n", " endpoint = deployed_model.deploy(machine_type=\"n1-standard-16\")\n", "\n", " # Save data to the output params\n", " vertex_endpoint.uri = endpoint.resource_name\n", " vertex_model.uri = deployed_model.resource_name\n" ] }, { "cell_type": "code", "execution_count": null, "id": "df55e79c", "metadata": { "id": "df55e79c" }, "outputs": [], "source": [ "@dsl.pipeline(\n", " # Default pipeline root. You can override it when submitting the pipeline.\n", " pipeline_root=PIPELINE_ROOT,\n", " # A name for the pipeline.\n", " name=\"pipeline-test\",\n", " description='Propensity BigQuery ML Test'\n", ")\n", "def pipeline():\n", "\n", " create_input_view_op = create_input_view(view_name = VIEW_NAME,\n", " data_set_id = DATA_SET_ID,\n", " project_id = PROJECT_ID,\n", " bucket_name = BUCKET_NAME,\n", " blob_path = BLOB_PATH\n", " )\n", "\n", "\n", " build_bqml_logistic_op = build_bqml_logistic(project_id = PROJECT_ID,\n", " data_set_id = DATA_SET_ID,\n", " model_name = 'bqml_logistic_model',\n", " training_view = VIEW_NAME\n", " )\n", "\n", " build_bqml_xgboost_op = build_bqml_xgboost(project_id = PROJECT_ID,\n", " data_set_id = DATA_SET_ID,\n", " model_name = 'bqml_xgboost_model',\n", " training_view = VIEW_NAME\n", " )\n", "\n", " build_bqml_automl_op = build_bqml_automl (project_id = PROJECT_ID,\n", " data_set_id = DATA_SET_ID,\n", " model_name = 'bqml_automl_model',\n", " training_view = VIEW_NAME\n", " )\n", "\n", "\n", "\n", " build_xgb_xgboost_op = build_xgb_xgboost(project_id = PROJECT_ID,\n", " data_set_id = DATA_SET_ID,\n", " training_view = VIEW_NAME\n", " )\n", "\n", "\n", " evaluate_bqml_logistic_op = evaluate_bqml_logistic(project_id = PROJECT_ID,\n", " data_set_id = DATA_SET_ID,\n", " model_name = 'bqml_logistic_model',\n", " training_view = VIEW_NAME\n", " )\n", "\n", " evaluate_bqml_xgboost_op = evaluate_bqml_xgboost(project_id = PROJECT_ID,\n", " data_set_id = DATA_SET_ID,\n", " model_name = 'bqml_xgboost_model',\n", " training_view = VIEW_NAME\n", " )\n", "\n", " evaluate_bqml_automl_op = evaluate_bqml_automl(project_id = PROJECT_ID,\n", " data_set_id = DATA_SET_ID,\n", " model_name = 'bqml_automl_model',\n", " training_view = VIEW_NAME\n", " )\n", "\n", "\n", " deploy_xgb_op = deploy_xgb(project_id = PROJECT_ID,\n", " model=build_xgb_xgboost_op.outputs[\"model\"]\n", " )\n", "\n", "\n", " build_bqml_logistic_op.after(create_input_view_op)\n", " build_bqml_xgboost_op.after(create_input_view_op)\n", " build_bqml_automl_op.after(create_input_view_op)\n", " build_xgb_xgboost_op.after(create_input_view_op)\n", "\n", " evaluate_bqml_logistic_op.after(build_bqml_logistic_op)\n", " evaluate_bqml_xgboost_op.after(build_bqml_xgboost_op)\n", " evaluate_bqml_automl_op.after(build_bqml_automl_op)" ] }, { "cell_type": "code", "execution_count": null, "id": "4abfd490", "metadata": { "id": "4abfd490" }, "outputs": [], "source": [ "compiler.Compiler().compile(\n", " pipeline_func=pipeline, package_path=\"pipeline.yaml\"\n", ")" ] }, { "cell_type": "code", "execution_count": null, "id": "7a7a3ec0", "metadata": { "id": "7a7a3ec0" }, "outputs": [], "source": [ "TIMESTAMP = datetime.now().strftime(\"%Y%m%d%H%M%S\")\n", "run = pipeline_jobs.PipelineJob(\n", " display_name=\"test-pipeine\",\n", " template_path=\"pipeline.yaml\",\n", " pipeline_root=PIPELINE_ROOT,\n", "\n", " job_id=\"test-{0}\".format(TIMESTAMP),\n", " enable_caching=True\n", ")" ] }, { "cell_type": "code", "execution_count": null, "id": "0be305bd", "metadata": {}, "outputs": [], "source": [ "run.run()" ] }, { "cell_type": "code", "execution_count": null, "id": "79c9316c", "metadata": { "id": "79c9316c" }, "outputs": [], "source": [ "# this schedules a cron like job by building an endpoint using cloud functions and then scheduler\n", "\n", "from kfp.v2.google.client import AIPlatformClient\n", "\n", "api_client = AIPlatformClient(project_id=PROJECT_ID,\n", " region='us-central1'\n", " )\n", "\n", "api_client.create_schedule_from_job_spec(\n", " job_spec_path='pipeline.json',\n", " schedule='0 * * * *',\n", " enable_caching=False\n", ")" ] } ], "metadata": { "colab": { "provenance": [] }, "environment": { "kernel": "conda-root-py", "name": "workbench-notebooks.m109", "type": "gcloud", "uri": "gcr.io/deeplearning-platform-release/workbench-notebooks:m109" }, "interpreter": { "hash": "3494ecf7585668a5944fbfc2a6c96b24395c92a20dd2d911f61e7d937ec88b5e" }, "kernelspec": { "display_name": "Python 3", "language": "python", "name": "conda-root-py" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.10.12" } }, "nbformat": 4, "nbformat_minor": 5 }