in provider/consumer.py [0:0]
def run(self):
try:
self.consumer = self.__createConsumer()
while self.__shouldRun():
messages = self.__pollForMessages()
if len(messages) > 0:
self.__fireTrigger(messages)
time.sleep(0.1)
logging.info("[{}] Consumer exiting main loop".format(self.trigger))
except Exception as e:
logging.error('[{}] Uncaught exception: {}'.format(self.trigger, e))
if self.desiredState() == Consumer.State.Dead:
logging.info('[{}] Permanently killing consumer because desired state is Dead'.format(self.trigger))
elif self.desiredState() == Consumer.State.Restart:
logging.info('[{}] Quietly letting the consumer thread stop in order to allow restart.'.format(self.trigger))
# nothing else to do because this Thread is about to go away
elif self.desiredState() == Consumer.State.Disabled:
logging.info('[{}] Quietly letting the consumer thread stop in order to disable the feed.'.format(self.trigger))
else:
# uh-oh... this really shouldn't happen
logging.error('[{}] Consumer stopped without being asked'.format(self.trigger))
try:
if self.consumer is not None:
logging.info('[{}] Cleaning up consumer'.format(self.trigger))
logging.debug('[{}] Closing KafkaConsumer'.format(self.trigger))
self.consumer.unsubscribe()
self.consumer.close()
logging.info('[{}] Successfully closed KafkaConsumer'.format(self.trigger))
logging.debug('[{}] Deallocating KafkaConsumer'.format(self.trigger))
self.consumer = None
logging.info('[{}] Successfully cleaned up consumer'.format(self.trigger))
except Exception as e:
logging.error('[{}] Uncaught exception while shutting down consumer: {}'.format(self.trigger, e))
finally:
logging.info('[{}] Recording consumer as {}. Bye bye!'.format(self.trigger, self.desiredState()))
self.__recordState(self.desiredState())