in python/activation/main.py [0:0]
def run(argv=None):
"""
Runs the activation application.
Args:
argv: The command-line arguments.
"""
# Create the pipeline options.
pipeline_options = GoogleCloudOptions(
job_name="activation-processing",
save_main_session=True)
# Get the activation options.
activation_options = pipeline_options.view_as(ActivationOptions)
# Load the activation type configuration.
logging.info(f"Loading activation type configuration from {activation_options}")
activation_type_configuration = load_activation_type_configuration(activation_options)
# Build the query to be used to retrieve data from the source table.
logging.info(f"Building query to retrieve data from {activation_type_configuration}")
load_from_source_query = build_query(activation_options, activation_type_configuration)
logging.info(load_from_source_query)
# Create a unique table suffix for the log tables.
table_suffix =f"{datetime.datetime.today().strftime('%Y_%m_%d')}_{str(uuid.uuid4())[:8]}"
# Create the log table names.
log_table_names = [f'activation_log_{table_suffix}', f'activation_retry_{table_suffix}']
# Create the log table schema.
table_schema = {
'fields': [{
'name': 'id', 'type': 'STRING', 'mode': 'REQUIRED'
}, {
'name': 'activation_id', 'type': 'STRING', 'mode': 'REQUIRED'
}, {
'name': 'payload', 'type': 'STRING', 'mode': 'REQUIRED'
}, {
'name': 'latest_state', 'type': 'STRING', 'mode': 'REQUIRED'
}, {
'name': 'updated_at', 'type': 'TIMESTAMP', 'mode': 'REQUIRED'
}]
}
# Create the BigQuery table references for the log tables.
success_log_table_spec = bigquery.TableReference(
projectId=activation_options.project,
datasetId=activation_options.log_db_dataset,
tableId=log_table_names[0])
failure_log_table_spec = bigquery.TableReference(
projectId=activation_options.project,
datasetId=activation_options.log_db_dataset,
tableId=log_table_names[1])
# Create the pipeline.
with beam.Pipeline(options=pipeline_options) as p:
# Read the data from the source table.
measurement_api_responses = (p
| beam.io.gcp.bigquery.ReadFromBigQuery(project=activation_options.project,
query=load_from_source_query,
use_json_exports=True,
use_standard_sql=True)
| 'Prepare Measurement Protocol API payload' >> beam.ParDo(TransformToPayload(activation_type_configuration['activation_event_name']))
| 'POST event to Measurement Protocol API' >> beam.ParDo(CallMeasurementProtocolAPI(activation_options.ga4_measurement_id, activation_options.ga4_api_secret, debug=activation_options.use_api_validation))
)
# Filter the successful responses
success_responses = ( measurement_api_responses
| 'Get the successful responses' >> beam.Filter(lambda element: element[1] == requests.status_codes.codes.NO_CONTENT)
)
# Filter the failed responses
failed_responses = ( measurement_api_responses
| 'Get the failed responses' >> beam.Filter(lambda element: element[1] != requests.status_codes.codes.NO_CONTENT)
)
# Store the successful responses in the log tables
_ = ( success_responses
| 'Transform log format' >> beam.ParDo(ToLogFormat())
| 'Store to log BQ table' >> beam.io.WriteToBigQuery(
success_log_table_spec,
schema=table_schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
)
# Store the failed responses in the log tables
_ = ( failed_responses
| 'Transform failure log format' >> beam.ParDo(ToLogFormat())
| 'Store to failure log BQ table' >> beam.io.WriteToBigQuery(
failure_log_table_spec,
schema=table_schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
)