def add_stats()

in 11_realtime/flightstxf/flights_transforms.py [0:0]


def add_stats(element, window=beam.DoFn.WindowParam):
    # result of a group-by, so this will be called once for each airport and window
    # all averages here are by airport
    airport = element[0]
    events = element[1]

    # how late are flights leaving?
    avg_dep_delay = compute_mean(events, 'DEP_DELAY')
    avg_taxiout = compute_mean(events, 'TAXI_OUT')

    # remember that an event will be present for 60 minutes, but we want to emit
    # it only if it has just arrived (if it is within 5 minutes of the start of the window)
    emit_end_time = window.start + WINDOW_EVERY
    for event in events:
        event_time = to_datetime(event['WHEELS_OFF']).timestamp()
        if event_time < emit_end_time:
            event_plus_stat = event.copy()
            event_plus_stat['AVG_DEP_DELAY'] = avg_dep_delay
            event_plus_stat['AVG_TAXI_OUT'] = avg_taxiout
            yield event_plus_stat