data-analytics/beam_multimodal_streaming/full/streaming_pipeline.ipynb (609 lines of code) (raw):
{
"cells": [
{
"cell_type": "markdown",
"id": "1z4ltDfJ00Sp",
"metadata": {
"id": "1z4ltDfJ00Sp"
},
"source": [
"# Streaming Multi-Modal, Multi-Input, Multi-Output with Apache Beam"
]
},
{
"cell_type": "markdown",
"id": "GqIf8pJ806Cl",
"metadata": {
"id": "GqIf8pJ806Cl"
},
"source": [
"## Setup"
]
},
{
"cell_type": "markdown",
"id": "8esns_iU01cb",
"metadata": {
"id": "8esns_iU01cb"
},
"source": [
"### Library installs"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "rw1J3mIe4jPL4BesHfDIAgaG",
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
},
"executionInfo": {
"elapsed": 54654,
"status": "ok",
"timestamp": 1724343050445,
"user": {
"displayName": "",
"userId": ""
},
"user_tz": 420
},
"id": "rw1J3mIe4jPL4BesHfDIAgaG",
"outputId": "f5351818-1713-4af7-8c01-3b42d7cc3dfb",
"tags": []
},
"outputs": [],
"source": [
"%pip install apache-beam[gcp]\n",
"%pip install google-cloud-aiplatform"
]
},
{
"cell_type": "markdown",
"id": "H7DMWLDH3i5W",
"metadata": {
"id": "H7DMWLDH3i5W"
},
"source": [
"### Authentication"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "mZBPPJWY1Bec",
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
},
"executionInfo": {
"elapsed": 5,
"status": "ok",
"timestamp": 1724343050445,
"user": {
"displayName": "",
"userId": ""
},
"user_tz": 420
},
"id": "mZBPPJWY1Bec",
"outputId": "41d8e6a7-ceec-48cf-aba7-9c1972b897a3"
},
"outputs": [],
"source": [
"# If using colab\n",
"from google.colab import auth\n",
"auth.authenticate_user()\n",
"\n",
"# else authenticate using \n",
"# https://cloud.google.com/docs/authentication/client-libraries#python"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "ly1NcQxNUXpl",
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/",
"height": 228
},
"executionInfo": {
"elapsed": 1385,
"status": "error",
"timestamp": 1724343051827,
"user": {
"displayName": "",
"userId": ""
},
"user_tz": 420
},
"id": "ly1NcQxNUXpl",
"outputId": "07b384b6-fe7b-4993-a707-02cb4e1997aa"
},
"outputs": [],
"source": [
"# This is not necessary\n",
"# This will just let you know who you're authenticated as\n",
"import requests\n",
"gcloud_token = !gcloud auth print-access-token\n",
"gcloud_tokeninfo = requests.get('https://www.googleapis.com/oauth2/v3/tokeninfo?access_token=' + gcloud_token[0]).json()\n",
"print(gcloud_tokeninfo['email'])"
]
},
{
"cell_type": "markdown",
"id": "e94ad7e8",
"metadata": {},
"source": [
"### Imports"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "J3gxOg2E13Ga",
"metadata": {
"executionInfo": {
"elapsed": 4672,
"status": "ok",
"timestamp": 1724368704250,
"user": {
"displayName": "",
"userId": ""
},
"user_tz": 420
},
"id": "J3gxOg2E13Ga"
},
"outputs": [],
"source": [
"import os\n",
"from typing import Dict\n",
"import argparse\n",
"import logging\n",
"import json\n",
"import apache_beam as beam\n",
"from apache_beam.io import PubsubMessage\n",
"from apache_beam.io import WriteToPubSub\n",
"from apache_beam.ml.inference.base import RunInference, PredictionResult, KeyedModelHandler, ModelHandler\n",
"from apache_beam.ml.inference.vertex_ai_inference import VertexAIModelHandlerJSON\n",
"from apache_beam.options.pipeline_options import PipelineOptions\n",
"from apache_beam.options.pipeline_options import SetupOptions\n",
"from apache_beam.options.pipeline_options import StandardOptions\n",
"from apache_beam.transforms import window\n",
"from apache_beam.coders import Coder\n",
"from apache_beam.coders import StrUtf8Coder\n",
"from apache_beam.transforms.userstate import BagStateSpec\n",
"from apache_beam.transforms.userstate import ReadModifyWriteStateSpec\n",
"from google.cloud import storage\n",
"import vertexai\n",
"from vertexai.vision_models import Image, ImageQnAModel, Video\n",
"from vertexai.vision_models import VideoSegmentConfig\n",
"from urllib.parse import urlparse"
]
},
{
"cell_type": "markdown",
"id": "u1bESm45z-eH",
"metadata": {
"id": "u1bESm45z-eH"
},
"source": [
"## Pipeline"
]
},
{
"cell_type": "markdown",
"id": "7df29b64",
"metadata": {},
"source": [
"### Global Variables"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "3e104fcd",
"metadata": {},
"outputs": [],
"source": [
"# Task 1. Setup your environment\n",
"# Step 3 \n",
"\n",
"# Fill in below\n",
"os.environ['GOOGLE_CLOUD_PROJECT'] = \"\""
]
},
{
"cell_type": "markdown",
"id": "076979bb",
"metadata": {},
"source": [
"### Upload the photos to your Google Cloud Storage Bucket"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "513e079c",
"metadata": {},
"outputs": [],
"source": [
"# Task 1. Setup your environment\n",
"# Step 4\n",
"# Skip this if you've already done it \n",
"!gcloud storage create gs://${GOOGLE_CLOUD_PROJECT}-gcs/ --location=us-central1\n",
"!gcloud storage cp ../*jpg gs://${GOOGLE_CLOUD_PROJECT}-gcs/\n",
"!gcloud storage cp ../members.txt gs://${GOOGLE_CLOUD_PROJECT}-gcs/"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "2d9f2bc1",
"metadata": {},
"outputs": [],
"source": [
"# Don't need to alter\n",
"google_cloud_project = os.environ.get(\"GOOGLE_CLOUD_PROJECT\")\n",
"user_file = f\"gs://{google_cloud_project}-gcs/members.txt\"\n",
"pubsub_topics = {\n",
" 'parking' : ('beam24-workshop-parking-input-topic',\n",
" 'beam24-workshop-parking-input-sub'),\n",
" 'checkin' : ('beam24-workshop-checkin-input-topic',\n",
" 'beam24-workshop-checkin-input-sub'),\n",
" 'area' : ('beam24-workshop-area-input-topic',\n",
" 'beam24-workshop-area-input-sub'),\n",
" 'parking_output' : ('beam24-workshop-parking-output-topic',\n",
" 'beam24-workshop-parking-output-sub'),\n",
" 'discount_output' : ('beam24-workshop-discount-output-topic',\n",
" 'beam24-workshop-discount-output-sub'),\n",
" 'inventory_output' : ('beam24-workshop-inventory-output-topic',\n",
" 'beam24-workshop-inventory-output-sub'),\n",
" 'line_status': ('beam24-workshop-line-input-topic',\n",
" 'beam24-workshop-line-input-sub')\n",
"}\n",
"\n",
"def format_subscription(subscription):\n",
" return 'projects/{}/subscriptions/{}'.format(google_cloud_project, subscription)"
]
},
{
"cell_type": "markdown",
"id": "8193a82e",
"metadata": {},
"source": [
"### Storage Helper Functions"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "NS5rYHE0QY_9",
"metadata": {
"executionInfo": {
"elapsed": 3,
"status": "ok",
"timestamp": 1724368704250,
"user": {
"displayName": "",
"userId": ""
},
"user_tz": 420
},
"id": "NS5rYHE0QY_9"
},
"outputs": [],
"source": [
"# Task 3. Create side input and read\n",
"\n",
"# Helper function to split apart the GCS URI\n",
"def decode_gcs_url(url):\n",
" # Read the URI and parse it\n",
" p = urlparse(url)\n",
" bucket = p.netloc\n",
" file_path = p.path[0:].split('/', 1)\n",
" # Return the relevant objects (bucket, path to object)\n",
" return bucket, file_path[1]\n",
"\n",
"# We can't use the image load from local file since it expects a local path\n",
"# We use a GCS URL and get the bytes of the image\n",
"def read_file(object_path):\n",
" # Parse the path\n",
" bucket, file_path = decode_gcs_url(object_path)\n",
" storage_client = storage.Client()\n",
" bucket = storage_client.bucket(bucket)\n",
" blob = bucket.blob(file_path)\n",
" # Return the object as bytes\n",
" return blob.download_as_bytes()"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "urjOgdnkEJsJ",
"metadata": {
"executionInfo": {
"elapsed": 2,
"status": "ok",
"timestamp": 1724368704250,
"user": {
"displayName": "",
"userId": ""
},
"user_tz": 420
},
"id": "urjOgdnkEJsJ"
},
"outputs": [],
"source": [
"# Task 3. Create side input and read\n",
"# Due to https://github.com/apache/beam/issues/21103\n",
"# We will simulate this for direct running\n",
"\n",
"def create_side_input():\n",
" all_data = read_file(user_file).decode('utf-8')\n",
" lines = all_data.splitlines()\n",
" user_dict = {}\n",
" for line in lines[1:]:\n",
" user = {}\n",
" member_id,first_name,last_name,parking_benefits,tier = line.split(\"|\")\n",
" user[\"first_name\"] = first_name\n",
" user[\"last_name\"] = last_name\n",
" user[\"parking_benefits\"] = parking_benefits\n",
" user[\"tier\"] = tier\n",
" user_dict[member_id] = user\n",
"\n",
" return user_dict\n",
"\n",
"class member_lookup(beam.DoFn):\n",
" def __init__(self):\n",
" self.user_dict = None\n",
"\n",
" def setup(self):\n",
" self.user_dict = create_side_input()\n",
"\n",
" def teardown(self):\n",
" self.user_dict = None\n",
"\n",
" def process(self, element):\n",
" lookup = json.loads(element)\n",
" member_id = lookup['member_id']\n",
" if member_id is None:\n",
" return [(None,(None,element))]\n",
" return [(member_id,(self.user_dict[member_id],element))]"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "jZ6p6c7RGKd1",
"metadata": {
"executionInfo": {
"elapsed": 2,
"status": "ok",
"timestamp": 1724368704250,
"user": {
"displayName": "",
"userId": ""
},
"user_tz": 420
},
"id": "jZ6p6c7RGKd1"
},
"outputs": [],
"source": [
"# Task 9. State updates\n",
"class busy_check(beam.DoFn):\n",
" STATUS_STATE = ReadModifyWriteStateSpec('previous_status_state', StrUtf8Coder())\n",
"\n",
" def process(self, element, previous_status_state=beam.DoFn.StateParam(STATUS_STATE),):\n",
" key = element[0]\n",
" transaction_id = element[1][0]\n",
" incoming_state = element[1][1][0]\n",
" previous_status = previous_status_state.read()\n",
" output = None\n",
" if incoming_state != None:\n",
" output = (\"discard {}\".format(key), (previous_status,incoming_state))\n",
" previous_status_state.write(incoming_state)\n",
" elif previous_status is None:\n",
" output = (key, (transaction_id,\"Unknown\"))\n",
" else:\n",
" output = (key, (transaction_id, previous_status))\n",
" return [output]\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "4041YAKyz8uy",
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/",
"height": 880
},
"executionInfo": {
"elapsed": 163917,
"status": "error",
"timestamp": 1724368886549,
"user": {
"displayName": "",
"userId": ""
},
"user_tz": 420
},
"id": "4041YAKyz8uy",
"outputId": "bf992510-cf44-496b-e9f3-93e7b25c3a05"
},
"outputs": [],
"source": [
"# Task 7. Create a custom model handler\n",
"# We reuse the decode_gcs_url_function from Task 3\n",
"class get_image_bytes(beam.DoFn):\n",
" def setup(self):\n",
" self.client = storage.Client()\n",
"\n",
" def process(self, element):\n",
" key, image_url = element[0], element[1]\n",
" bucket, file_path = decode_gcs_url(image_url)\n",
" bucket = self.client.bucket(bucket)\n",
" blob = bucket.blob(file_path)\n",
" # Return the object as bytes\n",
" return [(key,(image_url,blob.download_as_bytes()))]\n",
"\n",
"# Task 7. Create a custom model handler\n",
"# Multi Modal Custom Handler\n",
"class Cloud_Multi_Modal_ModelHandler(ModelHandler):\n",
" def load_model(self):\n",
" \"\"\"Initiate the Google Vision API client.\"\"\"\n",
" vertexai.init(project=google_cloud_project, location=\"us-central1\")\n",
" client = ImageQnAModel.from_pretrained(\"imagetext@001\")\n",
" return client\n",
"\n",
" def run_inference(self, batch, model,inference):\n",
" image_url = batch[0][0]\n",
" image_bytes = batch[0][1]\n",
" image = Image(image_bytes)\n",
"\n",
" results = model.ask_question(\n",
" image=image,\n",
" question=\"Are there any people in this picture\",\n",
" number_of_results=1\n",
" )\n",
"\n",
" return [(image_url, results)]\n",
"\n",
"# Task 2. Read from the various sources\n",
"def format_to_tuple(element):\n",
" incoming = json.loads(element)\n",
" return [(incoming[\"area\"],incoming[\"image\"])]\n",
"\n",
"# Task 2. Read from the various sources\n",
"def format_area(element):\n",
" incoming = json.loads(element)\n",
" return (incoming['area'],(incoming['transaction_id'],[None]))\n",
"\n",
"def run(argv=None, save_main_session=True):\n",
" parser = argparse.ArgumentParser()\n",
" known_args, pipeline_args = parser.parse_known_args(argv)\n",
" pipeline_options = PipelineOptions(pipeline_args,experiments=['pickle_library=cloudpickle'])\n",
" pipeline_options.view_as(StandardOptions).streaming = True\n",
" pipeline_options.view_as(SetupOptions).save_main_session = save_main_session\n",
"\n",
" # Task 8. RunInference\n",
" keyed_custom_model_handler = KeyedModelHandler(Cloud_Multi_Modal_ModelHandler())\n",
"\n",
" # Put everything together in a streaming pipeline.\n",
" with beam.Pipeline(options=pipeline_options) as p:\n",
" # Task 2. Read from the various sources\n",
" # Reading area check logs\n",
" area_check_logs = (\n",
" p\n",
" | \"read area logs\" >> beam.io.ReadFromPubSub(subscription=format_subscription(pubsub_topics['area'][1]))\n",
" | 'decode pos' >> beam.Map(lambda x: x.decode('utf-8')))\n",
"\n",
" area_check_formatted = area_check_logs | beam.Map(format_area)\n",
"\n",
" # Task 2. Read from the various sources\n",
" # Reading parking logs\n",
" parking_logs = (\n",
" p\n",
" | beam.io.ReadFromPubSub(subscription=format_subscription(pubsub_topics['parking'][1]))\n",
" | 'decode parking' >> beam.Map(lambda x: x.decode('utf-8'))\n",
" | 'window parking' >> beam.WindowInto(window.FixedWindows(30, 0)))\n",
"\n",
" # Task 4. Use the side input to key parking and check-in logs\n",
" parking_member_lookup = (\n",
" parking_logs\n",
" | 'lookup member parking' >> beam.ParDo(member_lookup())\n",
" )\n",
"\n",
" # Task 2. Read from the various sources\n",
" # Reading check-in logs\n",
" checkin_logs = (\n",
" p\n",
" | \"read checkin\" >> beam.io.ReadFromPubSub(subscription=format_subscription(pubsub_topics['checkin'][1]))\n",
" | 'decode checkin' >> beam.Map(lambda x: x.decode('utf-8'))\n",
" | 'window checkin' >> beam.WindowInto(window.FixedWindows(30, 0)))\n",
"\n",
" # Task 4. Use the side input to key parking and check-in logs\n",
" checkin_member_lookup = (\n",
" checkin_logs\n",
" | 'lookup member checkin' >> beam.ParDo(member_lookup())\n",
" )\n",
"\n",
" # Task 5. Merge the keyed parking and check-in logs\n",
" parking_member_only = parking_member_lookup | 'filter parking' >> beam.Filter(lambda x: x[0] is not None)\n",
" checkin_member_only = checkin_member_lookup | 'filter checkin' >> beam.Filter(lambda x: x[0] is not None)\n",
" upsell = (({\n",
" 'parking': parking_member_only, 'checkin': checkin_member_only\n",
" })\n",
" | 'Merge' >> beam.CoGroupByKey()\n",
" )\n",
" push_upsell = upsell | beam.Filter(lambda merged: len(merged[1]['parking']) > 0 and len(merged[1]['checkin']) > 0)\n",
"\n",
" # Task 6. Output the joined data\n",
" # You may opt to use something else\n",
" # Standard Output is only for demonstration purposes\n",
" _ = (push_upsell | beam.Map(print))\n",
"\n",
"\n",
" # Task 2. Read from the various sources\n",
" # Reading line logs\n",
" line_logs = (\n",
" p\n",
" | \"read line logs\" >> beam.io.ReadFromPubSub(subscription=format_subscription(pubsub_topics['line_status'][1]))\n",
" | 'decode' >> beam.Map(lambda x: x.decode('utf-8')))\n",
"\n",
" # Task 8. RunInference\n",
" inference_result = (\n",
" line_logs\n",
" | 'format line logs to tuple' >> beam.ParDo(format_to_tuple)\n",
" | 'get bytes' >> beam.ParDo(get_image_bytes())\n",
" | 'run inference' >> RunInference(keyed_custom_model_handler)\n",
" )\n",
"\n",
" # Task 9. State updates\n",
" merged = ((inference_result,area_check_formatted) | 'Merge PCollections' >> beam.Flatten())\n",
"\n",
" return_stats = merged | beam.ParDo(busy_check())\n",
"\n",
" # Task 10. Output the line status\n",
" _ = return_stats | \"print returned status \" >> beam.Map(print)\n",
"\n",
"run()"
]
}
],
"metadata": {
"colab": {
"collapsed_sections": [
"8esns_iU01cb",
"GqIf8pJ806Cl"
],
"name": "2024_beam_workshop",
"provenance": [],
"toc_visible": true
},
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.10"
}
},
"nbformat": 4,
"nbformat_minor": 5
}