in service/app/main.py [0:0]
def process_file(local_file, s3_prefix, s3_bucket, output_dir, topics_to_extract):
"""
Extract Rosbag Topics from input file to output_dir
:param local_file:
:param output_dir:
:param topics_to_extract:
:return:
"""
bag = bagreader(local_file)
local_file_name = local_file.split("/")[-1].replace(".bag", "")
save_metadata_to_dynamo(bag, s3_prefix, local_file_name, s3_bucket)
for topic in topics_to_extract:
data = bag.message_by_topic(topic)
if data is None:
logging.info("No data found for {topic}".format(topic=topic))
else:
logging.info("Reading data found for {topic}".format(topic=topic))
df_out = pd.read_csv(data)
df_out.columns = [x.replace(".", "_") for x in df_out.columns]
for col in df_out.columns:
# parse complex objects:
example = None
for x in df_out[col]:
if isinstance(x, str) and ":" in x:
# Column is yaml
example = x
break
if example:
obj_start = example.split(":")[0].replace("[", "")
df_out[f"{col}_clean"] = df_out[col].apply(
lambda x: parse_yaml_val(x, obj_start)
)
df_out["bag_file_prefix"] = s3_prefix
df_out["bag_file_bucket"] = s3_bucket
clean_topic = topic.replace("/", "_")[1:]
topic_output_dir = os.path.join(output_dir, clean_topic)
clean_directory(topic_output_dir)
topic_output_dir = os.path.join(
topic_output_dir, "bag_file=" + local_file_name
)
clean_directory(topic_output_dir)
output_path = os.path.join(topic_output_dir, "data.parq")
fastparquet.write(output_path, df_out)
print_files_in_path(output_dir)