pipelines/marketing_intelligence/marketing_intelligence_pipeline/pipeline.py (38 lines of code) (raw):

# Copyright 2025 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ Pipeline of the Marketing Intelligence Dataflow Solution guide. """ from apache_beam import Pipeline, PCollection from apache_beam.ml.inference import RunInference from apache_beam.io.gcp import pubsub import json import apache_beam as beam from apache_beam.ml.inference.base import PredictionResult from apache_beam.ml.inference.vertex_ai_inference import VertexAIModelHandlerJSON from .options import MyPipelineOptions # Format the predictions sent by the Vertex AI Endpoint def _format_output(element: PredictionResult) -> str: return f"Input: \n{element.example}, \n\n\nOutput: \n{element.infernece}" # Format the input and send each input as a dictionary def _format_input(x: bytes) -> dict: instance_dict = json.loads(x.decode("utf-8")) return instance_dict # Read input from Pub/Sub (all input data to be sent in String) and format it @beam.ptransform_fn def _extract(p: Pipeline, subscription: str) -> PCollection[str]: msgs: PCollection[bytes] = p | "Read subscription" >> beam.io.ReadFromPubSub( subscription=subscription) return msgs | "Parse and format Input" >> beam.Map(_format_input) # TODO Add transformation for BigTable Enrichment # Request predictions from the Vertex AI endpoint by sending the formatted input @beam.ptransform_fn def _transform(msgs: PCollection[str], model_endpoint: str, project: str, location: str) -> PCollection[str]: model_handler = VertexAIModelHandlerJSON( endpoint_id=model_endpoint, project=project, location=location) preds: PCollection[ PredictionResult] = msgs | "RunInference-vertexai" >> RunInference( model_handler) return preds | "Format Output" >> beam.Map(_format_output) def create_pipeline(options: MyPipelineOptions) -> Pipeline: """ Create the pipeline object. Args: options: The pipeline options, with type `MyPipelineOptions`. Returns: The pipeline object. """ pipeline = beam.Pipeline(options=options) # Extract messages: PCollection[str] = pipeline | "Read" >> _extract( subscription=options.messages_subscription) # Transform predictions: PCollection[str] = messages | "Transform" >> _transform( model_endpoint=options.model_endpoint, project=options.project_id, location=options.location) # Load predictions | "Publish Result" >> pubsub.WriteStringsToPubSub( topic=options.responses_topic) return pipeline