def _read_pub_sub_topic()

in pipelines/cdp/cdp_pipeline/customer_data_platform.py [0:0]


def _read_pub_sub_topic(p: Pipeline, topic: str) -> PCollection[str]:
  msgs: PCollection[bytes] = (
      p
      | "Read subscription" >> beam.io.ReadFromPubSub(topic=topic)
      | "Decode Transactions" >>
      beam.Map(lambda msg: json.loads(msg.decode("utf-8")))
      | "Add Transaction Key" >> beam.Map(lambda transaction: ((transaction[
          "transaction_id"], transaction["household_key"]), transaction))
      | "Window Transactions" >> beam.WindowInto(
          FixedWindows(60),
          trigger=AfterWatermark(early=AfterProcessingTime(10)),
          accumulation_mode=AccumulationMode.DISCARDING))

  return msgs