python/function/trigger_activation/main.py (48 lines of code) (raw):

# Copyright 2023 Google LLC # # Licensed under the Apache License, Version 2.0 (the 'License'); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an 'AS IS' BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import base64 import functions_framework import json import os from datetime import datetime from google.cloud import dataflow_v1beta3 from google.api_core.gapic_v1.client_info import ClientInfo USER_AGENT_ACTIVATION = 'cloud-solutions/marketing-analytics-jumpstart-activation-v1' @functions_framework.cloud_event def subscribe(cloud_event): """ This function is triggered by a Pub/Sub message. The message contains the activation type and the source table. The function then launches a Dataflow Flex Template to process the data and send the activation events to GA4. This function demonstrates how to use Cloud Functions to trigger a Dataflow Flex Template based on a Pub/Sub message. This allows for automated processing of data and sending activation events to GA4. Args: cloud_event: The CloudEvent message. Returns: None. """ # ACTIVATION_PROJECT: The Google Cloud project ID. project_id = os.environ.get('ACTIVATION_PROJECT') # ACTIVATION_REGION: The Google Cloud region where the Dataflow Flex Template will be launched. region = os.environ.get('ACTIVATION_REGION') # TEMPLATE_FILE_GCS_LOCATION: The Google Cloud Storage location of the Dataflow Flex Template file. template_file_gcs_location = os.environ.get('TEMPLATE_FILE_GCS_LOCATION') # GA4_MEASUREMENT_ID: The Google Analytics 4 measurement ID. ga4_measurement_id = os.environ.get('GA4_MEASUREMENT_ID') # GA4_MEASUREMENT_SECRET: The Google Analytics 4 measurement secret. ga4_measurement_secret = os.environ.get('GA4_MEASUREMENT_SECRET') # ACTIVATION_TYPE_CONFIGURATION: The path to a JSON file containing the configuration for the activation type. activation_type_configuration = os.environ.get('ACTIVATION_TYPE_CONFIGURATION') # PIPELINE_TEMP_LOCATION: The Google Cloud Storage location for temporary files used by the Dataflow Flex Template. temp_location = os.environ.get('PIPELINE_TEMP_LOCATION') # LOG_DATA_SET: The BigQuery dataset where the logs of the Dataflow Flex Template will be stored. log_db_dataset = os.environ.get('LOG_DATA_SET') # PIPELINE_WORKER_EMAIL: The service account email used by the Dataflow Flex Template workers. service_account_email = os.environ.get('PIPELINE_WORKER_EMAIL') # Decodes the base64 encoded data in the message and parses it as JSON. # It then extracts the activation_type and source_table values from the JSON object. message_data = base64.b64decode(cloud_event.data["message"]["data"]).decode() message_obj = json.loads(message_data) activation_type = message_obj['activation_type'] source_table = message_obj['source_table'] # Creates a FlexTemplateRuntimeEnvironment object with the service account email. environment_param = dataflow_v1beta3.FlexTemplateRuntimeEnvironment(service_account_email=service_account_email) # It then creates a dictionary of parameters for the Dataflow Flex Template, including the project ID, activation type, # activation type configuration, source table, temporary location, GA4 measurement ID, GA4 measurement secret, and log dataset. # Finally, it creates a LaunchFlexTemplateParameter object with the job name, container spec GCS path, environment, and parameters. parameters = { 'project': project_id, 'activation_type': activation_type, 'activation_type_configuration': activation_type_configuration, 'source_table': source_table, 'temp_location': temp_location, 'ga4_measurement_id': ga4_measurement_id, 'ga4_api_secret': ga4_measurement_secret, 'log_db_dataset': log_db_dataset } flex_template_param = dataflow_v1beta3.LaunchFlexTemplateParameter( job_name=f"activation-pipeline-{activation_type.replace('_','-')}-{datetime.now().strftime('%Y%m%d-%H%M%S')}", container_spec_gcs_path=template_file_gcs_location, environment=environment_param, parameters=parameters ) # Creates a LaunchFlexTemplateRequest object with the project ID, region, and launch parameter. # It then uses the FlexTemplatesServiceClient to launch the Dataflow Flex Template. request = dataflow_v1beta3.LaunchFlexTemplateRequest( project_id=project_id, location=region, launch_parameter=flex_template_param ) client = dataflow_v1beta3.FlexTemplatesServiceClient(client_info=ClientInfo(user_agent=USER_AGENT_ACTIVATION)) response = client.launch_flex_template(request=request) print(response)