data-analytics/beam_multimodal_streaming/skeleton/streaming_pipeline.ipynb (498 lines of code) (raw):
{
"cells": [
{
"cell_type": "markdown",
"id": "1z4ltDfJ00Sp",
"metadata": {
"id": "1z4ltDfJ00Sp"
},
"source": [
"# Streaming Multi-Modal, Multi-Input, Multi-Output with Apache Beam"
]
},
{
"cell_type": "markdown",
"id": "GqIf8pJ806Cl",
"metadata": {
"id": "GqIf8pJ806Cl"
},
"source": [
"## Setup"
]
},
{
"cell_type": "markdown",
"id": "8esns_iU01cb",
"metadata": {
"id": "8esns_iU01cb"
},
"source": [
"### Library installs"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "rw1J3mIe4jPL4BesHfDIAgaG",
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
},
"executionInfo": {
"elapsed": 54654,
"status": "ok",
"timestamp": 1724343050445,
"user": {
"displayName": "",
"userId": ""
},
"user_tz": 420
},
"id": "rw1J3mIe4jPL4BesHfDIAgaG",
"outputId": "f5351818-1713-4af7-8c01-3b42d7cc3dfb",
"tags": []
},
"outputs": [],
"source": [
"%pip install apache-beam[gcp]\n",
"%pip install google-cloud-aiplatform"
]
},
{
"cell_type": "markdown",
"id": "H7DMWLDH3i5W",
"metadata": {
"id": "H7DMWLDH3i5W"
},
"source": [
"### Authentication"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "mZBPPJWY1Bec",
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
},
"executionInfo": {
"elapsed": 5,
"status": "ok",
"timestamp": 1724343050445,
"user": {
"displayName": "",
"userId": ""
},
"user_tz": 420
},
"id": "mZBPPJWY1Bec",
"outputId": "41d8e6a7-ceec-48cf-aba7-9c1972b897a3"
},
"outputs": [],
"source": [
"# If using colab\n",
"from google.colab import auth\n",
"auth.authenticate_user()\n",
"\n",
"# else authenticate using \n",
"# https://cloud.google.com/docs/authentication/client-libraries#python"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "ly1NcQxNUXpl",
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/",
"height": 228
},
"executionInfo": {
"elapsed": 1385,
"status": "error",
"timestamp": 1724343051827,
"user": {
"displayName": "",
"userId": ""
},
"user_tz": 420
},
"id": "ly1NcQxNUXpl",
"outputId": "07b384b6-fe7b-4993-a707-02cb4e1997aa"
},
"outputs": [],
"source": [
"# This is not necessary\n",
"# This will just let you know who you're authenticated as\n",
"import requests\n",
"gcloud_token = !gcloud auth print-access-token\n",
"gcloud_tokeninfo = requests.get('https://www.googleapis.com/oauth2/v3/tokeninfo?access_token=' + gcloud_token[0]).json()\n",
"print(gcloud_tokeninfo['email'])"
]
},
{
"cell_type": "markdown",
"id": "e94ad7e8",
"metadata": {},
"source": [
"### Imports"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "J3gxOg2E13Ga",
"metadata": {
"executionInfo": {
"elapsed": 4672,
"status": "ok",
"timestamp": 1724368704250,
"user": {
"displayName": "",
"userId": ""
},
"user_tz": 420
},
"id": "J3gxOg2E13Ga"
},
"outputs": [],
"source": [
"import os\n",
"from typing import Dict\n",
"import argparse\n",
"import logging\n",
"import json\n",
"import apache_beam as beam\n",
"from apache_beam.io import PubsubMessage\n",
"from apache_beam.io import WriteToPubSub\n",
"from apache_beam.ml.inference.base import RunInference, PredictionResult, KeyedModelHandler, ModelHandler\n",
"from apache_beam.ml.inference.vertex_ai_inference import VertexAIModelHandlerJSON\n",
"from apache_beam.options.pipeline_options import PipelineOptions\n",
"from apache_beam.options.pipeline_options import SetupOptions\n",
"from apache_beam.options.pipeline_options import StandardOptions\n",
"from apache_beam.transforms import window\n",
"from apache_beam.coders import Coder\n",
"from apache_beam.coders import StrUtf8Coder\n",
"from apache_beam.transforms.userstate import BagStateSpec\n",
"from apache_beam.transforms.userstate import ReadModifyWriteStateSpec\n",
"from google.cloud import storage\n",
"import vertexai\n",
"from vertexai.vision_models import Image, ImageQnAModel, Video\n",
"from vertexai.vision_models import VideoSegmentConfig\n",
"from urllib.parse import urlparse"
]
},
{
"cell_type": "markdown",
"id": "u1bESm45z-eH",
"metadata": {
"id": "u1bESm45z-eH"
},
"source": [
"## Pipeline"
]
},
{
"cell_type": "markdown",
"id": "7df29b64",
"metadata": {},
"source": [
"### Global Variables"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "3e104fcd",
"metadata": {},
"outputs": [],
"source": [
"# Task 1. Setup your environment\n",
"# Step 3 \n",
"\n",
"# Fill in below\n",
"os.environ['GOOGLE_CLOUD_PROJECT'] = \"\""
]
},
{
"cell_type": "markdown",
"id": "076979bb",
"metadata": {},
"source": [
"### Upload the photos to your Google Cloud Storage Bucket"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "513e079c",
"metadata": {},
"outputs": [],
"source": [
"# Task 1. Setup your environment\n",
"# Step 4\n",
"!gcloud storage create gs://${GOOGLE_CLOUD_PROJECT}-gcs/ --location=us-central1\n",
"!gcloud storage cp ../*jpg gs://${GOOGLE_CLOUD_PROJECT}-gcs/\n",
"!gcloud storage cp ../members.txt gs://${GOOGLE_CLOUD_PROJECT}-gcs/"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "2d9f2bc1",
"metadata": {},
"outputs": [],
"source": [
"# Don't need to alter\n",
"google_cloud_project = os.environ.get(\"GOOGLE_CLOUD_PROJECT\")\n",
"user_file = f\"gs://{google_cloud_project}-gcs/members.txt\"\n",
"pubsub_topics = {\n",
" 'parking' : ('beam24-workshop-parking-input-topic',\n",
" 'beam24-workshop-parking-input-sub'),\n",
" 'checkin' : ('beam24-workshop-checkin-input-topic',\n",
" 'beam24-workshop-checkin-input-sub'),\n",
" 'area' : ('beam24-workshop-area-input-topic',\n",
" 'beam24-workshop-area-input-sub'),\n",
" 'parking_output' : ('beam24-workshop-parking-output-topic',\n",
" 'beam24-workshop-parking-output-sub'),\n",
" 'discount_output' : ('beam24-workshop-discount-output-topic',\n",
" 'beam24-workshop-discount-output-sub'),\n",
" 'inventory_output' : ('beam24-workshop-inventory-output-topic',\n",
" 'beam24-workshop-inventory-output-sub'),\n",
" 'line_status': ('beam24-workshop-line-input-topic',\n",
" 'beam24-workshop-line-input-sub')\n",
"}\n",
"\n",
"def format_subscription(subscription):\n",
" return 'projects/{}/subscriptions/{}'.format(google_cloud_project, subscription)"
]
},
{
"cell_type": "markdown",
"id": "8193a82e",
"metadata": {},
"source": [
"### Storage Helper Functions"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "NS5rYHE0QY_9",
"metadata": {
"executionInfo": {
"elapsed": 3,
"status": "ok",
"timestamp": 1724368704250,
"user": {
"displayName": "",
"userId": ""
},
"user_tz": 420
},
"id": "NS5rYHE0QY_9"
},
"outputs": [],
"source": [
"# Task 3. Create side input and read\n",
"\n",
"# Helper function to split apart the GCS URI\n",
"def decode_gcs_url(url):\n",
" # Read the URI and parse it\n",
" p = urlparse(url)\n",
" bucket = p.netloc\n",
" file_path = p.path[0:].split('/', 1)\n",
" # Return the relevant objects (bucket, path to object)\n",
" return bucket, file_path[1]\n",
"\n",
"# We can't use the image load from local file since it expects a local path\n",
"# We use a GCS URL and get the bytes of the image\n",
"def read_file(object_path):\n",
" # Parse the path\n",
" bucket, file_path = decode_gcs_url(object_path)\n",
" storage_client = storage.Client()\n",
" bucket = storage_client.bucket(bucket)\n",
" blob = bucket.blob(file_path)\n",
" # Return the object as bytes\n",
" return blob.download_as_bytes()"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "urjOgdnkEJsJ",
"metadata": {
"executionInfo": {
"elapsed": 2,
"status": "ok",
"timestamp": 1724368704250,
"user": {
"displayName": "",
"userId": ""
},
"user_tz": 420
},
"id": "urjOgdnkEJsJ"
},
"outputs": [],
"source": [
"# Task 3. Create side input and read\n",
"# Due to https://github.com/apache/beam/issues/21103\n",
"# We will simulate this for direct running\n",
"\n",
"def create_side_input():\n",
" all_data = read_file(user_file).decode('utf-8')\n",
" lines = all_data.splitlines()\n",
" user_dict = {}\n",
" for line in lines[1:]:\n",
" user = {}\n",
" member_id,first_name,last_name,parking_benefits,tier = line.split(\"|\")\n",
" user[\"first_name\"] = first_name\n",
" user[\"last_name\"] = last_name\n",
" user[\"parking_benefits\"] = parking_benefits\n",
" user[\"tier\"] = tier\n",
" user_dict[member_id] = user\n",
"\n",
" return user_dict\n",
"\n",
"class member_lookup(beam.DoFn):\n",
" def __init__(self):\n",
"\n",
" def setup(self):\n",
"\n",
" def teardown(self):\n",
"\n",
" def process(self, element):\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "jZ6p6c7RGKd1",
"metadata": {
"executionInfo": {
"elapsed": 2,
"status": "ok",
"timestamp": 1724368704250,
"user": {
"displayName": "",
"userId": ""
},
"user_tz": 420
},
"id": "jZ6p6c7RGKd1"
},
"outputs": [],
"source": [
"# Task 9. State updates\n",
"class busy_check(beam.DoFn):\n",
"\n",
" def process(self, element, previous_status_state=beam.DoFn.StateParam(STATUS_STATE),):\n",
"\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "4041YAKyz8uy",
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/",
"height": 880
},
"executionInfo": {
"elapsed": 163917,
"status": "error",
"timestamp": 1724368886549,
"user": {
"displayName": "",
"userId": ""
},
"user_tz": 420
},
"id": "4041YAKyz8uy",
"outputId": "bf992510-cf44-496b-e9f3-93e7b25c3a05"
},
"outputs": [],
"source": [
"# Task 7. Create a custom model handler\n",
"# We reuse the decode_gcs_url_function from Task 3\n",
"class get_image_bytes(beam.DoFn):\n",
" def setup(self):\n",
" self.client = storage.Client()\n",
"\n",
" def process(self, element):\n",
" key, image_url = element[0], element[1]\n",
" bucket, file_path = decode_gcs_url(image_url)\n",
" bucket = self.client.bucket(bucket)\n",
" blob = bucket.blob(file_path)\n",
" # Return the object as bytes\n",
" return [(key,(image_url,blob.download_as_bytes()))]\n",
"\n",
"# Task 7. Create a custom model handler\n",
"# Multi Modal Custom Handler\n",
"class Cloud_Multi_Modal_ModelHandler(ModelHandler):\n",
" def load_model(self):\n",
"\n",
" def run_inference(self, batch, model,inference):\n",
"\n",
"\n",
"# Task 2. Read from the various sources\n",
"def format_to_tuple(element):\n",
" incoming = json.loads(element)\n",
" return [(incoming[\"area\"],incoming[\"image\"])]\n",
"\n",
"# Task 2. Read from the various sources\n",
"def format_area(element):\n",
" incoming = json.loads(element)\n",
" return (incoming['area'],(incoming['transaction_id'],[None]))\n",
"\n",
"def run(argv=None, save_main_session=True):\n",
" parser = argparse.ArgumentParser()\n",
" known_args, pipeline_args = parser.parse_known_args(argv)\n",
" pipeline_options = PipelineOptions(pipeline_args,experiments=['pickle_library=cloudpickle'])\n",
" pipeline_options.view_as(StandardOptions).streaming = True\n",
" pipeline_options.view_as(SetupOptions).save_main_session = save_main_session\n",
"\n",
" # Task 8. RunInference\n",
"\n",
" # Put everything together in a streaming pipeline.\n",
" with beam.Pipeline(options=pipeline_options) as p:\n",
"\n",
"\n",
"run()"
]
}
],
"metadata": {
"colab": {
"collapsed_sections": [
"8esns_iU01cb",
"GqIf8pJ806Cl"
],
"name": "2024_beam_workshop",
"provenance": [],
"toc_visible": true
},
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.10"
}
},
"nbformat": 4,
"nbformat_minor": 5
}