in 04_streaming/realtime/avg01.py [0:0]
def run(project, bucket, region):
argv = [
'--project={0}'.format(project),
'--job_name=ch04avgdelay',
'--streaming',
'--save_main_session',
'--staging_location=gs://{0}/flights/staging/'.format(bucket),
'--temp_location=gs://{0}/flights/temp/'.format(bucket),
'--setup_file=./setup.py',
'--autoscaling_algorithm=THROUGHPUT_BASED',
'--max_num_workers=8',
'--region={}'.format(region),
'--runner=DirectRunner'
]
with beam.Pipeline(argv=argv) as pipeline:
events = {}
for event_name in ['arrived', 'departed']:
topic_name = "projects/{}/topics/{}".format(project, event_name)
events[event_name] = (pipeline
| 'read:{}'.format(event_name) >> beam.io.ReadFromPubSub(topic=topic_name)
| 'parse:{}'.format(event_name) >> beam.Map(lambda s: json.loads(s))
)
all_events = (events['arrived'], events['departed']) | beam.Flatten()
flights_schema = ','.join([
'FL_DATE:date,UNIQUE_CARRIER:string,ORIGIN_AIRPORT_SEQ_ID:string,ORIGIN:string',
'DEST_AIRPORT_SEQ_ID:string,DEST:string,CRS_DEP_TIME:timestamp,DEP_TIME:timestamp',
'DEP_DELAY:float,TAXI_OUT:float,WHEELS_OFF:timestamp,WHEELS_ON:timestamp,TAXI_IN:float',
'CRS_ARR_TIME:timestamp,ARR_TIME:timestamp,ARR_DELAY:float,CANCELLED:boolean',
'DIVERTED:boolean,DISTANCE:float',
'DEP_AIRPORT_LAT:float,DEP_AIRPORT_LON:float,DEP_AIRPORT_TZOFFSET:float',
'ARR_AIRPORT_LAT:float,ARR_AIRPORT_LON:float,ARR_AIRPORT_TZOFFSET:float'])
events_schema = ','.join([flights_schema, 'EVENT_TYPE:string,EVENT_TIME:timestamp'])
schema = events_schema
(all_events
| 'bqout' >> beam.io.WriteToBigQuery(
'dsongcp.streaming_events', schema=schema,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
)
)