in pipelines/iot_analytics/iot_analytics_pipeline/aggregate_metrics.py [0:0]
def window_expiry_callback(
self,
vehicle_id=beam.DoFn.KeyParam,
max_timestamp_seen=beam.DoFn.StateParam(MAX_TIMESTAMP),
max_temperature=beam.DoFn.StateParam(MAX_TEMPERATURE),
max_vibration=beam.DoFn.StateParam(MAX_VIBRATION),
sum_mileage=beam.DoFn.StateParam(SUM_MILEAGE),
count_mileage=beam.DoFn.StateParam(COUNT_MILEAGE)):
# Calculate average mileage
avg_mileage = sum_mileage.read() / count_mileage.read(
) if count_mileage.read() > 0 else 0.0
# Create output Row object
output_row = beam.Row(
vehicle_id=vehicle_id,
max_timestamp=max_timestamp_seen.read(),
max_temperature=max_temperature.read(),
max_vibration=max_vibration.read(),
avg_mileage=int(avg_mileage))
# Output the aggregated metrics
yield output_row
# Clear state
max_timestamp_seen.clear()
max_temperature.clear()
max_vibration.clear()
sum_mileage.clear()
count_mileage.clear()