pipelines/anomaly_detection/anomaly_detection_pipeline/pipeline.py (34 lines of code) (raw):
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Anomaly Detection Apache Beam pipeline.
"""
from apache_beam import Pipeline, PCollection
from apache_beam.ml.inference import RunInference
from apache_beam.io.gcp import pubsub
import apache_beam as beam
from apache_beam.ml.inference.base import PredictionResult
from apache_beam.ml.inference.vertex_ai_inference import VertexAIModelHandlerJSON
from .options import MyPipelineOptions
def _format_output(element: PredictionResult) -> str:
return f"Input: \n{element.example}, \n\n\nOutput: \n{element.inference}"
@beam.ptransform_fn
def _extract(p: Pipeline, subscription: str) -> PCollection[str]:
msgs: PCollection[bytes] = p | "Read subscription" >> beam.io.ReadFromPubSub(
subscription=subscription)
return msgs | "Parse" >> beam.Map(lambda x: x.decode("utf-8"))
@beam.ptransform_fn
def _transform(msgs: PCollection[str], model_endpoint: str, project: str,
location: str) -> PCollection[str]:
model_handler = VertexAIModelHandlerJSON(
endpoint_id=model_endpoint, project=project, location=location)
preds: PCollection[
PredictionResult] = msgs | "RunInference-vertexai" >> RunInference(
model_handler)
return preds | "Format Output" >> beam.Map(_format_output)
def create_pipeline(options: MyPipelineOptions) -> Pipeline:
""" Create the pipeline object.
Args:
options: The pipeline options, with type `MyPipelineOptions`.
Returns:
The pipeline object.
"""
pipeline = beam.Pipeline(options=options)
# Extract
transactions: PCollection[str] = pipeline | "Read" >> _extract(
subscription=options.messages_subscription)
# Transform
responses: PCollection[str] = transactions | "Transform" >> _transform(
model_endpoint=options.model_endpoint,
project=options.project,
location=options.location)
# Load
responses | "Publish Result" >> pubsub.WriteStringsToPubSub(
topic=options.responses_topic)
return pipeline