in pipelines/cdp/cdp_pipeline/customer_data_platform.py [0:0]
def create_and_run_pipeline(pipeline_options: MyPipelineOptions):
logging.info(pipeline_options)
with Pipeline(options=pipeline_options) as p:
# Read transcation pub-sub topic
transactions_pcoll = p | "Read transactions topic" >> _read_pub_sub_topic(
topic=pipeline_options.transactions_topic)
# Read coupon_redemption pub-sub topic
coupons_redempt_pcoll = p | "Read coupon redemption topic" >> _read_pub_sub_topic(
topic=pipeline_options.coupons_redemption_topic)
# call _unify_data to unify the data from two streaming sources
unified_data: PCollection = (transactions_pcoll, coupons_redempt_pcoll
) | "Transform" >> _unify_data()
# Write it to bigquery. Provide schema of the output table as parameter output_schema
unified_data | "Write to bigquery" >> _write_to_bq(
pipeline_options.project_id, pipeline_options.output_dataset,
pipeline_options.output_table, output_schema)