def create_pipeline()

in pipelines/anomaly_detection/anomaly_detection_pipeline/pipeline.py [0:0]


def create_pipeline(options: MyPipelineOptions) -> Pipeline:
  """ Create the pipeline object.

  Args:
    options: The pipeline options, with type `MyPipelineOptions`.

  Returns:
    The pipeline object.
    """
  pipeline = beam.Pipeline(options=options)
  # Extract
  transactions: PCollection[str] = pipeline | "Read" >> _extract(
      subscription=options.messages_subscription)
  # Transform
  responses: PCollection[str] = transactions | "Transform" >> _transform(
      model_endpoint=options.model_endpoint,
      project=options.project,
      location=options.location)
  # Load
  responses | "Publish Result" >> pubsub.WriteStringsToPubSub(
      topic=options.responses_topic)

  return pipeline