in deepracer_follow_the_leader_ws/object_detection_pkg/object_detection_pkg/object_detection_node.py [0:0]
def __init__(self, qos_profile):
"""Create a ObjectDetectionNode.
"""
super().__init__('object_detection_node')
self.get_logger().info("object_detection_node started.")
# Double buffer to hold the input images for inference.
self.input_buffer = utils.DoubleBuffer(clear_data_on_get=True)
# Get DEVICE parameter (CPU/MYRIAD) from launch file.
self.declare_parameter("DEVICE")
self.device = self.get_parameter("DEVICE").get_parameter_value().string_value
if not self.device:
self.device = constants.DEVICE
# Check if the inference output needs to be published to localhost using web_video_server
self.declare_parameter("PUBLISH_DISPLAY_OUTPUT")
self.publish_display_output = \
self.get_parameter("PUBLISH_DISPLAY_OUTPUT").get_parameter_value().bool_value
self.get_logger().info(f"Publish output set to {self.publish_display_output}")
# Initialize Intel Inference Engine
self.init_network()
# Calculate target position for bounding box center.
self.target_x, self.target_y = self.calculate_target_center(self.w, self.h)
# Create subscription to sensor messages from camera.
self.image_subscriber = self.create_subscription(EvoSensorMsg,
constants.SENSOR_FUSION_TOPIC,
self.on_image_received_cb,
qos_profile)
# Creating publisher for display_image.
self.display_image_publisher = \
self.create_publisher(Image,
constants.DISPLAY_IMAGE_PUBLISHER_TOPIC,
10)
# Creating publisher for error (delta) from target bb position.
self.delta_publisher = self.create_publisher(DetectionDeltaMsg,
constants.DELTA_PUBLISHER_TOPIC,
qos_profile)
self.bridge = CvBridge()
# Launching a separate thread to run inference.
self.stop_thread = False
self.thread_initialized = False
self.thread = threading.Thread(target=self.run_inference)
self.thread.start()
self.thread_initialized = True
self.get_logger().info(f"Waiting for input images on {constants.SENSOR_FUSION_TOPIC}")