def getProducer()

in action/messageHubProduce.py [0:0]


def getProducer(validatedParams, timeout_ms):
    connectionHash = getConnectionHash(validatedParams)

    if globals().get("cached_producers") is None:
        logging.info("dictionary was None")
        globals()["cached_producers"] = dict()

    # remove arbitrary connection to make room for new one
    if len(globals()["cached_producers"]) == max_cached_producers:
        poppedProducer = globals()["cached_producers"].popitem()[1]
        poppedProducer.close(timeout=1)
        logging.info("Removed cached producer")

    if connectionHash not in globals()["cached_producers"]:
        logging.info("cache miss")
        # create a new connection
        sasl_mechanism = 'PLAIN'
        security_protocol = 'SASL_SSL'

        # Create a new context using system defaults, disable all but TLS1.2
        context = ssl.create_default_context()
        context.options &= ssl.OP_NO_TLSv1
        context.options &= ssl.OP_NO_TLSv1_1

        producer = KafkaProducer(
            api_version=(0, 10),
            batch_size=0,
            bootstrap_servers=validatedParams['kafka_brokers_sasl'],
            max_block_ms=timeout_ms,
            request_timeout_ms=timeout_ms,
            sasl_plain_username=validatedParams['user'],
            sasl_plain_password=validatedParams['password'],
            security_protocol=security_protocol,
            ssl_context=context,
            sasl_mechanism=sasl_mechanism
        )

        logging.info("Created producer")

        # store the producer globally for subsequent invocations
        globals()["cached_producers"][connectionHash] = producer

        # return it
        return producer
    else:
        logging.info("Reusing existing producer")
        return globals()["cached_producers"][connectionHash]