tools/cloud/firehose-to-html.py (152 lines of code) (raw):

#!/usr/bin/env python3 # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 import argparse import json import math import numpy import pandas as pd import plotly.graph_objects as go def is_included(name, include_list, exclude_list): if include_list: for include in include_list: if include and include in name: break else: return False for exclude in exclude_list: if exclude and exclude in name: return False return True def expand(struct_data, expanded_data, name_prefix, s3_links, include_list, exclude_list): if type(struct_data) == dict: for key, value in struct_data.items(): expand( value, expanded_data, name_prefix + "." + key, s3_links, include_list, exclude_list ) elif type(struct_data) in [list, numpy.ndarray]: for i in range(len(struct_data)): expand( struct_data[i], expanded_data, f"{name_prefix}[{i}]", s3_links, include_list, exclude_list, ) elif not is_included(name_prefix, include_list, exclude_list): return elif type(struct_data) == str: if "s3://" in struct_data: s3_links.append(struct_data) else: expanded_data[name_prefix] = float(struct_data) def process_row(vehicle_name, row, data, s3_links, include_list, exclude_list): expanded_data = {"time": row["time"]} if "measure_name" not in row or "time" not in row or "vehicleName" not in row: raise Exception("Unsupported format") if row["vehicleName"] != vehicle_name: return if "measure_value_BOOLEAN" in row: if not is_included(row["measure_name"], include_list, exclude_list): return expanded_data[row["measure_name"]] = 1 if row["measure_value_BOOLEAN"] else 0 elif "measure_value_DOUBLE" in row and not math.isnan(row["measure_value_DOUBLE"]): if not is_included(row["measure_name"], include_list, exclude_list): return expanded_data[row["measure_name"]] = row["measure_value_DOUBLE"] elif "measure_value_BIGINT" in row: if not is_included(row["measure_name"], include_list, exclude_list): return expanded_data[row["measure_name"]] = row["measure_value_BIGINT"] elif "measure_value_VARCHAR" in row: if not is_included(row["measure_name"], include_list, exclude_list): return expanded_data[row["measure_name"]] = row["measure_value_VARCHAR"] elif "measure_value_STRUCT" in row and row["measure_value_STRUCT"] is not None: for struct_data in row["measure_value_STRUCT"].values(): if struct_data is None: continue expand( struct_data, expanded_data, row["measure_name"], s3_links, include_list, exclude_list, ) break else: raise Exception(f"Unsupported format: {row}") data.append(expanded_data) if __name__ == "__main__": parser = argparse.ArgumentParser(description="Creates plots for collected data from Firehose") parser.add_argument( "--vehicle-name", required=True, help="Vehicle name", ) parser.add_argument( "--files", type=argparse.FileType("r"), nargs="+", required=True, help="List files to process", ) parser.add_argument( "--html-filename", metavar="FILE", required=True, help="HTML output filename", ) parser.add_argument( "--s3-links-filename", metavar="FILE", help="S3 links output filename", ) parser.add_argument( "--include-signals", metavar="SIGNAL_LIST", help="Comma separated list of signals to include", ) parser.add_argument( "--exclude-signals", metavar="SIGNAL_LIST", help="Comma separated list of signals to exclude", ) args = parser.parse_args() data = [] s3_links = [] include_list = ( [i.strip() for i in args.include_signals.split(",")] if args.include_signals else [] ) exclude_list = ( [i.strip() for i in args.exclude_signals.split(",")] if args.exclude_signals else [] ) for file in args.files: try: if file.name.endswith(".json"): with open(file.name) as fp: for line in fp: row = json.loads(line) process_row( args.vehicle_name, row, data, s3_links, include_list, exclude_list ) elif file.name.endswith(".parquet"): df = pd.read_parquet(file.name, engine="pyarrow") for _, row in df.iterrows(): process_row(args.vehicle_name, row, data, s3_links, include_list, exclude_list) else: raise Exception("Unsupported format") except Exception as e: raise Exception(str(e) + f" in {file.name}") if len(data) == 0: raise Exception("No data found") df = pd.DataFrame(data) df["time"] = pd.to_datetime(df["time"], unit="ms") fig = go.Figure() for column in df: if column != "time": fig.add_trace(go.Scatter(x=df["time"], y=df[column], mode="markers", name=column)) with open(args.html_filename, "w") as fp: fp.write(fig.to_html()) if args.s3_links_filename: with open(args.s3_links_filename, "w") as fp: fp.write("\n".join(s3_links))