playground-internal/statefun-playground-producer/main.py (73 lines of code) (raw):

################################################################################ # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ import json import signal import sys import time import os from collections import namedtuple from itertools import cycle from kafka.errors import NoBrokersAvailable from kafka import KafkaProducer from jsonpath_ng import parse Arg = namedtuple('Arg', ['key', 'default', 'type']) APP_PATH = Arg(key="APP_PATH", default="/opt/statefun/example.json", type=str) APP_KAFKA_HOST = Arg(key="APP_KAFKA_HOST", default="kafka-broker:9092", type=str) APP_KAFKA_TOPIC = Arg(key="APP_KAFKA_TOPIC", default="input", type=str) APP_DELAY_SECONDS = Arg(key="APP_DELAY_SECONDS", default="1", type=int) APP_LOOP = Arg(key="APP_LOOP", default="true", type=lambda s: s.lower() == "true") APP_JSON_PATH = Arg(key="APP_JSON_PATH", default="name", type=parse) def env(arg: Arg): val = os.environ.get(arg.key, arg.default) return arg.type(val) def read_jsons(path: str): with open(path) as f: for line in f.read().splitlines(): yield json.loads(line) def create_requests(path: str, loop: bool, json_path): jsons = [js for js in read_jsons(path)] if loop: jsons = cycle(jsons) for js in jsons: matches = json_path.find(js) if len(matches) != 1: raise ValueError(f"Unable to find exactly one key at {js}, please check the correctness of your {APP_JSON_PATH.key} value") match = matches[0] yield match.value, js class KProducer(object): def __init__(self, broker, topic): self.producer = KafkaProducer(bootstrap_servers=[broker]) self.topic = topic def send(self, key: str, value: str): key = key.encode('utf-8') value = value.encode('utf-8') self.producer.send(topic=self.topic, key=key, value=value) self.producer.flush() def produce(producer, delay_seconds: int, requests): for key, js in requests: value = json.dumps(js) producer.send(key=key, value=value) if delay_seconds > 0: time.sleep(delay_seconds) def handler(number, frame): sys.exit(0) def main(): # setup an exit signal handler signal.signal(signal.SIGTERM, handler) signal.signal(signal.SIGINT, handler) # get the key,value request generators requests = create_requests(path=env(APP_PATH), loop=env(APP_LOOP), json_path=env(APP_JSON_PATH)) # produce forever while True: try: producer = KProducer(broker=env(APP_KAFKA_HOST), topic=env(APP_KAFKA_TOPIC)) produce(producer=producer, delay_seconds=env(APP_DELAY_SECONDS), requests=requests) print("Done producing, good bye!", flush=True) return except SystemExit: print("Good bye!", flush=True) return except NoBrokersAvailable: print("No brokers available... retrying in 2 seconds.") time.sleep(2) continue except Exception as e: print(e) return if __name__ == "__main__": main()