tools/cansim/canigen.py (453 lines of code) (raw):
#!/usr/bin/env python3
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
import json
import select
import struct
import time
from datetime import datetime
from threading import Thread
import can
import cantools
import isotp
from tenacity import Retrying, stop_after_attempt
if __name__ == "__main__":
import argparse
from prompt_toolkit import PromptSession
from prompt_toolkit.completion import NestedCompleter, PathCompleter, WordCompleter
class Canigen:
_BROADCAST_ID_STANDARD = 0x7DF
_BROADCAST_ID_EXTENDED = 0x18DB33F1
_MAX_TX_ID_STANDARD = 0x7EF
_MIN_RX_ID_EXTENDED = 0x18DA00F1
def __init__(
self,
interface,
output_filename=None,
database_filename=None,
values_filename=None,
obd_config_filename=None,
default_cycle_time_ms=100,
obd_answer_reverse_order=False,
):
self._stop = False
self._interface = interface
self._output_file = None
self._sig_names = []
self._obd_answer_reverse_order = obd_answer_reverse_order
fd = False
self._values = {"sig": {}, "pid": {}, "dtc": {}, "dtc_snapshots": {}, "dtc_ext_data": {}}
if database_filename is not None:
self._db = cantools.database.load_file(database_filename)
for msg in self._db.messages:
if not fd and msg.length > 8:
fd = True
for sig in msg.signals:
self._sig_names.append(sig.name)
self._values["sig"][sig.name] = (
sig.offset if sig.initial is None else sig.initial
)
if values_filename is not None:
self._values = self._load_json(values_filename)
if output_filename is not None:
self._output_file = open(output_filename, "w")
else:
self._can_bus = can.interface.Bus(self._interface, interface="socketcan", fd=fd)
self._obd_config = {}
self._pid_names = []
self._dtc_names = []
if obd_config_filename is not None:
self._obd_config = self._load_json(obd_config_filename)
for ecu in self._obd_config["ecus"]:
for pid_name in ecu["pids"]:
self._pid_names.append(pid_name)
if pid_name not in self._values["pid"]:
self._values["pid"][pid_name] = 0.0
for dtc_name in ecu["dtcs"]:
self._dtc_names.append(dtc_name)
if dtc_name not in self._values["dtc"]:
self._values["dtc"][dtc_name] = 0.0
if dtc_name not in self._values["dtc_snapshots"]:
self._values["dtc_snapshots"][dtc_name] = {}
if dtc_name not in self._values["dtc_ext_data"]:
self._values["dtc_ext_data"][dtc_name] = {}
self._threads = []
if database_filename is not None:
for msg in self._db.messages:
cycle_time = msg.cycle_time
if cycle_time is None or cycle_time == 0:
cycle_time = default_cycle_time_ms
print(
f"Warning: Cycle time is None or zero for frame '{msg.name}',"
f" setting it to default of {default_cycle_time_ms} ms"
)
thread = Thread(
name=f"CanigenSignals-{msg.frame_id}",
target=self._sig_thread,
args=(
msg.name,
cycle_time,
),
)
thread.start()
self._threads.append(thread)
if obd_config_filename is not None:
for ecu in self._obd_config["ecus"]:
thread = Thread(
name=f"CanigenObd-{ecu['tx_id']}", target=self._obd_thread, args=(ecu,)
)
thread.start()
self._threads.append(thread)
def stop(self):
self._stop = True
for thread in self._threads:
thread.join()
if self._output_file:
self._output_file.close()
def _load_json(self, filename):
try:
with open(filename) as fp:
return json.load(fp)
except Exception:
print("error: failed to load " + filename)
raise
def _save_json(self, filename, data):
try:
with open(filename, "w") as fp:
return json.dump(data, fp, sort_keys=True, indent=4)
except Exception:
print("error: failed to save " + filename)
def _write_frame(self, msg, data):
if msg.is_extended_frame:
can_id = "%07X" % msg.frame_id
else:
can_id = "%03X" % msg.frame_id
data_hex = ""
for byte in data:
data_hex += "%02X" % byte
self._output_file.write(
"({:f}) {} {}#{}\n".format(
datetime.now().timestamp(), self._interface, can_id, data_hex
)
)
def _sig_thread(self, msg_name, cycle_time):
send_time = time.monotonic()
while not self._stop:
msg = self._db.get_message_by_name(msg_name)
vals = {}
for sig in msg.signals:
if sig.name not in self._values["sig"]:
val = self._values["sig"][sig.name] = 0
else:
val = self._values["sig"][sig.name]
vals[sig.name] = 0 if val is None else val
data = msg.encode(vals)
if self._output_file is not None:
self._write_frame(msg, data)
else:
frame = can.Message(
is_extended_id=msg.is_extended_frame,
is_fd=msg.length > 8,
arbitration_id=msg.frame_id,
data=data,
)
try:
for attempt in Retrying(stop=stop_after_attempt(5)):
with attempt:
self._can_bus.send(frame)
except can.CanError:
print("error: failed to push data to CAN bus, transmit buffer is full")
send_time += cycle_time / 1000
sleep_time = send_time - time.monotonic()
if cycle_time >= 5 and sleep_time < 2.0 / 1000.0:
# If multiple signal samples with millisecond time precision are
# put into Timestream with exact same timestamp Timstream dedupes them, which
# might break test scenarios as there will be only one row for multiple samples.
# To avoid sending multiple messages of the same id with a cyclic time
# above 5 milliseconds within the same milliseconds slow the catch-up down
# to have at least 2 milliseconds between messages.
time.sleep(2 / 1000)
elif sleep_time >= 0:
time.sleep(sleep_time)
def _get_supported_pids(self, num_range, ecu):
supported = False
out = [0, 0, 0, 0]
for data in ecu["pids"].values():
pid_num = int(data["num"], 0)
if pid_num >= num_range and pid_num < (num_range + 0x20):
i = int((pid_num - num_range - 1) / 8)
j = (pid_num - num_range - 1) % 8
out[i] |= 1 << (7 - j)
supported = True
return supported, out
def _encode_pid_data(self, num, ecu):
for name, data in ecu["pids"].items():
if num == int(data["num"], 0):
scaled_val = (self._values["pid"][name] + data["offset"]) * data["scale"]
is_float = data.get("signal_value_type", "INTEGER") == "FLOATING_POINT"
if is_float:
if data["size"] == 4:
return list(struct.pack(">f", scaled_val))
elif data["size"] == 8:
return list(struct.pack(">d", scaled_val))
else:
print(
"error: invalid size for floating point value,"
f" expected 4 or 8 bytes but got {data['size']}"
)
return None
else:
val = int(scaled_val)
out = []
for i in range(data["size"]):
out.append((val >> ((data["size"] - i - 1) * 8)) & 0xFF)
return out
return None
def _create_isotp_socket(self, txid, rxid, zero_padding):
s = isotp.socket()
if zero_padding:
s.set_opts(txpad=0, rxpad=0)
addressing_mode = (
isotp.AddressingMode.Normal_11bits
if txid <= self._MAX_TX_ID_STANDARD
else isotp.AddressingMode.Normal_29bits
)
s.bind(
self._interface,
isotp.Address(addressing_mode=addressing_mode, rxid=rxid, txid=txid),
)
return s
def _obd_thread(self, ecu):
isotp_socket_phys = None
create_socket = True
while not self._stop:
if create_socket:
create_socket = False
if isotp_socket_phys:
# Wait one sec, to avoid high CPU usage in the case of persistent bus errors
time.sleep(1)
txid = int(ecu["tx_id"], 0)
if txid <= self._MAX_TX_ID_STANDARD:
rxid_phys = txid - 8
rxid_func = self._BROADCAST_ID_STANDARD
else:
rxid_phys = self._MIN_RX_ID_EXTENDED + ((txid & 0xFF) << 8)
rxid_func = self._BROADCAST_ID_EXTENDED
isotp_socket_phys = self._create_isotp_socket(txid, rxid_phys, ecu["zero_padding"])
isotp_socket_func = self._create_isotp_socket(txid, rxid_func, ecu["zero_padding"])
try:
res = select.select([isotp_socket_phys, isotp_socket_func], [], [], 0.5)
if len(res[0]) == 0:
continue
rx = list(res[0][0].recv())
except OSError:
create_socket = True
continue
# print(ecu['name']+' rx: '+str(rx))
sid = rx.pop(0)
try:
tx = [sid | 0x40]
if ecu.get("require_broadcast_requests", False) and res[0][0] != isotp_socket_func:
tx = [0x7F, sid, 0x11] # NRC Service not supported
elif sid == 0x01: # OBD PID
while len(rx) > 0:
pid_num = rx.pop(-1 if self._obd_answer_reverse_order else 0)
if (pid_num % 0x20) == 0: # Supported PIDs
supported, data = self._get_supported_pids(pid_num, ecu)
if (
pid_num == 0
or supported
or not ecu.get("ignore_unsupported_pid_requests", False)
):
tx += [pid_num] + data
else:
data = self._encode_pid_data(pid_num, ecu)
if data is not None:
tx += [pid_num] + data
elif sid == 0x03: # OBD DTCs
num_dtcs = 0
dtc_data = []
for dtc_name in ecu["dtcs"]:
if (
ecu["dtcs"][dtc_name].get("type", "OBD") == "OBD"
and self._values["dtc"][dtc_name]
):
dtc_num = int(ecu["dtcs"][dtc_name]["num"], 16)
dtc_data.append((dtc_num >> 8) & 0xFF)
dtc_data.append(dtc_num & 0xFF)
num_dtcs += 1
tx += [num_dtcs] + dtc_data
elif sid == 0x19: # UDS ReadDTCInformation
subfn = rx.pop(0)
tx += [subfn]
if subfn == 0x02: # reportDTCByStatusMask
status_mask = rx.pop(0)
status_availability_mask = 0xFF # All bits supported
tx.append(status_availability_mask)
for dtc_name in ecu["dtcs"]:
dtc_status = int(self._values["dtc"][dtc_name]) & 0xFF
if (
ecu["dtcs"][dtc_name].get("type", "OBD") == "UDS"
and dtc_status & status_mask
):
dtc_num = int(ecu["dtcs"][dtc_name]["num"], 16)
tx.append((dtc_num >> 16) & 0xFF)
tx.append((dtc_num >> 8) & 0xFF)
tx.append(dtc_num & 0xFF)
tx.append(dtc_status)
elif subfn == 0x03: # reportDTCSnapshotIdentification
for dtc_name in self._values.get("dtc_snapshots", {}):
if (
dtc_name in ecu["dtcs"]
and ecu["dtcs"][dtc_name].get("type", "OBD") == "UDS"
):
dtc_num = int(ecu["dtcs"][dtc_name]["num"], 16)
for record_number in self._values["dtc_snapshots"][dtc_name].keys():
tx.append((dtc_num >> 16) & 0xFF)
tx.append((dtc_num >> 8) & 0xFF)
tx.append(dtc_num & 0xFF)
tx.append(int(record_number))
elif subfn in [
0x04,
0x06,
]: # reportDTCSnapshotRecordByDTCNumber | reportDTCExtDataRecordByDTCNumber
dtc_num = rx.pop(0) << 16 | rx.pop(0) << 8 | rx.pop(0)
record_num = rx.pop(0)
vals = (
self._values.get("dtc_snapshots", {})
if subfn == 0x04
else self._values.get("dtc_ext_data", {})
)
for dtc_name in vals:
if (
dtc_name in ecu["dtcs"]
and ecu["dtcs"][dtc_name].get("type", "OBD") == "UDS"
and dtc_num == int(ecu["dtcs"][dtc_name]["num"], 16)
and str(record_num) in vals[dtc_name]
):
dtc_status = int(self._values["dtc"][dtc_name]) & 0xFF
data = []
for byte in vals[dtc_name][str(record_num)]:
data.append(int(byte, 16))
tx.append((dtc_num >> 16) & 0xFF)
tx.append((dtc_num >> 8) & 0xFF)
tx.append(dtc_num & 0xFF)
tx.append(dtc_status)
tx.append(record_num)
tx += data # Includes number of DIDs as first byte
break
else:
tx = [0x7F, sid, 0x31] # NRC request out of range
else:
tx = [0x7F, sid, 0x12] # NRC subfunction not supported
else:
tx = [0x7F, sid, 0x11] # NRC Service not supported
except IndexError:
tx = [0x7F, sid, 0x13] # NRC incorrectMessageLengthOrInvalidFormat
# print(ecu['name']+' tx: '+str(tx))
if len(tx) > 1:
try:
isotp_socket_phys.send(bytearray(tx))
except Exception:
pass # Ignore timeout errors
isotp_socket_phys.close()
isotp_socket_func.close()
def get_sig_names(self):
return self._sig_names
def get_pid_names(self):
return self._pid_names
def get_dtc_names(self):
return self._dtc_names
def set_value(self, val_type, name, value):
self._values[val_type][name] = value
def set_sig(self, name, value):
self._values["sig"][name] = value
def set_pid(self, name, value):
self._values["pid"][name] = value
def set_dtc(self, name, value):
self._values["dtc"][name] = value
def set_dtc_snapshot(self, name, record_number, data):
self._values["dtc_snapshots"][name][str(record_number)] = data
def set_dtc_ext_data(self, name, record_number, data):
self._values["dtc_ext_data"][name][str(record_number)] = data
def get_value(self, val_type, name):
return self._values[val_type][name]
def get_sig(self, name):
return self._values["sig"][name]
def get_pid(self, name):
return self._values["pid"][name]
def get_dtc(self, name):
return self._values["dtc"][name]
def get_dtc_snapshot(self, name, record_number):
return self._values["dtc_snapshot"][name][str(record_number)]
def get_dtc_ext_data(self, name, record_number):
return self._values["dtc_ext_data"][name][str(record_number)]
def load_values(self, filename):
self._values = self._load_json(filename)
def save_values(self, filename):
self._save_json(filename, self._values)
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description=(
"Generates SocketCAN messages interactively according to a DBC file and OBD config"
)
)
parser.add_argument("-i", "--interface", required=True, help="CAN interface, e.g. vcan0")
parser.add_argument("-d", "--database", help="DBC file")
parser.add_argument("-f", "--output_filename", help="Output to file in canplayer format")
parser.add_argument("-o", "--obd_config", help="OBD config JSON file")
parser.add_argument("-v", "--values", help="Values JSON file")
args = parser.parse_args()
if args.output_filename is None and args.interface is None:
print("error: --interface argument is required")
exit(1)
c = Canigen(
args.interface,
args.output_filename,
args.database,
args.values,
args.obd_config,
)
sig_completer = WordCompleter(c.get_sig_names())
path_completer = PathCompleter()
pid_completer = WordCompleter(c.get_pid_names())
dtc_completer = WordCompleter(c.get_dtc_names())
cmd_completion_dict = {
"set": {"sig": sig_completer, "pid": pid_completer, "dtc": dtc_completer},
"get": {"sig": sig_completer, "pid": pid_completer, "dtc": dtc_completer},
"exit": None,
"load": path_completer,
"save": path_completer,
}
cmd_completer = NestedCompleter.from_nested_dict(cmd_completion_dict)
def print_help():
print("Usage:")
print(" set sig <SIGNAL> <VALUE>")
print(" get sig <SIGNAL>")
print(" set pid <PID> <VALUE>")
print(" get pid <PID>")
print(" set dtc <DTC> <FAILED>")
print(" get dtc <DTC>")
print(" load <VALUE_JSON_FILE>")
print(" save <VALUE_JSON_FILE>")
session = PromptSession()
try:
while True:
cmd = session.prompt("canigen$ ", completer=cmd_completer).split()
try:
if len(cmd) == 0:
pass
elif cmd[0] == "exit" or cmd[0] == "quit":
break
elif cmd[0] == "set":
try:
c.set_value(cmd[1], cmd[2], float(cmd[3]))
except Exception:
print("error: invalid value")
elif cmd[0] == "get":
print(c.get_value(cmd[1], cmd[2]))
elif cmd[0] == "load":
try:
c.load_values(cmd[1])
except Exception:
pass
elif cmd[0] == "save":
c.save_values(cmd[1])
else:
print_help()
except Exception:
print("error: invalid command")
print_help()
except KeyboardInterrupt:
pass
except Exception:
print("error: unknown")
c.stop()