tools/can-to-someip/can-to-someip.py (175 lines of code) (raw):

#!/usr/bin/env python3 # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 import argparse import asyncio import logging import socket import textwrap import can from someip.sd import ServiceDiscoveryProtocol from someip.service import SimpleEventgroup, SimpleService log = logging.getLogger(__name__) class CanEventGroup(SimpleEventgroup): def __init__(self, service: SimpleService, event_group_id: int, event_id: int): super().__init__(service, id=event_group_id, interval=None) self._event_id = event_id self._queue = asyncio.Queue() async def add_message(self, data: bytes): # We can't just send the data here because we need to set the data in self.values and # then call self.notify_once. But self.notify_once happens asynchronously and it doesn't # copy the data. So by the time it sends the data it might be already overwritten. # We need to queue it and have them sent one by one by a single coroutine. await self._queue.put(data) async def run_send_messages(self): try: while True: data = await self._queue.get() self.values[self._event_id] = data # Using the non-public self._notify_all method as self.notify_once doesn't return # the coroutine so we can't wait for it to finish. await self._notify_all(events=[self._event_id], label="event") except asyncio.CancelledError: pass class CanListener(can.Listener): def __init__(self, event_loop: asyncio.AbstractEventLoop, can_service) -> None: super().__init__() self._event_loop = event_loop self._can_service = can_service def on_message_received(self, message: can.Message): timestamp_us = int(message.timestamp * 1e6) payload = b"".join( [ message.arbitration_id.to_bytes(4, "big"), timestamp_us.to_bytes(8, "big"), bytes(message.data), ] ) asyncio.run_coroutine_threadsafe( self._can_service.event_group.add_message(payload), self._event_loop ) async def create_service( local_addr: str, multicast_addr: str, port: int, service_id: int, instance_id: int, event_id: int, event_group_id: int, ): log.info( "Creating SOME/IP service" f" service_id=0x{service_id:02X}" f" instance_id=0x{instance_id:02X}" f" event_id=0x{event_id:02X}" f" event_group_id=0x{event_group_id:02X}" ) sd_trsp_u, sd_trsp_m, sd_prot = await ServiceDiscoveryProtocol.create_endpoints( family=socket.AF_INET, local_addr=local_addr, multicast_addr=multicast_addr ) sd_prot.timings.CYCLIC_OFFER_DELAY = 2 class_service_id = service_id class CanService(SimpleService): service_id = class_service_id # This can only be set as class variable version_major = 0 # interface_version version_minor = 0 def __init__(self, instance_id: int, event_group_id: int, event_id: int): super().__init__(instance_id) self.event_group = CanEventGroup(self, event_group_id=event_group_id, event_id=event_id) self.register_eventgroup(self.event_group) can_service: CanService _, can_service = await CanService.create_unicast_endpoint( instance_id=instance_id, local_addr=(local_addr, port), event_group_id=event_group_id, event_id=event_id, ) can_service.start_announce(sd_prot.announcer) return sd_trsp_u, sd_trsp_m, sd_prot, can_service async def run(sd_trsp_u, sd_trsp_m, sd_prot, can_service): sd_prot.start() asyncio.get_event_loop().create_task(can_service.event_group.run_send_messages()) try: while True: await asyncio.sleep(1) except asyncio.CancelledError: pass finally: sd_prot.stop() sd_trsp_u.close() sd_trsp_m.close() can_service.stop() if __name__ == "__main__": logging.basicConfig(level=logging.INFO) class CustomArgumentParser(argparse.ArgumentParser): def format_help(self): return super().format_help() + textwrap.dedent( """ The CAN data and metadata are sent as SOME/IP payload in the following format: _________________________________________________________ | CAN ID | Timestamp (in us) | CAN data | |___________|_____________________|_______________________| 4 bytes 8 bytes variable length CAN ID and Timestamp are unsigned integers encoded in network byte order (big endian). CAN ID is in the SocketCAN format: https://github.com/linux-can/can-utils/blob/88f0c753343bd863dd3110812d6b4698c4700b26/include/linux/can.h#L66-L78 """ # noqa: E501 ) parser = CustomArgumentParser( prog="can-to-someip", description="Listen to CAN messages and offer them as a SOME/IP service", ) parser.add_argument( "-i", "--can-interface", required=True, help="The CAN interface to listen to, e.g. vcan0", ) parser.add_argument( "--local-addr", required=True, help="The unicast IP address of this SOME/IP service. It should match the address of an" " existing network interface.", ) parser.add_argument( "--local-port", type=int, default=0, help="The port this SOME/IP service will listen to", ) parser.add_argument( "--multicast-addr", required=True, help="The multicast address that will be used for service discovery, e.g. 224.224.224.245", ) parser.add_argument( "--service-id", type=int, default=0x7777, help="The service id that will be announced to other SOME/IP applications", ) parser.add_argument( "--instance-id", type=int, default=0x5678, help="The instance id of this service that will be announced. Only a single instance will" " be created.", ) parser.add_argument( "--event-id", type=int, default=0x8778, metavar="[0x8000-0xFFFE]", help="ID of SOME/IP event that will be offered." " All CAN data is sent with the same event ID.", ) parser.add_argument( "--event-group-id", type=int, default=0x5555, help="ID of SOME/IP event group that will be offered." " Other applications will be able to subscribe to this event group.", ) args = parser.parse_args() if args.event_id < 0x8000 or args.event_id > 0xFFFE: raise ValueError("Event ID must be in the range [0x8000-0xFFFE]") (sd_trsp_u, sd_trsp_m, sd_prot, can_service) = asyncio.get_event_loop().run_until_complete( create_service( local_addr=args.local_addr, multicast_addr=args.multicast_addr, port=args.local_port, service_id=args.service_id, instance_id=args.instance_id, event_id=args.event_id, event_group_id=args.event_group_id, ) ) with can.interface.Bus(args.can_interface, bustype="socketcan") as can_bus: # The listener runs on another thread, so we need to explicitly pass the main thread's event # loop notifier = can.Notifier(can_bus, [CanListener(asyncio.get_event_loop(), can_service)]) try: asyncio.get_event_loop().run_until_complete( run(sd_trsp_u, sd_trsp_m, sd_prot, can_service) ) except KeyboardInterrupt: pass