in pipelines/marketing_intelligence/marketing_intelligence_pipeline/pipeline.py [0:0]
def create_pipeline(options: MyPipelineOptions) -> Pipeline:
""" Create the pipeline object.
Args:
options: The pipeline options, with type `MyPipelineOptions`.
Returns:
The pipeline object.
"""
pipeline = beam.Pipeline(options=options)
# Extract
messages: PCollection[str] = pipeline | "Read" >> _extract(
subscription=options.messages_subscription)
# Transform
predictions: PCollection[str] = messages | "Transform" >> _transform(
model_endpoint=options.model_endpoint,
project=options.project_id,
location=options.location)
# Load
predictions | "Publish Result" >> pubsub.WriteStringsToPubSub(
topic=options.responses_topic)
return pipeline