in tools/cansim/canigen.py [0:0]
def _obd_thread(self, ecu):
isotp_socket_phys = None
create_socket = True
while not self._stop:
if create_socket:
create_socket = False
if isotp_socket_phys:
# Wait one sec, to avoid high CPU usage in the case of persistent bus errors
time.sleep(1)
txid = int(ecu["tx_id"], 0)
if txid <= self._MAX_TX_ID_STANDARD:
rxid_phys = txid - 8
rxid_func = self._BROADCAST_ID_STANDARD
else:
rxid_phys = self._MIN_RX_ID_EXTENDED + ((txid & 0xFF) << 8)
rxid_func = self._BROADCAST_ID_EXTENDED
isotp_socket_phys = self._create_isotp_socket(txid, rxid_phys, ecu["zero_padding"])
isotp_socket_func = self._create_isotp_socket(txid, rxid_func, ecu["zero_padding"])
try:
res = select.select([isotp_socket_phys, isotp_socket_func], [], [], 0.5)
if len(res[0]) == 0:
continue
rx = list(res[0][0].recv())
except OSError:
create_socket = True
continue
# print(ecu['name']+' rx: '+str(rx))
sid = rx.pop(0)
try:
tx = [sid | 0x40]
if ecu.get("require_broadcast_requests", False) and res[0][0] != isotp_socket_func:
tx = [0x7F, sid, 0x11] # NRC Service not supported
elif sid == 0x01: # OBD PID
while len(rx) > 0:
pid_num = rx.pop(-1 if self._obd_answer_reverse_order else 0)
if (pid_num % 0x20) == 0: # Supported PIDs
supported, data = self._get_supported_pids(pid_num, ecu)
if (
pid_num == 0
or supported
or not ecu.get("ignore_unsupported_pid_requests", False)
):
tx += [pid_num] + data
else:
data = self._encode_pid_data(pid_num, ecu)
if data is not None:
tx += [pid_num] + data
elif sid == 0x03: # OBD DTCs
num_dtcs = 0
dtc_data = []
for dtc_name in ecu["dtcs"]:
if (
ecu["dtcs"][dtc_name].get("type", "OBD") == "OBD"
and self._values["dtc"][dtc_name]
):
dtc_num = int(ecu["dtcs"][dtc_name]["num"], 16)
dtc_data.append((dtc_num >> 8) & 0xFF)
dtc_data.append(dtc_num & 0xFF)
num_dtcs += 1
tx += [num_dtcs] + dtc_data
elif sid == 0x19: # UDS ReadDTCInformation
subfn = rx.pop(0)
tx += [subfn]
if subfn == 0x02: # reportDTCByStatusMask
status_mask = rx.pop(0)
status_availability_mask = 0xFF # All bits supported
tx.append(status_availability_mask)
for dtc_name in ecu["dtcs"]:
dtc_status = int(self._values["dtc"][dtc_name]) & 0xFF
if (
ecu["dtcs"][dtc_name].get("type", "OBD") == "UDS"
and dtc_status & status_mask
):
dtc_num = int(ecu["dtcs"][dtc_name]["num"], 16)
tx.append((dtc_num >> 16) & 0xFF)
tx.append((dtc_num >> 8) & 0xFF)
tx.append(dtc_num & 0xFF)
tx.append(dtc_status)
elif subfn == 0x03: # reportDTCSnapshotIdentification
for dtc_name in self._values.get("dtc_snapshots", {}):
if (
dtc_name in ecu["dtcs"]
and ecu["dtcs"][dtc_name].get("type", "OBD") == "UDS"
):
dtc_num = int(ecu["dtcs"][dtc_name]["num"], 16)
for record_number in self._values["dtc_snapshots"][dtc_name].keys():
tx.append((dtc_num >> 16) & 0xFF)
tx.append((dtc_num >> 8) & 0xFF)
tx.append(dtc_num & 0xFF)
tx.append(int(record_number))
elif subfn in [
0x04,
0x06,
]: # reportDTCSnapshotRecordByDTCNumber | reportDTCExtDataRecordByDTCNumber
dtc_num = rx.pop(0) << 16 | rx.pop(0) << 8 | rx.pop(0)
record_num = rx.pop(0)
vals = (
self._values.get("dtc_snapshots", {})
if subfn == 0x04
else self._values.get("dtc_ext_data", {})
)
for dtc_name in vals:
if (
dtc_name in ecu["dtcs"]
and ecu["dtcs"][dtc_name].get("type", "OBD") == "UDS"
and dtc_num == int(ecu["dtcs"][dtc_name]["num"], 16)
and str(record_num) in vals[dtc_name]
):
dtc_status = int(self._values["dtc"][dtc_name]) & 0xFF
data = []
for byte in vals[dtc_name][str(record_num)]:
data.append(int(byte, 16))
tx.append((dtc_num >> 16) & 0xFF)
tx.append((dtc_num >> 8) & 0xFF)
tx.append(dtc_num & 0xFF)
tx.append(dtc_status)
tx.append(record_num)
tx += data # Includes number of DIDs as first byte
break
else:
tx = [0x7F, sid, 0x31] # NRC request out of range
else:
tx = [0x7F, sid, 0x12] # NRC subfunction not supported
else:
tx = [0x7F, sid, 0x11] # NRC Service not supported
except IndexError:
tx = [0x7F, sid, 0x13] # NRC incorrectMessageLengthOrInvalidFormat
# print(ecu['name']+' tx: '+str(tx))
if len(tx) > 1:
try:
isotp_socket_phys.send(bytearray(tx))
except Exception:
pass # Ignore timeout errors
isotp_socket_phys.close()
isotp_socket_func.close()