def run()

in 04_streaming/realtime/avg02.py [0:0]


def run(project, bucket, region):
    argv = [
        '--project={0}'.format(project),
        '--job_name=ch04avgdelay',
        '--streaming',
        '--save_main_session',
        '--staging_location=gs://{0}/flights/staging/'.format(bucket),
        '--temp_location=gs://{0}/flights/temp/'.format(bucket),
        '--autoscaling_algorithm=THROUGHPUT_BASED',
        '--max_num_workers=8',
        '--region={}'.format(region),
        '--runner=DirectRunner'
    ]

    with beam.Pipeline(argv=argv) as pipeline:
        events = {}

        for event_name in ['arrived', 'departed']:
            topic_name = "projects/{}/topics/{}".format(project, event_name)

            events[event_name] = (pipeline
                                  | 'read:{}'.format(event_name) >> beam.io.ReadFromPubSub(
                                                topic=topic_name, timestamp_attribute='EventTimeStamp')
                                  | 'parse:{}'.format(event_name) >> beam.Map(lambda s: json.loads(s))
                                  )

        all_events = (events['arrived'], events['departed']) | beam.Flatten()

        stats = (all_events
                 | 'byairport' >> beam.Map(by_airport)
                 | 'window' >> beam.WindowInto(beam.window.SlidingWindows(60 * 60, 5 * 60))
                 | 'group' >> beam.GroupByKey()
                 | 'stats' >> beam.Map(lambda x: compute_stats(x[0], x[1]))
        )

        stats_schema = ','.join(['AIRPORT:string,AVG_ARR_DELAY:float,AVG_DEP_DELAY:float',
                                 'NUM_FLIGHTS:int64,START_TIME:timestamp,END_TIME:timestamp'])
        (stats
         | 'bqout' >> beam.io.WriteToBigQuery(
                    'dsongcp.streaming_delays', schema=stats_schema,
                    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
                )
         )