in pipelines/ml_ai_python/ml_ai_pipeline/pipeline.py [0:0]
def create_pipeline(options: MyPipelineOptions) -> Pipeline:
""" Create the pipeline object.
Args:
options: The pipeline options, with type `MyPipelineOptions`.
Returns:
The pipeline object.
"""
pipeline = beam.Pipeline(options=options)
# Extract
msgs: PCollection[str] = pipeline | "Read" >> _extract(
subscription=options.messages_subscription)
# Transform
responses: PCollection[str] = msgs | "Transform" >> _transform(
model_path=options.model_path)
# Load
responses | "Publish Result" >> pubsub.WriteStringsToPubSub(
topic=options.responses_topic)
return pipeline