in deepracer_follow_the_leader_ws/ftl_navigation_pkg/ftl_navigation_pkg/ftl_navigation_node.py [0:0]
def action_publish(self, msg):
"""Function which runs in a separate thread to read object detection delta
from double buffer, decides the action and sends it to servo.
Args:
msg: detection_delta (DetectionDeltaMsg): Message containing the normalized
detection delta in x and y axes respectively passed as a list.
"""
try:
while not self.stop_thread:
# Get a new message to plan action on
detection_delta = self.delta_buffer.get()
action_category = self.plan_action(detection_delta.delta)
msg.angle, msg.throttle = self.get_mapped_action(action_category,
self.max_speed_pct)
# Publish msg based on action planned and mapped from a new object detection.
self.action_publisher.publish(msg)
max_speed_pct = self.max_speed_pct
# Sleep for a default amount of time before checking if new data is available.
time.sleep(constants.DEFAULT_SLEEP)
# If new data is not available within default time, gracefully run blind.
while self.delta_buffer.is_empty() and not self.stop_thread:
# Decrease the max_speed_pct in every iteration so as to halt gradually.
max_speed_pct = max_speed_pct - 0.05
msg.angle, msg.throttle = self.get_mapped_action(action_category,
max_speed_pct)
# Reducing angle value
msg.angle = msg.angle / 2
# Publish blind action
self.action_publisher.publish(msg)
# Sleep before checking if new data is available.
time.sleep(0.1)
except Exception as ex:
self.get_logger().error(f"Failed to publish action to servo: {ex}")
# Stop the car
msg.angle, msg.throttle = constants.ActionValues.DEFAULT, constants.ActionValues.DEFAULT
self.action_publisher.publish(msg)
# Destroy the ROS Node running in another thread as well.
self.destroy_node()
rclpy.shutdown()