in tools/cansim/canigen.py [0:0]
def _sig_thread(self, msg_name, cycle_time):
send_time = time.monotonic()
while not self._stop:
msg = self._db.get_message_by_name(msg_name)
vals = {}
for sig in msg.signals:
if sig.name not in self._values["sig"]:
val = self._values["sig"][sig.name] = 0
else:
val = self._values["sig"][sig.name]
vals[sig.name] = 0 if val is None else val
data = msg.encode(vals)
if self._output_file is not None:
self._write_frame(msg, data)
else:
frame = can.Message(
is_extended_id=msg.is_extended_frame,
is_fd=msg.length > 8,
arbitration_id=msg.frame_id,
data=data,
)
try:
for attempt in Retrying(stop=stop_after_attempt(5)):
with attempt:
self._can_bus.send(frame)
except can.CanError:
print("error: failed to push data to CAN bus, transmit buffer is full")
send_time += cycle_time / 1000
sleep_time = send_time - time.monotonic()
if cycle_time >= 5 and sleep_time < 2.0 / 1000.0:
# If multiple signal samples with millisecond time precision are
# put into Timestream with exact same timestamp Timstream dedupes them, which
# might break test scenarios as there will be only one row for multiple samples.
# To avoid sending multiple messages of the same id with a cyclic time
# above 5 milliseconds within the same milliseconds slow the catch-up down
# to have at least 2 milliseconds between messages.
time.sleep(2 / 1000)
elif sleep_time >= 0:
time.sleep(sleep_time)