data-analytics/beam_multimodal_streaming/full/streaming_pipeline.ipynb (609 lines of code) (raw):

{ "cells": [ { "cell_type": "markdown", "id": "1z4ltDfJ00Sp", "metadata": { "id": "1z4ltDfJ00Sp" }, "source": [ "# Streaming Multi-Modal, Multi-Input, Multi-Output with Apache Beam" ] }, { "cell_type": "markdown", "id": "GqIf8pJ806Cl", "metadata": { "id": "GqIf8pJ806Cl" }, "source": [ "## Setup" ] }, { "cell_type": "markdown", "id": "8esns_iU01cb", "metadata": { "id": "8esns_iU01cb" }, "source": [ "### Library installs" ] }, { "cell_type": "code", "execution_count": null, "id": "rw1J3mIe4jPL4BesHfDIAgaG", "metadata": { "colab": { "base_uri": "https://localhost:8080/" }, "executionInfo": { "elapsed": 54654, "status": "ok", "timestamp": 1724343050445, "user": { "displayName": "", "userId": "" }, "user_tz": 420 }, "id": "rw1J3mIe4jPL4BesHfDIAgaG", "outputId": "f5351818-1713-4af7-8c01-3b42d7cc3dfb", "tags": [] }, "outputs": [], "source": [ "%pip install apache-beam[gcp]\n", "%pip install google-cloud-aiplatform" ] }, { "cell_type": "markdown", "id": "H7DMWLDH3i5W", "metadata": { "id": "H7DMWLDH3i5W" }, "source": [ "### Authentication" ] }, { "cell_type": "code", "execution_count": null, "id": "mZBPPJWY1Bec", "metadata": { "colab": { "base_uri": "https://localhost:8080/" }, "executionInfo": { "elapsed": 5, "status": "ok", "timestamp": 1724343050445, "user": { "displayName": "", "userId": "" }, "user_tz": 420 }, "id": "mZBPPJWY1Bec", "outputId": "41d8e6a7-ceec-48cf-aba7-9c1972b897a3" }, "outputs": [], "source": [ "# If using colab\n", "from google.colab import auth\n", "auth.authenticate_user()\n", "\n", "# else authenticate using \n", "# https://cloud.google.com/docs/authentication/client-libraries#python" ] }, { "cell_type": "code", "execution_count": null, "id": "ly1NcQxNUXpl", "metadata": { "colab": { "base_uri": "https://localhost:8080/", "height": 228 }, "executionInfo": { "elapsed": 1385, "status": "error", "timestamp": 1724343051827, "user": { "displayName": "", "userId": "" }, "user_tz": 420 }, "id": "ly1NcQxNUXpl", "outputId": "07b384b6-fe7b-4993-a707-02cb4e1997aa" }, "outputs": [], "source": [ "# This is not necessary\n", "# This will just let you know who you're authenticated as\n", "import requests\n", "gcloud_token = !gcloud auth print-access-token\n", "gcloud_tokeninfo = requests.get('https://www.googleapis.com/oauth2/v3/tokeninfo?access_token=' + gcloud_token[0]).json()\n", "print(gcloud_tokeninfo['email'])" ] }, { "cell_type": "markdown", "id": "e94ad7e8", "metadata": {}, "source": [ "### Imports" ] }, { "cell_type": "code", "execution_count": null, "id": "J3gxOg2E13Ga", "metadata": { "executionInfo": { "elapsed": 4672, "status": "ok", "timestamp": 1724368704250, "user": { "displayName": "", "userId": "" }, "user_tz": 420 }, "id": "J3gxOg2E13Ga" }, "outputs": [], "source": [ "import os\n", "from typing import Dict\n", "import argparse\n", "import logging\n", "import json\n", "import apache_beam as beam\n", "from apache_beam.io import PubsubMessage\n", "from apache_beam.io import WriteToPubSub\n", "from apache_beam.ml.inference.base import RunInference, PredictionResult, KeyedModelHandler, ModelHandler\n", "from apache_beam.ml.inference.vertex_ai_inference import VertexAIModelHandlerJSON\n", "from apache_beam.options.pipeline_options import PipelineOptions\n", "from apache_beam.options.pipeline_options import SetupOptions\n", "from apache_beam.options.pipeline_options import StandardOptions\n", "from apache_beam.transforms import window\n", "from apache_beam.coders import Coder\n", "from apache_beam.coders import StrUtf8Coder\n", "from apache_beam.transforms.userstate import BagStateSpec\n", "from apache_beam.transforms.userstate import ReadModifyWriteStateSpec\n", "from google.cloud import storage\n", "import vertexai\n", "from vertexai.vision_models import Image, ImageQnAModel, Video\n", "from vertexai.vision_models import VideoSegmentConfig\n", "from urllib.parse import urlparse" ] }, { "cell_type": "markdown", "id": "u1bESm45z-eH", "metadata": { "id": "u1bESm45z-eH" }, "source": [ "## Pipeline" ] }, { "cell_type": "markdown", "id": "7df29b64", "metadata": {}, "source": [ "### Global Variables" ] }, { "cell_type": "code", "execution_count": null, "id": "3e104fcd", "metadata": {}, "outputs": [], "source": [ "# Task 1. Setup your environment\n", "# Step 3 \n", "\n", "# Fill in below\n", "os.environ['GOOGLE_CLOUD_PROJECT'] = \"\"" ] }, { "cell_type": "markdown", "id": "076979bb", "metadata": {}, "source": [ "### Upload the photos to your Google Cloud Storage Bucket" ] }, { "cell_type": "code", "execution_count": null, "id": "513e079c", "metadata": {}, "outputs": [], "source": [ "# Task 1. Setup your environment\n", "# Step 4\n", "# Skip this if you've already done it \n", "!gcloud storage create gs://${GOOGLE_CLOUD_PROJECT}-gcs/ --location=us-central1\n", "!gcloud storage cp ../*jpg gs://${GOOGLE_CLOUD_PROJECT}-gcs/\n", "!gcloud storage cp ../members.txt gs://${GOOGLE_CLOUD_PROJECT}-gcs/" ] }, { "cell_type": "code", "execution_count": null, "id": "2d9f2bc1", "metadata": {}, "outputs": [], "source": [ "# Don't need to alter\n", "google_cloud_project = os.environ.get(\"GOOGLE_CLOUD_PROJECT\")\n", "user_file = f\"gs://{google_cloud_project}-gcs/members.txt\"\n", "pubsub_topics = {\n", " 'parking' : ('beam24-workshop-parking-input-topic',\n", " 'beam24-workshop-parking-input-sub'),\n", " 'checkin' : ('beam24-workshop-checkin-input-topic',\n", " 'beam24-workshop-checkin-input-sub'),\n", " 'area' : ('beam24-workshop-area-input-topic',\n", " 'beam24-workshop-area-input-sub'),\n", " 'parking_output' : ('beam24-workshop-parking-output-topic',\n", " 'beam24-workshop-parking-output-sub'),\n", " 'discount_output' : ('beam24-workshop-discount-output-topic',\n", " 'beam24-workshop-discount-output-sub'),\n", " 'inventory_output' : ('beam24-workshop-inventory-output-topic',\n", " 'beam24-workshop-inventory-output-sub'),\n", " 'line_status': ('beam24-workshop-line-input-topic',\n", " 'beam24-workshop-line-input-sub')\n", "}\n", "\n", "def format_subscription(subscription):\n", " return 'projects/{}/subscriptions/{}'.format(google_cloud_project, subscription)" ] }, { "cell_type": "markdown", "id": "8193a82e", "metadata": {}, "source": [ "### Storage Helper Functions" ] }, { "cell_type": "code", "execution_count": null, "id": "NS5rYHE0QY_9", "metadata": { "executionInfo": { "elapsed": 3, "status": "ok", "timestamp": 1724368704250, "user": { "displayName": "", "userId": "" }, "user_tz": 420 }, "id": "NS5rYHE0QY_9" }, "outputs": [], "source": [ "# Task 3. Create side input and read\n", "\n", "# Helper function to split apart the GCS URI\n", "def decode_gcs_url(url):\n", " # Read the URI and parse it\n", " p = urlparse(url)\n", " bucket = p.netloc\n", " file_path = p.path[0:].split('/', 1)\n", " # Return the relevant objects (bucket, path to object)\n", " return bucket, file_path[1]\n", "\n", "# We can't use the image load from local file since it expects a local path\n", "# We use a GCS URL and get the bytes of the image\n", "def read_file(object_path):\n", " # Parse the path\n", " bucket, file_path = decode_gcs_url(object_path)\n", " storage_client = storage.Client()\n", " bucket = storage_client.bucket(bucket)\n", " blob = bucket.blob(file_path)\n", " # Return the object as bytes\n", " return blob.download_as_bytes()" ] }, { "cell_type": "code", "execution_count": null, "id": "urjOgdnkEJsJ", "metadata": { "executionInfo": { "elapsed": 2, "status": "ok", "timestamp": 1724368704250, "user": { "displayName": "", "userId": "" }, "user_tz": 420 }, "id": "urjOgdnkEJsJ" }, "outputs": [], "source": [ "# Task 3. Create side input and read\n", "# Due to https://github.com/apache/beam/issues/21103\n", "# We will simulate this for direct running\n", "\n", "def create_side_input():\n", " all_data = read_file(user_file).decode('utf-8')\n", " lines = all_data.splitlines()\n", " user_dict = {}\n", " for line in lines[1:]:\n", " user = {}\n", " member_id,first_name,last_name,parking_benefits,tier = line.split(\"|\")\n", " user[\"first_name\"] = first_name\n", " user[\"last_name\"] = last_name\n", " user[\"parking_benefits\"] = parking_benefits\n", " user[\"tier\"] = tier\n", " user_dict[member_id] = user\n", "\n", " return user_dict\n", "\n", "class member_lookup(beam.DoFn):\n", " def __init__(self):\n", " self.user_dict = None\n", "\n", " def setup(self):\n", " self.user_dict = create_side_input()\n", "\n", " def teardown(self):\n", " self.user_dict = None\n", "\n", " def process(self, element):\n", " lookup = json.loads(element)\n", " member_id = lookup['member_id']\n", " if member_id is None:\n", " return [(None,(None,element))]\n", " return [(member_id,(self.user_dict[member_id],element))]" ] }, { "cell_type": "code", "execution_count": null, "id": "jZ6p6c7RGKd1", "metadata": { "executionInfo": { "elapsed": 2, "status": "ok", "timestamp": 1724368704250, "user": { "displayName": "", "userId": "" }, "user_tz": 420 }, "id": "jZ6p6c7RGKd1" }, "outputs": [], "source": [ "# Task 9. State updates\n", "class busy_check(beam.DoFn):\n", " STATUS_STATE = ReadModifyWriteStateSpec('previous_status_state', StrUtf8Coder())\n", "\n", " def process(self, element, previous_status_state=beam.DoFn.StateParam(STATUS_STATE),):\n", " key = element[0]\n", " transaction_id = element[1][0]\n", " incoming_state = element[1][1][0]\n", " previous_status = previous_status_state.read()\n", " output = None\n", " if incoming_state != None:\n", " output = (\"discard {}\".format(key), (previous_status,incoming_state))\n", " previous_status_state.write(incoming_state)\n", " elif previous_status is None:\n", " output = (key, (transaction_id,\"Unknown\"))\n", " else:\n", " output = (key, (transaction_id, previous_status))\n", " return [output]\n" ] }, { "cell_type": "code", "execution_count": null, "id": "4041YAKyz8uy", "metadata": { "colab": { "base_uri": "https://localhost:8080/", "height": 880 }, "executionInfo": { "elapsed": 163917, "status": "error", "timestamp": 1724368886549, "user": { "displayName": "", "userId": "" }, "user_tz": 420 }, "id": "4041YAKyz8uy", "outputId": "bf992510-cf44-496b-e9f3-93e7b25c3a05" }, "outputs": [], "source": [ "# Task 7. Create a custom model handler\n", "# We reuse the decode_gcs_url_function from Task 3\n", "class get_image_bytes(beam.DoFn):\n", " def setup(self):\n", " self.client = storage.Client()\n", "\n", " def process(self, element):\n", " key, image_url = element[0], element[1]\n", " bucket, file_path = decode_gcs_url(image_url)\n", " bucket = self.client.bucket(bucket)\n", " blob = bucket.blob(file_path)\n", " # Return the object as bytes\n", " return [(key,(image_url,blob.download_as_bytes()))]\n", "\n", "# Task 7. Create a custom model handler\n", "# Multi Modal Custom Handler\n", "class Cloud_Multi_Modal_ModelHandler(ModelHandler):\n", " def load_model(self):\n", " \"\"\"Initiate the Google Vision API client.\"\"\"\n", " vertexai.init(project=google_cloud_project, location=\"us-central1\")\n", " client = ImageQnAModel.from_pretrained(\"imagetext@001\")\n", " return client\n", "\n", " def run_inference(self, batch, model,inference):\n", " image_url = batch[0][0]\n", " image_bytes = batch[0][1]\n", " image = Image(image_bytes)\n", "\n", " results = model.ask_question(\n", " image=image,\n", " question=\"Are there any people in this picture\",\n", " number_of_results=1\n", " )\n", "\n", " return [(image_url, results)]\n", "\n", "# Task 2. Read from the various sources\n", "def format_to_tuple(element):\n", " incoming = json.loads(element)\n", " return [(incoming[\"area\"],incoming[\"image\"])]\n", "\n", "# Task 2. Read from the various sources\n", "def format_area(element):\n", " incoming = json.loads(element)\n", " return (incoming['area'],(incoming['transaction_id'],[None]))\n", "\n", "def run(argv=None, save_main_session=True):\n", " parser = argparse.ArgumentParser()\n", " known_args, pipeline_args = parser.parse_known_args(argv)\n", " pipeline_options = PipelineOptions(pipeline_args,experiments=['pickle_library=cloudpickle'])\n", " pipeline_options.view_as(StandardOptions).streaming = True\n", " pipeline_options.view_as(SetupOptions).save_main_session = save_main_session\n", "\n", " # Task 8. RunInference\n", " keyed_custom_model_handler = KeyedModelHandler(Cloud_Multi_Modal_ModelHandler())\n", "\n", " # Put everything together in a streaming pipeline.\n", " with beam.Pipeline(options=pipeline_options) as p:\n", " # Task 2. Read from the various sources\n", " # Reading area check logs\n", " area_check_logs = (\n", " p\n", " | \"read area logs\" >> beam.io.ReadFromPubSub(subscription=format_subscription(pubsub_topics['area'][1]))\n", " | 'decode pos' >> beam.Map(lambda x: x.decode('utf-8')))\n", "\n", " area_check_formatted = area_check_logs | beam.Map(format_area)\n", "\n", " # Task 2. Read from the various sources\n", " # Reading parking logs\n", " parking_logs = (\n", " p\n", " | beam.io.ReadFromPubSub(subscription=format_subscription(pubsub_topics['parking'][1]))\n", " | 'decode parking' >> beam.Map(lambda x: x.decode('utf-8'))\n", " | 'window parking' >> beam.WindowInto(window.FixedWindows(30, 0)))\n", "\n", " # Task 4. Use the side input to key parking and check-in logs\n", " parking_member_lookup = (\n", " parking_logs\n", " | 'lookup member parking' >> beam.ParDo(member_lookup())\n", " )\n", "\n", " # Task 2. Read from the various sources\n", " # Reading check-in logs\n", " checkin_logs = (\n", " p\n", " | \"read checkin\" >> beam.io.ReadFromPubSub(subscription=format_subscription(pubsub_topics['checkin'][1]))\n", " | 'decode checkin' >> beam.Map(lambda x: x.decode('utf-8'))\n", " | 'window checkin' >> beam.WindowInto(window.FixedWindows(30, 0)))\n", "\n", " # Task 4. Use the side input to key parking and check-in logs\n", " checkin_member_lookup = (\n", " checkin_logs\n", " | 'lookup member checkin' >> beam.ParDo(member_lookup())\n", " )\n", "\n", " # Task 5. Merge the keyed parking and check-in logs\n", " parking_member_only = parking_member_lookup | 'filter parking' >> beam.Filter(lambda x: x[0] is not None)\n", " checkin_member_only = checkin_member_lookup | 'filter checkin' >> beam.Filter(lambda x: x[0] is not None)\n", " upsell = (({\n", " 'parking': parking_member_only, 'checkin': checkin_member_only\n", " })\n", " | 'Merge' >> beam.CoGroupByKey()\n", " )\n", " push_upsell = upsell | beam.Filter(lambda merged: len(merged[1]['parking']) > 0 and len(merged[1]['checkin']) > 0)\n", "\n", " # Task 6. Output the joined data\n", " # You may opt to use something else\n", " # Standard Output is only for demonstration purposes\n", " _ = (push_upsell | beam.Map(print))\n", "\n", "\n", " # Task 2. Read from the various sources\n", " # Reading line logs\n", " line_logs = (\n", " p\n", " | \"read line logs\" >> beam.io.ReadFromPubSub(subscription=format_subscription(pubsub_topics['line_status'][1]))\n", " | 'decode' >> beam.Map(lambda x: x.decode('utf-8')))\n", "\n", " # Task 8. RunInference\n", " inference_result = (\n", " line_logs\n", " | 'format line logs to tuple' >> beam.ParDo(format_to_tuple)\n", " | 'get bytes' >> beam.ParDo(get_image_bytes())\n", " | 'run inference' >> RunInference(keyed_custom_model_handler)\n", " )\n", "\n", " # Task 9. State updates\n", " merged = ((inference_result,area_check_formatted) | 'Merge PCollections' >> beam.Flatten())\n", "\n", " return_stats = merged | beam.ParDo(busy_check())\n", "\n", " # Task 10. Output the line status\n", " _ = return_stats | \"print returned status \" >> beam.Map(print)\n", "\n", "run()" ] } ], "metadata": { "colab": { "collapsed_sections": [ "8esns_iU01cb", "GqIf8pJ806Cl" ], "name": "2024_beam_workshop", "provenance": [], "toc_visible": true }, "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.10.10" } }, "nbformat": 4, "nbformat_minor": 5 }