def run()

in a2d2/src/rosbag_consumer.py [0:0]


    def run(self):
        
        try: 
            self.logger.info("starting rosbag_consumer:{0}".format(self.response_topic))
            node_name = "mozart_rosbag_{0}".format(random_string(6))
            rospy.init_node(node_name)
            
            consumer = KafkaConsumer(self.response_topic, 
                                bootstrap_servers=self.servers,
                                auto_offset_reset="earliest",
                                client_id=random_string())

            if self.s3:
                self.s3_reader = S3Reader(self.s3_read_req, self.s3_read_resp)
                self.s3_deleter = S3Deleter(self.s3_delete_req)
                self.s3_reader.start()
                self.s3_deleter.start()

            for msg in consumer:
                try:
                    json_str = msg.value
                    json_msg = json.loads(json_str)

                    if is_close_msg(json_msg):
                        print(json_str)
                        break

                    self.__publish_bag(json_msg)
                except Exception as _:
                    exc_type, exc_value, exc_traceback = sys.exc_info()
                    traceback.print_tb(exc_traceback, limit=20, file=sys.stdout)
                    print(str(exc_type))
                    print(str(exc_value))

            if self.s3:
                self.s3_read_req.put("__close__")
                time.sleep(5)
                self.__read_s3(drain=True)
                
                self.s3_reader.join(timeout=2)
                if self.s3_reader.is_alive():
                    self.s3_reader.terminate()

                self.s3_delete_req.put("__close__")
                time.sleep(5)
                self.s3_deleter.join(timeout=2)
                if self.s3_deleter.is_alive():
                    self.s3_deleter.terminate()
            else:
                for dir in self.clean_up:
                    shutil.rmtree(dir, ignore_errors=True)

            consumer.close()
            admin = KafkaAdminClient(bootstrap_servers=self.servers)
            admin.delete_topics([self.response_topic])
            admin.close()

        except Exception as _:
            exc_type, exc_value, exc_traceback = sys.exc_info()
            traceback.print_tb(exc_traceback, limit=20, file=sys.stdout)
            print(str(exc_type))
            print(str(exc_value))