in pipelines/cdp/cdp_pipeline/customer_data_platform.py [0:0]
def _read_pub_sub_topic(p: Pipeline, topic: str) -> PCollection[str]:
msgs: PCollection[bytes] = (
p
| "Read subscription" >> beam.io.ReadFromPubSub(topic=topic)
| "Decode Transactions" >>
beam.Map(lambda msg: json.loads(msg.decode("utf-8")))
| "Add Transaction Key" >> beam.Map(lambda transaction: ((transaction[
"transaction_id"], transaction["household_key"]), transaction))
| "Window Transactions" >> beam.WindowInto(
FixedWindows(60),
trigger=AfterWatermark(early=AfterProcessingTime(10)),
accumulation_mode=AccumulationMode.DISCARDING))
return msgs