in playground-internal/statefun-playground-producer/main.py [0:0]
def main():
# setup an exit signal handler
signal.signal(signal.SIGTERM, handler)
signal.signal(signal.SIGINT, handler)
# get the key,value request generators
requests = create_requests(path=env(APP_PATH), loop=env(APP_LOOP), json_path=env(APP_JSON_PATH))
# produce forever
while True:
try:
producer = KProducer(broker=env(APP_KAFKA_HOST), topic=env(APP_KAFKA_TOPIC))
produce(producer=producer, delay_seconds=env(APP_DELAY_SECONDS), requests=requests)
print("Done producing, good bye!", flush=True)
return
except SystemExit:
print("Good bye!", flush=True)
return
except NoBrokersAvailable:
print("No brokers available... retrying in 2 seconds.")
time.sleep(2)
continue
except Exception as e:
print(e)
return