in tools/rossim/rosigen.py [0:0]
def __init__(self, config_filename, values_filename=None):
rclpy.init()
self._node = Node("Rosigen")
self._publishers = {} # Dict of publishers, indexed by ROS2 topic name
self._timers = {} # Dict of timers used to trigger publishing, index by ROS2 topic name
# Dict of ROS2 message values, indexed by ROS2 topic name. Each message is of a ROS2 message
# type, structured according to the corresponding interface definition.
self._vals = {}
config = self._load_json(config_filename)
for topic in config["topics"]:
module = importlib.import_module(topic["module"])
topic_type = getattr(module, topic["type"])
self._publishers[topic["name"]] = self._node.create_publisher(
topic_type, topic["name"], self.PUBLISH_HISTORY_DEPTH
)
self._vals[topic["name"]] = topic_type()
self._timers[topic["name"]] = self._node.create_timer(
topic["period_sec"],
lambda name=topic["name"]: self.publish_single_message(name),
)
if values_filename is not None:
self.load_values(values_filename)
self._thread = Thread(target=self._publish_thread)
self._thread.start()