in provider/consumer.py [0:0]
def __init__(self, trigger, params, sharedDictionary):
Process.__init__(self)
self.daemon = True
self.trigger = trigger
self.isMessageHub = params["isMessageHub"]
self.triggerURL = self.__triggerURL(params["triggerURL"])
self.brokers = params["brokers"]
self.topic = params["topic"]
self.sharedDictionary = sharedDictionary
if 'status' in params and params['status']['active'] == False:
self.sharedDictionary['currentState'] = Consumer.State.Disabled
self.sharedDictionary['desiredState'] = Consumer.State.Disabled
else:
self.sharedDictionary['currentState'] = Consumer.State.Initializing
self.sharedDictionary['desiredState'] = Consumer.State.Running
if self.isMessageHub:
self.username = params["username"]
self.password = params["password"]
if 'isIamKey' in params and params['isIamKey'] == True:
self.authHandler = IAMAuth(params['authKey'], params['iamUrl'])
else:
if 'authKey' in params:
auth = params['authKey'].split(':')
self.authHandler = HTTPBasicAuth(auth[0], auth[1])
else:
parsedUrl = urlparse(params["triggerURL"])
self.authHandler = HTTPBasicAuth(parsedUrl.username, parsedUrl.password)
# handle the case where there may be existing triggers that do not
# have the isJSONData field set
if "isJSONData" in params:
self.encodeValueAsJSON = params["isJSONData"]
else:
self.encodeValueAsJSON = False
if "isBinaryValue" in params:
self.encodeValueAsBase64 = params["isBinaryValue"]
else:
self.encodeValueAsBase64 = False
if "isBinaryKey" in params:
self.encodeKeyAsBase64 = params["isBinaryKey"]
else:
self.encodeKeyAsBase64 = False
# always init consumer to None in case the consumer needs to shut down
# before the KafkaConsumer is fully initialized/assigned
self.consumer = None
# potentially squirrel away the message that would overflow the payload
self.queuedMessage = None