def run()

in a2d2/src/manifest_consumer.py [0:0]


    def run(self):
        try:
            consumer = KafkaConsumer(self.response_topic, 
                                bootstrap_servers=self.servers,
                                auto_offset_reset="earliest",
                                client_id=random_string())

            self.logger.info("manifest consumer on {0} kafka topic".format(self.response_topic))

            for message in consumer:
                try:
                    json_str = message.value
                    json_msg = json.loads(json_str)
                    if is_close_msg(json_msg):
                        print(json_str)
                        break
                except Exception as e:
                    exc_type, exc_value, exc_traceback = sys.exc_info()
                    traceback.print_tb(exc_traceback, limit=20, file=sys.stdout)
                    self.logger.error(str(exc_type))
                    self.logger.error(str(exc_value))
                    break

            consumer.close()
            admin = KafkaAdminClient(bootstrap_servers=self.servers)
            admin.delete_topics([self.response_topic])
            admin.close()
        except Exception as e:
            exc_type, exc_value, exc_traceback = sys.exc_info()
            traceback.print_tb(exc_traceback, limit=20, file=sys.stdout)
            self.logger.error(str(exc_type))
            self.logger.error(str(exc_value))