in pipelines/iot_analytics/iot_analytics_pipeline/aggregate_metrics.py [0:0]
def process(self,
element: Tuple[str, VehicleStateEvent],
timestamp=beam.DoFn.TimestampParam,
window=beam.DoFn.WindowParam,
vehicle_events_bag=beam.DoFn.StateParam(VEHICLE_EVENTS_BAG),
max_timestamp_seen=beam.DoFn.StateParam(MAX_TIMESTAMP),
max_temperature=beam.DoFn.StateParam(MAX_TEMPERATURE),
max_vibration=beam.DoFn.StateParam(MAX_VIBRATION),
sum_mileage=beam.DoFn.StateParam(SUM_MILEAGE),
count_mileage=beam.DoFn.StateParam(COUNT_MILEAGE),
window_timer=beam.DoFn.TimerParam(WINDOW_TIMER)):
# Add event to bag state
vehicle_events_bag.add(element[1]) # Add the VehicleStateEvent object
# Update state with current event's values
max_timestamp_seen.add(timestamp)
max_temperature.add(element[1].temperature)
max_vibration.add(element[1].vibration)
sum_mileage.add(element[1].mileage)
count_mileage.add(1)
# Set timer for the end of the window
window_timer.set(window.end)