# Streaming Multi-Modal, Multi-Input, Multi-Output with Apache Beam

## Setup

### Library installs

In [None]:
%pip install apache-beam[gcp]
%pip install google-cloud-aiplatform

### Authentication

In [None]:
# If using colab
from google.colab import auth
auth.authenticate_user()

# else authenticate using 
# https://cloud.google.com/docs/authentication/client-libraries#python

In [None]:
# This is not necessary
# This will just let you know who you're authenticated as
import requests
gcloud_token = !gcloud auth print-access-token
gcloud_tokeninfo = requests.get('https://www.googleapis.com/oauth2/v3/tokeninfo?access_token=' + gcloud_token[0]).json()
print(gcloud_tokeninfo['email'])

### Imports

In [None]:
import os
from typing import Dict
import argparse
import logging
import json
import apache_beam as beam
from apache_beam.io import PubsubMessage
from apache_beam.io import WriteToPubSub
from apache_beam.ml.inference.base import RunInference, PredictionResult, KeyedModelHandler, ModelHandler
from apache_beam.ml.inference.vertex_ai_inference import VertexAIModelHandlerJSON
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.transforms import window
from apache_beam.coders import Coder
from apache_beam.coders import StrUtf8Coder
from apache_beam.transforms.userstate import BagStateSpec
from apache_beam.transforms.userstate import ReadModifyWriteStateSpec
from google.cloud import storage
import vertexai
from vertexai.vision_models import Image, ImageQnAModel, Video
from vertexai.vision_models import VideoSegmentConfig
from urllib.parse import urlparse

## Pipeline

### Global Variables

In [None]:
# Task 1. Setup your environment
# Step 3 

# Fill in below
os.environ['GOOGLE_CLOUD_PROJECT'] = ""

### Upload the photos to your Google Cloud Storage Bucket

In [None]:
# Task 1. Setup your environment
# Step 4
# Skip this if you've already done it 
!gcloud storage create gs://${GOOGLE_CLOUD_PROJECT}-gcs/ --location=us-central1
!gcloud storage cp ../*jpg gs://${GOOGLE_CLOUD_PROJECT}-gcs/
!gcloud storage cp ../members.txt gs://${GOOGLE_CLOUD_PROJECT}-gcs/

In [None]:
# Don't need to alter
google_cloud_project = os.environ.get("GOOGLE_CLOUD_PROJECT")
user_file = f"gs://{google_cloud_project}-gcs/members.txt"
pubsub_topics = {
    'parking' : ('beam24-workshop-parking-input-topic',
        'beam24-workshop-parking-input-sub'),
    'checkin' : ('beam24-workshop-checkin-input-topic',
        'beam24-workshop-checkin-input-sub'),
    'area' : ('beam24-workshop-area-input-topic',
        'beam24-workshop-area-input-sub'),
    'parking_output' : ('beam24-workshop-parking-output-topic',
        'beam24-workshop-parking-output-sub'),
    'discount_output' : ('beam24-workshop-discount-output-topic',
        'beam24-workshop-discount-output-sub'),
    'inventory_output' : ('beam24-workshop-inventory-output-topic',
        'beam24-workshop-inventory-output-sub'),
    'line_status': ('beam24-workshop-line-input-topic',
        'beam24-workshop-line-input-sub')
}

def format_subscription(subscription):
    return 'projects/{}/subscriptions/{}'.format(google_cloud_project, subscription)

### Storage Helper Functions

In [None]:
# Task 3. Create side input and read

# Helper function to split apart the GCS URI
def decode_gcs_url(url):
    # Read the URI and parse it
    p = urlparse(url)
    bucket = p.netloc
    file_path = p.path[0:].split('/', 1)
    # Return the relevant objects (bucket, path to object)
    return bucket, file_path[1]

# We can't use the image load from local file since it expects a local path
# We use a GCS URL and get the bytes of the image
def read_file(object_path):
    # Parse the path
    bucket, file_path = decode_gcs_url(object_path)
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket)
    blob = bucket.blob(file_path)
    # Return the object as bytes
    return blob.download_as_bytes()

In [None]:
# Task 3. Create side input and read
# Due to https://github.com/apache/beam/issues/21103
# We will simulate this for direct running

def create_side_input():
    all_data = read_file(user_file).decode('utf-8')
    lines = all_data.splitlines()
    user_dict = {}
    for line in lines[1:]:
        user = {}
        member_id,first_name,last_name,parking_benefits,tier = line.split("|")
        user["first_name"] = first_name
        user["last_name"] = last_name
        user["parking_benefits"] = parking_benefits
        user["tier"] = tier
        user_dict[member_id] = user

    return user_dict

class member_lookup(beam.DoFn):
    def __init__(self):
        self.user_dict = None

    def setup(self):
        self.user_dict = create_side_input()

    def teardown(self):
        self.user_dict = None

    def process(self, element):
        lookup = json.loads(element)
        member_id = lookup['member_id']
        if member_id is None:
            return [(None,(None,element))]
        return [(member_id,(self.user_dict[member_id],element))]

In [None]:
# Task 9. State updates
class busy_check(beam.DoFn):
    STATUS_STATE = ReadModifyWriteStateSpec('previous_status_state', StrUtf8Coder())

    def process(self, element, previous_status_state=beam.DoFn.StateParam(STATUS_STATE),):
        key = element[0]
        transaction_id = element[1][0]
        incoming_state = element[1][1][0]
        previous_status = previous_status_state.read()
        output = None
        if incoming_state != None:
            output = ("discard {}".format(key), (previous_status,incoming_state))
            previous_status_state.write(incoming_state)
        elif previous_status is None:
            output = (key, (transaction_id,"Unknown"))
        else:
            output = (key, (transaction_id, previous_status))
        return [output]


In [None]:
# Task 7. Create a custom model handler
# We reuse the decode_gcs_url_function from Task 3
class get_image_bytes(beam.DoFn):
    def setup(self):
        self.client = storage.Client()

    def process(self, element):
        key, image_url = element[0], element[1]
        bucket, file_path = decode_gcs_url(image_url)
        bucket = self.client.bucket(bucket)
        blob = bucket.blob(file_path)
        # Return the object as bytes
        return [(key,(image_url,blob.download_as_bytes()))]

# Task 7. Create a custom model handler
# Multi Modal Custom Handler
class Cloud_Multi_Modal_ModelHandler(ModelHandler):
    def load_model(self):
        """Initiate the Google Vision API client."""
        vertexai.init(project=google_cloud_project, location="us-central1")
        client = ImageQnAModel.from_pretrained("imagetext@001")
        return client

    def run_inference(self, batch, model,inference):
        image_url = batch[0][0]
        image_bytes = batch[0][1]
        image = Image(image_bytes)

        results = model.ask_question(
            image=image,
            question="Are there any people in this picture",
            number_of_results=1
        )

        return [(image_url, results)]

# Task 2. Read from the various sources
def format_to_tuple(element):
    incoming = json.loads(element)
    return [(incoming["area"],incoming["image"])]

# Task 2. Read from the various sources
def format_area(element):
    incoming = json.loads(element)
    return (incoming['area'],(incoming['transaction_id'],[None]))

def run(argv=None, save_main_session=True):
    parser = argparse.ArgumentParser()
    known_args, pipeline_args = parser.parse_known_args(argv)
    pipeline_options = PipelineOptions(pipeline_args,experiments=['pickle_library=cloudpickle'])
    pipeline_options.view_as(StandardOptions).streaming = True
    pipeline_options.view_as(SetupOptions).save_main_session = save_main_session

    # Task 8. RunInference
    keyed_custom_model_handler = KeyedModelHandler(Cloud_Multi_Modal_ModelHandler())

    # Put everything together in a streaming pipeline.
    with beam.Pipeline(options=pipeline_options) as p:
        # Task 2. Read from the various sources
        # Reading area check logs
        area_check_logs = (
            p
            | "read area logs" >> beam.io.ReadFromPubSub(subscription=format_subscription(pubsub_topics['area'][1]))
            | 'decode pos' >> beam.Map(lambda x: x.decode('utf-8')))

        area_check_formatted = area_check_logs | beam.Map(format_area)

        # Task 2. Read from the various sources
        # Reading parking logs
        parking_logs = (
            p
            | beam.io.ReadFromPubSub(subscription=format_subscription(pubsub_topics['parking'][1]))
            | 'decode parking' >> beam.Map(lambda x: x.decode('utf-8'))
            | 'window parking' >> beam.WindowInto(window.FixedWindows(30, 0)))

        # Task 4. Use the side input to key parking and check-in logs
        parking_member_lookup = (
            parking_logs
            | 'lookup member parking' >> beam.ParDo(member_lookup())
        )

        # Task 2. Read from the various sources
        # Reading check-in logs
        checkin_logs = (
            p
            | "read checkin" >> beam.io.ReadFromPubSub(subscription=format_subscription(pubsub_topics['checkin'][1]))
            | 'decode checkin' >> beam.Map(lambda x: x.decode('utf-8'))
            | 'window checkin' >> beam.WindowInto(window.FixedWindows(30, 0)))

        # Task 4. Use the side input to key parking and check-in logs
        checkin_member_lookup = (
            checkin_logs
            | 'lookup member checkin' >> beam.ParDo(member_lookup())
        )

        # Task 5. Merge the keyed parking and check-in logs
        parking_member_only = parking_member_lookup | 'filter parking' >> beam.Filter(lambda x: x[0] is not None)
        checkin_member_only = checkin_member_lookup | 'filter checkin' >> beam.Filter(lambda x: x[0] is not None)
        upsell = (({
                'parking': parking_member_only, 'checkin': checkin_member_only
            })
            | 'Merge' >> beam.CoGroupByKey()
        )
        push_upsell = upsell | beam.Filter(lambda merged: len(merged[1]['parking']) > 0 and len(merged[1]['checkin']) > 0)

        # Task 6. Output the joined data
        # You may opt to use something else
        # Standard Output is only for demonstration purposes
        _ = (push_upsell | beam.Map(print))


        # Task 2. Read from the various sources
        # Reading line logs
        line_logs = (
            p
            | "read line logs" >> beam.io.ReadFromPubSub(subscription=format_subscription(pubsub_topics['line_status'][1]))
            | 'decode' >> beam.Map(lambda x: x.decode('utf-8')))

        # Task 8. RunInference
        inference_result = (
            line_logs
            | 'format line logs to tuple' >> beam.ParDo(format_to_tuple)
            | 'get bytes' >> beam.ParDo(get_image_bytes())
            | 'run inference' >> RunInference(keyed_custom_model_handler)
        )

        # Task 9. State updates
        merged = ((inference_result,area_check_formatted) | 'Merge PCollections' >> beam.Flatten())

        return_stats = merged | beam.ParDo(busy_check())

        # Task 10. Output the line status
        _ = return_stats | "print returned status " >> beam.Map(print)

run()