playground-internal/statefun-playground-producer/main.py (73 lines of code) (raw):
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import json
import signal
import sys
import time
import os
from collections import namedtuple
from itertools import cycle
from kafka.errors import NoBrokersAvailable
from kafka import KafkaProducer
from jsonpath_ng import parse
Arg = namedtuple('Arg', ['key', 'default', 'type'])
APP_PATH = Arg(key="APP_PATH", default="/opt/statefun/example.json", type=str)
APP_KAFKA_HOST = Arg(key="APP_KAFKA_HOST", default="kafka-broker:9092", type=str)
APP_KAFKA_TOPIC = Arg(key="APP_KAFKA_TOPIC", default="input", type=str)
APP_DELAY_SECONDS = Arg(key="APP_DELAY_SECONDS", default="1", type=int)
APP_LOOP = Arg(key="APP_LOOP", default="true", type=lambda s: s.lower() == "true")
APP_JSON_PATH = Arg(key="APP_JSON_PATH", default="name", type=parse)
def env(arg: Arg):
val = os.environ.get(arg.key, arg.default)
return arg.type(val)
def read_jsons(path: str):
with open(path) as f:
for line in f.read().splitlines():
yield json.loads(line)
def create_requests(path: str, loop: bool, json_path):
jsons = [js for js in read_jsons(path)]
if loop:
jsons = cycle(jsons)
for js in jsons:
matches = json_path.find(js)
if len(matches) != 1:
raise ValueError(f"Unable to find exactly one key at {js}, please check the correctness of your {APP_JSON_PATH.key} value")
match = matches[0]
yield match.value, js
class KProducer(object):
def __init__(self, broker, topic):
self.producer = KafkaProducer(bootstrap_servers=[broker])
self.topic = topic
def send(self, key: str, value: str):
key = key.encode('utf-8')
value = value.encode('utf-8')
self.producer.send(topic=self.topic, key=key, value=value)
self.producer.flush()
def produce(producer, delay_seconds: int, requests):
for key, js in requests:
value = json.dumps(js)
producer.send(key=key, value=value)
if delay_seconds > 0:
time.sleep(delay_seconds)
def handler(number, frame):
sys.exit(0)
def main():
# setup an exit signal handler
signal.signal(signal.SIGTERM, handler)
signal.signal(signal.SIGINT, handler)
# get the key,value request generators
requests = create_requests(path=env(APP_PATH), loop=env(APP_LOOP), json_path=env(APP_JSON_PATH))
# produce forever
while True:
try:
producer = KProducer(broker=env(APP_KAFKA_HOST), topic=env(APP_KAFKA_TOPIC))
produce(producer=producer, delay_seconds=env(APP_DELAY_SECONDS), requests=requests)
print("Done producing, good bye!", flush=True)
return
except SystemExit:
print("Good bye!", flush=True)
return
except NoBrokersAvailable:
print("No brokers available... retrying in 2 seconds.")
time.sleep(2)
continue
except Exception as e:
print(e)
return
if __name__ == "__main__":
main()