in 04_streaming/transform/df07.py [0:0]
def run(project, bucket, region):
argv = [
'--project={0}'.format(project),
'--job_name=ch04timecorr',
'--save_main_session',
'--staging_location=gs://{0}/flights/staging/'.format(bucket),
'--temp_location=gs://{0}/flights/temp/'.format(bucket),
'--setup_file=./setup.py',
'--autoscaling_algorithm=THROUGHPUT_BASED',
'--max_num_workers=8',
'--region={}'.format(region),
'--runner=DataflowRunner'
]
airports_filename = 'gs://{}/flights/airports/airports.csv.gz'.format(bucket)
flights_output = 'gs://{}/flights/tzcorr/all_flights'.format(bucket)
with beam.Pipeline(argv=argv) as pipeline:
airports = (pipeline
| 'airports:read' >> beam.io.ReadFromText(airports_filename)
| 'airports:onlyUSA' >> beam.Filter(lambda line: "United States" in line)
| 'airports:fields' >> beam.Map(lambda line: next(csv.reader([line])))
| 'airports:tz' >> beam.Map(lambda fields: (fields[0], addtimezone(fields[21], fields[26])))
)
flights = (pipeline
| 'flights:read' >> beam.io.ReadFromBigQuery(
query='SELECT * FROM dsongcp.flights', use_standard_sql=True)
| 'flights:tzcorr' >> beam.FlatMap(tz_correct, beam.pvalue.AsDict(airports))
)
(flights
| 'flights:tostring' >> beam.Map(lambda fields: json.dumps(fields))
| 'flights:gcsout' >> beam.io.textio.WriteToText(flights_output)
)
flights_schema = ','.join([
'FL_DATE:date,UNIQUE_CARRIER:string,ORIGIN_AIRPORT_SEQ_ID:string,ORIGIN:string',
'DEST_AIRPORT_SEQ_ID:string,DEST:string,CRS_DEP_TIME:timestamp,DEP_TIME:timestamp',
'DEP_DELAY:float,TAXI_OUT:float,WHEELS_OFF:timestamp,WHEELS_ON:timestamp,TAXI_IN:float',
'CRS_ARR_TIME:timestamp,ARR_TIME:timestamp,ARR_DELAY:float,CANCELLED:boolean',
'DIVERTED:boolean,DISTANCE:float',
'DEP_AIRPORT_LAT:float,DEP_AIRPORT_LON:float,DEP_AIRPORT_TZOFFSET:float',
'ARR_AIRPORT_LAT:float,ARR_AIRPORT_LON:float,ARR_AIRPORT_TZOFFSET:float'])
flights | 'flights:bqout' >> beam.io.WriteToBigQuery(
'dsongcp.flights_tzcorr', schema=flights_schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
)
events = flights | beam.FlatMap(get_next_event)
events_schema = ','.join([flights_schema, 'EVENT_TYPE:string,EVENT_TIME:timestamp,EVENT_DATA:string'])
(events
| 'events:totablerow' >> beam.Map(lambda fields: create_event_row(fields))
| 'events:bqout' >> beam.io.WriteToBigQuery(
'dsongcp.flights_simevents', schema=events_schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
)
)