pipelines/iot_analytics/iot_analytics_pipeline/pipeline.py (68 lines of code) (raw):

# Copyright 2025 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ Pipeline of the IoT Analytics Dataflow Solution guide. """ import apache_beam as beam from apache_beam import Pipeline from .options import MyPipelineOptions import json import pickle from .aggregate_metrics import AggregateMetrics from .parse_timestamp import VehicleStateEvent from .trigger_inference import RunInference from apache_beam.transforms.window import FixedWindows from apache_beam.transforms.trigger import AccumulationMode, AfterWatermark from typing import Any, Dict, Tuple from apache_beam.transforms.enrichment import Enrichment from apache_beam.transforms.enrichment_handlers.bigtable import BigTableEnrichmentHandler def custom_join(left: Dict[str, Any], right: Dict[str, Any]): enriched = {} enriched["vehicle_id"] = left["vehicle_id"] enriched["max_temperature"] = left["max_temperature"] enriched["max_vibration"] = left["max_vibration"] enriched["latest_timestamp"] = left["max_timestamp"] enriched["avg_mileage"] = left["avg_mileage"] enriched["last_service_date"] = right["maintenance"]["last_service_date"] enriched["maintenance_type"] = right["maintenance"]["maintenance_type"] enriched["model"] = right["maintenance"]["model"] return enriched with open("maintenance_model.pkl", "rb") as model_file: sklearn_model_handler = pickle.load(model_file) def create_pipeline(pipeline_options: MyPipelineOptions) -> Pipeline: """ Create the pipeline object. Args: options: The pipeline options, with type `MyPipelineOptions`. Returns: The pipeline object. """ # Define your pipeline options bigtable_handler = BigTableEnrichmentHandler( project_id=pipeline_options.project, instance_id=pipeline_options.bigtable_instance_id, table_id=pipeline_options.bigtable_table_id, row_key=pipeline_options.row_key) bq_schema = "vehicle_id:STRING, \ max_temperature:INTEGER, \ max_vibration:FLOAT, \ latest_timestamp:TIMESTAMP, \ last_service_date:STRING, \ maintenance_type:STRING, \ model:STRING, \ needs_maintenance:INTEGER" pipeline = beam.Pipeline(options=pipeline_options) enriched_data = pipeline \ | "ReadFromPubSub" >> beam.io.ReadFromPubSub(topic=pipeline_options.topic) \ | "Read JSON" >> beam.Map(json.loads) \ | "Parse&EventTimestamp" >> beam.Map( VehicleStateEvent.convert_json_to_vehicleobj).with_output_types( VehicleStateEvent) \ | "AddKeys" >> beam.WithKeys(lambda event: event.vehicle_id).with_output_types( Tuple[str, VehicleStateEvent]) \ | "Window" >> beam.WindowInto( FixedWindows(60), trigger=AfterWatermark(), accumulation_mode=AccumulationMode.ACCUMULATING) \ | "AggregateMetrics" >> beam.ParDo(AggregateMetrics()).with_output_types( VehicleStateEvent).with_input_types(Tuple[str, VehicleStateEvent]) \ | "EnrichWithBigtable" >> Enrichment( bigtable_handler, join_fn=custom_join, timeout=10) predictions = enriched_data | "RunInference" >> beam.ParDo( RunInference(model=sklearn_model_handler)) predictions | "WriteToBigQuery" >> beam.io.gcp.bigquery.WriteToBigQuery( method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API, project=pipeline_options.project, dataset=pipeline_options.dataset, table=pipeline_options.table, schema=bq_schema, create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND) return pipeline