def run()

in 04_streaming/transform/df06.py [0:0]


def run(project, bucket):
    argv = [
        '--project={0}'.format(project),
        '--staging_location=gs://{0}/flights/staging/'.format(bucket),
        '--temp_location=gs://{0}/flights/temp/'.format(bucket),
        '--runner=DirectRunner'
    ]
    airports_filename = 'gs://{}/flights/airports/airports.csv.gz'.format(bucket)
    flights_output = 'gs://{}/flights/tzcorr/all_flights'.format(bucket)

    with beam.Pipeline(argv=argv) as pipeline:
        airports = (pipeline
                    | 'airports:read' >> beam.io.ReadFromText(airports_filename)
                    | beam.Filter(lambda line: "United States" in line)
                    | 'airports:fields' >> beam.Map(lambda line: next(csv.reader([line])))
                    | 'airports:tz' >> beam.Map(lambda fields: (fields[0], addtimezone(fields[21], fields[26])))
                    )

        flights = (pipeline
                   | 'flights:read' >> beam.io.ReadFromBigQuery(
                    query='SELECT * FROM dsongcp.flights WHERE rand() < 0.001', use_standard_sql=True)
                   | 'flights:tzcorr' >> beam.FlatMap(tz_correct, beam.pvalue.AsDict(airports))
                   )

        (flights
         | 'flights:tostring' >> beam.Map(lambda fields: json.dumps(fields))
         | 'flights:gcsout' >> beam.io.textio.WriteToText(flights_output)
         )
        
        flights_schema = ','.join([
            'FL_DATE:date',
            'UNIQUE_CARRIER:string',
            'ORIGIN_AIRPORT_SEQ_ID:string',
            'ORIGIN:string',
            'DEST_AIRPORT_SEQ_ID:string',
            'DEST:string',
            'CRS_DEP_TIME:timestamp',
            'DEP_TIME:timestamp',
            'DEP_DELAY:float',
            'TAXI_OUT:float',
            'WHEELS_OFF:timestamp',
            'WHEELS_ON:timestamp',
            'TAXI_IN:float',
            'CRS_ARR_TIME:timestamp',
            'ARR_TIME:timestamp',
            'ARR_DELAY:float',
            'CANCELLED:boolean',
            'DIVERTED:boolean',
            'DISTANCE:float',
            'DEP_AIRPORT_LAT:float',
            'DEP_AIRPORT_LON:float',
            'DEP_AIRPORT_TZOFFSET:float',
            'ARR_AIRPORT_LAT:float',
            'ARR_AIRPORT_LON:float',
            'ARR_AIRPORT_TZOFFSET:float',
            'Year:string'])
        
        # autodetect on JSON works, but is less reliable
        #flights_schema = 'SCHEMA_AUTODETECT'
        
        (flights 
         | 'flights:bqout' >> beam.io.WriteToBigQuery(
                'dsongcp.flights_tzcorr', 
                schema=flights_schema,
                write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
                create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
                )
        )
        
        events = flights | beam.FlatMap(get_next_event)
        events_schema = ','.join([flights_schema, 'EVENT_TYPE:string,EVENT_TIME:timestamp,EVENT_DATA:string'])

        (events
         | 'events:totablerow' >> beam.Map(lambda fields: create_event_row(fields))
         | 'events:bqout' >> beam.io.WriteToBigQuery(
                'dsongcp.flights_simevents', schema=events_schema,
                write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
                create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
                )
        )