tools/cloud/dbc-to-nodes.py (96 lines of code) (raw):
#!/usr/bin/env python3
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
import argparse
import json
import sys
import cantools
FQN_KEY = "fullyQualifiedName"
parser = argparse.ArgumentParser(
description=(
"Converts a DBC file to AWS IoT FleetWise 'nodes' format for use with "
"CreateSignalCatalog"
)
)
parser.add_argument(
"-p",
"--permissive",
action="store_true",
help="Apply the cantools strict=False option when loading the DBC file",
)
parser.add_argument(
"infile",
nargs="?",
type=argparse.FileType("r"),
default=sys.stdin,
help="Input DBC file, default stdin",
)
parser.add_argument(
"outfile",
nargs="?",
type=argparse.FileType("w"),
default=sys.stdout,
help="Output filename, default stdout",
)
args = parser.parse_args()
db = cantools.database.load(args.infile, strict=not args.permissive)
vehicle_branch = {FQN_KEY: "Vehicle"}
nodes = []
processed_messages = set()
for message in db.messages:
message_text = message.name if message.name else message.frame_id
if message_text in processed_messages:
message_text = f"{message_text}_{message.frame_id}"
if message_text in processed_messages:
print(
f"Message {message.frame_id} occurs multiple times, only the first occurrence "
"will be used",
file=sys.stderr,
)
continue
processed_messages.add(message_text)
message_branch = {FQN_KEY: f"{vehicle_branch[FQN_KEY]}.{message_text}"}
nodes.append({"branch": message_branch})
processed_signals = set()
for signal in message.signals:
if signal.name in processed_signals:
print(
f"Signal {signal.name} occurs multiple times in the message {message_text}, only"
" the first occurrence will be used",
file=sys.stderr,
)
continue
processed_signals.add(signal.name)
if (
signal.choices
and len(signal.choices) <= 2
and (
(0 in signal.choices and str(signal.choices[0]).lower() == "false")
or (1 in signal.choices and str(signal.choices[1]).lower() == "true")
)
):
datatype = "BOOLEAN"
elif signal.scale != 1 or signal.offset != 0 or signal.length > 64 or signal.is_float:
datatype = "DOUBLE"
elif signal.length <= 8:
datatype = "INT8" if signal.is_signed else "UINT8"
elif signal.length <= 16:
datatype = "INT16" if signal.is_signed else "UINT16"
elif signal.length <= 32:
datatype = "INT32" if signal.is_signed else "UINT32"
else:
datatype = "INT64" if signal.is_signed else "UINT64"
node = {
"sensor": {
"dataType": datatype,
FQN_KEY: f"{message_branch[FQN_KEY]}.{signal.name}",
}
}
if signal.comment:
node["sensor"]["description"] = signal.comment
if signal.unit:
node["sensor"]["unit"] = signal.unit
if signal.minimum is not None and datatype != "BOOLEAN":
node["sensor"]["min"] = signal.minimum
if signal.maximum is not None and datatype != "BOOLEAN":
node["sensor"]["max"] = signal.maximum
nodes.append(node)
out = json.dumps(nodes, indent=4, sort_keys=True)
args.outfile.write(out)