def process_file()

in service/app/main.py [0:0]


def process_file(local_file, s3_prefix, s3_bucket, output_dir, topics_to_extract):
    """
    Extract Rosbag Topics from input file to output_dir

    :param local_file:
    :param output_dir:
    :param topics_to_extract:
    :return:
    """

    bag = bagreader(local_file)

    local_file_name = local_file.split("/")[-1].replace(".bag", "")
    save_metadata_to_dynamo(bag, s3_prefix, local_file_name, s3_bucket)


    for topic in topics_to_extract:
        data = bag.message_by_topic(topic)
        if data is None:
            logging.info("No data found for {topic}".format(topic=topic))
        else:
            logging.info("Reading data found for {topic}".format(topic=topic))
            df_out = pd.read_csv(data)
            df_out.columns = [x.replace(".", "_") for x in df_out.columns]
            for col in df_out.columns:
                # parse complex objects:
                example = None
                for x in df_out[col]:
                    if isinstance(x, str) and ":" in x:
                        # Column is yaml
                        example = x
                        break
                if example:
                    obj_start = example.split(":")[0].replace("[", "")
                    df_out[f"{col}_clean"] = df_out[col].apply(
                        lambda x: parse_yaml_val(x, obj_start)
                    )

            df_out["bag_file_prefix"] = s3_prefix
            df_out["bag_file_bucket"] = s3_bucket
            clean_topic = topic.replace("/", "_")[1:]
            topic_output_dir = os.path.join(output_dir, clean_topic)
            clean_directory(topic_output_dir)
            topic_output_dir = os.path.join(
                topic_output_dir, "bag_file=" + local_file_name
            )
            clean_directory(topic_output_dir)
            output_path = os.path.join(topic_output_dir, "data.parq")
            fastparquet.write(output_path, df_out)
    print_files_in_path(output_dir)