04_streaming/simulate/simulate.py (78 lines of code) (raw):
#!/usr/bin/env python3
# Copyright 2016 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import time
import pytz
import logging
import argparse
import datetime
import google.cloud.pubsub_v1 as pubsub # Use v1 of the API
import google.cloud.bigquery as bq
TIME_FORMAT = '%Y-%m-%d %H:%M:%S %Z'
RFC3339_TIME_FORMAT = '%Y-%m-%dT%H:%M:%S-00:00'
def publish(publisher, topics, allevents, notify_time):
timestamp = notify_time.strftime(RFC3339_TIME_FORMAT)
for key in topics: # 'departed', 'arrived', etc.
topic = topics[key]
events = allevents[key]
# the client automatically batches
logging.info('Publishing {} {} till {}'.format(len(events), key, timestamp))
for event_data in events:
publisher.publish(topic, event_data.encode(), EventTimeStamp=timestamp)
def notify(publisher, topics, rows, simStartTime, programStart, speedFactor):
# sleep computation
def compute_sleep_secs(notify_time):
time_elapsed = (datetime.datetime.utcnow() - programStart).total_seconds()
sim_time_elapsed = (notify_time - simStartTime).total_seconds() / speedFactor
to_sleep_secs = sim_time_elapsed - time_elapsed
return to_sleep_secs
tonotify = {}
for key in topics:
tonotify[key] = list()
for row in rows:
event_type, notify_time, event_data = row
# how much time should we sleep?
if compute_sleep_secs(notify_time) > 1:
# notify the accumulated tonotify
publish(publisher, topics, tonotify, notify_time)
for key in topics:
tonotify[key] = list()
# recompute sleep, since notification takes a while
to_sleep_secs = compute_sleep_secs(notify_time)
if to_sleep_secs > 0:
logging.info('Sleeping {} seconds'.format(to_sleep_secs))
time.sleep(to_sleep_secs)
tonotify[event_type].append(event_data)
# left-over records; notify again
publish(publisher, topics, tonotify, notify_time)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Send simulated flight events to Cloud Pub/Sub')
parser.add_argument('--startTime', help='Example: 2015-05-01 00:00:00 UTC', required=True)
parser.add_argument('--endTime', help='Example: 2015-05-03 00:00:00 UTC', required=True)
parser.add_argument('--project', help='your project id, to create pubsub topic', required=True)
parser.add_argument('--speedFactor', help='Example: 60 implies 1 hour of data sent to Cloud Pub/Sub in 1 minute', required=True, type=float)
parser.add_argument('--jitter', help='type of jitter to add: None, uniform, exp are the three options', default='None')
# set up BigQuery bqclient
logging.basicConfig(format='%(levelname)s: %(message)s', level=logging.INFO)
args = parser.parse_args()
bqclient = bq.Client(args.project)
bqclient.get_table('dsongcp.flights_simevents') # throws exception on failure
# jitter?
if args.jitter == 'exp':
jitter = 'CAST (-LN(RAND()*0.99 + 0.01)*30 + 90.5 AS INT64)'
elif args.jitter == 'uniform':
jitter = 'CAST(90.5 + RAND()*30 AS INT64)'
else:
jitter = '0'
# run the query to pull simulated events
querystr = """
SELECT
EVENT_TYPE,
TIMESTAMP_ADD(EVENT_TIME, INTERVAL @jitter SECOND) AS NOTIFY_TIME,
EVENT_DATA
FROM
dsongcp.flights_simevents
WHERE
EVENT_TIME >= @startTime
AND EVENT_TIME < @endTime
ORDER BY
EVENT_TIME ASC
"""
job_config = bq.QueryJobConfig(
query_parameters=[
bq.ScalarQueryParameter("jitter", "INT64", jitter),
bq.ScalarQueryParameter("startTime", "TIMESTAMP", args.startTime),
bq.ScalarQueryParameter("endTime", "TIMESTAMP", args.endTime),
]
)
rows = bqclient.query(querystr, job_config=job_config)
# create one Pub/Sub notification topic for each type of event
publisher = pubsub.PublisherClient()
topics = {}
for event_type in ['wheelsoff', 'arrived', 'departed']:
topics[event_type] = publisher.topic_path(args.project, event_type)
try:
publisher.get_topic(topic=topics[event_type])
logging.info("Already exists: {}".format(topics[event_type]))
except:
logging.info("Creating {}".format(topics[event_type]))
publisher.create_topic(name=topics[event_type])
# notify about each row in the dataset
programStartTime = datetime.datetime.utcnow()
simStartTime = datetime.datetime.strptime(args.startTime, TIME_FORMAT).replace(tzinfo=pytz.UTC)
logging.info('Simulation start time is {}'.format(simStartTime))
notify(publisher, topics, rows, simStartTime, programStartTime, args.speedFactor)