data-analytics/beam_multimodal_streaming/skeleton/streaming_pipeline.ipynb (498 lines of code) (raw):

{ "cells": [ { "cell_type": "markdown", "id": "1z4ltDfJ00Sp", "metadata": { "id": "1z4ltDfJ00Sp" }, "source": [ "# Streaming Multi-Modal, Multi-Input, Multi-Output with Apache Beam" ] }, { "cell_type": "markdown", "id": "GqIf8pJ806Cl", "metadata": { "id": "GqIf8pJ806Cl" }, "source": [ "## Setup" ] }, { "cell_type": "markdown", "id": "8esns_iU01cb", "metadata": { "id": "8esns_iU01cb" }, "source": [ "### Library installs" ] }, { "cell_type": "code", "execution_count": null, "id": "rw1J3mIe4jPL4BesHfDIAgaG", "metadata": { "colab": { "base_uri": "https://localhost:8080/" }, "executionInfo": { "elapsed": 54654, "status": "ok", "timestamp": 1724343050445, "user": { "displayName": "", "userId": "" }, "user_tz": 420 }, "id": "rw1J3mIe4jPL4BesHfDIAgaG", "outputId": "f5351818-1713-4af7-8c01-3b42d7cc3dfb", "tags": [] }, "outputs": [], "source": [ "%pip install apache-beam[gcp]\n", "%pip install google-cloud-aiplatform" ] }, { "cell_type": "markdown", "id": "H7DMWLDH3i5W", "metadata": { "id": "H7DMWLDH3i5W" }, "source": [ "### Authentication" ] }, { "cell_type": "code", "execution_count": null, "id": "mZBPPJWY1Bec", "metadata": { "colab": { "base_uri": "https://localhost:8080/" }, "executionInfo": { "elapsed": 5, "status": "ok", "timestamp": 1724343050445, "user": { "displayName": "", "userId": "" }, "user_tz": 420 }, "id": "mZBPPJWY1Bec", "outputId": "41d8e6a7-ceec-48cf-aba7-9c1972b897a3" }, "outputs": [], "source": [ "# If using colab\n", "from google.colab import auth\n", "auth.authenticate_user()\n", "\n", "# else authenticate using \n", "# https://cloud.google.com/docs/authentication/client-libraries#python" ] }, { "cell_type": "code", "execution_count": null, "id": "ly1NcQxNUXpl", "metadata": { "colab": { "base_uri": "https://localhost:8080/", "height": 228 }, "executionInfo": { "elapsed": 1385, "status": "error", "timestamp": 1724343051827, "user": { "displayName": "", "userId": "" }, "user_tz": 420 }, "id": "ly1NcQxNUXpl", "outputId": "07b384b6-fe7b-4993-a707-02cb4e1997aa" }, "outputs": [], "source": [ "# This is not necessary\n", "# This will just let you know who you're authenticated as\n", "import requests\n", "gcloud_token = !gcloud auth print-access-token\n", "gcloud_tokeninfo = requests.get('https://www.googleapis.com/oauth2/v3/tokeninfo?access_token=' + gcloud_token[0]).json()\n", "print(gcloud_tokeninfo['email'])" ] }, { "cell_type": "markdown", "id": "e94ad7e8", "metadata": {}, "source": [ "### Imports" ] }, { "cell_type": "code", "execution_count": null, "id": "J3gxOg2E13Ga", "metadata": { "executionInfo": { "elapsed": 4672, "status": "ok", "timestamp": 1724368704250, "user": { "displayName": "", "userId": "" }, "user_tz": 420 }, "id": "J3gxOg2E13Ga" }, "outputs": [], "source": [ "import os\n", "from typing import Dict\n", "import argparse\n", "import logging\n", "import json\n", "import apache_beam as beam\n", "from apache_beam.io import PubsubMessage\n", "from apache_beam.io import WriteToPubSub\n", "from apache_beam.ml.inference.base import RunInference, PredictionResult, KeyedModelHandler, ModelHandler\n", "from apache_beam.ml.inference.vertex_ai_inference import VertexAIModelHandlerJSON\n", "from apache_beam.options.pipeline_options import PipelineOptions\n", "from apache_beam.options.pipeline_options import SetupOptions\n", "from apache_beam.options.pipeline_options import StandardOptions\n", "from apache_beam.transforms import window\n", "from apache_beam.coders import Coder\n", "from apache_beam.coders import StrUtf8Coder\n", "from apache_beam.transforms.userstate import BagStateSpec\n", "from apache_beam.transforms.userstate import ReadModifyWriteStateSpec\n", "from google.cloud import storage\n", "import vertexai\n", "from vertexai.vision_models import Image, ImageQnAModel, Video\n", "from vertexai.vision_models import VideoSegmentConfig\n", "from urllib.parse import urlparse" ] }, { "cell_type": "markdown", "id": "u1bESm45z-eH", "metadata": { "id": "u1bESm45z-eH" }, "source": [ "## Pipeline" ] }, { "cell_type": "markdown", "id": "7df29b64", "metadata": {}, "source": [ "### Global Variables" ] }, { "cell_type": "code", "execution_count": null, "id": "3e104fcd", "metadata": {}, "outputs": [], "source": [ "# Task 1. Setup your environment\n", "# Step 3 \n", "\n", "# Fill in below\n", "os.environ['GOOGLE_CLOUD_PROJECT'] = \"\"" ] }, { "cell_type": "markdown", "id": "076979bb", "metadata": {}, "source": [ "### Upload the photos to your Google Cloud Storage Bucket" ] }, { "cell_type": "code", "execution_count": null, "id": "513e079c", "metadata": {}, "outputs": [], "source": [ "# Task 1. Setup your environment\n", "# Step 4\n", "!gcloud storage create gs://${GOOGLE_CLOUD_PROJECT}-gcs/ --location=us-central1\n", "!gcloud storage cp ../*jpg gs://${GOOGLE_CLOUD_PROJECT}-gcs/\n", "!gcloud storage cp ../members.txt gs://${GOOGLE_CLOUD_PROJECT}-gcs/" ] }, { "cell_type": "code", "execution_count": null, "id": "2d9f2bc1", "metadata": {}, "outputs": [], "source": [ "# Don't need to alter\n", "google_cloud_project = os.environ.get(\"GOOGLE_CLOUD_PROJECT\")\n", "user_file = f\"gs://{google_cloud_project}-gcs/members.txt\"\n", "pubsub_topics = {\n", " 'parking' : ('beam24-workshop-parking-input-topic',\n", " 'beam24-workshop-parking-input-sub'),\n", " 'checkin' : ('beam24-workshop-checkin-input-topic',\n", " 'beam24-workshop-checkin-input-sub'),\n", " 'area' : ('beam24-workshop-area-input-topic',\n", " 'beam24-workshop-area-input-sub'),\n", " 'parking_output' : ('beam24-workshop-parking-output-topic',\n", " 'beam24-workshop-parking-output-sub'),\n", " 'discount_output' : ('beam24-workshop-discount-output-topic',\n", " 'beam24-workshop-discount-output-sub'),\n", " 'inventory_output' : ('beam24-workshop-inventory-output-topic',\n", " 'beam24-workshop-inventory-output-sub'),\n", " 'line_status': ('beam24-workshop-line-input-topic',\n", " 'beam24-workshop-line-input-sub')\n", "}\n", "\n", "def format_subscription(subscription):\n", " return 'projects/{}/subscriptions/{}'.format(google_cloud_project, subscription)" ] }, { "cell_type": "markdown", "id": "8193a82e", "metadata": {}, "source": [ "### Storage Helper Functions" ] }, { "cell_type": "code", "execution_count": null, "id": "NS5rYHE0QY_9", "metadata": { "executionInfo": { "elapsed": 3, "status": "ok", "timestamp": 1724368704250, "user": { "displayName": "", "userId": "" }, "user_tz": 420 }, "id": "NS5rYHE0QY_9" }, "outputs": [], "source": [ "# Task 3. Create side input and read\n", "\n", "# Helper function to split apart the GCS URI\n", "def decode_gcs_url(url):\n", " # Read the URI and parse it\n", " p = urlparse(url)\n", " bucket = p.netloc\n", " file_path = p.path[0:].split('/', 1)\n", " # Return the relevant objects (bucket, path to object)\n", " return bucket, file_path[1]\n", "\n", "# We can't use the image load from local file since it expects a local path\n", "# We use a GCS URL and get the bytes of the image\n", "def read_file(object_path):\n", " # Parse the path\n", " bucket, file_path = decode_gcs_url(object_path)\n", " storage_client = storage.Client()\n", " bucket = storage_client.bucket(bucket)\n", " blob = bucket.blob(file_path)\n", " # Return the object as bytes\n", " return blob.download_as_bytes()" ] }, { "cell_type": "code", "execution_count": null, "id": "urjOgdnkEJsJ", "metadata": { "executionInfo": { "elapsed": 2, "status": "ok", "timestamp": 1724368704250, "user": { "displayName": "", "userId": "" }, "user_tz": 420 }, "id": "urjOgdnkEJsJ" }, "outputs": [], "source": [ "# Task 3. Create side input and read\n", "# Due to https://github.com/apache/beam/issues/21103\n", "# We will simulate this for direct running\n", "\n", "def create_side_input():\n", " all_data = read_file(user_file).decode('utf-8')\n", " lines = all_data.splitlines()\n", " user_dict = {}\n", " for line in lines[1:]:\n", " user = {}\n", " member_id,first_name,last_name,parking_benefits,tier = line.split(\"|\")\n", " user[\"first_name\"] = first_name\n", " user[\"last_name\"] = last_name\n", " user[\"parking_benefits\"] = parking_benefits\n", " user[\"tier\"] = tier\n", " user_dict[member_id] = user\n", "\n", " return user_dict\n", "\n", "class member_lookup(beam.DoFn):\n", " def __init__(self):\n", "\n", " def setup(self):\n", "\n", " def teardown(self):\n", "\n", " def process(self, element):\n" ] }, { "cell_type": "code", "execution_count": null, "id": "jZ6p6c7RGKd1", "metadata": { "executionInfo": { "elapsed": 2, "status": "ok", "timestamp": 1724368704250, "user": { "displayName": "", "userId": "" }, "user_tz": 420 }, "id": "jZ6p6c7RGKd1" }, "outputs": [], "source": [ "# Task 9. State updates\n", "class busy_check(beam.DoFn):\n", "\n", " def process(self, element, previous_status_state=beam.DoFn.StateParam(STATUS_STATE),):\n", "\n" ] }, { "cell_type": "code", "execution_count": null, "id": "4041YAKyz8uy", "metadata": { "colab": { "base_uri": "https://localhost:8080/", "height": 880 }, "executionInfo": { "elapsed": 163917, "status": "error", "timestamp": 1724368886549, "user": { "displayName": "", "userId": "" }, "user_tz": 420 }, "id": "4041YAKyz8uy", "outputId": "bf992510-cf44-496b-e9f3-93e7b25c3a05" }, "outputs": [], "source": [ "# Task 7. Create a custom model handler\n", "# We reuse the decode_gcs_url_function from Task 3\n", "class get_image_bytes(beam.DoFn):\n", " def setup(self):\n", " self.client = storage.Client()\n", "\n", " def process(self, element):\n", " key, image_url = element[0], element[1]\n", " bucket, file_path = decode_gcs_url(image_url)\n", " bucket = self.client.bucket(bucket)\n", " blob = bucket.blob(file_path)\n", " # Return the object as bytes\n", " return [(key,(image_url,blob.download_as_bytes()))]\n", "\n", "# Task 7. Create a custom model handler\n", "# Multi Modal Custom Handler\n", "class Cloud_Multi_Modal_ModelHandler(ModelHandler):\n", " def load_model(self):\n", "\n", " def run_inference(self, batch, model,inference):\n", "\n", "\n", "# Task 2. Read from the various sources\n", "def format_to_tuple(element):\n", " incoming = json.loads(element)\n", " return [(incoming[\"area\"],incoming[\"image\"])]\n", "\n", "# Task 2. Read from the various sources\n", "def format_area(element):\n", " incoming = json.loads(element)\n", " return (incoming['area'],(incoming['transaction_id'],[None]))\n", "\n", "def run(argv=None, save_main_session=True):\n", " parser = argparse.ArgumentParser()\n", " known_args, pipeline_args = parser.parse_known_args(argv)\n", " pipeline_options = PipelineOptions(pipeline_args,experiments=['pickle_library=cloudpickle'])\n", " pipeline_options.view_as(StandardOptions).streaming = True\n", " pipeline_options.view_as(SetupOptions).save_main_session = save_main_session\n", "\n", " # Task 8. RunInference\n", "\n", " # Put everything together in a streaming pipeline.\n", " with beam.Pipeline(options=pipeline_options) as p:\n", "\n", "\n", "run()" ] } ], "metadata": { "colab": { "collapsed_sections": [ "8esns_iU01cb", "GqIf8pJ806Cl" ], "name": "2024_beam_workshop", "provenance": [], "toc_visible": true }, "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.10.10" } }, "nbformat": 4, "nbformat_minor": 5 }