in deepracer_offroad_ws/qr_detection_pkg/qr_detection_pkg/qr_detection_node.py [0:0]
def __init__(self, qos_profile):
"""Create a QRDetectionNode.
"""
super().__init__('qr_detection_node')
self.get_logger().info("qr_detection_node started.")
self.detect_waypoint_number = 1
# Double buffer to hold the input images for inference.
self.input_buffer = utils.DoubleBuffer(clear_data_on_get=True)
# Calculate target position to track.
self.target_x, self.target_y = self.calculate_target_reference_point(constants.WIDTH,
constants.HEIGHT)
# Create subscription to sensor messages from camera.
self.image_subscriber = self.create_subscription(EvoSensorMsg,
constants.SENSOR_FUSION_TOPIC,
self.on_image_received_cb,
qos_profile)
# Creating publisher for display_image.
self.display_image_publisher = \
self.create_publisher(Image,
constants.DISPLAY_IMAGE_PUBLISHER_TOPIC,
10)
# Creating publisher for error (delta) from target bb position.
self.delta_publisher = self.create_publisher(DetectionDeltaMsg,
constants.DELTA_PUBLISHER_TOPIC,
qos_profile)
self.bridge = CvBridge()
self.last_three_waypoints = list()
# Keep track of the action running.
self.ongoing_action = None
# Keep a limit counter to run a specific ongoing action for decoded number of frames.
self.limit = 1
# Launching a separate thread to run inference.
self.stop_thread = False
self.thread_initialized = False
self.thread = threading.Thread(target=self.run_waypoint_detection)
self.thread.start()
self.thread_initialized = True
self.get_logger().info(f"Waiting for input images on {constants.SENSOR_FUSION_TOPIC}")