def window_expiry_callback()

in pipelines/iot_analytics/iot_analytics_pipeline/aggregate_metrics.py [0:0]


  def window_expiry_callback(
      self,
      vehicle_id=beam.DoFn.KeyParam,
      max_timestamp_seen=beam.DoFn.StateParam(MAX_TIMESTAMP),
      max_temperature=beam.DoFn.StateParam(MAX_TEMPERATURE),
      max_vibration=beam.DoFn.StateParam(MAX_VIBRATION),
      sum_mileage=beam.DoFn.StateParam(SUM_MILEAGE),
      count_mileage=beam.DoFn.StateParam(COUNT_MILEAGE)):
    # Calculate average mileage
    avg_mileage = sum_mileage.read() / count_mileage.read(
    ) if count_mileage.read() > 0 else 0.0

    # Create output Row object
    output_row = beam.Row(
        vehicle_id=vehicle_id,
        max_timestamp=max_timestamp_seen.read(),
        max_temperature=max_temperature.read(),
        max_vibration=max_vibration.read(),
        avg_mileage=int(avg_mileage))

    # Output the aggregated metrics
    yield output_row

    # Clear state
    max_timestamp_seen.clear()
    max_temperature.clear()
    max_vibration.clear()
    sum_mileage.clear()
    count_mileage.clear()