marketing-analytics/predicting/future-customer-value-segments/focvs_automation.ipynb (319 lines of code) (raw):

{ "nbformat": 4, "nbformat_minor": 0, "metadata": { "colab": { "name": "FoCVS Automation Colab (go/focvs-colab)", "provenance": [], "collapsed_sections": [ "KsbpSqDIaq7A" ], "toc_visible": true }, "kernelspec": { "name": "python3", "display_name": "Python 3" }, "language_info": { "name": "python" } }, "cells": [ { "cell_type": "markdown", "metadata": { "id": "P9eCcmbJUiIB" }, "source": [ "# Make a copy of this notebook! " ] }, { "cell_type": "markdown", "metadata": { "id": "KsbpSqDIaq7A" }, "source": [ "# Intro to Colab" ] }, { "cell_type": "markdown", "metadata": { "id": "uuo6MXz7KXpM" }, "source": [ "**60 second crash course in Colab notebooks**\n", "\n", "A notebook is a list of cells. Cells contain either **explanatory text** or **executable code** and its output. This is a **text cell**. You can double-click to edit this cell.\n", "\n", "Once the toolbar button indicates CONNECTED, click in the cell to select it and execute the contents in the following ways:\n", "\n", "* Click the **Play icon** in the left gutter of the cell; or\n", "* Type **Cmd/Ctrl + Enter** to run the cell in place.\n", "\n", "Good to know\n", "* **Hashtags (#)** are Python comments (they're ignored during code execution)\n", "* Use **Cmd/Ctrl + / ** to comment out a line of code (helpful during debugging)\n", "* When you execute a code block, anything within that code block can be referenced elsewhere in the notebook" ] }, { "cell_type": "code", "metadata": { "id": "psHZilLIH3ww" }, "source": [ "# Printing to screen\n", "print(\"I'm a code block\")\n", "\n", "# Defining variables\n", "a = 2\n", "b = 5\n", "c = a + b\n", "print(f\"a equals {a}\")\n", "print(f\"b equals {b}\")\n", "print(f\"a plus b equals {c}\")\n", "\n", "# Proper indentation is essential in Python\n", "for x in range(1,6):\n", " print(x)" ], "execution_count": null, "outputs": [] }, { "cell_type": "markdown", "metadata": { "id": "0WwBV_BrUeq-" }, "source": [ "# Future Customer Value Segments (FoCVS) - Automation Notebook" ] }, { "cell_type": "markdown", "metadata": { "id": "ZqfyCrSBy8Um" }, "source": [ "This notebook can be used to automate runs for the Customer Lifetime Value (CLV) prediction and segmentation data processing pipleine known as [FoCVS](https://github.com/GoogleCloudPlatform/cloud-for-marketing/tree/main/marketing-analytics/predicting/future-customer-value-segments).\n", "\n", "Please follow the [GCP installation steps](https://github.com/GoogleCloudPlatform/cloud-for-marketing/tree/main/marketing-analytics/predicting/future-customer-value-segments#gcp-steps) to install FoCVS within your Google Cloud Project **before** using this Colab notebook. " ] }, { "cell_type": "code", "metadata": { "id": "EG-cH7w95p3w" }, "source": [ "#@title Authenticate your user for this Colab notebook\n", "#@markdown This allows the Colab notebook to access GCP resources owned by you.\n", "from google.colab import auth\n", "auth.authenticate_user()" ], "execution_count": null, "outputs": [] }, { "cell_type": "code", "metadata": { "id": "z_csS3aYW12s" }, "source": [ "#@title Specify FoCVS pipeline parameters\n", "#@markdown Use the form below to define all the desired parameters, including Dataflow environment settings.\n", "GCP_PROJECT_ID = '' #@param {type:\"string\"}\n", "GCS_BUCKET_ID = '' #@param {type:\"string\"}\n", "\n", "JOB_NAME = '' #@param {type:\"string\"}\n", "BIGQUERY_INPUT_QUERY = '' #@param {type:\"string\"}\n", "INPUT_CUSTOMERID_COLUMN = '' #@param {type:\"string\"}\n", "INPUT_TRANSACTIONVALUE_COLUMN = '' #@param {type:\"string\"}\n", "INPUT_TRANSACTIONDATE_COLUMN = '' #@param {type:\"string\"}\n", "INPUT_TRANSACTIONDATE_FORMAT = 'YYYY-MM-DD' #@param [\"YYYY-MM-DD\", \"MM/DD/YY\", \"MM/DD/YYYY\", \"DD/MM/YY\", \"DD/MM/YYYY\", \"YYYYMMDD\"]\n", "\n", "#@markdown #### Extra dimensions\n", "#@markdown > Insert space-separated extra dimensions you would like to test.\n", "#@markdown A job will be created per extra dimension. For example:<br>\n", "#@markdown <br>JOB_NAME = my_job<br>INPUT_EXTRA_DIMENSIONS = dim1 dim2 dim3<br>**Resulting jobs**: *my_job_dim1, my_job_dim2, my_job_dim3*\n", "#@markdown <br><br>Leave empty if running without any extra dimensions.\n", "INPUT_EXTRA_DIMENSIONS = '' #@param {type:\"string\"}\n", "\n", "#@markdown #### Optional pipeline parameters\n", "MODEL_TYPE = 'MBGNBD' #@param [\"BGNBD\", \"MBGNBD\", \"PNBD\", \"BGBB\"]\n", "MODEL_TIME_GRANULARITY = 'Weekly' #@param [\"Daily\", \"Weekly\", \"Monthly\"]\n", "MODEL_CALIBRATION_START_DATE = '' #@param {type:\"string\"}\n", "MODEL_CALIBRATION_END_DATE = '' #@param {type:\"string\"}\n", "MODEL_COHORT_START_DATE = '' #@param {type:\"string\"}\n", "MODEL_COHORT_END_DATE = '' #@param {type:\"string\"}\n", "MODEL_HOLDOUT_END_DATE = '' #@param {type:\"string\"}\n", "MODEL_PREDICTION_PERIOD = 52 #@param {type:\"integer\"}\n", "MODEL_PENALIZER_COEFFICIENT = 0.0 #@param {type:\"number\"}\n", "MODEL_VALIDATION_ERROR_THRESHOLD = 15 #@param {type:\"number\"}\n", "OUTPUT_NUM_SEGMENTS = 5 #@param {type:\"integer\"}\n", "OUTPUT_ROUND_NUMBERS = False #@param {type:\"boolean\"}\n", "\n", "#@markdown #### Optional Dataflow and BigQuery environment parameters\n", "DATAFLOW_SERVICE_ACCOUNT_EMAIL = '' #@param {type:\"string\"}\n", "DATAFLOW_NUM_WORKERS = '' #@param {type:\"string\"}\n", "DATAFLOW_MACHINE_TYPE = '' #@param {type:\"string\"}\n", "#@markdown > View all machine types at https://cloud.google.com/compute/docs/machine-types.\n", "DATAFLOW_RUN_LOCATION = '' #@param {type:\"string\"}\n", "#@markdown > See https://cloud.google.com/dataflow/docs/concepts/regional-endpoints for more information.\n", "BIGQUERY_DATASET_LOCATION = '' #@param {type:\"string\"}\n", "#@markdown > See https://cloud.google.com/bigquery/docs/locations for supported locations.\n", "\n", "def get_runtime_environment(temp_location):\n", " runtime_environment = {\n", " 'tempLocation': temp_location\n", " }\n", " if DATAFLOW_SERVICE_ACCOUNT_EMAIL:\n", " runtime_environment['serviceAccountEmail'] = DATAFLOW_SERVICE_ACCOUNT_EMAIL\n", "\n", " if DATAFLOW_NUM_WORKERS:\n", " runtime_environment['numWorkers'] = int(DATAFLOW_NUM_WORKERS)\n", "\n", " if DATAFLOW_MACHINE_TYPE:\n", " runtime_environment['machineType'] = DATAFLOW_MACHINE_TYPE\n", "\n", " return runtime_environment\n", "\n", "def create_pipeline_request(extra_dimension=None):\n", " job_name = JOB_NAME.replace(' ', '_')\n", "\n", " if extra_dimension:\n", " job_name = f'{job_name}_{extra_dimension}'\n", "\n", " gcs_temp_location = f'gs://{GCS_BUCKET_ID}/temp/{job_name}/'\n", " env_temp_location = gcs_temp_location\n", " gcs_output_folder = f'gs://{GCS_BUCKET_ID}/output/{job_name}/'\n", " bq_output_dataset = job_name\n", "\n", " focvs_request = {\n", " 'jobName': job_name,\n", " 'parameters': {\n", " 'input_bq_query': BIGQUERY_INPUT_QUERY,\n", " 'input_bq_project': GCP_PROJECT_ID,\n", " 'temp_gcs_location': gcs_temp_location,\n", " 'output_folder': gcs_output_folder,\n", " 'output_bq_project': GCP_PROJECT_ID,\n", " 'output_bq_dataset': bq_output_dataset,\n", " 'customer_id_column_name': INPUT_CUSTOMERID_COLUMN,\n", " 'sales_column_name': INPUT_TRANSACTIONVALUE_COLUMN,\n", " 'transaction_date_column_name': INPUT_TRANSACTIONDATE_COLUMN,\n", " 'date_parsing_pattern': INPUT_TRANSACTIONDATE_FORMAT,\n", " 'frequency_model_type': MODEL_TYPE,\n", " 'model_time_granularity': MODEL_TIME_GRANULARITY,\n", " 'prediction_period': str(MODEL_PREDICTION_PERIOD),\n", " 'penalizer_coef': str(MODEL_PENALIZER_COEFFICIENT),\n", " 'transaction_frequency_threshold': str(MODEL_VALIDATION_ERROR_THRESHOLD),\n", " 'output_segments': str(OUTPUT_NUM_SEGMENTS),\n", " 'round_numbers': 'true' if OUTPUT_ROUND_NUMBERS else 'false'\n", " },\n", " 'environment': get_runtime_environment(env_temp_location)\n", " }\n", " if extra_dimension:\n", " focvs_request['parameters']['extra_dimension_column_name'] = extra_dimension\n", "\n", " if MODEL_CALIBRATION_START_DATE:\n", " focvs_request['parameters']['calibration_start_date'] = MODEL_CALIBRATION_START_DATE\n", "\n", " if MODEL_CALIBRATION_END_DATE:\n", " focvs_request['parameters']['calibration_end_date'] = MODEL_CALIBRATION_END_DATE\n", "\n", " if MODEL_COHORT_START_DATE:\n", " focvs_request['parameters']['cohort_start_date'] = MODEL_COHORT_START_DATE\n", "\n", " if MODEL_COHORT_END_DATE:\n", " focvs_request['parameters']['cohort_end_date'] = MODEL_COHORT_END_DATE\n", "\n", " if MODEL_HOLDOUT_END_DATE:\n", " focvs_request['parameters']['holdout_end_date'] = MODEL_HOLDOUT_END_DATE\n", " \n", " return focvs_request\n", "\n", "focvs_requests = []\n", "\n", "if INPUT_EXTRA_DIMENSIONS:\n", " for dimension in INPUT_EXTRA_DIMENSIONS.split():\n", " focvs_requests.append(create_pipeline_request(dimension))\n", "else:\n", " focvs_requests.append(create_pipeline_request())\n", "\n", "import json\n", "print(json.dumps(focvs_requests, indent=2))" ], "execution_count": null, "outputs": [] }, { "cell_type": "code", "metadata": { "id": "DDqegpJjl8wf" }, "source": [ "#@title Run FoCVS jobs\n", "#@markdown Executing this cell will create Dataflow jobs for all requests generated by the previous cell.\n", "#@markdown It is important to mention that a BigQuery dataset per Dataflow job - named after the job itself -\n", "#@markdown will be created before the job is executed. Existing datasets with the same name will be deleted first.\n", "#@markdown > **WARNING**: This process is not idempotent; Dataflow jobs will be created and executed each time this cell is run.\n", "from googleapiclient import discovery\n", "\n", "bigquery_client = discovery.build('bigquery', 'v2')\n", "dataflow_client = discovery.build('dataflow', 'v1b3')\n", "\n", "def create_bigquery_dataset_idempotent(focvs_request):\n", " dataset_id = focvs_request['parameters']['output_bq_dataset']\n", "\n", " try:\n", " bigquery_client.datasets().delete(\n", " projectId=GCP_PROJECT_ID,\n", " datasetId=dataset_id,\n", " deleteContents=True).execute()\n", " except:\n", " pass\n", "\n", " dataset_request_payload = {\n", " 'datasetReference': {\n", " 'projectId': GCP_PROJECT_ID,\n", " 'datasetId': dataset_id\n", " }\n", " }\n", " if BIGQUERY_DATASET_LOCATION:\n", " dataset_request_payload['location'] = BIGQUERY_DATASET_LOCATION\n", "\n", " bigquery_client.datasets().insert(\n", " projectId=GCP_PROJECT_ID,\n", " body=dataset_request_payload).execute()\n", "\n", "def run_dataflow_pipeline(focvs_request):\n", " gcs_path = f'gs://{GCS_BUCKET_ID}/templates/FoCVS-bq'\n", "\n", " if DATAFLOW_RUN_LOCATION:\n", " return dataflow_client.projects().locations().templates().launch(\n", " projectId=GCP_PROJECT_ID,\n", " location=DATAFLOW_RUN_LOCATION,\n", " gcsPath=gcs_path,\n", " body=focvs_request).execute()\n", " else:\n", " return dataflow_client.projects().templates().launch(\n", " projectId=GCP_PROJECT_ID,\n", " gcsPath=gcs_path,\n", " body=focvs_request).execute()\n", "\n", "responses = []\n", "\n", "for focvs_request in focvs_requests:\n", " create_bigquery_dataset_idempotent(focvs_request)\n", " responses.append(run_dataflow_pipeline(focvs_request))\n", "\n", "import json\n", "print(json.dumps(responses, indent=2))" ], "execution_count": null, "outputs": [] } ] }