11_realtime/flightstxf/flights_transforms.py (122 lines of code) (raw):
#!/usr/bin/env python3
# Copyright 2021 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import apache_beam as beam
import datetime as dt
import logging
import numpy as np
import farmhash # pip install pyfarmhash
DATETIME_FORMAT = '%Y-%m-%d %H:%M:%S'
WINDOW_DURATION = 60 * 60
WINDOW_EVERY = 5 * 60
def get_data_split(fl_date):
fl_date_str = str(fl_date)
# Use farm fingerprint just like in BigQuery
x = np.abs(np.uint64(farmhash.fingerprint64(fl_date_str)).astype('int64') % 100)
if x < 60:
data_split = 'TRAIN'
elif x < 80:
data_split = 'VALIDATE'
else:
data_split = 'TEST'
return data_split
def get_data_split_2019(fl_date):
fl_date_str = str(fl_date)
if fl_date_str > '2019':
data_split = 'TEST'
else:
# Use farm fingerprint just like in BigQuery
x = np.abs(np.uint64(farmhash.fingerprint64(fl_date_str)).astype('int64') % 100)
if x < 95:
data_split = 'TRAIN'
else:
data_split = 'VALIDATE'
return data_split
def to_datetime(event_time):
if isinstance(event_time, str):
# In BigQuery, this is a datetime.datetime. In JSON, it's a string
# sometimes it has a T separating the date, sometimes it doesn't
# Handle all the possibilities
event_time = dt.datetime.strptime(event_time.replace('T', ' '), DATETIME_FORMAT)
return event_time
def approx_miles_between(lat1, lon1, lat2, lon2):
# convert to radians
lat1 = float(lat1) * np.pi / 180.0
lat2 = float(lat2) * np.pi / 180.0
lon1 = float(lon1) * np.pi / 180.0
lon2 = float(lon2) * np.pi / 180.0
# apply Haversine formula
d_lat = lat2 - lat1
d_lon = lon2 - lon1
a = (pow(np.sin(d_lat / 2), 2) +
pow(np.sin(d_lon / 2), 2) *
np.cos(lat1) * np.cos(lat2));
c = 2 * np.arcsin(np.sqrt(a))
return float(6371 * c * 0.621371) # miles
def create_features_and_label(event, for_training):
try:
model_input = {}
if for_training:
model_input.update({
'ontime': 1.0 if float(event['ARR_DELAY'] or 0) < 15 else 0,
})
# features for both training and prediction
model_input.update({
# same as in ch9
'dep_delay': event['DEP_DELAY'],
'taxi_out': event['TAXI_OUT'],
# distance is not in wheelsoff
'distance': approx_miles_between(event['DEP_AIRPORT_LAT'], event['DEP_AIRPORT_LON'],
event['ARR_AIRPORT_LAT'], event['ARR_AIRPORT_LON']),
'origin': event['ORIGIN'],
'dest': event['DEST'],
'dep_hour': to_datetime(event['DEP_TIME']).hour,
'is_weekday': 1.0 if to_datetime(event['DEP_TIME']).isoweekday() < 6 else 0.0,
'carrier': event['UNIQUE_CARRIER'],
'dep_airport_lat': event['DEP_AIRPORT_LAT'],
'dep_airport_lon': event['DEP_AIRPORT_LON'],
'arr_airport_lat': event['ARR_AIRPORT_LAT'],
'arr_airport_lon': event['ARR_AIRPORT_LON'],
# newly computed averages
'avg_dep_delay': event['AVG_DEP_DELAY'],
'avg_taxi_out': event['AVG_TAXI_OUT'],
})
if for_training:
model_input.update({
# training data split
'data_split': get_data_split(event['FL_DATE'])
})
else:
model_input.update({
# prediction output should include timestamp
'event_time': event['WHEELS_OFF']
})
yield model_input
except Exception as e:
# if any key is not present, don't use for training
logging.warning('Ignoring {} because: {}'.format(event, e), exc_info=True)
pass
def compute_mean(events, col_name):
values = [float(event[col_name]) for event in events if col_name in event and event[col_name]]
return float(np.mean(values)) if len(values) > 0 else None
def add_stats(element, window=beam.DoFn.WindowParam):
# result of a group-by, so this will be called once for each airport and window
# all averages here are by airport
airport = element[0]
events = element[1]
# how late are flights leaving?
avg_dep_delay = compute_mean(events, 'DEP_DELAY')
avg_taxiout = compute_mean(events, 'TAXI_OUT')
# remember that an event will be present for 60 minutes, but we want to emit
# it only if it has just arrived (if it is within 5 minutes of the start of the window)
emit_end_time = window.start + WINDOW_EVERY
for event in events:
event_time = to_datetime(event['WHEELS_OFF']).timestamp()
if event_time < emit_end_time:
event_plus_stat = event.copy()
event_plus_stat['AVG_DEP_DELAY'] = avg_dep_delay
event_plus_stat['AVG_TAXI_OUT'] = avg_taxiout
yield event_plus_stat
def assign_timestamp(event):
try:
event_time = to_datetime(event['WHEELS_OFF'])
yield beam.window.TimestampedValue(event, event_time.timestamp())
except:
pass
def is_normal_operation(event):
for flag in ['CANCELLED', 'DIVERTED']:
if flag in event:
s = str(event[flag]).lower()
if s == 'true':
return False; # cancelled or diverted
return True # normal operation
def transform_events_to_features(events, for_training=True):
# events are assigned the time at which predictions will have to be made -- the wheels off time
events = events | 'assign_time' >> beam.FlatMap(assign_timestamp)
events = events | 'remove_cancelled' >> beam.Filter(is_normal_operation)
# compute stats by airport, and add to events
features = (
events
| 'window' >> beam.WindowInto(beam.window.SlidingWindows(WINDOW_DURATION, WINDOW_EVERY))
| 'by_airport' >> beam.Map(lambda x: (x['ORIGIN'], x))
| 'group_by_airport' >> beam.GroupByKey()
| 'events_and_stats' >> beam.FlatMap(add_stats)
| 'events_to_features' >> beam.FlatMap(lambda x: create_features_and_label(x, for_training))
)
return features