def create_and_run_pipeline()

in pipelines/cdp/cdp_pipeline/customer_data_platform.py [0:0]


def create_and_run_pipeline(pipeline_options: MyPipelineOptions):
  logging.info(pipeline_options)

  with Pipeline(options=pipeline_options) as p:

    # Read transcation pub-sub topic
    transactions_pcoll = p | "Read transactions topic" >> _read_pub_sub_topic(
        topic=pipeline_options.transactions_topic)
    # Read coupon_redemption pub-sub topic
    coupons_redempt_pcoll = p | "Read coupon redemption topic" >> _read_pub_sub_topic(
        topic=pipeline_options.coupons_redemption_topic)

    # call _unify_data to unify the data from two streaming sources
    unified_data: PCollection = (transactions_pcoll, coupons_redempt_pcoll
                                ) | "Transform" >> _unify_data()

    # Write it to bigquery. Provide schema of the output table as parameter output_schema
    unified_data | "Write to bigquery" >> _write_to_bq(
        pipeline_options.project_id, pipeline_options.output_dataset,
        pipeline_options.output_table, output_schema)