pipelines/cdp/cdp_pipeline/customer_data_platform.py (78 lines of code) (raw):
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Customer Data Platform analytics pipeline for the Dataflow Solution Guides.
"""
import logging
import apache_beam as beam
from apache_beam import Pipeline, PCollection
from apache_beam.io.gcp.bigquery import WriteToBigQuery
import json
from apache_beam.transforms.window import FixedWindows
from apache_beam.transforms.trigger import AfterWatermark, AfterProcessingTime, AccumulationMode
from cdp_pipeline.options import MyPipelineOptions
# FIXME: This will not work in Dataflow, the schema should be passed as an option to the PTrasnform
# Get the schema of bigquery output table
with open("./schema/unified_table.json", encoding="utf-8") as schema_file:
output_schema = json.load(schema_file)
def left_join(key_value_pair):
_, values = key_value_pair
trans_values, coupon_redempt_values = values
if not coupon_redempt_values:
coupon_redempt_values = [None] # Fill missing values with None
for trans_value in trans_values:
if trans_value is not None:
for coupon_redempt_value in coupon_redempt_values:
coupon_redempt_value: dict
unified_data = {
"transaction_id":
trans_value["transaction_id"],
"household_key":
trans_value["household_key"],
"coupon_upc": # FIXME: Is this a dictionary?
coupon_redempt_value["coupon_upc"]
if coupon_redempt_value is not None else None,
"product_id":
trans_value["product_id"],
"coupon_discount":
trans_value["coupon_disc"],
}
yield unified_data
@beam.ptransform_fn
def _read_pub_sub_topic(p: Pipeline, topic: str) -> PCollection[str]:
msgs: PCollection[bytes] = (
p
| "Read subscription" >> beam.io.ReadFromPubSub(topic=topic)
| "Decode Transactions" >>
beam.Map(lambda msg: json.loads(msg.decode("utf-8")))
| "Add Transaction Key" >> beam.Map(lambda transaction: ((transaction[
"transaction_id"], transaction["household_key"]), transaction))
| "Window Transactions" >> beam.WindowInto(
FixedWindows(60),
trigger=AfterWatermark(early=AfterProcessingTime(10)),
accumulation_mode=AccumulationMode.DISCARDING))
return msgs
@beam.ptransform_fn
def _unify_data(pcolls: tuple[PCollection, PCollection]) -> PCollection[str]:
transactions_pcoll, coupons_redempt_pcoll = pcolls
unified_data = ((transactions_pcoll, coupons_redempt_pcoll)
| "Combine Transactions and Coupons" >> beam.CoGroupByKey()
| beam.FlatMap(left_join))
return unified_data
@beam.ptransform_fn
def _write_to_bq(unified_pcoll: PCollection, project_id: str,
output_dataset: str, output_table: str, unified_schema: str):
unified_pcoll | "Write to bigquery" >> \
WriteToBigQuery(
project=project_id,
dataset=output_dataset,
table=output_table,
schema=unified_schema,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
)
def create_and_run_pipeline(pipeline_options: MyPipelineOptions):
logging.info(pipeline_options)
with Pipeline(options=pipeline_options) as p:
# Read transcation pub-sub topic
transactions_pcoll = p | "Read transactions topic" >> _read_pub_sub_topic(
topic=pipeline_options.transactions_topic)
# Read coupon_redemption pub-sub topic
coupons_redempt_pcoll = p | "Read coupon redemption topic" >> _read_pub_sub_topic(
topic=pipeline_options.coupons_redemption_topic)
# call _unify_data to unify the data from two streaming sources
unified_data: PCollection = (transactions_pcoll, coupons_redempt_pcoll
) | "Transform" >> _unify_data()
# Write it to bigquery. Provide schema of the output table as parameter output_schema
unified_data | "Write to bigquery" >> _write_to_bq(
pipeline_options.project_id, pipeline_options.output_dataset,
pipeline_options.output_table, output_schema)