in a2d2/src/rosbag_producer.py [0:0]
def run(self):
try:
self.producer = KafkaProducer(bootstrap_servers=self.servers,
client_id=random_string())
self.__create_bag_dir()
tasks = []
sensors = self.request["sensor_id"]
sensor_topics = self.request['ros_topic']
sensor_data_types = self.request["data_type"]
sensor_frame_id = self.request.get("frame_id", dict())
for s in sensors:
manifest = self.manifests[s]
data_type = sensor_data_types[s]
ros_topic = sensor_topics[s]
frame_id = sensor_frame_id.get(s, "map")
t = threading.Thread(target=self.__record_sensor, name=s,
kwargs={"manifest": manifest, "data_type": data_type,
"ros_topic": ros_topic, "sensor": s, "frame_id": frame_id})
tasks.append(t)
t.start()
self.logger.info("Started thread:" + t.getName())
for t in tasks:
self.logger.info("Wait on thread:" + t.getName())
t.join()
self.logger.info("Thread finished:" + t.getName())
self.logger.info("Flush ROS bag")
self.__flush_bag()
self.logger.info("Close ROS bag")
self.__close_bag()
json_msg = {"__close__": True}
resp_topic = self.request['response_topic']
self.producer.send(resp_topic, json.dumps(json_msg).encode('utf-8'))
self.producer.flush()
self.producer.close()
print("completed request:"+resp_topic)
except Exception as _:
exc_type, exc_value, exc_traceback = sys.exc_info()
traceback.print_tb(exc_traceback, limit=20, file=sys.stdout)
self.logger.error(str(exc_type))
self.logger.error(str(exc_value))