def process()

in pipelines/iot_analytics/iot_analytics_pipeline/aggregate_metrics.py [0:0]


  def process(self,
              element: Tuple[str, VehicleStateEvent],
              timestamp=beam.DoFn.TimestampParam,
              window=beam.DoFn.WindowParam,
              vehicle_events_bag=beam.DoFn.StateParam(VEHICLE_EVENTS_BAG),
              max_timestamp_seen=beam.DoFn.StateParam(MAX_TIMESTAMP),
              max_temperature=beam.DoFn.StateParam(MAX_TEMPERATURE),
              max_vibration=beam.DoFn.StateParam(MAX_VIBRATION),
              sum_mileage=beam.DoFn.StateParam(SUM_MILEAGE),
              count_mileage=beam.DoFn.StateParam(COUNT_MILEAGE),
              window_timer=beam.DoFn.TimerParam(WINDOW_TIMER)):

    # Add event to bag state
    vehicle_events_bag.add(element[1])  # Add the VehicleStateEvent object

    # Update state with current event's values
    max_timestamp_seen.add(timestamp)
    max_temperature.add(element[1].temperature)
    max_vibration.add(element[1].vibration)
    sum_mileage.add(element[1].mileage)
    count_mileage.add(1)

    # Set timer for the end of the window
    window_timer.set(window.end)