in 11_realtime/flightstxf/flights_transforms.py [0:0]
def transform_events_to_features(events, for_training=True):
# events are assigned the time at which predictions will have to be made -- the wheels off time
events = events | 'assign_time' >> beam.FlatMap(assign_timestamp)
events = events | 'remove_cancelled' >> beam.Filter(is_normal_operation)
# compute stats by airport, and add to events
features = (
events
| 'window' >> beam.WindowInto(beam.window.SlidingWindows(WINDOW_DURATION, WINDOW_EVERY))
| 'by_airport' >> beam.Map(lambda x: (x['ORIGIN'], x))
| 'group_by_airport' >> beam.GroupByKey()
| 'events_and_stats' >> beam.FlatMap(add_stats)
| 'events_to_features' >> beam.FlatMap(lambda x: create_features_and_label(x, for_training))
)
return features