def run()

in provider/consumer.py [0:0]


    def run(self):
        try:
            self.consumer = self.__createConsumer()

            while self.__shouldRun():
                messages = self.__pollForMessages()

                if len(messages) > 0:
                    self.__fireTrigger(messages)

                time.sleep(0.1)

            logging.info("[{}] Consumer exiting main loop".format(self.trigger))
        except Exception as e:
            logging.error('[{}] Uncaught exception: {}'.format(self.trigger, e))

        if self.desiredState() == Consumer.State.Dead:
            logging.info('[{}] Permanently killing consumer because desired state is Dead'.format(self.trigger))
        elif self.desiredState() == Consumer.State.Restart:
            logging.info('[{}] Quietly letting the consumer thread stop in order to allow restart.'.format(self.trigger))
            # nothing else to do because this Thread is about to go away
        elif self.desiredState() == Consumer.State.Disabled:
            logging.info('[{}] Quietly letting the consumer thread stop in order to disable the feed.'.format(self.trigger))
        else:
            # uh-oh... this really shouldn't happen
            logging.error('[{}] Consumer stopped without being asked'.format(self.trigger))

        try:
            if self.consumer is not None:
                logging.info('[{}] Cleaning up consumer'.format(self.trigger))
                logging.debug('[{}] Closing KafkaConsumer'.format(self.trigger))
                self.consumer.unsubscribe()
                self.consumer.close()
                logging.info('[{}] Successfully closed KafkaConsumer'.format(self.trigger))

                logging.debug('[{}] Deallocating KafkaConsumer'.format(self.trigger))
                self.consumer = None
                logging.info('[{}] Successfully cleaned up consumer'.format(self.trigger))
        except Exception as e:
            logging.error('[{}] Uncaught exception while shutting down consumer: {}'.format(self.trigger, e))
        finally:
            logging.info('[{}] Recording consumer as {}. Bye bye!'.format(self.trigger, self.desiredState()))
            self.__recordState(self.desiredState())