in a2d2/src/rosbag_consumer.py [0:0]
def run(self):
try:
self.logger.info("starting rosbag_consumer:{0}".format(self.response_topic))
node_name = "mozart_rosbag_{0}".format(random_string(6))
rospy.init_node(node_name)
consumer = KafkaConsumer(self.response_topic,
bootstrap_servers=self.servers,
auto_offset_reset="earliest",
client_id=random_string())
if self.s3:
self.s3_reader = S3Reader(self.s3_read_req, self.s3_read_resp)
self.s3_deleter = S3Deleter(self.s3_delete_req)
self.s3_reader.start()
self.s3_deleter.start()
for msg in consumer:
try:
json_str = msg.value
json_msg = json.loads(json_str)
if is_close_msg(json_msg):
print(json_str)
break
self.__publish_bag(json_msg)
except Exception as _:
exc_type, exc_value, exc_traceback = sys.exc_info()
traceback.print_tb(exc_traceback, limit=20, file=sys.stdout)
print(str(exc_type))
print(str(exc_value))
if self.s3:
self.s3_read_req.put("__close__")
time.sleep(5)
self.__read_s3(drain=True)
self.s3_reader.join(timeout=2)
if self.s3_reader.is_alive():
self.s3_reader.terminate()
self.s3_delete_req.put("__close__")
time.sleep(5)
self.s3_deleter.join(timeout=2)
if self.s3_deleter.is_alive():
self.s3_deleter.terminate()
else:
for dir in self.clean_up:
shutil.rmtree(dir, ignore_errors=True)
consumer.close()
admin = KafkaAdminClient(bootstrap_servers=self.servers)
admin.delete_topics([self.response_topic])
admin.close()
except Exception as _:
exc_type, exc_value, exc_traceback = sys.exc_info()
traceback.print_tb(exc_traceback, limit=20, file=sys.stdout)
print(str(exc_type))
print(str(exc_value))