in Advanced workshops/reInvent2019-400/src/markov/environments/deepracer_racetrack_env.py [0:0]
def infer_reward_state(self, steering_angle, speed):
try:
self.set_next_state()
except Exception as ex:
utils.json_format_logger("Unable to retrieve image from queue: {}".format(ex),
**utils.build_system_error_dict(utils.SIMAPP_ENVIRONMENT_EXCEPTION, utils.SIMAPP_EVENT_ERROR_CODE_500))
# Read model state from Gazebo
model_state = self.get_model_state('racecar', '')
model_orientation = Rotation.from_quat([
model_state.pose.orientation.x,
model_state.pose.orientation.y,
model_state.pose.orientation.z,
model_state.pose.orientation.w])
model_location = np.array([
model_state.pose.position.x,
model_state.pose.position.y,
model_state.pose.position.z]) + \
model_orientation.apply(RELATIVE_POSITION_OF_FRONT_OF_CAR)
model_point = Point(model_location[0], model_location[1])
model_heading = model_orientation.as_euler('zyx')[0]
# Read the wheel locations from Gazebo
left_rear_wheel_state = self.get_link_state('racecar::left_rear_wheel', '')
left_front_wheel_state = self.get_link_state('racecar::left_front_wheel', '')
right_rear_wheel_state = self.get_link_state('racecar::right_rear_wheel', '')
right_front_wheel_state = self.get_link_state('racecar::right_front_wheel', '')
wheel_points = [
Point(left_rear_wheel_state.link_state.pose.position.x,
left_rear_wheel_state.link_state.pose.position.y),
Point(left_front_wheel_state.link_state.pose.position.x,
left_front_wheel_state.link_state.pose.position.y),
Point(right_rear_wheel_state.link_state.pose.position.x,
right_rear_wheel_state.link_state.pose.position.y),
Point(right_front_wheel_state.link_state.pose.position.x,
right_front_wheel_state.link_state.pose.position.y)
]
# Project the current location onto the center line and find nearest points
current_ndist = self.center_line.project(model_point, normalized=True)
prev_index, next_index = self.find_prev_next_waypoints(current_ndist)
distance_from_prev = model_point.distance(Point(self.center_line.coords[prev_index]))
distance_from_next = model_point.distance(Point(self.center_line.coords[next_index]))
closest_waypoint_index = (prev_index, next_index)[distance_from_next < distance_from_prev]
# Compute distance from center and road width
nearest_point_center = self.center_line.interpolate(current_ndist, normalized=True)
nearest_point_inner = self.inner_border.interpolate(self.inner_border.project(nearest_point_center))
nearest_point_outer = self.outer_border.interpolate(self.outer_border.project(nearest_point_center))
distance_from_center = nearest_point_center.distance(model_point)
distance_from_inner = nearest_point_inner.distance(model_point)
distance_from_outer = nearest_point_outer.distance(model_point)
track_width = nearest_point_inner.distance(nearest_point_outer)
is_left_of_center = (distance_from_outer < distance_from_inner) if self.reverse_dir \
else (distance_from_inner < distance_from_outer)
# Compute the distance to the closest bot car
is_crashed = False
dist_closest_bot_car = 1e3 # large number
closest_bot = -1 # index of closest bot car
for kk in range(len(self.bot_cars)):
dist_to_bot_car = np.sqrt((model_state.pose.position.x - self.bot_cars[kk].car_model_state.pose.position.x) ** 2
+ (model_state.pose.position.y - self.bot_cars[kk].car_model_state.pose.position.y) ** 2)
if dist_to_bot_car < dist_closest_bot_car:
dist_closest_bot_car = dist_to_bot_car
closest_bot = kk
# if self.bot_cars[closest_bot].shapely_lane.length < 16:
# is_left_lane_bot = False if self.bot_cars[closest_bot].reverse_dir else True
# else:
# is_left_lane_bot = True if self.bot_cars[closest_bot].reverse_dir else False
botcar = self.bot_cars[closest_bot].car_model_state
botcar_orientation = Rotation.from_quat([
botcar.pose.orientation.x,
botcar.pose.orientation.y,
botcar.pose.orientation.z,
botcar.pose.orientation.w])
botcar_heading = botcar_orientation.as_euler('zyx')[0]
botcar_location = np.array([
botcar.pose.position.x,
botcar.pose.position.y,
botcar.pose.position.z]) + \
botcar_orientation.apply(RELATIVE_POSITION_OF_FRONT_OF_CAR)
botcar_point = Point(botcar_location[0], botcar_location[1])
botcar_current_ndist = self.center_line.project(botcar_point, normalized=True)
botcar_prev_index, botcar_next_index = self.find_prev_next_waypoints(botcar_current_ndist)
if next_index <= botcar_next_index: # corner case: last few waypoints
flag_model_car_behind = True
else:
flag_model_car_behind = False
if flag_model_car_behind:
behind_car_heading = model_heading * 180.0 / math.pi
angle_to_front_car = math.atan2(botcar_point.y - model_point.y, botcar_point.x - model_point.x)
angle_to_front_car = angle_to_front_car * 180.0 / math.pi
else:
behind_car_heading = botcar_heading * 180.0 / math.pi
angle_to_front_car = math.atan2(model_point.y - botcar_point.y, model_point.x - botcar_point.x)
angle_to_front_car = angle_to_front_car * 180.0 / math.pi
abs_diff = abs(angle_to_front_car - behind_car_heading)
if abs_diff > 180:
abs_diff = 360 - abs_diff
if abs_diff < self.safe_angle:
flag_unsafe = True
else:
flag_unsafe = False
if flag_unsafe and flag_model_car_behind and dist_closest_bot_car < 0.4:
is_crashed = True
elif flag_unsafe and not flag_model_car_behind and dist_closest_bot_car < 0.3:
is_crashed = True
elif not flag_unsafe and dist_closest_bot_car < 0.25:
is_crashed = True
# SAHIKA: Quantize the shapely projections to 2 digits
current_ndist = np.around(current_ndist, decimals=2)
self.start_ndist = np.around(self.start_ndist, decimals=2)
# Convert current progress to be [0,100] starting at the initial waypoint
if self.reverse_dir:
current_progress = self.start_ndist - current_ndist
else:
current_progress = current_ndist - self.start_ndist
# SAHIKA: adding the error margin because due to quantization errors some close points results in negative error
#if current_progress < 0.0: current_progress = current_progress + 1.0
if current_progress < 0.0:
if abs(current_progress) < 0.011:
current_progress = 0.0
else:
current_progress = current_progress + 1.0
current_progress = 100 * current_progress
if current_progress < self.prev_progress:
# Either: (1) we wrapped around and have finished the track,
delta1 = current_progress + 100 - self.prev_progress
# or (2) for some reason the car went backwards (this should be rare)
delta2 = self.prev_progress - current_progress
current_progress = (self.prev_progress, 100)[delta1 < delta2]
# Car is off track if all wheels are outside the borders
wheel_on_track = [self.road_poly.contains(p) for p in wheel_points]
all_wheels_on_track = all(wheel_on_track)
any_wheels_on_track = any(wheel_on_track)
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# SAHIKA
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
learner_car_position = Point(model_state.pose.position.x, model_state.pose.position.y)
learner_car_progress = self.center_line.project(learner_car_position)
learner_car_lane = self.center_line.contains(learner_car_position)
bot_car_progress = list()
bot_car_lane_match = list()
for kk in range(len(self.bot_cars)):
bot_car_position = Point(self.bot_cars[kk].car_model_state.pose.position.x, self.bot_cars[kk].car_model_state.pose.position.y)
bot_car_progress.append(self.center_line.project(bot_car_position) - learner_car_progress)
bot_car_lane = self.center_line.contains(bot_car_position)
if learner_car_lane == bot_car_lane:
bot_car_lane_match.append(True)
else:
bot_car_lane_match.append(False)
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
bot_car_progress = np.array(bot_car_progress)
if all(bot_car_progress < 0):
ahead_bot_index = np.argmin(bot_car_progress)
ahead_bot_lane_match = bot_car_lane_match[ahead_bot_index]
else:
ahead_bot_index = np.where(bot_car_progress > 0, bot_car_progress, np.inf).argmin()
ahead_bot_lane_match = bot_car_lane_match[ahead_bot_index]
THRESHOLD1, THRESHOLD2 = 5, 50
is_bot_in_left_camera = False
_, thr1 = cv2.threshold(self.next_state['STEREO_CAMERAS'][:,:,0], THRESHOLD1, 1, cv2.THRESH_BINARY)
_, thr2 = cv2.threshold(self.next_state['STEREO_CAMERAS'][:,:,0], THRESHOLD2, 1, cv2.THRESH_BINARY)
thr = cv2.bitwise_xor(thr1, thr2)
thr = cv2.convertScaleAbs(thr)
contours, hierarchy = cv2.findContours(thr, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
bot_size_in_left_camera = float('nan')
for c in contours:
x, y, w, h = cv2.boundingRect(c)
if w * h > 70:
is_bot_in_left_camera = True
bot_size_in_left_camera = w*h
break
is_bot_in_right_camera = False
_, thr1 = cv2.threshold(self.next_state['STEREO_CAMERAS'][:,:,1], THRESHOLD1, 1, cv2.THRESH_BINARY)
_, thr2 = cv2.threshold(self.next_state['STEREO_CAMERAS'][:,:,1], THRESHOLD2, 1, cv2.THRESH_BINARY)
thr = cv2.bitwise_xor(thr1, thr2)
thr = cv2.convertScaleAbs(thr)
contours, hierarchy = cv2.findContours(thr, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
bot_size_in_right_camera = float('nan')
for c in contours:
x, y, w, h = cv2.boundingRect(c)
if w * h > 70:
is_bot_in_right_camera = True
bot_size_in_right_camera = w*h
break
is_bot_in_camera = is_bot_in_left_camera or is_bot_in_right_camera
# Compute the reward
reward = 0.0
if (not is_crashed) and any_wheels_on_track:
done = False
params = {
'all_wheels_on_track': all_wheels_on_track,
'x': model_point.x,
'y': model_point.y,
'heading': model_heading * 180.0 / math.pi,
'distance_from_center': distance_from_center,
'progress': current_progress,
'steps': self.steps,
'speed': speed,
'steering_angle': steering_angle * 180.0 / math.pi,
'track_width': track_width,
'waypoints': list(self.center_line.coords),
'closest_waypoints': [prev_index, next_index],
'is_left_of_center': is_left_of_center,
'is_reversed': self.reverse_dir,
'is_crashed': is_crashed,
'dist_closest_bot': dist_closest_bot_car,
'flag_unsafe': flag_unsafe,
'bot_car_progress': bot_car_progress,
'bot_car_lane_match': ahead_bot_lane_match,
'is_bot_in_camera': is_bot_in_camera,
'bot_size_in_left_camera': bot_size_in_left_camera,
'bot_size_in_right_camera': bot_size_in_right_camera
}
try:
reward = float(self.reward_function(params))
except Exception as e:
utils.json_format_logger("Exception {} in customer reward function. Job failed!".format(e),
**utils.build_user_error_dict(utils.SIMAPP_SIMULATION_WORKER_EXCEPTION,
utils.SIMAPP_EVENT_ERROR_CODE_400))
traceback.print_exc()
utils.simapp_exit_gracefully()
else:
done = True
reward = CRASHED
# Reset if the car position hasn't changed in the last 2 steps
prev_pnt_dist = min(model_point.distance(self.prev_point), model_point.distance(self.prev_point_2))
if prev_pnt_dist <= 0.0001 and self.steps % NUM_STEPS_TO_CHECK_STUCK == 0:
done = True
reward = CRASHED # stuck
# Simulation jobs are done when progress reaches 100
if current_progress >= 100:
done = True
# reward = 1e3
# Keep data from the previous step around
self.prev_point_2 = self.prev_point
self.prev_point = model_point
self.prev_progress = current_progress
# Set the reward and done flag
self.reward = reward
self.reward_in_episode += reward
self.done = done
# Trace logs to help us debug and visualize the training runs
# btown TODO: This should be written to S3, not to CWL.
logger.info('SIM_TRACE_LOG:%d,%d,%.4f,%.4f,%.4f,%.2f,%.2f,%d,%.4f,%s,%s,%.4f,%d,%.2f,%s,%s,%.2f\n' % (
self.episodes, self.steps, model_location[0], model_location[1], model_heading,
self.steering_angle,
self.speed,
self.action_taken,
self.reward,
self.done,
all_wheels_on_track,
current_progress,
closest_waypoint_index,
self.track_length,
time.time(),
is_crashed,
dist_closest_bot_car))
#build json record of the reward metrics
reward_metrics = OrderedDict()
reward_metrics['episode'] = self.episodes
reward_metrics['steps'] = self.steps
reward_metrics['X'] = model_location[0]
reward_metrics['Y'] = model_location[1]
reward_metrics['yaw'] = model_heading
reward_metrics['steer'] = self.steering_angle
reward_metrics['throttle'] = self.speed
reward_metrics['action'] = self.action_taken
reward_metrics['reward'] = self.reward
reward_metrics['done'] = self.done
reward_metrics['all_wheels_on_track'] = all_wheels_on_track
reward_metrics['progress'] = current_progress
reward_metrics['closest_waypoint'] = closest_waypoint_index
reward_metrics['track_len'] = self.track_length
reward_metrics['tstamp'] = time.time()
# self.simtrace_data.write_simtrace_data(reward_metrics)
# Terminate this episode when ready
if done and node_type == SIMULATION_WORKER:
self.finish_episode(current_progress)