in action/kafkaProduce.py [0:0]
def getProducer(validatedParams, timeout_ms):
connectionHash = getConnectionHash(validatedParams)
if globals().get("cached_producers") is None:
logging.info("dictionary was None")
globals()["cached_producers"] = dict()
# remove arbitrary connection to make room for new one
if len(globals()["cached_producers"]) == max_cached_producers:
poppedProducer = globals()["cached_producers"].popitem()[1]
poppedProducer.close(timeout=1)
logging.info("Removed cached producer")
if connectionHash not in globals()["cached_producers"]:
logging.info("cache miss")
# create a new connection
producer = KafkaProducer(
api_version_auto_timeout_ms=15000,
batch_size=0,
bootstrap_servers=validatedParams['brokers'],
max_block_ms=timeout_ms,
request_timeout_ms=timeout_ms,
)
logging.info("Created producer")
# store the producer globally for subsequent invocations
globals()["cached_producers"][connectionHash] = producer
# return it
return producer
else:
logging.info("Reusing existing producer")
return globals()["cached_producers"][connectionHash]