action/kafkaProduce.py [139:173]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    if 'base64DecodeValue' in params and params['base64DecodeValue'] == True:
        try:
            validatedParams['value'] = base64.b64decode(params['value']).decode('utf-8')
        except:
            return (False, "value parameter is not Base64 encoded")

        if len(validatedParams['value']) == 0:
            return (False, "value parameter is not Base64 encoded")

    if 'base64DecodeKey' in params and params['base64DecodeKey'] == True:
        try:
            validatedParams['key'] = base64.b64decode(params['key']).decode('utf-8')
        except:
            return (False, "key parameter is not Base64 encoded")

        if len(validatedParams['key']) == 0:
            return (False, "key parameter is not Base64 encoded")

    return (True, validatedParams)

def getProducer(validatedParams, timeout_ms):
    connectionHash = getConnectionHash(validatedParams)

    if globals().get("cached_producers") is None:
        logging.info("dictionary was None")
        globals()["cached_producers"] = dict()

    # remove arbitrary connection to make room for new one
    if len(globals()["cached_producers"]) == max_cached_producers:
        poppedProducer = globals()["cached_producers"].popitem()[1]
        poppedProducer.close(timeout=1)
        logging.info("Removed cached producer")

    if connectionHash not in globals()["cached_producers"]:
        logging.info("cache miss")
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



action/messageHubProduce.py [140:174]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    if 'base64DecodeValue' in params and params['base64DecodeValue'] == True:
        try:
            validatedParams['value'] = base64.b64decode(params['value']).decode('utf-8')
        except:
            return (False, "value parameter is not Base64 encoded")

        if len(validatedParams['value']) == 0:
            return (False, "value parameter is not Base64 encoded")

    if 'base64DecodeKey' in params and params['base64DecodeKey'] == True:
        try:
            validatedParams['key'] = base64.b64decode(params['key']).decode('utf-8')
        except:
            return (False, "key parameter is not Base64 encoded")

        if len(validatedParams['key']) == 0:
            return (False, "key parameter is not Base64 encoded")

    return (True, validatedParams)

def getProducer(validatedParams, timeout_ms):
    connectionHash = getConnectionHash(validatedParams)

    if globals().get("cached_producers") is None:
        logging.info("dictionary was None")
        globals()["cached_producers"] = dict()

    # remove arbitrary connection to make room for new one
    if len(globals()["cached_producers"]) == max_cached_producers:
        poppedProducer = globals()["cached_producers"].popitem()[1]
        poppedProducer.close(timeout=1)
        logging.info("Removed cached producer")

    if connectionHash not in globals()["cached_producers"]:
        logging.info("cache miss")
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



