in provider/consumer.py [0:0]
def __createConsumer(self):
if self.__shouldRun():
config = {'metadata.broker.list': ','.join(self.brokers),
'group.id': self.trigger,
'default.topic.config': {'auto.offset.reset': 'latest'},
'enable.auto.commit': False,
'api.version.request': True,
'isolation.level': 'read_uncommitted'
}
if self.isMessageHub:
# append Message Hub specific config
config.update({'ssl.ca.location': '/etc/ssl/certs/',
'sasl.mechanisms': 'PLAIN',
'sasl.username': self.username,
'sasl.password': self.password,
'security.protocol': 'sasl_ssl'
})
consumer = KafkaConsumer(config)
consumer.subscribe([self.topic], self.__on_assign, self.__on_revoke)
logging.info("[{}] Now listening in order to fire trigger".format(self.trigger))
return consumer