in action/messageHubProduce.py [0:0]
def getProducer(validatedParams, timeout_ms):
connectionHash = getConnectionHash(validatedParams)
if globals().get("cached_producers") is None:
logging.info("dictionary was None")
globals()["cached_producers"] = dict()
# remove arbitrary connection to make room for new one
if len(globals()["cached_producers"]) == max_cached_producers:
poppedProducer = globals()["cached_producers"].popitem()[1]
poppedProducer.close(timeout=1)
logging.info("Removed cached producer")
if connectionHash not in globals()["cached_producers"]:
logging.info("cache miss")
# create a new connection
sasl_mechanism = 'PLAIN'
security_protocol = 'SASL_SSL'
# Create a new context using system defaults, disable all but TLS1.2
context = ssl.create_default_context()
context.options &= ssl.OP_NO_TLSv1
context.options &= ssl.OP_NO_TLSv1_1
producer = KafkaProducer(
api_version=(0, 10),
batch_size=0,
bootstrap_servers=validatedParams['kafka_brokers_sasl'],
max_block_ms=timeout_ms,
request_timeout_ms=timeout_ms,
sasl_plain_username=validatedParams['user'],
sasl_plain_password=validatedParams['password'],
security_protocol=security_protocol,
ssl_context=context,
sasl_mechanism=sasl_mechanism
)
logging.info("Created producer")
# store the producer globally for subsequent invocations
globals()["cached_producers"][connectionHash] = producer
# return it
return producer
else:
logging.info("Reusing existing producer")
return globals()["cached_producers"][connectionHash]