in a2d2/src/rosbag_producer.py [0:0]
def __init__(self, dbconfig=None, servers=None,
request=None, data_store=None, calibration=None):
Process.__init__(self)
logging.basicConfig(
format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s',
level=logging.INFO)
self.logger = logging.getLogger("rosbag_producer")
self.servers = servers
self.request = request
self.data_store = data_store
self.calibration = calibration
self.tmp = os.getenv("TMP", default="/tmp")
self.img_cv_bridge = cv_bridge.CvBridge()
self.manifests = dict()
sensors = self.request['sensor_id']
self.topic_dict = dict()
self.topic_list = list()
self.topic_active = dict()
self.topic_index = 0
self.round_robin = list()
self.bus_topic = None
for s in sensors:
self.manifests[s] = create_manifest(request=request, dbconfig=dbconfig, sensor_id=s)
_topic = self.request['ros_topic'][s]
self.topic_dict[_topic] = []
self.topic_list.append(_topic)
self.topic_active[_topic] = True
if len(sensors) > 1:
self.bag_lock = threading.Lock()
else:
self.bag_lock = None
self.bag = None
self.bag_path = None
self.bag_name = None
self.multipart = 'multipart' in request['accept']
if self.multipart:
self.msg_count = 0
self.accept = request['accept']
if self.accept.startswith('s3/'):
s3_config = self.data_store['s3']
self.rosbag_bucket = s3_config['rosbag_bucket']
self.rosbag_prefix = s3_config['rosbag_prefix']
if not self.rosbag_prefix.endswith("/"):
self.rosbag_prefix += "/"
if self.multipart:
self.chunk_count = len(self.topic_list)*2
cal_obj = get_s3_resource().Object(calibration["cal_bucket"], calibration["cal_key"])
cal_data = cal_obj.get()['Body'].read().decode('utf-8')
self.cal_json = json.loads(cal_data)