def transform_events_to_features()

in 11_realtime/flightstxf/flights_transforms.py [0:0]


def transform_events_to_features(events, for_training=True):
    # events are assigned the time at which predictions will have to be made -- the wheels off time
    events = events | 'assign_time' >> beam.FlatMap(assign_timestamp)
    events = events | 'remove_cancelled' >> beam.Filter(is_normal_operation)

    # compute stats by airport, and add to events
    features = (
            events
            | 'window' >> beam.WindowInto(beam.window.SlidingWindows(WINDOW_DURATION, WINDOW_EVERY))
            | 'by_airport' >> beam.Map(lambda x: (x['ORIGIN'], x))
            | 'group_by_airport' >> beam.GroupByKey()
            | 'events_and_stats' >> beam.FlatMap(add_stats)
            | 'events_to_features' >> beam.FlatMap(lambda x: create_features_and_label(x, for_training))
    )

    return features