in src/markov/agent_ctrl/utils.py [0:0]
def set_reward_and_metrics(reward_params, step_metrics, agent_name, pos_dict, track_data,
data_dict, action, json_actions, car_pose):
'''Populates the reward_params and step_metrics dictionaries with the common
metrics and parameters.
reward_params - Dictionary containing the input parameters to the reward function
step_metrics - Dictionary containing the metrics that are sent to s3
agent_name - String of agent name
pos_dict - Dictionary containing the agent position data, keys defined in AgentPos
track_data - Object containing all the track information and geometry
data_dict - Dictionary containing previous progress, steps, and start distance
action - Integer containing the action to take
json_actions - Dictionary that maps action into steering and angle
car_pose - Gazebo Pose of the agent
'''
try:
# Check that the required keys are present in the dicts that are being
# passed in, these methods will throw an exception if a key is missing
RewardParam.validate_dict(reward_params)
# model point and distance
model_point = pos_dict[AgentPos.POINT.value]
current_ndist = track_data.get_norm_dist(model_point)
prev_index, next_index = track_data.find_prev_next_waypoints(current_ndist,
normalized=True)
# model progress starting at the initial waypoint
reverse_dir = track_data.reverse_dir
if reverse_dir:
reward_params[const.RewardParam.LEFT_CENT.value[0]] = \
not reward_params[const.RewardParam.LEFT_CENT.value[0]]
current_progress = current_ndist - data_dict['start_ndist']
current_progress = compute_current_prog(current_progress,
data_dict['prev_progress'])
# Geat the nearest points
nearest_pnts_dict = track_data.get_nearest_points(model_point)
# Compute distance from center and road width
nearest_dist_dict = track_data.get_nearest_dist(nearest_pnts_dict, model_point)
# Compute the distance from the previous and next points
distance_from_prev, distance_from_next = \
track_data.get_distance_from_next_and_prev(model_point, prev_index,
next_index)
# Compute which points are on the track
wheel_on_track = track_data.points_on_track(pos_dict[AgentPos.LINK_POINTS.value])
# Get the model orientation
model_orientation = pos_dict[AgentPos.ORIENTATION.value]
# Set the reward and metric parameters
step_metrics[StepMetrics.STEPS.value] = \
reward_params[RewardParam.STEPS.value[0]] = data_dict['steps']
reward_params[RewardParam.REVERSE.value[0]] = reverse_dir
step_metrics[StepMetrics.PROG.value] = \
reward_params[RewardParam.PROG.value[0]] = current_progress
reward_params[RewardParam.CENTER_DIST.value[0]] = \
nearest_dist_dict[TrackNearDist.NEAR_DIST_CENT.value]
reward_params[RewardParam.PROJECTION_DISTANCE.value[0]] = \
current_ndist * track_data.get_track_length()
reward_params[RewardParam.CLS_WAYPNY.value[0]] = [prev_index, next_index]
reward_params[RewardParam.LEFT_CENT.value[0]] = \
(nearest_dist_dict[TrackNearDist.NEAR_DIST_IN.value] < \
nearest_dist_dict[TrackNearDist.NEAR_DIST_OUT.value]) ^ (not track_data.is_ccw)
reward_params[RewardParam.WAYPNTS.value[0]] = track_data.get_way_pnts()
reward_params[RewardParam.TRACK_WIDTH.value[0]] = \
nearest_pnts_dict[TrackNearPnts.NEAR_PNT_IN.value] \
.distance(nearest_pnts_dict[TrackNearPnts.NEAR_PNT_OUT.value])
reward_params[RewardParam.TRACK_LEN.value[0]] = track_data.get_track_length()
step_metrics[StepMetrics.X.value] = \
reward_params[RewardParam.X.value[0]] = model_point.x
step_metrics[StepMetrics.Y.value] = \
reward_params[RewardParam.Y.value[0]] = model_point.y
step_metrics[StepMetrics.YAW.value] = \
reward_params[RewardParam.HEADING.value[0]] = \
Rotation.from_quat(model_orientation).as_euler('zyx')[0] * 180.0 / math.pi
step_metrics[StepMetrics.CLS_WAYPNT.value] = \
next_index if distance_from_next < distance_from_prev else prev_index
step_metrics[StepMetrics.TRACK_LEN.value] = track_data.get_track_length()
step_metrics[StepMetrics.STEER.value] = \
reward_params[RewardParam.STEER.value[0]] = \
float(json_actions[ModelMetadataKeys.STEERING_ANGLE.value])
step_metrics[StepMetrics.THROTTLE.value] = \
reward_params[RewardParam.SPEED.value[0]] = \
float(json_actions[ModelMetadataKeys.SPEED.value])
step_metrics[StepMetrics.WHEELS_TRACK.value] = \
reward_params[RewardParam.WHEELS_ON_TRACK.value[0]] = all(wheel_on_track)
step_metrics[StepMetrics.ACTION.value] = action
# set extra reward param for obstacle
model_heading = reward_params[RewardParam.HEADING.value[0]]
obstacle_reward_params = track_data.get_object_reward_params(agent_name,
model_point,
car_pose)
if obstacle_reward_params:
reward_params.update(obstacle_reward_params)
except KeyError as ex:
raise GenericRolloutException("Key {}, not found".format(ex))
except Exception as ex:
raise GenericRolloutException('Cannot compute reward and metrics: {}'.format(ex))