data-analytics/beam_multimodal_streaming/data_generator.ipynb (615 lines of code) (raw):
{
"cells": [
{
"cell_type": "markdown",
"id": "FrEvLbIKex0b",
"metadata": {
"id": "FrEvLbIKex0b"
},
"source": [
"Fill these in first"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "Zr1cCN59e1Q0",
"metadata": {
"id": "Zr1cCN59e1Q0"
},
"outputs": [],
"source": [
"import os\n",
"\n",
"# Fill In\n",
"os.environ['GOOGLE_CLOUD_PROJECT'] = \"\""
]
},
{
"cell_type": "markdown",
"id": "U0mBaA_PduKp",
"metadata": {
"id": "U0mBaA_PduKp"
},
"source": [
"# Generator Setup"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "mThoyj9he54R",
"metadata": {
"id": "mThoyj9he54R"
},
"outputs": [],
"source": [
"\n",
"google_cloud_project = os.environ.get(\"GOOGLE_CLOUD_PROJECT\")\n",
"# Location where you put your two files in setup \n",
"image_with_people = f\"gs://{google_cloud_project}-gcs/entrance with people.jpg\"\n",
"image_empty = f\"gs://{google_cloud_project}-gcs/entrance_empty.jpg\" \n",
"user_file = f\"gs://{google_cloud_project}-gcs/members.txt\""
]
},
{
"cell_type": "markdown",
"id": "gDj0PT8tdrF2",
"metadata": {
"id": "gDj0PT8tdrF2"
},
"source": [
"## Pip installs & Imports"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "PXNBgIqIazf2",
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
},
"executionInfo": {
"elapsed": 11235,
"status": "ok",
"timestamp": 1724343940479,
"user": {
"displayName": "",
"userId": ""
},
"user_tz": 420
},
"id": "PXNBgIqIazf2",
"outputId": "7cec3b5e-d785-4a9e-affa-d4a0f61d8f52"
},
"outputs": [],
"source": [
"%pip install google-cloud-storage\n",
"%pip install --upgrade google-cloud-pubsub"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "ZW4SPa0V52a4",
"metadata": {
"id": "ZW4SPa0V52a4"
},
"outputs": [],
"source": [
"import time\n",
"import uuid\n",
"import random\n",
"from google.cloud import storage\n",
"from urllib.parse import urlparse\n",
"from copy import deepcopy\n",
"import os\n",
"from google.cloud import pubsub_v1\n",
"import json"
]
},
{
"cell_type": "markdown",
"id": "F7xa8LiUuIXQ",
"metadata": {
"id": "F7xa8LiUuIXQ"
},
"source": [
"## Members Lists"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "QfLKVtbJYegt",
"metadata": {
"id": "QfLKVtbJYegt"
},
"outputs": [],
"source": [
"# Helper function to split apart the GCS URI\n",
"def decode_gcs_url(url):\n",
" # Read the URI and parse it\n",
" p = urlparse(url)\n",
" bucket = p.netloc\n",
" file_path = p.path[0:].split('/', 1)\n",
" # Return the relevant objects (bucket, path to object)\n",
" return bucket, file_path[1]\n",
"\n",
"# We can't use the image load from local file since it expects a local path\n",
"# We use a GCS URL and get the bytes of the image\n",
"def read_file(object_path):\n",
" # Parse the path\n",
" bucket, file_path = decode_gcs_url(object_path)\n",
" storage_client = storage.Client()\n",
" bucket = storage_client.bucket(bucket)\n",
" blob = bucket.blob(file_path)\n",
" # Return the object as bytes\n",
" return blob.download_as_text()"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "37kDGvc6bOqtRHvO4hy09HDE",
"metadata": {
"id": "37kDGvc6bOqtRHvO4hy09HDE",
"tags": []
},
"outputs": [],
"source": [
"\n",
"\n",
"class member():\n",
" def __init__(self,first_name,last_name,parking_benefits,tier):\n",
" self.first_name = first_name\n",
" self.last_name = last_name\n",
" self.parking_benefits = parking_benefits\n",
" self.tier = tier\n",
"\n",
" def to_dict(self):\n",
" return {\"first_name\":self.first_name, \"last_name\":self.last_name, \"parking_benefits\":self.parking_benefits, \"tier\":self.tier}\n",
"\n",
"class member_list():\n",
" def __init__(self):\n",
" self.members = {}\n",
"\n",
" def load(self):\n",
" lines = read_file(user_file)\n",
" for line in lines.split('\\n')[1:]:\n",
" member_id,first_name,last_name,parking_benefits,tier = line.split('|')\n",
" self.members[member_id] = member(first_name,last_name,parking_benefits,tier)\n",
"\n",
" def get_random_member(self):\n",
" return random.choice(list(self.members.keys()))"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "MGan0UCSappB",
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
},
"executionInfo": {
"elapsed": 335,
"status": "ok",
"timestamp": 1724343941338,
"user": {
"displayName": "",
"userId": ""
},
"user_tz": 420
},
"id": "MGan0UCSappB",
"outputId": "14b31d86-51ea-4d0d-843c-501682191f92"
},
"outputs": [],
"source": [
"new_list = member_list()\n",
"new_list.load()\n",
"for k,v in new_list.members.items():\n",
" print (k,v.to_dict())\n",
"\n",
"for i in range(10):\n",
" print(new_list.get_random_member())"
]
},
{
"cell_type": "markdown",
"id": "9Dm7dbn9uLi5",
"metadata": {
"id": "9Dm7dbn9uLi5"
},
"source": [
"## Parking"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "UYje-0BFuMze",
"metadata": {
"id": "UYje-0BFuMze"
},
"outputs": [],
"source": [
"def gen_parking_log ():\n",
" transaction_id = str(uuid.uuid4())\n",
" member_flag = random.choice([True, False])\n",
" member_id = None\n",
" entry_time = int(time.time())\n",
" if member_flag:\n",
" member_id = new_list.get_random_member()\n",
"\n",
" return {\"transaction_id\":transaction_id, \"member_id\": member_id, \"entry_time\":entry_time}\n",
"\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "Imvk_rZQxmot",
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
},
"executionInfo": {
"elapsed": 5,
"status": "ok",
"timestamp": 1724343941566,
"user": {
"displayName": "",
"userId": ""
},
"user_tz": 420
},
"id": "Imvk_rZQxmot",
"outputId": "ff7a0142-c03f-4a08-fb57-2d21b27d89d7"
},
"outputs": [],
"source": [
"for i in range(10):\n",
" print(gen_parking_log())"
]
},
{
"cell_type": "markdown",
"id": "tSzKdPjDyZ_3",
"metadata": {
"id": "tSzKdPjDyZ_3"
},
"source": [
"## Area of interest"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "kszar1PhycYn",
"metadata": {
"id": "kszar1PhycYn"
},
"outputs": [],
"source": [
"def area_check ():\n",
" transaction_id = str(uuid.uuid4())\n",
" area = random.choice([\"pos\",\"entrance\"])\n",
" return {\"transaction_id\":transaction_id, \"area\": area}\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "Gtm0DRRLy-H2",
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
},
"executionInfo": {
"elapsed": 4,
"status": "ok",
"timestamp": 1724366199180,
"user": {
"displayName": "",
"userId": ""
},
"user_tz": 420
},
"id": "Gtm0DRRLy-H2",
"outputId": "1d86c9fb-ae6a-4b57-f5c0-2b0af5c6ccbd"
},
"outputs": [],
"source": [
"for i in range(10):\n",
" print(area_check())"
]
},
{
"cell_type": "markdown",
"id": "7iX1vnyI2Pt4",
"metadata": {
"id": "7iX1vnyI2Pt4"
},
"source": [
"## Line is busy"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "-WGcKSrO2SyF",
"metadata": {
"id": "-WGcKSrO2SyF"
},
"outputs": [],
"source": [
"def gen_line_status ():\n",
" is_busy = random.choice([True, False])\n",
" area = random.choice([\"pos\",\"entrance\"])\n",
" if is_busy:\n",
" return {\"area\":area, \"image\":image_with_people}\n",
" else:\n",
" return {\"area\":area, \"image\":image_empty}"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "EamakgEJ3Kej",
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
},
"executionInfo": {
"elapsed": 143,
"status": "ok",
"timestamp": 1724344737661,
"user": {
"displayName": "",
"userId": ""
},
"user_tz": 420
},
"id": "EamakgEJ3Kej",
"outputId": "3806af9e-431c-4018-8626-3eedb86a5ed9"
},
"outputs": [],
"source": [
"for i in range(10):\n",
" print(gen_line_status())"
]
},
{
"cell_type": "markdown",
"id": "j-WyjeoBaMO-",
"metadata": {
"id": "j-WyjeoBaMO-"
},
"source": [
"## Check In"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "N6OZnwwdaOOb",
"metadata": {
"id": "N6OZnwwdaOOb"
},
"outputs": [],
"source": [
"def gen_checkin ():\n",
" time_start = int(time.time())\n",
" check_in_list = deepcopy(new_list.members)\n",
" check_in_list = list(check_in_list.keys())\n",
" return_check_ins = []\n",
" while len(check_in_list) > 0:\n",
" member_id = None\n",
" transaction_id = str(uuid.uuid4())\n",
" member_flag = random.choice([True, False])\n",
" if member_flag:\n",
" member_id = random.choice(check_in_list)\n",
" check_in_list.remove(member_id)\n",
" else:\n",
" member_id = None\n",
" check_in_time = time_start + random.randint(1,1000)\n",
" check_in = {\"transaction_id\":transaction_id,\"check_in_time\":check_in_time,\"member_id\": member_id}\n",
" return_check_ins.append(check_in)\n",
"\n",
"\n",
" return return_check_ins\n",
"\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "1dAoFuajcE6g",
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
},
"executionInfo": {
"elapsed": 3,
"status": "ok",
"timestamp": 1724343941736,
"user": {
"displayName": "",
"userId": ""
},
"user_tz": 420
},
"id": "1dAoFuajcE6g",
"outputId": "ef5d3ea7-904b-4b48-c49d-9f399fe3bbf5"
},
"outputs": [],
"source": [
"check_ins = gen_checkin()\n",
"for check_in in check_ins:\n",
" print(check_in)"
]
},
{
"cell_type": "markdown",
"id": "U7kgGburjgmy",
"metadata": {
"id": "U7kgGburjgmy"
},
"source": [
"## PubSub Write"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "iA12o3k3rq7b",
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
},
"executionInfo": {
"elapsed": 36566,
"status": "ok",
"timestamp": 1724366985629,
"user": {
"displayName": "",
"userId": ""
},
"user_tz": 420
},
"id": "iA12o3k3rq7b",
"outputId": "ae7f9975-81f2-47b3-ef1b-1fbf67fd4c0e"
},
"outputs": [],
"source": [
"pubsub_topics = {\n",
" 'parking' : {\n",
" 'beam24-workshop-parking-input-topic':\n",
" 'beam24-workshop-parking-input-sub'},\n",
" 'checkin' : {\n",
" 'beam24-workshop-checkin-input-topic':\n",
" 'beam24-workshop-checkin-input-sub'},\n",
" 'area' : {\n",
" 'beam24-workshop-area-input-topic':\n",
" 'beam24-workshop-area-input-sub'},\n",
" 'parking_output' : {\n",
" 'beam24-workshop-parking-output-topic':\n",
" 'beam24-workshop-parking-output-sub'},\n",
" 'discount_output' : {\n",
" 'beam24-workshop-discount-output-topic':\n",
" 'beam24-workshop-discount-output-sub'},\n",
" 'inventory_output' : {\n",
" 'beam24-workshop-inventory-output-topic':\n",
" 'beam24-workshop-inventory-output-sub'},\n",
" 'line_status': {\n",
" 'beam24-workshop-line-input-topic':\n",
" 'beam24-workshop-line-input-sub'}}\n",
"\n",
"for area in pubsub_topics.keys():\n",
" for topic, sub in pubsub_topics[area].items():\n",
" os.environ['current_topic'] = topic\n",
" os.environ['current_sub'] = sub\n",
" !gcloud pubsub topics create $current_topic\n",
" !gcloud pubsub subscriptions create $current_sub --topic projects/$GOOGLE_CLOUD_PROJECT/topics/$current_topic\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "KUx25vnsji3c",
"metadata": {
"id": "KUx25vnsji3c"
},
"outputs": [],
"source": [
"def pubsub_write(messages, topic_id):\n",
" publisher = pubsub_v1.PublisherClient()\n",
" # The `topic_path` method creates a fully qualified identifier\n",
" # in the form `projects/{project_id}/topics/{topic_id}`\n",
" topic_path = publisher.topic_path(google_cloud_project, topic_id)\n",
"\n",
" for message in messages:\n",
" # Data must be a bytestring\n",
" message = json.dumps(message)\n",
" data = message.encode(\"utf-8\")\n",
" # When you publish a message, the client returns a future.\n",
" future = publisher.publish(topic_path, data)\n",
" print(future.result())\n"
]
},
{
"cell_type": "markdown",
"id": "9b4d64f8",
"metadata": {},
"source": [
"# Run Generator"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "HMpeTkxU_I-M",
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
},
"executionInfo": {
"elapsed": 34285,
"status": "ok",
"timestamp": 1724368769142,
"user": {
"displayName": "",
"userId": ""
},
"user_tz": 420
},
"id": "HMpeTkxU_I-M",
"outputId": "098b8365-fc93-45ad-f6a3-e2dd2151e38b"
},
"outputs": [],
"source": [
"for i in range(100):\n",
" messages = []\n",
" if i%25 == 0:\n",
" pubsub_write([gen_line_status()], \"beam24-workshop-line-input-topic\")\n",
" if i%5 == 0:\n",
" pubsub_write([area_check()], \"beam24-workshop-area-input-topic\")\n",
" if i%2 == 0:\n",
" pubsub_write([gen_parking_log()], \"beam24-workshop-parking-input-topic\")\n",
" if i == 50:\n",
" pubsub_write(gen_checkin(), \"beam24-workshop-checkin-input-topic\")\n",
" time.sleep(0.25)\n",
"\n"
]
}
],
"metadata": {
"colab": {
"name": "2024_beam_workshop_gen",
"provenance": [],
"toc_visible": true
},
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.10"
}
},
"nbformat": 4,
"nbformat_minor": 5
}