in deepracer_navigation_pkg/deepracer_navigation_pkg/deepracer_navigation_node.py [0:0]
def process_inference_data(self, inference_msg, servo_msg):
"""Helper method that selects the class with highest probability amplitude
and maps it to angle and throttle.
Args:
inference_msg (InferResultsArray): Message containing all relevant
inference data.
servo (ServoCtrlMsg): Message to be published to the servo with all
relevant servo data.
"""
# Negative value moves the car forward, positive values move the car backwards
servo_msg.throttle = self.throttle_scale
try:
if self.action_space_type == constants.ActionSpaceTypes.DISCRETE:
max_prob = max(inference_msg.results, key=lambda result: result.class_prob)
action_id = max_prob.class_label
servo_msg.angle = self.get_max_scaled_value(self.action_space[action_id]
[constants.ModelMetadataKeys.STEERING],
constants.ModelMetadataKeys.STEERING)
servo_msg.throttle *= self.get_non_linearly_mapped_speed(self.action_space[action_id]
[constants.ModelMetadataKeys.SPEED])
elif self.action_space_type == constants.ActionSpaceTypes.CONTINUOUS:
action_values = dict()
for result in inference_msg.results:
action_values[int(result.class_label)] = max(min(result.class_prob, 1.0), -1.0)
scaled_angle = self.scale_continuous_value(action_values[0],
-1.0,
1.0,
self.action_space[constants.ModelMetadataKeys.STEERING]
[constants.ModelMetadataKeys.CONTINUOUS_LOW],
self.action_space[constants.ModelMetadataKeys.STEERING]
[constants.ModelMetadataKeys.CONTINUOUS_HIGH])
servo_msg.angle = self.get_max_scaled_value(scaled_angle, constants.ModelMetadataKeys.STEERING)
scaled_throttle = self.scale_continuous_value(action_values[1],
-1.0,
1.0,
self.action_space[constants.ModelMetadataKeys.SPEED]
[constants.ModelMetadataKeys.CONTINUOUS_LOW],
self.action_space[constants.ModelMetadataKeys.SPEED]
[constants.ModelMetadataKeys.CONTINUOUS_HIGH])
servo_msg.throttle *= self.get_non_linearly_mapped_speed(scaled_throttle)
else:
raise Exception("Action space type {} is not supported".format(self.action_space_type))
except Exception as ex:
self.get_logger().error("Error while processing data in navigation node: {}".format(ex))
servo_msg.throttle = 0.0
servo_msg.angle = 0.0