04_streaming/transform/df03.py (52 lines of code) (raw):

#!/usr/bin/env python3 # Copyright 2016 Google Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import apache_beam as beam import logging import csv import json def addtimezone(lat, lon): try: import timezonefinder tf = timezonefinder.TimezoneFinder() return lat, lon, tf.timezone_at(lng=float(lon), lat=float(lat)) # return (lat, lon, 'America/Los_Angeles') # FIXME except ValueError: return lat, lon, 'TIMEZONE' # header def as_utc(date, hhmm, tzone): try: if len(hhmm) > 0 and tzone is not None: import datetime, pytz loc_tz = pytz.timezone(tzone) loc_dt = loc_tz.localize(datetime.datetime.strptime(date, '%Y-%m-%d'), is_dst=False) # can't just parse hhmm because the data contains 2400 and the like ... loc_dt += datetime.timedelta(hours=int(hhmm[:2]), minutes=int(hhmm[2:])) utc_dt = loc_dt.astimezone(pytz.utc) return utc_dt.strftime('%Y-%m-%d %H:%M:%S') else: return '' # empty string corresponds to canceled flights except ValueError as e: logging.exception('{} {} {}'.format(date, hhmm, tzone)) raise e def tz_correct(line, airport_timezones): fields = json.loads(line) try: # convert all times to UTC dep_airport_id = fields["ORIGIN_AIRPORT_SEQ_ID"] arr_airport_id = fields["DEST_AIRPORT_SEQ_ID"] dep_timezone = airport_timezones[dep_airport_id][2] arr_timezone = airport_timezones[arr_airport_id][2] for f in ["CRS_DEP_TIME", "DEP_TIME", "WHEELS_OFF"]: fields[f] = as_utc(fields["FL_DATE"], fields[f], dep_timezone) for f in ["WHEELS_ON", "CRS_ARR_TIME", "ARR_TIME"]: fields[f] = as_utc(fields["FL_DATE"], fields[f], arr_timezone) yield json.dumps(fields) except KeyError as e: logging.exception(" Ignoring " + line + " because airport is not known") if __name__ == '__main__': with beam.Pipeline('DirectRunner') as pipeline: airports = (pipeline | 'airports:read' >> beam.io.ReadFromText('airports.csv.gz') | beam.Filter(lambda line: "United States" in line) | 'airports:fields' >> beam.Map(lambda line: next(csv.reader([line]))) | 'airports:tz' >> beam.Map(lambda fields: (fields[0], addtimezone(fields[21], fields[26]))) ) flights = (pipeline | 'flights:read' >> beam.io.ReadFromText('flights_sample.json') | 'flights:tzcorr' >> beam.FlatMap(tz_correct, beam.pvalue.AsDict(airports)) ) flights | beam.io.textio.WriteToText('all_flights')