pipelines/iot_analytics/iot_analytics_pipeline/parse_timestamp.py (24 lines of code) (raw):

# Copyright 2025 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ Pipeline of the IoT Analytics Dataflow Solution guide. """ import typing import datetime from apache_beam.transforms.window import TimestampedValue class VehicleStateEvent(typing.NamedTuple): """ Class to create VehicleState TimestampedValue """ vehicle_id: str timestamp: datetime.datetime temperature: int rpm: int vibration: float fuel_level: int mileage: int @staticmethod def convert_json_to_vehicleobj(input_json): dt_object = datetime.datetime.strptime(input_json["timestamp"], "%Y-%m-%dT%H:%M:%SZ") event = VehicleStateEvent( vehicle_id=input_json["vehicle_id"], timestamp=dt_object, temperature=input_json["temperature"], rpm=input_json["rpm"], vibration=input_json["vibration"], fuel_level=input_json["fuel_level"], mileage=input_json["mileage"]) return TimestampedValue(event, dt_object.timestamp())