tools/cansim/can_command_server.py (125 lines of code) (raw):

#!/usr/bin/env python3 # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 import argparse import asyncio import struct import time from threading import Thread import can class CanCommandServer: command_configs = [ { "canRequestId": 0x00000123, # standard 11-bit CAN ID "canResponseId": 0x00000456, # standard 11-bit CAN ID "actuatorName": "Vehicle.actuator6", "argumentType": ">i", # big-endian int32 "inProgressCount": 0, "status": 1, # SUCCEEDED "reasonCode": 0x1234, "reasonDescription": "hello", "responseDelay": 1, }, { "canRequestId": 0x80000789, # extended 29-bit CAN ID (MSB sets extended) "canResponseId": 0x80000ABC, # extended 29-bit CAN ID (MSB sets extended) "actuatorName": "Vehicle.actuator7", "argumentType": ">d", # big-endian double "inProgressCount": 10, "status": 1, # SUCCEEDED "reasonCode": 0x5678, "reasonDescription": "goodbye", "responseDelay": 1, }, ] args = {} # used to record the passed arguments for each execution def is_extended_id(self, can_id): return (can_id & 0x80000000) != 0 def mask_id(self, can_id): return (0x1FFFFFFF if self.is_extended_id(can_id) else 0x7FF) & can_id def pop_string(self): for i in range(len(self.data)): if self.data[i] == 0x00: break else: raise RuntimeError("Could not find null terminator") string_data = self.data[0:i] self.data = self.data[i + 1 :] return string_data.decode("utf-8") def pop_arg(self, struct_type): dummy = struct.pack(struct_type, 0) arg_data = self.data[0 : len(dummy)] self.data = self.data[len(dummy) :] return struct.unpack(struct_type, arg_data)[0] def push_string(self, val): val += "\0" self.data += val.encode("utf-8") def push_arg(self, struct_type, val): self.data += struct.pack(struct_type, val) def handle_message(self, receive_msg): for config in self.command_configs: if receive_msg.arbitration_id == self.mask_id( config["canRequestId"] ) and receive_msg.is_extended_id == self.is_extended_id(config["canRequestId"]): break else: return try: self.data = bytearray(receive_msg.data) command_id = self.pop_string() issued_timestamp_ms = self.pop_arg(">Q") execution_timeout_ms = self.pop_arg(">Q") arg_value = self.pop_arg(config["argumentType"]) print( f"Received request for {config['actuatorName']}" f" with command id {command_id}, value {arg_value}," f" issued timestamp {issued_timestamp_ms}, execution timeout {execution_timeout_ms}" ) self.args[command_id] = arg_value execution_state = {"in_progress_counter": config["inProgressCount"]} def send_response(): status = 10 if execution_state["in_progress_counter"] > 0 else config["status"] print( f"Sending response to request for {config['actuatorName']}" f" with command id {command_id}, status {status}," f" reason code {config['reasonCode']}," f" reason description {config['reasonDescription']}" ) self.data = bytearray() send_msg = can.Message() send_msg.arbitration_id = self.mask_id(config["canResponseId"]) send_msg.is_extended_id = self.is_extended_id(config["canResponseId"]) send_msg.is_fd = True self.push_string(command_id) self.push_arg("B", status) self.push_arg(">I", config["reasonCode"]) self.push_string(config["reasonDescription"]) send_msg.data = self.data send_msg.dlc = len(self.data) self.can_bus.send(send_msg) if execution_state["in_progress_counter"] > 0: execution_state["in_progress_counter"] -= 1 self.loop.call_later(config["responseDelay"], send_response) self.loop.call_later(config["responseDelay"], send_response) except Exception as e: print(e) def __init__(self, interface): self.can_bus = can.Bus(interface, interface="socketcan", fd=True) self.loop = asyncio.new_event_loop() can.Notifier(bus=self.can_bus, listeners=[self.handle_message], loop=self.loop) self.thread = Thread(name="CanCommandServer", target=self.loop.run_forever) self.thread.start() def stop(self): async def stop_loop(): self.loop.stop() asyncio.run_coroutine_threadsafe(stop_loop(), self.loop) self.thread.join() self.can_bus.shutdown() if __name__ == "__main__": parser = argparse.ArgumentParser(description="Runs a server to respond to CAN command messages") parser.add_argument("-i", "--interface", required=True, help="CAN interface, e.g. vcan0") args = parser.parse_args() command_server = CanCommandServer(args.interface) try: while True: time.sleep(60) except KeyboardInterrupt: pass command_server.stop()