marketing-analytics/predicting/future-customer-value-segments/focvs_automation.ipynb (319 lines of code) (raw):
{
"nbformat": 4,
"nbformat_minor": 0,
"metadata": {
"colab": {
"name": "FoCVS Automation Colab (go/focvs-colab)",
"provenance": [],
"collapsed_sections": [
"KsbpSqDIaq7A"
],
"toc_visible": true
},
"kernelspec": {
"name": "python3",
"display_name": "Python 3"
},
"language_info": {
"name": "python"
}
},
"cells": [
{
"cell_type": "markdown",
"metadata": {
"id": "P9eCcmbJUiIB"
},
"source": [
"# Make a copy of this notebook! "
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "KsbpSqDIaq7A"
},
"source": [
"# Intro to Colab"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "uuo6MXz7KXpM"
},
"source": [
"**60 second crash course in Colab notebooks**\n",
"\n",
"A notebook is a list of cells. Cells contain either **explanatory text** or **executable code** and its output. This is a **text cell**. You can double-click to edit this cell.\n",
"\n",
"Once the toolbar button indicates CONNECTED, click in the cell to select it and execute the contents in the following ways:\n",
"\n",
"* Click the **Play icon** in the left gutter of the cell; or\n",
"* Type **Cmd/Ctrl + Enter** to run the cell in place.\n",
"\n",
"Good to know\n",
"* **Hashtags (#)** are Python comments (they're ignored during code execution)\n",
"* Use **Cmd/Ctrl + / ** to comment out a line of code (helpful during debugging)\n",
"* When you execute a code block, anything within that code block can be referenced elsewhere in the notebook"
]
},
{
"cell_type": "code",
"metadata": {
"id": "psHZilLIH3ww"
},
"source": [
"# Printing to screen\n",
"print(\"I'm a code block\")\n",
"\n",
"# Defining variables\n",
"a = 2\n",
"b = 5\n",
"c = a + b\n",
"print(f\"a equals {a}\")\n",
"print(f\"b equals {b}\")\n",
"print(f\"a plus b equals {c}\")\n",
"\n",
"# Proper indentation is essential in Python\n",
"for x in range(1,6):\n",
" print(x)"
],
"execution_count": null,
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {
"id": "0WwBV_BrUeq-"
},
"source": [
"# Future Customer Value Segments (FoCVS) - Automation Notebook"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "ZqfyCrSBy8Um"
},
"source": [
"This notebook can be used to automate runs for the Customer Lifetime Value (CLV) prediction and segmentation data processing pipleine known as [FoCVS](https://github.com/GoogleCloudPlatform/cloud-for-marketing/tree/main/marketing-analytics/predicting/future-customer-value-segments).\n",
"\n",
"Please follow the [GCP installation steps](https://github.com/GoogleCloudPlatform/cloud-for-marketing/tree/main/marketing-analytics/predicting/future-customer-value-segments#gcp-steps) to install FoCVS within your Google Cloud Project **before** using this Colab notebook. "
]
},
{
"cell_type": "code",
"metadata": {
"id": "EG-cH7w95p3w"
},
"source": [
"#@title Authenticate your user for this Colab notebook\n",
"#@markdown This allows the Colab notebook to access GCP resources owned by you.\n",
"from google.colab import auth\n",
"auth.authenticate_user()"
],
"execution_count": null,
"outputs": []
},
{
"cell_type": "code",
"metadata": {
"id": "z_csS3aYW12s"
},
"source": [
"#@title Specify FoCVS pipeline parameters\n",
"#@markdown Use the form below to define all the desired parameters, including Dataflow environment settings.\n",
"GCP_PROJECT_ID = '' #@param {type:\"string\"}\n",
"GCS_BUCKET_ID = '' #@param {type:\"string\"}\n",
"\n",
"JOB_NAME = '' #@param {type:\"string\"}\n",
"BIGQUERY_INPUT_QUERY = '' #@param {type:\"string\"}\n",
"INPUT_CUSTOMERID_COLUMN = '' #@param {type:\"string\"}\n",
"INPUT_TRANSACTIONVALUE_COLUMN = '' #@param {type:\"string\"}\n",
"INPUT_TRANSACTIONDATE_COLUMN = '' #@param {type:\"string\"}\n",
"INPUT_TRANSACTIONDATE_FORMAT = 'YYYY-MM-DD' #@param [\"YYYY-MM-DD\", \"MM/DD/YY\", \"MM/DD/YYYY\", \"DD/MM/YY\", \"DD/MM/YYYY\", \"YYYYMMDD\"]\n",
"\n",
"#@markdown #### Extra dimensions\n",
"#@markdown > Insert space-separated extra dimensions you would like to test.\n",
"#@markdown A job will be created per extra dimension. For example:<br>\n",
"#@markdown <br>JOB_NAME = my_job<br>INPUT_EXTRA_DIMENSIONS = dim1 dim2 dim3<br>**Resulting jobs**: *my_job_dim1, my_job_dim2, my_job_dim3*\n",
"#@markdown <br><br>Leave empty if running without any extra dimensions.\n",
"INPUT_EXTRA_DIMENSIONS = '' #@param {type:\"string\"}\n",
"\n",
"#@markdown #### Optional pipeline parameters\n",
"MODEL_TYPE = 'MBGNBD' #@param [\"BGNBD\", \"MBGNBD\", \"PNBD\", \"BGBB\"]\n",
"MODEL_TIME_GRANULARITY = 'Weekly' #@param [\"Daily\", \"Weekly\", \"Monthly\"]\n",
"MODEL_CALIBRATION_START_DATE = '' #@param {type:\"string\"}\n",
"MODEL_CALIBRATION_END_DATE = '' #@param {type:\"string\"}\n",
"MODEL_COHORT_START_DATE = '' #@param {type:\"string\"}\n",
"MODEL_COHORT_END_DATE = '' #@param {type:\"string\"}\n",
"MODEL_HOLDOUT_END_DATE = '' #@param {type:\"string\"}\n",
"MODEL_PREDICTION_PERIOD = 52 #@param {type:\"integer\"}\n",
"MODEL_PENALIZER_COEFFICIENT = 0.0 #@param {type:\"number\"}\n",
"MODEL_VALIDATION_ERROR_THRESHOLD = 15 #@param {type:\"number\"}\n",
"OUTPUT_NUM_SEGMENTS = 5 #@param {type:\"integer\"}\n",
"OUTPUT_ROUND_NUMBERS = False #@param {type:\"boolean\"}\n",
"\n",
"#@markdown #### Optional Dataflow and BigQuery environment parameters\n",
"DATAFLOW_SERVICE_ACCOUNT_EMAIL = '' #@param {type:\"string\"}\n",
"DATAFLOW_NUM_WORKERS = '' #@param {type:\"string\"}\n",
"DATAFLOW_MACHINE_TYPE = '' #@param {type:\"string\"}\n",
"#@markdown > View all machine types at https://cloud.google.com/compute/docs/machine-types.\n",
"DATAFLOW_RUN_LOCATION = '' #@param {type:\"string\"}\n",
"#@markdown > See https://cloud.google.com/dataflow/docs/concepts/regional-endpoints for more information.\n",
"BIGQUERY_DATASET_LOCATION = '' #@param {type:\"string\"}\n",
"#@markdown > See https://cloud.google.com/bigquery/docs/locations for supported locations.\n",
"\n",
"def get_runtime_environment(temp_location):\n",
" runtime_environment = {\n",
" 'tempLocation': temp_location\n",
" }\n",
" if DATAFLOW_SERVICE_ACCOUNT_EMAIL:\n",
" runtime_environment['serviceAccountEmail'] = DATAFLOW_SERVICE_ACCOUNT_EMAIL\n",
"\n",
" if DATAFLOW_NUM_WORKERS:\n",
" runtime_environment['numWorkers'] = int(DATAFLOW_NUM_WORKERS)\n",
"\n",
" if DATAFLOW_MACHINE_TYPE:\n",
" runtime_environment['machineType'] = DATAFLOW_MACHINE_TYPE\n",
"\n",
" return runtime_environment\n",
"\n",
"def create_pipeline_request(extra_dimension=None):\n",
" job_name = JOB_NAME.replace(' ', '_')\n",
"\n",
" if extra_dimension:\n",
" job_name = f'{job_name}_{extra_dimension}'\n",
"\n",
" gcs_temp_location = f'gs://{GCS_BUCKET_ID}/temp/{job_name}/'\n",
" env_temp_location = gcs_temp_location\n",
" gcs_output_folder = f'gs://{GCS_BUCKET_ID}/output/{job_name}/'\n",
" bq_output_dataset = job_name\n",
"\n",
" focvs_request = {\n",
" 'jobName': job_name,\n",
" 'parameters': {\n",
" 'input_bq_query': BIGQUERY_INPUT_QUERY,\n",
" 'input_bq_project': GCP_PROJECT_ID,\n",
" 'temp_gcs_location': gcs_temp_location,\n",
" 'output_folder': gcs_output_folder,\n",
" 'output_bq_project': GCP_PROJECT_ID,\n",
" 'output_bq_dataset': bq_output_dataset,\n",
" 'customer_id_column_name': INPUT_CUSTOMERID_COLUMN,\n",
" 'sales_column_name': INPUT_TRANSACTIONVALUE_COLUMN,\n",
" 'transaction_date_column_name': INPUT_TRANSACTIONDATE_COLUMN,\n",
" 'date_parsing_pattern': INPUT_TRANSACTIONDATE_FORMAT,\n",
" 'frequency_model_type': MODEL_TYPE,\n",
" 'model_time_granularity': MODEL_TIME_GRANULARITY,\n",
" 'prediction_period': str(MODEL_PREDICTION_PERIOD),\n",
" 'penalizer_coef': str(MODEL_PENALIZER_COEFFICIENT),\n",
" 'transaction_frequency_threshold': str(MODEL_VALIDATION_ERROR_THRESHOLD),\n",
" 'output_segments': str(OUTPUT_NUM_SEGMENTS),\n",
" 'round_numbers': 'true' if OUTPUT_ROUND_NUMBERS else 'false'\n",
" },\n",
" 'environment': get_runtime_environment(env_temp_location)\n",
" }\n",
" if extra_dimension:\n",
" focvs_request['parameters']['extra_dimension_column_name'] = extra_dimension\n",
"\n",
" if MODEL_CALIBRATION_START_DATE:\n",
" focvs_request['parameters']['calibration_start_date'] = MODEL_CALIBRATION_START_DATE\n",
"\n",
" if MODEL_CALIBRATION_END_DATE:\n",
" focvs_request['parameters']['calibration_end_date'] = MODEL_CALIBRATION_END_DATE\n",
"\n",
" if MODEL_COHORT_START_DATE:\n",
" focvs_request['parameters']['cohort_start_date'] = MODEL_COHORT_START_DATE\n",
"\n",
" if MODEL_COHORT_END_DATE:\n",
" focvs_request['parameters']['cohort_end_date'] = MODEL_COHORT_END_DATE\n",
"\n",
" if MODEL_HOLDOUT_END_DATE:\n",
" focvs_request['parameters']['holdout_end_date'] = MODEL_HOLDOUT_END_DATE\n",
" \n",
" return focvs_request\n",
"\n",
"focvs_requests = []\n",
"\n",
"if INPUT_EXTRA_DIMENSIONS:\n",
" for dimension in INPUT_EXTRA_DIMENSIONS.split():\n",
" focvs_requests.append(create_pipeline_request(dimension))\n",
"else:\n",
" focvs_requests.append(create_pipeline_request())\n",
"\n",
"import json\n",
"print(json.dumps(focvs_requests, indent=2))"
],
"execution_count": null,
"outputs": []
},
{
"cell_type": "code",
"metadata": {
"id": "DDqegpJjl8wf"
},
"source": [
"#@title Run FoCVS jobs\n",
"#@markdown Executing this cell will create Dataflow jobs for all requests generated by the previous cell.\n",
"#@markdown It is important to mention that a BigQuery dataset per Dataflow job - named after the job itself -\n",
"#@markdown will be created before the job is executed. Existing datasets with the same name will be deleted first.\n",
"#@markdown > **WARNING**: This process is not idempotent; Dataflow jobs will be created and executed each time this cell is run.\n",
"from googleapiclient import discovery\n",
"\n",
"bigquery_client = discovery.build('bigquery', 'v2')\n",
"dataflow_client = discovery.build('dataflow', 'v1b3')\n",
"\n",
"def create_bigquery_dataset_idempotent(focvs_request):\n",
" dataset_id = focvs_request['parameters']['output_bq_dataset']\n",
"\n",
" try:\n",
" bigquery_client.datasets().delete(\n",
" projectId=GCP_PROJECT_ID,\n",
" datasetId=dataset_id,\n",
" deleteContents=True).execute()\n",
" except:\n",
" pass\n",
"\n",
" dataset_request_payload = {\n",
" 'datasetReference': {\n",
" 'projectId': GCP_PROJECT_ID,\n",
" 'datasetId': dataset_id\n",
" }\n",
" }\n",
" if BIGQUERY_DATASET_LOCATION:\n",
" dataset_request_payload['location'] = BIGQUERY_DATASET_LOCATION\n",
"\n",
" bigquery_client.datasets().insert(\n",
" projectId=GCP_PROJECT_ID,\n",
" body=dataset_request_payload).execute()\n",
"\n",
"def run_dataflow_pipeline(focvs_request):\n",
" gcs_path = f'gs://{GCS_BUCKET_ID}/templates/FoCVS-bq'\n",
"\n",
" if DATAFLOW_RUN_LOCATION:\n",
" return dataflow_client.projects().locations().templates().launch(\n",
" projectId=GCP_PROJECT_ID,\n",
" location=DATAFLOW_RUN_LOCATION,\n",
" gcsPath=gcs_path,\n",
" body=focvs_request).execute()\n",
" else:\n",
" return dataflow_client.projects().templates().launch(\n",
" projectId=GCP_PROJECT_ID,\n",
" gcsPath=gcs_path,\n",
" body=focvs_request).execute()\n",
"\n",
"responses = []\n",
"\n",
"for focvs_request in focvs_requests:\n",
" create_bigquery_dataset_idempotent(focvs_request)\n",
" responses.append(run_dataflow_pipeline(focvs_request))\n",
"\n",
"import json\n",
"print(json.dumps(responses, indent=2))"
],
"execution_count": null,
"outputs": []
}
]
}