data-analytics/beam_multimodal_streaming/data_generator.ipynb (615 lines of code) (raw):

{ "cells": [ { "cell_type": "markdown", "id": "FrEvLbIKex0b", "metadata": { "id": "FrEvLbIKex0b" }, "source": [ "Fill these in first" ] }, { "cell_type": "code", "execution_count": null, "id": "Zr1cCN59e1Q0", "metadata": { "id": "Zr1cCN59e1Q0" }, "outputs": [], "source": [ "import os\n", "\n", "# Fill In\n", "os.environ['GOOGLE_CLOUD_PROJECT'] = \"\"" ] }, { "cell_type": "markdown", "id": "U0mBaA_PduKp", "metadata": { "id": "U0mBaA_PduKp" }, "source": [ "# Generator Setup" ] }, { "cell_type": "code", "execution_count": null, "id": "mThoyj9he54R", "metadata": { "id": "mThoyj9he54R" }, "outputs": [], "source": [ "\n", "google_cloud_project = os.environ.get(\"GOOGLE_CLOUD_PROJECT\")\n", "# Location where you put your two files in setup \n", "image_with_people = f\"gs://{google_cloud_project}-gcs/entrance with people.jpg\"\n", "image_empty = f\"gs://{google_cloud_project}-gcs/entrance_empty.jpg\" \n", "user_file = f\"gs://{google_cloud_project}-gcs/members.txt\"" ] }, { "cell_type": "markdown", "id": "gDj0PT8tdrF2", "metadata": { "id": "gDj0PT8tdrF2" }, "source": [ "## Pip installs & Imports" ] }, { "cell_type": "code", "execution_count": null, "id": "PXNBgIqIazf2", "metadata": { "colab": { "base_uri": "https://localhost:8080/" }, "executionInfo": { "elapsed": 11235, "status": "ok", "timestamp": 1724343940479, "user": { "displayName": "", "userId": "" }, "user_tz": 420 }, "id": "PXNBgIqIazf2", "outputId": "7cec3b5e-d785-4a9e-affa-d4a0f61d8f52" }, "outputs": [], "source": [ "%pip install google-cloud-storage\n", "%pip install --upgrade google-cloud-pubsub" ] }, { "cell_type": "code", "execution_count": null, "id": "ZW4SPa0V52a4", "metadata": { "id": "ZW4SPa0V52a4" }, "outputs": [], "source": [ "import time\n", "import uuid\n", "import random\n", "from google.cloud import storage\n", "from urllib.parse import urlparse\n", "from copy import deepcopy\n", "import os\n", "from google.cloud import pubsub_v1\n", "import json" ] }, { "cell_type": "markdown", "id": "F7xa8LiUuIXQ", "metadata": { "id": "F7xa8LiUuIXQ" }, "source": [ "## Members Lists" ] }, { "cell_type": "code", "execution_count": null, "id": "QfLKVtbJYegt", "metadata": { "id": "QfLKVtbJYegt" }, "outputs": [], "source": [ "# Helper function to split apart the GCS URI\n", "def decode_gcs_url(url):\n", " # Read the URI and parse it\n", " p = urlparse(url)\n", " bucket = p.netloc\n", " file_path = p.path[0:].split('/', 1)\n", " # Return the relevant objects (bucket, path to object)\n", " return bucket, file_path[1]\n", "\n", "# We can't use the image load from local file since it expects a local path\n", "# We use a GCS URL and get the bytes of the image\n", "def read_file(object_path):\n", " # Parse the path\n", " bucket, file_path = decode_gcs_url(object_path)\n", " storage_client = storage.Client()\n", " bucket = storage_client.bucket(bucket)\n", " blob = bucket.blob(file_path)\n", " # Return the object as bytes\n", " return blob.download_as_text()" ] }, { "cell_type": "code", "execution_count": null, "id": "37kDGvc6bOqtRHvO4hy09HDE", "metadata": { "id": "37kDGvc6bOqtRHvO4hy09HDE", "tags": [] }, "outputs": [], "source": [ "\n", "\n", "class member():\n", " def __init__(self,first_name,last_name,parking_benefits,tier):\n", " self.first_name = first_name\n", " self.last_name = last_name\n", " self.parking_benefits = parking_benefits\n", " self.tier = tier\n", "\n", " def to_dict(self):\n", " return {\"first_name\":self.first_name, \"last_name\":self.last_name, \"parking_benefits\":self.parking_benefits, \"tier\":self.tier}\n", "\n", "class member_list():\n", " def __init__(self):\n", " self.members = {}\n", "\n", " def load(self):\n", " lines = read_file(user_file)\n", " for line in lines.split('\\n')[1:]:\n", " member_id,first_name,last_name,parking_benefits,tier = line.split('|')\n", " self.members[member_id] = member(first_name,last_name,parking_benefits,tier)\n", "\n", " def get_random_member(self):\n", " return random.choice(list(self.members.keys()))" ] }, { "cell_type": "code", "execution_count": null, "id": "MGan0UCSappB", "metadata": { "colab": { "base_uri": "https://localhost:8080/" }, "executionInfo": { "elapsed": 335, "status": "ok", "timestamp": 1724343941338, "user": { "displayName": "", "userId": "" }, "user_tz": 420 }, "id": "MGan0UCSappB", "outputId": "14b31d86-51ea-4d0d-843c-501682191f92" }, "outputs": [], "source": [ "new_list = member_list()\n", "new_list.load()\n", "for k,v in new_list.members.items():\n", " print (k,v.to_dict())\n", "\n", "for i in range(10):\n", " print(new_list.get_random_member())" ] }, { "cell_type": "markdown", "id": "9Dm7dbn9uLi5", "metadata": { "id": "9Dm7dbn9uLi5" }, "source": [ "## Parking" ] }, { "cell_type": "code", "execution_count": null, "id": "UYje-0BFuMze", "metadata": { "id": "UYje-0BFuMze" }, "outputs": [], "source": [ "def gen_parking_log ():\n", " transaction_id = str(uuid.uuid4())\n", " member_flag = random.choice([True, False])\n", " member_id = None\n", " entry_time = int(time.time())\n", " if member_flag:\n", " member_id = new_list.get_random_member()\n", "\n", " return {\"transaction_id\":transaction_id, \"member_id\": member_id, \"entry_time\":entry_time}\n", "\n" ] }, { "cell_type": "code", "execution_count": null, "id": "Imvk_rZQxmot", "metadata": { "colab": { "base_uri": "https://localhost:8080/" }, "executionInfo": { "elapsed": 5, "status": "ok", "timestamp": 1724343941566, "user": { "displayName": "", "userId": "" }, "user_tz": 420 }, "id": "Imvk_rZQxmot", "outputId": "ff7a0142-c03f-4a08-fb57-2d21b27d89d7" }, "outputs": [], "source": [ "for i in range(10):\n", " print(gen_parking_log())" ] }, { "cell_type": "markdown", "id": "tSzKdPjDyZ_3", "metadata": { "id": "tSzKdPjDyZ_3" }, "source": [ "## Area of interest" ] }, { "cell_type": "code", "execution_count": null, "id": "kszar1PhycYn", "metadata": { "id": "kszar1PhycYn" }, "outputs": [], "source": [ "def area_check ():\n", " transaction_id = str(uuid.uuid4())\n", " area = random.choice([\"pos\",\"entrance\"])\n", " return {\"transaction_id\":transaction_id, \"area\": area}\n" ] }, { "cell_type": "code", "execution_count": null, "id": "Gtm0DRRLy-H2", "metadata": { "colab": { "base_uri": "https://localhost:8080/" }, "executionInfo": { "elapsed": 4, "status": "ok", "timestamp": 1724366199180, "user": { "displayName": "", "userId": "" }, "user_tz": 420 }, "id": "Gtm0DRRLy-H2", "outputId": "1d86c9fb-ae6a-4b57-f5c0-2b0af5c6ccbd" }, "outputs": [], "source": [ "for i in range(10):\n", " print(area_check())" ] }, { "cell_type": "markdown", "id": "7iX1vnyI2Pt4", "metadata": { "id": "7iX1vnyI2Pt4" }, "source": [ "## Line is busy" ] }, { "cell_type": "code", "execution_count": null, "id": "-WGcKSrO2SyF", "metadata": { "id": "-WGcKSrO2SyF" }, "outputs": [], "source": [ "def gen_line_status ():\n", " is_busy = random.choice([True, False])\n", " area = random.choice([\"pos\",\"entrance\"])\n", " if is_busy:\n", " return {\"area\":area, \"image\":image_with_people}\n", " else:\n", " return {\"area\":area, \"image\":image_empty}" ] }, { "cell_type": "code", "execution_count": null, "id": "EamakgEJ3Kej", "metadata": { "colab": { "base_uri": "https://localhost:8080/" }, "executionInfo": { "elapsed": 143, "status": "ok", "timestamp": 1724344737661, "user": { "displayName": "", "userId": "" }, "user_tz": 420 }, "id": "EamakgEJ3Kej", "outputId": "3806af9e-431c-4018-8626-3eedb86a5ed9" }, "outputs": [], "source": [ "for i in range(10):\n", " print(gen_line_status())" ] }, { "cell_type": "markdown", "id": "j-WyjeoBaMO-", "metadata": { "id": "j-WyjeoBaMO-" }, "source": [ "## Check In" ] }, { "cell_type": "code", "execution_count": null, "id": "N6OZnwwdaOOb", "metadata": { "id": "N6OZnwwdaOOb" }, "outputs": [], "source": [ "def gen_checkin ():\n", " time_start = int(time.time())\n", " check_in_list = deepcopy(new_list.members)\n", " check_in_list = list(check_in_list.keys())\n", " return_check_ins = []\n", " while len(check_in_list) > 0:\n", " member_id = None\n", " transaction_id = str(uuid.uuid4())\n", " member_flag = random.choice([True, False])\n", " if member_flag:\n", " member_id = random.choice(check_in_list)\n", " check_in_list.remove(member_id)\n", " else:\n", " member_id = None\n", " check_in_time = time_start + random.randint(1,1000)\n", " check_in = {\"transaction_id\":transaction_id,\"check_in_time\":check_in_time,\"member_id\": member_id}\n", " return_check_ins.append(check_in)\n", "\n", "\n", " return return_check_ins\n", "\n" ] }, { "cell_type": "code", "execution_count": null, "id": "1dAoFuajcE6g", "metadata": { "colab": { "base_uri": "https://localhost:8080/" }, "executionInfo": { "elapsed": 3, "status": "ok", "timestamp": 1724343941736, "user": { "displayName": "", "userId": "" }, "user_tz": 420 }, "id": "1dAoFuajcE6g", "outputId": "ef5d3ea7-904b-4b48-c49d-9f399fe3bbf5" }, "outputs": [], "source": [ "check_ins = gen_checkin()\n", "for check_in in check_ins:\n", " print(check_in)" ] }, { "cell_type": "markdown", "id": "U7kgGburjgmy", "metadata": { "id": "U7kgGburjgmy" }, "source": [ "## PubSub Write" ] }, { "cell_type": "code", "execution_count": null, "id": "iA12o3k3rq7b", "metadata": { "colab": { "base_uri": "https://localhost:8080/" }, "executionInfo": { "elapsed": 36566, "status": "ok", "timestamp": 1724366985629, "user": { "displayName": "", "userId": "" }, "user_tz": 420 }, "id": "iA12o3k3rq7b", "outputId": "ae7f9975-81f2-47b3-ef1b-1fbf67fd4c0e" }, "outputs": [], "source": [ "pubsub_topics = {\n", " 'parking' : {\n", " 'beam24-workshop-parking-input-topic':\n", " 'beam24-workshop-parking-input-sub'},\n", " 'checkin' : {\n", " 'beam24-workshop-checkin-input-topic':\n", " 'beam24-workshop-checkin-input-sub'},\n", " 'area' : {\n", " 'beam24-workshop-area-input-topic':\n", " 'beam24-workshop-area-input-sub'},\n", " 'parking_output' : {\n", " 'beam24-workshop-parking-output-topic':\n", " 'beam24-workshop-parking-output-sub'},\n", " 'discount_output' : {\n", " 'beam24-workshop-discount-output-topic':\n", " 'beam24-workshop-discount-output-sub'},\n", " 'inventory_output' : {\n", " 'beam24-workshop-inventory-output-topic':\n", " 'beam24-workshop-inventory-output-sub'},\n", " 'line_status': {\n", " 'beam24-workshop-line-input-topic':\n", " 'beam24-workshop-line-input-sub'}}\n", "\n", "for area in pubsub_topics.keys():\n", " for topic, sub in pubsub_topics[area].items():\n", " os.environ['current_topic'] = topic\n", " os.environ['current_sub'] = sub\n", " !gcloud pubsub topics create $current_topic\n", " !gcloud pubsub subscriptions create $current_sub --topic projects/$GOOGLE_CLOUD_PROJECT/topics/$current_topic\n" ] }, { "cell_type": "code", "execution_count": null, "id": "KUx25vnsji3c", "metadata": { "id": "KUx25vnsji3c" }, "outputs": [], "source": [ "def pubsub_write(messages, topic_id):\n", " publisher = pubsub_v1.PublisherClient()\n", " # The `topic_path` method creates a fully qualified identifier\n", " # in the form `projects/{project_id}/topics/{topic_id}`\n", " topic_path = publisher.topic_path(google_cloud_project, topic_id)\n", "\n", " for message in messages:\n", " # Data must be a bytestring\n", " message = json.dumps(message)\n", " data = message.encode(\"utf-8\")\n", " # When you publish a message, the client returns a future.\n", " future = publisher.publish(topic_path, data)\n", " print(future.result())\n" ] }, { "cell_type": "markdown", "id": "9b4d64f8", "metadata": {}, "source": [ "# Run Generator" ] }, { "cell_type": "code", "execution_count": null, "id": "HMpeTkxU_I-M", "metadata": { "colab": { "base_uri": "https://localhost:8080/" }, "executionInfo": { "elapsed": 34285, "status": "ok", "timestamp": 1724368769142, "user": { "displayName": "", "userId": "" }, "user_tz": 420 }, "id": "HMpeTkxU_I-M", "outputId": "098b8365-fc93-45ad-f6a3-e2dd2151e38b" }, "outputs": [], "source": [ "for i in range(100):\n", " messages = []\n", " if i%25 == 0:\n", " pubsub_write([gen_line_status()], \"beam24-workshop-line-input-topic\")\n", " if i%5 == 0:\n", " pubsub_write([area_check()], \"beam24-workshop-area-input-topic\")\n", " if i%2 == 0:\n", " pubsub_write([gen_parking_log()], \"beam24-workshop-parking-input-topic\")\n", " if i == 50:\n", " pubsub_write(gen_checkin(), \"beam24-workshop-checkin-input-topic\")\n", " time.sleep(0.25)\n", "\n" ] } ], "metadata": { "colab": { "name": "2024_beam_workshop_gen", "provenance": [], "toc_visible": true }, "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.10.10" } }, "nbformat": 4, "nbformat_minor": 5 }