in 11_realtime/make_predictions.py [0:0]
def run(project, bucket, region, source, sink):
if source == 'local':
logging.info('Running locally on small extract')
argv = [
'--project={0}'.format(project),
'--runner=DirectRunner'
]
flights_output = '/tmp/predictions'
else:
logging.info('Running in the cloud on full dataset input={}'.format(source))
argv = [
'--project={0}'.format(project),
'--job_name=ch10predictions',
'--save_main_session',
'--staging_location=gs://{0}/flights/staging/'.format(bucket),
'--temp_location=gs://{0}/flights/temp/'.format(bucket),
'--setup_file=./setup.py',
'--autoscaling_algorithm=THROUGHPUT_BASED',
'--max_num_workers=8',
'--region={}'.format(region),
'--runner=DataflowRunner'
]
if source == 'pubsub':
logging.info("Turning on streaming. Cancel the pipeline from GCP console")
argv += ['--streaming']
flights_output = 'gs://{}/flights/ch11/predictions'.format(bucket)
with beam.Pipeline(argv=argv) as pipeline:
# read the event stream
if source == 'local':
input_file = './simevents_sample.json'
logging.info("Reading from {} ... Writing to {}".format(input_file, flights_output))
events = (
pipeline
| 'read_input' >> beam.io.ReadFromText(input_file)
| 'parse_input' >> beam.Map(lambda line: json.loads(line))
)
elif source == 'bigquery':
input_query = ("SELECT EVENT_DATA FROM dsongcp.flights_simevents " +
"WHERE EVENT_TIME BETWEEN '2015-03-01' AND '2015-03-02'")
logging.info("Reading from {} ... Writing to {}".format(input_query, flights_output))
events = (
pipeline
| 'read_input' >> beam.io.ReadFromBigQuery(query=input_query, use_standard_sql=True)
| 'parse_input' >> beam.Map(lambda row: json.loads(row['EVENT_DATA']))
)
elif source == 'pubsub':
input_topic = "projects/{}/topics/wheelsoff".format(project)
logging.info("Reading from {} ... Writing to {}".format(input_topic, flights_output))
events = (
pipeline
| 'read_input' >> beam.io.ReadFromPubSub(topic=input_topic,
timestamp_attribute='EventTimeStamp')
| 'parse_input' >> beam.Map(lambda s: json.loads(s))
)
else:
logging.error("Unknown input type {}".format(source))
return
# events -> features. See ./flights_transforms.py for the code shared between training & prediction
features = ftxf.transform_events_to_features(events, for_training=False)
# call model endpoint
# shared_handle = beam.utils.shared.Shared()
preds = (
features
| 'into_global' >> beam.WindowInto(beam.window.GlobalWindows())
| 'batch_instances' >> beam.BatchElements(min_batch_size=1, max_batch_size=64)
| 'model_predict' >> beam.ParDo(FlightsModelInvoker())
)
# write it out
if sink == 'file':
(preds
| 'to_string' >> beam.Map(lambda f: ','.join([str(x) for x in f.values()]))
| 'to_gcs' >> beam.io.textio.WriteToText(flights_output,
file_name_suffix='.csv', header=CSV_HEADER,
# workaround b/207384805
num_shards=1)
)
elif sink == 'bigquery':
preds_schema = ','.join([
'event_time:timestamp',
'prob_ontime:float',
'dep_delay:float',
'taxi_out:float',
'distance:float',
'origin:string',
'dest:string',
'dep_hour:integer',
'is_weekday:integer',
'carrier:string',
'dep_airport_lat:float,dep_airport_lon:float',
'arr_airport_lat:float,arr_airport_lon:float',
'avg_dep_delay:float',
'avg_taxi_out:float',
])
(preds
| 'to_bigquery' >> beam.io.WriteToBigQuery(
'dsongcp.streaming_preds', schema=preds_schema,
# write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
method='STREAMING_INSERTS'
)
)
else:
logging.error("Unknown output type {}".format(sink))
return