in 04_streaming/realtime/avg02.py [0:0]
def run(project, bucket, region):
argv = [
'--project={0}'.format(project),
'--job_name=ch04avgdelay',
'--streaming',
'--save_main_session',
'--staging_location=gs://{0}/flights/staging/'.format(bucket),
'--temp_location=gs://{0}/flights/temp/'.format(bucket),
'--autoscaling_algorithm=THROUGHPUT_BASED',
'--max_num_workers=8',
'--region={}'.format(region),
'--runner=DirectRunner'
]
with beam.Pipeline(argv=argv) as pipeline:
events = {}
for event_name in ['arrived', 'departed']:
topic_name = "projects/{}/topics/{}".format(project, event_name)
events[event_name] = (pipeline
| 'read:{}'.format(event_name) >> beam.io.ReadFromPubSub(
topic=topic_name, timestamp_attribute='EventTimeStamp')
| 'parse:{}'.format(event_name) >> beam.Map(lambda s: json.loads(s))
)
all_events = (events['arrived'], events['departed']) | beam.Flatten()
stats = (all_events
| 'byairport' >> beam.Map(by_airport)
| 'window' >> beam.WindowInto(beam.window.SlidingWindows(60 * 60, 5 * 60))
| 'group' >> beam.GroupByKey()
| 'stats' >> beam.Map(lambda x: compute_stats(x[0], x[1]))
)
stats_schema = ','.join(['AIRPORT:string,AVG_ARR_DELAY:float,AVG_DEP_DELAY:float',
'NUM_FLIGHTS:int64,START_TIME:timestamp,END_TIME:timestamp'])
(stats
| 'bqout' >> beam.io.WriteToBigQuery(
'dsongcp.streaming_delays', schema=stats_schema,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
)
)