in pipelines/iot_analytics/iot_analytics_pipeline/pipeline.py [0:0]
def create_pipeline(pipeline_options: MyPipelineOptions) -> Pipeline:
""" Create the pipeline object.
Args:
options: The pipeline options, with type `MyPipelineOptions`.
Returns:
The pipeline object.
"""
# Define your pipeline options
bigtable_handler = BigTableEnrichmentHandler(
project_id=pipeline_options.project,
instance_id=pipeline_options.bigtable_instance_id,
table_id=pipeline_options.bigtable_table_id,
row_key=pipeline_options.row_key)
bq_schema = "vehicle_id:STRING, \
max_temperature:INTEGER, \
max_vibration:FLOAT, \
latest_timestamp:TIMESTAMP, \
last_service_date:STRING, \
maintenance_type:STRING, \
model:STRING, \
needs_maintenance:INTEGER"
pipeline = beam.Pipeline(options=pipeline_options)
enriched_data = pipeline \
| "ReadFromPubSub" >> beam.io.ReadFromPubSub(topic=pipeline_options.topic) \
| "Read JSON" >> beam.Map(json.loads) \
| "Parse&EventTimestamp" >> beam.Map(
VehicleStateEvent.convert_json_to_vehicleobj).with_output_types(
VehicleStateEvent) \
| "AddKeys" >> beam.WithKeys(lambda event: event.vehicle_id).with_output_types(
Tuple[str, VehicleStateEvent]) \
| "Window" >> beam.WindowInto(
FixedWindows(60),
trigger=AfterWatermark(),
accumulation_mode=AccumulationMode.ACCUMULATING) \
| "AggregateMetrics" >> beam.ParDo(AggregateMetrics()).with_output_types(
VehicleStateEvent).with_input_types(Tuple[str, VehicleStateEvent]) \
| "EnrichWithBigtable" >> Enrichment(
bigtable_handler, join_fn=custom_join, timeout=10)
predictions = enriched_data | "RunInference" >> beam.ParDo(
RunInference(model=sklearn_model_handler))
predictions | "WriteToBigQuery" >> beam.io.gcp.bigquery.WriteToBigQuery(
method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API,
project=pipeline_options.project,
dataset=pipeline_options.dataset,
table=pipeline_options.table,
schema=bq_schema,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
return pipeline