in deepracer_follow_the_leader_ws/object_detection_pkg/object_detection_pkg/object_detection_node.py [0:0]
def run_inference(self):
"""Method for running inference on received input image.
"""
try:
while not self.stop_thread:
# Get an input image from double buffer.
sensor_data = self.input_buffer.get()
start_time = time.time()
# Pre-process input.
input_data = {}
input_data[self.input_name] = self.preprocess(sensor_data)
# Perform Inference.
res = self.exec_net.infer(inputs=input_data)
# Read and postprocess output.
res = res[self.out_blob]
boxes, classes = {}, {}
output_data = res[0][0]
detected = False
for number, proposal in enumerate(output_data):
# confidence for the predicted class.
confidence = proposal[2]
if (confidence > constants.CONFIDENCE_THRESHOLD and
constants.COCO_LABELS[proposal[1]] == constants.DETECT_CLASS):
# ID of the image in the batch.
imid = np.int(proposal[0])
# predicted class ID.
label = np.int(proposal[1])
# coordinates of the top left bounding box corner.
# (coordinates are in normalized format, in range [0, 1])
top_left_x = np.int(self.w * proposal[3])
top_left_y = np.int(self.h * proposal[4])
# coordinates of the bottom right bounding box corner.
# (coordinates are in normalized format, in range [0, 1])
bottom_right_x = np.int(self.w * proposal[5])
bottom_right_y = np.int(self.h * proposal[6])
# Calculate bounding box center
bb_center_x, bb_center_y = self.calculate_bb_center(top_left_x,
top_left_y,
bottom_right_x,
bottom_right_y)
# Calculate detection delta.
detection_delta = self.calculate_delta(self.target_x,
self.target_y,
bb_center_x,
bb_center_y)
# Publish to object_detection_delta topic.
self.delta_publisher.publish(detection_delta)
# Set the flag that there is a detected object.
detected = True
if imid not in boxes.keys():
boxes[imid] = []
boxes[imid].append([top_left_x, top_left_y, bottom_right_x, bottom_right_y])
if imid not in classes.keys():
classes[imid] = []
classes[imid].append(label)
# Break as soon as specified class is detected.
break
if not detected:
# Assume being at target position.
detection_delta = self.calculate_delta(self.target_x,
self.target_y,
self.target_x,
self.target_y)
self.delta_publisher.publish(detection_delta)
if self.publish_display_output:
# Change data layout from CHW to HWC.
display_image = input_data[self.input_name].transpose((1, 2, 0))
for imid in classes:
for box in boxes[imid]:
# Drawing bounding boxes on the image.
cv2.rectangle(display_image,
(box[0], box[1]),
(box[2], box[3]),
(232, 35, 244),
2)
# Printing target center on the image.
cv2.circle(display_image,
(int(self.target_x),
int(self.target_y)),
5,
(0, 255, 0),
-1)
# Publish to display topic (Can be viewed on localhost:8080).
display_image = self.bridge.cv2_to_imgmsg(np.array(display_image), "bgr8")
self.display_image_publisher.publish(display_image)
self.get_logger().info(f"Total execution time = {time.time() - start_time}")
except Exception as ex:
self.get_logger().error(f"Failed inference step: {ex}")
# Destroy the ROS Node running in another thread as well.
self.destroy_node()
rclpy.shutdown()