def run()

in python/activation/main.py [0:0]


def run(argv=None):
  """
  Runs the activation application.

  Args:
    argv: The command-line arguments.
  """
  # Create the pipeline options.
  pipeline_options = GoogleCloudOptions(
    job_name="activation-processing",
    save_main_session=True)

  # Get the activation options.
  activation_options = pipeline_options.view_as(ActivationOptions)
  # Load the activation type configuration.
  logging.info(f"Loading activation type configuration from {activation_options}")
  activation_type_configuration = load_activation_type_configuration(activation_options)

  # Build the query to be used to retrieve data from the source table.
  logging.info(f"Building query to retrieve data from {activation_type_configuration}")
  load_from_source_query = build_query(activation_options, activation_type_configuration)
  logging.info(load_from_source_query)

  # Create a unique table suffix for the log tables.
  table_suffix =f"{datetime.datetime.today().strftime('%Y_%m_%d')}_{str(uuid.uuid4())[:8]}"
  # Create the log table names.
  log_table_names = [f'activation_log_{table_suffix}', f'activation_retry_{table_suffix}']
  # Create the log table schema.
  table_schema = {
    'fields': [{
      'name': 'id', 'type': 'STRING', 'mode': 'REQUIRED'
      }, {
      'name': 'activation_id', 'type': 'STRING', 'mode': 'REQUIRED'
      }, {
      'name': 'payload', 'type': 'STRING', 'mode': 'REQUIRED'
      }, {
      'name': 'latest_state', 'type': 'STRING', 'mode': 'REQUIRED'
      }, {
      'name': 'updated_at', 'type': 'TIMESTAMP', 'mode': 'REQUIRED'
    }]
  }

  # Create the BigQuery table references for the log tables.
  success_log_table_spec = bigquery.TableReference(
    projectId=activation_options.project,
    datasetId=activation_options.log_db_dataset,
    tableId=log_table_names[0])

  failure_log_table_spec = bigquery.TableReference(
    projectId=activation_options.project,
    datasetId=activation_options.log_db_dataset,
    tableId=log_table_names[1])

  # Create the pipeline.
  with beam.Pipeline(options=pipeline_options) as p:
    # Read the data from the source table.
    measurement_api_responses = (p
    | beam.io.gcp.bigquery.ReadFromBigQuery(project=activation_options.project,
        query=load_from_source_query,
        use_json_exports=True,
        use_standard_sql=True)
    | 'Prepare Measurement Protocol API payload' >> beam.ParDo(TransformToPayload(activation_type_configuration['activation_event_name']))
    | 'POST event to Measurement Protocol API' >> beam.ParDo(CallMeasurementProtocolAPI(activation_options.ga4_measurement_id, activation_options.ga4_api_secret, debug=activation_options.use_api_validation))
    )

    # Filter the successful responses
    success_responses = ( measurement_api_responses
    | 'Get the successful responses' >> beam.Filter(lambda element: element[1] == requests.status_codes.codes.NO_CONTENT)
    )

    # Filter the failed responses
    failed_responses = ( measurement_api_responses
    | 'Get the failed responses' >> beam.Filter(lambda element: element[1] != requests.status_codes.codes.NO_CONTENT)
    )

    # Store the successful responses in the log tables
    _ = ( success_responses
    | 'Transform log format' >> beam.ParDo(ToLogFormat())
    | 'Store to log BQ table' >> beam.io.WriteToBigQuery(
      success_log_table_spec,
      schema=table_schema,
      write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
      create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
    )

    # Store the failed responses in the log tables
    _ = ( failed_responses
    | 'Transform failure log format' >> beam.ParDo(ToLogFormat())
    | 'Store to failure log BQ table' >> beam.io.WriteToBigQuery(
      failure_log_table_spec,
      schema=table_schema,
      write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
      create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
    )