pipelines/cdp/cdp_pipeline/generate_transaction_data.py (46 lines of code) (raw):
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
A data generator for the Customer Data Platform analytics pipeline.
"""
from google.cloud import pubsub_v1
import json
import pandas as pd
import asyncio
async def publish_coupons_to_pubsub():
bucket_name = "<bucket_name>"
project_id = "<project_id>"
# Example: ["27601281299","27757099033","28235291311","27021203242","27101290145","27853175697"]
transactions_id = [
"<List of sample transaction IDs to test the pipeline on.>"
]
transactions_topic_name = "transactions"
# Reference example - "dataflow-solution-guide-cdp/input_data/transaction_data.csv"
transactions_data = "<path to transactions data in gcs bucket>"
coupons_topic_name = "coupon_redemption"
# reference example - "dataflow-solution-guide-cdp/input_data/coupon_redempt.csv"
coupons_data = "<path to coupon redemption data in gcs bucket>"
transactions_df = pd.read_csv(
f"gs://{bucket_name}/{transactions_data}", dtype=str)
coupons_df = pd.read_csv(f"gs://{bucket_name}/{coupons_data}", dtype=str)
publisher = pubsub_v1.PublisherClient()
transactions_topic_path = publisher.topic_path(project_id,
transactions_topic_name)
coupons_topic_path = publisher.topic_path(project_id, coupons_topic_name)
filtered_trans_df = transactions_df[transactions_df["transaction_id"].isin(
transactions_id)]
filtered_coupons_df = coupons_df[coupons_df["transaction_id"].isin(
transactions_id)]
await asyncio.gather(
publish_coupons(filtered_coupons_df, publisher, coupons_topic_path),
publish_transactions(filtered_trans_df, publisher,
transactions_topic_path))
async def publish_coupons(filtered_coupons_df, publisher, coupons_topic_path):
for _, row in filtered_coupons_df.iterrows():
coupon_message = json.dumps(row.to_dict()).encode("utf-8")
print(coupon_message)
future = publisher.publish(coupons_topic_path, coupon_message)
print(f"Published coupon message ID: {future.result()}")
await asyncio.sleep(3)
async def publish_transactions(filtered_trans_df, publisher,
transactions_topic_path):
for _, row in filtered_trans_df.iterrows():
transaction_message = json.dumps(row.to_dict()).encode("utf-8")
print(transaction_message)
future = publisher.publish(transactions_topic_path, transaction_message)
print(f"Published transaction message ID: {future.result()}")
await asyncio.sleep(1)
if __name__ == "__main__":
asyncio.run(publish_coupons_to_pubsub())