# Make a copy of this notebook! 

# Intro to Colab

**60 second crash course in Colab notebooks**

A notebook is a list of cells. Cells contain either **explanatory text** or **executable code** and its output. This is a **text cell**. You can double-click to edit this cell.

Once the toolbar button indicates CONNECTED, click in the cell to select it and execute the contents in the following ways:

* Click the **Play icon** in the left gutter of the cell; or
* Type **Cmd/Ctrl + Enter** to run the cell in place.

Good to know
* **Hashtags (#)** are Python comments (they're ignored during code execution)
* Use **Cmd/Ctrl + / ** to comment out a line of code (helpful during debugging)
* When you execute a code block, anything within that code block can be referenced elsewhere in the notebook

In [None]:
# Printing to screen
print("I'm a code block")

# Defining variables
a = 2
b = 5
c = a + b
print(f"a equals {a}")
print(f"b equals {b}")
print(f"a plus b equals {c}")

# Proper indentation is essential in Python
for x in range(1,6):
  print(x)

# Future Customer Value Segments (FoCVS) - Automation Notebook

This notebook can be used to automate runs for the Customer Lifetime Value (CLV) prediction and segmentation data processing pipleine known as [FoCVS](https://github.com/GoogleCloudPlatform/cloud-for-marketing/tree/main/marketing-analytics/predicting/future-customer-value-segments).

Please follow the [GCP installation steps](https://github.com/GoogleCloudPlatform/cloud-for-marketing/tree/main/marketing-analytics/predicting/future-customer-value-segments#gcp-steps) to install FoCVS within your Google Cloud Project **before** using this Colab notebook. 

In [None]:
#@title Authenticate your user for this Colab notebook
#@markdown This allows the Colab notebook to access GCP resources owned by you.
from google.colab import auth
auth.authenticate_user()

In [None]:
#@title Specify FoCVS pipeline parameters
#@markdown Use the form below to define all the desired parameters, including Dataflow environment settings.
GCP_PROJECT_ID = '' #@param {type:"string"}
GCS_BUCKET_ID = '' #@param {type:"string"}

JOB_NAME = '' #@param {type:"string"}
BIGQUERY_INPUT_QUERY = '' #@param {type:"string"}
INPUT_CUSTOMERID_COLUMN = '' #@param {type:"string"}
INPUT_TRANSACTIONVALUE_COLUMN = '' #@param {type:"string"}
INPUT_TRANSACTIONDATE_COLUMN = '' #@param {type:"string"}
INPUT_TRANSACTIONDATE_FORMAT = 'YYYY-MM-DD' #@param ["YYYY-MM-DD", "MM/DD/YY", "MM/DD/YYYY", "DD/MM/YY", "DD/MM/YYYY", "YYYYMMDD"]

#@markdown #### Extra dimensions
#@markdown > Insert space-separated extra dimensions you would like to test.
#@markdown A job will be created per extra dimension. For example:<br>
#@markdown <br>JOB_NAME = my_job<br>INPUT_EXTRA_DIMENSIONS = dim1 dim2 dim3<br>**Resulting jobs**: *my_job_dim1, my_job_dim2, my_job_dim3*
#@markdown <br><br>Leave empty if running without any extra dimensions.
INPUT_EXTRA_DIMENSIONS = '' #@param {type:"string"}

#@markdown #### Optional pipeline parameters
MODEL_TYPE = 'MBGNBD' #@param ["BGNBD", "MBGNBD", "PNBD", "BGBB"]
MODEL_TIME_GRANULARITY = 'Weekly' #@param ["Daily", "Weekly", "Monthly"]
MODEL_CALIBRATION_START_DATE = '' #@param {type:"string"}
MODEL_CALIBRATION_END_DATE = '' #@param {type:"string"}
MODEL_COHORT_START_DATE = '' #@param {type:"string"}
MODEL_COHORT_END_DATE = '' #@param {type:"string"}
MODEL_HOLDOUT_END_DATE = '' #@param {type:"string"}
MODEL_PREDICTION_PERIOD = 52 #@param {type:"integer"}
MODEL_PENALIZER_COEFFICIENT = 0.0 #@param {type:"number"}
MODEL_VALIDATION_ERROR_THRESHOLD = 15 #@param {type:"number"}
OUTPUT_NUM_SEGMENTS = 5 #@param {type:"integer"}
OUTPUT_ROUND_NUMBERS = False #@param {type:"boolean"}

#@markdown #### Optional Dataflow and BigQuery environment parameters
DATAFLOW_SERVICE_ACCOUNT_EMAIL = '' #@param {type:"string"}
DATAFLOW_NUM_WORKERS = '' #@param {type:"string"}
DATAFLOW_MACHINE_TYPE = '' #@param {type:"string"}
#@markdown > View all machine types at https://cloud.google.com/compute/docs/machine-types.
DATAFLOW_RUN_LOCATION = '' #@param {type:"string"}
#@markdown > See https://cloud.google.com/dataflow/docs/concepts/regional-endpoints for more information.
BIGQUERY_DATASET_LOCATION = '' #@param {type:"string"}
#@markdown > See https://cloud.google.com/bigquery/docs/locations for supported locations.

def get_runtime_environment(temp_location):
  runtime_environment = {
    'tempLocation': temp_location
  }
  if DATAFLOW_SERVICE_ACCOUNT_EMAIL:
    runtime_environment['serviceAccountEmail'] = DATAFLOW_SERVICE_ACCOUNT_EMAIL

  if DATAFLOW_NUM_WORKERS:
    runtime_environment['numWorkers'] = int(DATAFLOW_NUM_WORKERS)

  if DATAFLOW_MACHINE_TYPE:
    runtime_environment['machineType'] = DATAFLOW_MACHINE_TYPE

  return runtime_environment

def create_pipeline_request(extra_dimension=None):
  job_name = JOB_NAME.replace(' ', '_')

  if extra_dimension:
    job_name = f'{job_name}_{extra_dimension}'

  gcs_temp_location = f'gs://{GCS_BUCKET_ID}/temp/{job_name}/'
  env_temp_location = gcs_temp_location
  gcs_output_folder = f'gs://{GCS_BUCKET_ID}/output/{job_name}/'
  bq_output_dataset = job_name

  focvs_request = {
    'jobName': job_name,
    'parameters': {
      'input_bq_query': BIGQUERY_INPUT_QUERY,
      'input_bq_project': GCP_PROJECT_ID,
      'temp_gcs_location': gcs_temp_location,
      'output_folder': gcs_output_folder,
      'output_bq_project': GCP_PROJECT_ID,
      'output_bq_dataset': bq_output_dataset,
      'customer_id_column_name': INPUT_CUSTOMERID_COLUMN,
      'sales_column_name': INPUT_TRANSACTIONVALUE_COLUMN,
      'transaction_date_column_name': INPUT_TRANSACTIONDATE_COLUMN,
      'date_parsing_pattern': INPUT_TRANSACTIONDATE_FORMAT,
      'frequency_model_type': MODEL_TYPE,
      'model_time_granularity': MODEL_TIME_GRANULARITY,
      'prediction_period': str(MODEL_PREDICTION_PERIOD),
      'penalizer_coef': str(MODEL_PENALIZER_COEFFICIENT),
      'transaction_frequency_threshold': str(MODEL_VALIDATION_ERROR_THRESHOLD),
      'output_segments': str(OUTPUT_NUM_SEGMENTS),
      'round_numbers': 'true' if OUTPUT_ROUND_NUMBERS else 'false'
    },
    'environment': get_runtime_environment(env_temp_location)
  }
  if extra_dimension:
    focvs_request['parameters']['extra_dimension_column_name'] = extra_dimension

  if MODEL_CALIBRATION_START_DATE:
    focvs_request['parameters']['calibration_start_date'] = MODEL_CALIBRATION_START_DATE

  if MODEL_CALIBRATION_END_DATE:
    focvs_request['parameters']['calibration_end_date'] = MODEL_CALIBRATION_END_DATE

  if MODEL_COHORT_START_DATE:
    focvs_request['parameters']['cohort_start_date'] = MODEL_COHORT_START_DATE

  if MODEL_COHORT_END_DATE:
    focvs_request['parameters']['cohort_end_date'] = MODEL_COHORT_END_DATE

  if MODEL_HOLDOUT_END_DATE:
    focvs_request['parameters']['holdout_end_date'] = MODEL_HOLDOUT_END_DATE
  
  return focvs_request

focvs_requests = []

if INPUT_EXTRA_DIMENSIONS:
  for dimension in INPUT_EXTRA_DIMENSIONS.split():
    focvs_requests.append(create_pipeline_request(dimension))
else:
  focvs_requests.append(create_pipeline_request())

import json
print(json.dumps(focvs_requests, indent=2))

In [None]:
#@title Run FoCVS jobs
#@markdown Executing this cell will create Dataflow jobs for all requests generated by the previous cell.
#@markdown It is important to mention that a BigQuery dataset per Dataflow job - named after the job itself -
#@markdown will be created before the job is executed. Existing datasets with the same name will be deleted first.
#@markdown > **WARNING**: This process is not idempotent; Dataflow jobs will be created and executed each time this cell is run.
from googleapiclient import discovery

bigquery_client = discovery.build('bigquery', 'v2')
dataflow_client = discovery.build('dataflow', 'v1b3')

def create_bigquery_dataset_idempotent(focvs_request):
  dataset_id = focvs_request['parameters']['output_bq_dataset']

  try:
    bigquery_client.datasets().delete(
        projectId=GCP_PROJECT_ID,
        datasetId=dataset_id,
        deleteContents=True).execute()
  except:
    pass

  dataset_request_payload = {
    'datasetReference': {
      'projectId': GCP_PROJECT_ID,
      'datasetId': dataset_id
    }
  }
  if BIGQUERY_DATASET_LOCATION:
    dataset_request_payload['location'] = BIGQUERY_DATASET_LOCATION

  bigquery_client.datasets().insert(
      projectId=GCP_PROJECT_ID,
      body=dataset_request_payload).execute()

def run_dataflow_pipeline(focvs_request):
  gcs_path = f'gs://{GCS_BUCKET_ID}/templates/FoCVS-bq'

  if DATAFLOW_RUN_LOCATION:
    return dataflow_client.projects().locations().templates().launch(
      projectId=GCP_PROJECT_ID,
      location=DATAFLOW_RUN_LOCATION,
      gcsPath=gcs_path,
      body=focvs_request).execute()
  else:
    return dataflow_client.projects().templates().launch(
      projectId=GCP_PROJECT_ID,
      gcsPath=gcs_path,
      body=focvs_request).execute()

responses = []

for focvs_request in focvs_requests:
  create_bigquery_dataset_idempotent(focvs_request)
  responses.append(run_dataflow_pipeline(focvs_request))

import json
print(json.dumps(responses, indent=2))