04_streaming/transform/df06.py (163 lines of code) (raw):
#!/usr/bin/env python3
# Copyright 2016 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import apache_beam as beam
import logging
import csv
import json
DATETIME_FORMAT = '%Y-%m-%dT%H:%M:%S'
def addtimezone(lat, lon):
try:
import timezonefinder
tf = timezonefinder.TimezoneFinder()
lat = float(lat)
lon = float(lon)
return lat, lon, tf.timezone_at(lng=lon, lat=lat)
except ValueError:
return lat, lon, 'TIMEZONE' # header
def as_utc(date, hhmm, tzone):
"""
Returns date corrected for timezone, and the tzoffset
"""
try:
if len(hhmm) > 0 and tzone is not None:
import datetime, pytz
loc_tz = pytz.timezone(tzone)
loc_dt = loc_tz.localize(datetime.datetime.strptime(date, '%Y-%m-%d'), is_dst=False)
# can't just parse hhmm because the data contains 2400 and the like ...
loc_dt += datetime.timedelta(hours=int(hhmm[:2]), minutes=int(hhmm[2:]))
utc_dt = loc_dt.astimezone(pytz.utc)
return utc_dt.strftime(DATETIME_FORMAT), loc_dt.utcoffset().total_seconds()
else:
return '', 0 # empty string corresponds to canceled flights
except ValueError as e:
logging.exception('{} {} {}'.format(date, hhmm, tzone))
raise e
def add_24h_if_before(arrtime, deptime):
import datetime
if len(arrtime) > 0 and len(deptime) > 0 and arrtime < deptime:
adt = datetime.datetime.strptime(arrtime, DATETIME_FORMAT)
adt += datetime.timedelta(hours=24)
return adt.strftime(DATETIME_FORMAT)
else:
return arrtime
def tz_correct(fields, airport_timezones):
fields['FL_DATE'] = fields['FL_DATE'].strftime('%Y-%m-%d') # convert to a string so JSON code works
try:
# convert all times to UTC
dep_airport_id = fields["ORIGIN_AIRPORT_SEQ_ID"]
arr_airport_id = fields["DEST_AIRPORT_SEQ_ID"]
dep_timezone = airport_timezones[dep_airport_id][2]
arr_timezone = airport_timezones[arr_airport_id][2]
for f in ["CRS_DEP_TIME", "DEP_TIME", "WHEELS_OFF"]:
fields[f], deptz = as_utc(fields["FL_DATE"], fields[f], dep_timezone)
for f in ["WHEELS_ON", "CRS_ARR_TIME", "ARR_TIME"]:
fields[f], arrtz = as_utc(fields["FL_DATE"], fields[f], arr_timezone)
for f in ["WHEELS_OFF", "WHEELS_ON", "CRS_ARR_TIME", "ARR_TIME"]:
fields[f] = add_24h_if_before(fields[f], fields["DEP_TIME"])
fields["DEP_AIRPORT_LAT"] = airport_timezones[dep_airport_id][0]
fields["DEP_AIRPORT_LON"] = airport_timezones[dep_airport_id][1]
fields["DEP_AIRPORT_TZOFFSET"] = deptz
fields["ARR_AIRPORT_LAT"] = airport_timezones[arr_airport_id][0]
fields["ARR_AIRPORT_LON"] = airport_timezones[arr_airport_id][1]
fields["ARR_AIRPORT_TZOFFSET"] = arrtz
yield fields
except KeyError:
#logging.exception(f"Ignoring {fields} because airport is not known")
pass
except KeyError:
logging.exception("Ignoring field because airport is not known")
def get_next_event(fields):
if len(fields["DEP_TIME"]) > 0:
event = dict(fields) # copy
event["EVENT_TYPE"] = "departed"
event["EVENT_TIME"] = fields["DEP_TIME"]
for f in ["TAXI_OUT", "WHEELS_OFF", "WHEELS_ON", "TAXI_IN", "ARR_TIME", "ARR_DELAY", "DISTANCE"]:
event.pop(f, None) # not knowable at departure time
yield event
if len(fields["WHEELS_OFF"]) > 0:
event = dict(fields) # copy
event["EVENT_TYPE"] = "wheelsoff"
event["EVENT_TIME"] = fields["WHEELS_OFF"]
for f in ["WHEELS_ON", "TAXI_IN", "ARR_TIME", "ARR_DELAY", "DISTANCE"]:
event.pop(f, None) # not knowable at departure time
yield event
if len(fields["ARR_TIME"]) > 0:
event = dict(fields)
event["EVENT_TYPE"] = "arrived"
event["EVENT_TIME"] = fields["ARR_TIME"]
yield event
def create_event_row(fields):
featdict = dict(fields) # copy
featdict['EVENT_DATA'] = json.dumps(fields)
return featdict
def run(project, bucket):
argv = [
'--project={0}'.format(project),
'--staging_location=gs://{0}/flights/staging/'.format(bucket),
'--temp_location=gs://{0}/flights/temp/'.format(bucket),
'--runner=DirectRunner'
]
airports_filename = 'gs://{}/flights/airports/airports.csv.gz'.format(bucket)
flights_output = 'gs://{}/flights/tzcorr/all_flights'.format(bucket)
with beam.Pipeline(argv=argv) as pipeline:
airports = (pipeline
| 'airports:read' >> beam.io.ReadFromText(airports_filename)
| beam.Filter(lambda line: "United States" in line)
| 'airports:fields' >> beam.Map(lambda line: next(csv.reader([line])))
| 'airports:tz' >> beam.Map(lambda fields: (fields[0], addtimezone(fields[21], fields[26])))
)
flights = (pipeline
| 'flights:read' >> beam.io.ReadFromBigQuery(
query='SELECT * FROM dsongcp.flights WHERE rand() < 0.001', use_standard_sql=True)
| 'flights:tzcorr' >> beam.FlatMap(tz_correct, beam.pvalue.AsDict(airports))
)
(flights
| 'flights:tostring' >> beam.Map(lambda fields: json.dumps(fields))
| 'flights:gcsout' >> beam.io.textio.WriteToText(flights_output)
)
flights_schema = ','.join([
'FL_DATE:date',
'UNIQUE_CARRIER:string',
'ORIGIN_AIRPORT_SEQ_ID:string',
'ORIGIN:string',
'DEST_AIRPORT_SEQ_ID:string',
'DEST:string',
'CRS_DEP_TIME:timestamp',
'DEP_TIME:timestamp',
'DEP_DELAY:float',
'TAXI_OUT:float',
'WHEELS_OFF:timestamp',
'WHEELS_ON:timestamp',
'TAXI_IN:float',
'CRS_ARR_TIME:timestamp',
'ARR_TIME:timestamp',
'ARR_DELAY:float',
'CANCELLED:boolean',
'DIVERTED:boolean',
'DISTANCE:float',
'DEP_AIRPORT_LAT:float',
'DEP_AIRPORT_LON:float',
'DEP_AIRPORT_TZOFFSET:float',
'ARR_AIRPORT_LAT:float',
'ARR_AIRPORT_LON:float',
'ARR_AIRPORT_TZOFFSET:float',
'Year:string'])
# autodetect on JSON works, but is less reliable
#flights_schema = 'SCHEMA_AUTODETECT'
(flights
| 'flights:bqout' >> beam.io.WriteToBigQuery(
'dsongcp.flights_tzcorr',
schema=flights_schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
)
)
events = flights | beam.FlatMap(get_next_event)
events_schema = ','.join([flights_schema, 'EVENT_TYPE:string,EVENT_TIME:timestamp,EVENT_DATA:string'])
(events
| 'events:totablerow' >> beam.Map(lambda fields: create_event_row(fields))
| 'events:bqout' >> beam.io.WriteToBigQuery(
'dsongcp.flights_simevents', schema=events_schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
)
)
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(description='Run pipeline on the cloud')
parser.add_argument('-p', '--project', help='Unique project ID', required=True)
parser.add_argument('-b', '--bucket', help='Bucket where gs://BUCKET/flights/airports/airports.csv.gz exists',
required=True)
args = vars(parser.parse_args())
print("Correcting timestamps and writing to BigQuery dataset")
run(project=args['project'], bucket=args['bucket'])