def create_pipeline()

in pipelines/iot_analytics/iot_analytics_pipeline/pipeline.py [0:0]


def create_pipeline(pipeline_options: MyPipelineOptions) -> Pipeline:
  """ Create the pipeline object.

    Args:
    options: The pipeline options, with type `MyPipelineOptions`.

    Returns:
    The pipeline object.
    """
  # Define your pipeline options
  bigtable_handler = BigTableEnrichmentHandler(
      project_id=pipeline_options.project,
      instance_id=pipeline_options.bigtable_instance_id,
      table_id=pipeline_options.bigtable_table_id,
      row_key=pipeline_options.row_key)
  bq_schema = "vehicle_id:STRING, \
    max_temperature:INTEGER, \
    max_vibration:FLOAT, \
    latest_timestamp:TIMESTAMP, \
    last_service_date:STRING, \
    maintenance_type:STRING, \
    model:STRING, \
    needs_maintenance:INTEGER"

  pipeline = beam.Pipeline(options=pipeline_options)
  enriched_data = pipeline \
  | "ReadFromPubSub" >> beam.io.ReadFromPubSub(topic=pipeline_options.topic) \
  | "Read JSON" >> beam.Map(json.loads) \
  | "Parse&EventTimestamp" >> beam.Map(
      VehicleStateEvent.convert_json_to_vehicleobj).with_output_types(
          VehicleStateEvent) \
  | "AddKeys" >>  beam.WithKeys(lambda event: event.vehicle_id).with_output_types(
      Tuple[str, VehicleStateEvent]) \
  | "Window" >> beam.WindowInto(
      FixedWindows(60),
      trigger=AfterWatermark(),
      accumulation_mode=AccumulationMode.ACCUMULATING) \
  | "AggregateMetrics" >> beam.ParDo(AggregateMetrics()).with_output_types(
      VehicleStateEvent).with_input_types(Tuple[str, VehicleStateEvent]) \
  | "EnrichWithBigtable" >> Enrichment(
      bigtable_handler, join_fn=custom_join, timeout=10)
  predictions = enriched_data | "RunInference" >> beam.ParDo(
      RunInference(model=sklearn_model_handler))
  predictions | "WriteToBigQuery" >> beam.io.gcp.bigquery.WriteToBigQuery(
      method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API,
      project=pipeline_options.project,
      dataset=pipeline_options.dataset,
      table=pipeline_options.table,
      schema=bq_schema,
      create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
      write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
  return pipeline