in 04_streaming/transform/df06.py [0:0]
def run(project, bucket):
argv = [
'--project={0}'.format(project),
'--staging_location=gs://{0}/flights/staging/'.format(bucket),
'--temp_location=gs://{0}/flights/temp/'.format(bucket),
'--runner=DirectRunner'
]
airports_filename = 'gs://{}/flights/airports/airports.csv.gz'.format(bucket)
flights_output = 'gs://{}/flights/tzcorr/all_flights'.format(bucket)
with beam.Pipeline(argv=argv) as pipeline:
airports = (pipeline
| 'airports:read' >> beam.io.ReadFromText(airports_filename)
| beam.Filter(lambda line: "United States" in line)
| 'airports:fields' >> beam.Map(lambda line: next(csv.reader([line])))
| 'airports:tz' >> beam.Map(lambda fields: (fields[0], addtimezone(fields[21], fields[26])))
)
flights = (pipeline
| 'flights:read' >> beam.io.ReadFromBigQuery(
query='SELECT * FROM dsongcp.flights WHERE rand() < 0.001', use_standard_sql=True)
| 'flights:tzcorr' >> beam.FlatMap(tz_correct, beam.pvalue.AsDict(airports))
)
(flights
| 'flights:tostring' >> beam.Map(lambda fields: json.dumps(fields))
| 'flights:gcsout' >> beam.io.textio.WriteToText(flights_output)
)
flights_schema = ','.join([
'FL_DATE:date',
'UNIQUE_CARRIER:string',
'ORIGIN_AIRPORT_SEQ_ID:string',
'ORIGIN:string',
'DEST_AIRPORT_SEQ_ID:string',
'DEST:string',
'CRS_DEP_TIME:timestamp',
'DEP_TIME:timestamp',
'DEP_DELAY:float',
'TAXI_OUT:float',
'WHEELS_OFF:timestamp',
'WHEELS_ON:timestamp',
'TAXI_IN:float',
'CRS_ARR_TIME:timestamp',
'ARR_TIME:timestamp',
'ARR_DELAY:float',
'CANCELLED:boolean',
'DIVERTED:boolean',
'DISTANCE:float',
'DEP_AIRPORT_LAT:float',
'DEP_AIRPORT_LON:float',
'DEP_AIRPORT_TZOFFSET:float',
'ARR_AIRPORT_LAT:float',
'ARR_AIRPORT_LON:float',
'ARR_AIRPORT_TZOFFSET:float',
'Year:string'])
# autodetect on JSON works, but is less reliable
#flights_schema = 'SCHEMA_AUTODETECT'
(flights
| 'flights:bqout' >> beam.io.WriteToBigQuery(
'dsongcp.flights_tzcorr',
schema=flights_schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
)
)
events = flights | beam.FlatMap(get_next_event)
events_schema = ','.join([flights_schema, 'EVENT_TYPE:string,EVENT_TIME:timestamp,EVENT_DATA:string'])
(events
| 'events:totablerow' >> beam.Map(lambda fields: create_event_row(fields))
| 'events:bqout' >> beam.io.WriteToBigQuery(
'dsongcp.flights_simevents', schema=events_schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
)
)