in pipelines/anomaly_detection/anomaly_detection_pipeline/pipeline.py [0:0]
def create_pipeline(options: MyPipelineOptions) -> Pipeline:
""" Create the pipeline object.
Args:
options: The pipeline options, with type `MyPipelineOptions`.
Returns:
The pipeline object.
"""
pipeline = beam.Pipeline(options=options)
# Extract
transactions: PCollection[str] = pipeline | "Read" >> _extract(
subscription=options.messages_subscription)
# Transform
responses: PCollection[str] = transactions | "Transform" >> _transform(
model_endpoint=options.model_endpoint,
project=options.project,
location=options.location)
# Load
responses | "Publish Result" >> pubsub.WriteStringsToPubSub(
topic=options.responses_topic)
return pipeline