in 11_realtime/flightstxf/flights_transforms.py [0:0]
def add_stats(element, window=beam.DoFn.WindowParam):
# result of a group-by, so this will be called once for each airport and window
# all averages here are by airport
airport = element[0]
events = element[1]
# how late are flights leaving?
avg_dep_delay = compute_mean(events, 'DEP_DELAY')
avg_taxiout = compute_mean(events, 'TAXI_OUT')
# remember that an event will be present for 60 minutes, but we want to emit
# it only if it has just arrived (if it is within 5 minutes of the start of the window)
emit_end_time = window.start + WINDOW_EVERY
for event in events:
event_time = to_datetime(event['WHEELS_OFF']).timestamp()
if event_time < emit_end_time:
event_plus_stat = event.copy()
event_plus_stat['AVG_DEP_DELAY'] = avg_dep_delay
event_plus_stat['AVG_TAXI_OUT'] = avg_taxiout
yield event_plus_stat