in 04_streaming/simulate/simulate.py [0:0]
def notify(publisher, topics, rows, simStartTime, programStart, speedFactor):
# sleep computation
def compute_sleep_secs(notify_time):
time_elapsed = (datetime.datetime.utcnow() - programStart).total_seconds()
sim_time_elapsed = (notify_time - simStartTime).total_seconds() / speedFactor
to_sleep_secs = sim_time_elapsed - time_elapsed
return to_sleep_secs
tonotify = {}
for key in topics:
tonotify[key] = list()
for row in rows:
event_type, notify_time, event_data = row
# how much time should we sleep?
if compute_sleep_secs(notify_time) > 1:
# notify the accumulated tonotify
publish(publisher, topics, tonotify, notify_time)
for key in topics:
tonotify[key] = list()
# recompute sleep, since notification takes a while
to_sleep_secs = compute_sleep_secs(notify_time)
if to_sleep_secs > 0:
logging.info('Sleeping {} seconds'.format(to_sleep_secs))
time.sleep(to_sleep_secs)
tonotify[event_type].append(event_data)
# left-over records; notify again
publish(publisher, topics, tonotify, notify_time)