courses/DSL/pub-sub-examples/pub-sub-simulated-clicks-feed.ipynb (247 lines of code) (raw):
{
"cells": [
{
"cell_type": "markdown",
"id": "e1405808-2f7d-4d80-aa94-ca41a378f9fb",
"metadata": {},
"source": [
"# Pub/Sub Simulated Click Stream"
]
},
{
"cell_type": "markdown",
"id": "71697b95-f75e-492d-a48b-e2f311f57a63",
"metadata": {},
"source": [
"## Make up a random Click event. "
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "3b513fe1-8022-49f8-a83f-0ae7a87c9ccc",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"import json\n",
"import time\n",
"import random\n",
"from datetime import datetime\n",
"\n",
"# Arrays of usernames and products\n",
"usernames = [\n",
" \"tech_savvy\", \"gamer123\", \"pc_builder\", \"keyboard_warrior\", \"mouse_master\", \"laptop_lover\",\n",
" \"coder_jane\", \"geek_guy\", \"hardware_harry\", \"pc_pro\", \"desktop_dude\", \"monitor_maven\",\n",
" \"peripheral_queen\", \"it_guru\", \"ssd_speedster\", \"ram_rocker\", \"techie_tom\", \"gadget_girl\",\n",
" \"gpu_guru\", \"motherboard_mike\", \"keyboard_king\", \"tech_traveler\", \"screen_savvy\", \"build_master\",\n",
" \"device_diva\"\n",
"]\n",
"\n",
"products = [\n",
" {\"id\": \"P001\", \"name\": \"Gaming Laptop\"}, {\"id\": \"P002\", \"name\": \"Mechanical Keyboard\"},\n",
" {\"id\": \"P003\", \"name\": \"Wireless Mouse\"}, {\"id\": \"P004\", \"name\": \"4K Monitor\"},\n",
" {\"id\": \"P005\", \"name\": \"Gaming Chair\"}, {\"id\": \"P006\", \"name\": \"Graphics Card\"},\n",
" {\"id\": \"P007\", \"name\": \"External SSD\"}, {\"id\": \"P008\", \"name\": \"Desktop PC\"},\n",
" {\"id\": \"P009\", \"name\": \"Laptop Stand\"}, {\"id\": \"P010\", \"name\": \"USB-C Hub\"},\n",
" {\"id\": \"P011\", \"name\": \"Noise Cancelling Headphones\"}, {\"id\": \"P012\", \"name\": \"Gaming Mouse Pad\"},\n",
" {\"id\": \"P013\", \"name\": \"Webcam\"}, {\"id\": \"P014\", \"name\": \"Bluetooth Speakers\"},\n",
" {\"id\": \"P015\", \"name\": \"VR Headset\"}, {\"id\": \"P016\", \"name\": \"CPU Cooler\"},\n",
" {\"id\": \"P017\", \"name\": \"RAM Kit\"}, {\"id\": \"P018\", \"name\": \"Power Supply Unit\"},\n",
" {\"id\": \"P019\", \"name\": \"Motherboard\"}, {\"id\": \"P020\", \"name\": \"Portable Hard Drive\"},\n",
" {\"id\": \"P021\", \"name\": \"Surge Protector\"}, {\"id\": \"P022\", \"name\": \"Ethernet Cable\"},\n",
" {\"id\": \"P023\", \"name\": \"Laptop Bag\"}, {\"id\": \"P024\", \"name\": \"Smartphone Holder\"},\n",
" {\"id\": \"P025\", \"name\": \"USB Flash Drive\"}\n",
"]\n",
"\n",
"# Function to generate random click data\n",
"def generate_click_data():\n",
" username = random.choice(usernames)\n",
" route = random.choice(['/home', '/about', '/contact', '/products', '/cart'])\n",
" parameters = []\n",
" \n",
" # Add a product if the route is /products\n",
" if route == '/products':\n",
" product = random.choice(products)\n",
" parameters.append({\"name\": \"product_id\", \"value\": product[\"id\"]})\n",
" parameters.append({\"name\": \"product_name\", \"value\": product[\"name\"]})\n",
" \n",
" \n",
" if route == '/cart':\n",
" product = random.choice(products)\n",
" parameters.append({\"name\": \"product_id\", \"value\": product[\"id\"]})\n",
" parameters.append({\"name\": \"product_name\", \"value\": product[\"name\"]})\n",
" parameters.append({\"name\": \"quantity\", \"value\": str(random.randint(1, 3))})\n",
" \n",
" \n",
" # Set the method based on the route\n",
" if route == '/cart':\n",
" method = 'POST'\n",
" else:\n",
" method = 'GET'\n",
" \n",
" click_data = {\n",
" 'timestamp': datetime.utcnow().isoformat() + 'Z',\n",
" 'username': username,\n",
" 'route': route,\n",
" 'method': method,\n",
" 'parameters': parameters\n",
" }\n",
" return click_data\n",
"\n",
"\n",
"print(\"Done\")"
]
},
{
"cell_type": "markdown",
"id": "e53afab3-d3e6-41e7-a25b-bc1e440a4d31",
"metadata": {},
"source": [
"## Print some example click events"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "40f2e0e8-725d-4bec-b88b-a19b40f80235",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"# Call the generate_click_data function 50 times and print the results\n",
"for _ in range(50):\n",
" click_data = generate_click_data()\n",
" if click_data[\"method\"] == \"POST\":\n",
" print(click_data)\n"
]
},
{
"cell_type": "markdown",
"id": "b967ccd6-5d9d-4dbb-a5e2-7c407c3675de",
"metadata": {},
"source": [
"# Send a Click event to Pub/Sub"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "e3544d3c-b646-4d8b-8595-341cb1bab2e4",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"from google.cloud import pubsub_v1\n",
"\n",
"# Example usage\n",
"project_id = 'dsl-dar'\n",
"topic_id = 'clicks'\n",
"\n",
"def publish_click_data(project_id, topic_id):\n",
" publisher = pubsub_v1.PublisherClient()\n",
" topic_path = publisher.topic_path(project_id, topic_id)\n",
"\n",
" click_data = generate_click_data()\n",
" data_str = json.dumps(click_data)\n",
" data_bytes = data_str.encode('utf-8')\n",
"\n",
" future = publisher.publish(topic_path, data_bytes)\n",
" print(f'Published message ID: {future.result()}')\n"
]
},
{
"cell_type": "markdown",
"id": "fc064e87-8c20-410c-9aee-a706473a041d",
"metadata": {},
"source": [
"# Send a message"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "87bea16a-96f6-4387-baa2-b9998ce5cf68",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"# Call the publish_click_data function\n",
"publish_click_data(project_id, topic_id)"
]
},
{
"cell_type": "markdown",
"id": "122d5525-6d8f-4e9d-94ae-ee4c798711df",
"metadata": {},
"source": [
"# Send some number of messages for a specified duration. "
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "ace31f61-858d-4dfc-acf9-61096d884a1c",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"times_per_minute = 120\n",
"duration_minutes = 10 # Specify the duration in minutes\n",
"interval = 60 / times_per_minute\n",
"\n",
"# Loop to call the publish_click_data function a specified number of times per minute for a specified duration\n",
"end_time = time.time() + duration_minutes * 60\n",
"count = 0\n",
"while time.time() < end_time:\n",
" for _ in range(times_per_minute):\n",
" publish_click_data(project_id, topic_id)\n",
" count = count + 1\n",
" time.sleep(interval)\n",
"\n",
"print(\"Done\")\n",
"print(count)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "cf39e295-d670-478a-abe7-3048b93958ca",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"environment": {
"kernel": "apache-beam-2.56.0",
"name": ".m116",
"type": "gcloud",
"uri": "gcr.io/deeplearning-platform-release/:m116"
},
"kernelspec": {
"display_name": "Apache Beam 2.56.0 (Local)",
"language": "python",
"name": "apache-beam-2.56.0"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.14"
}
},
"nbformat": 4,
"nbformat_minor": 5
}