in Advanced workshops/reInvent2019-400/customize/deepracer_racetrack_env_lidar_3cars.py [0:0]
def __init__(self):
# Create the observation space
# self.observation_space = spaces.Box(low=0, high=255,
# shape=(TRAINING_IMAGE_SIZE[1], TRAINING_IMAGE_SIZE[0], 3),
# dtype=np.uint8)
# self.observation_space = spaces.Dict({
# # 'left_camera': spaces.Box(low=0, high=255,
# # shape=(TRAINING_IMAGE_SIZE[1], TRAINING_IMAGE_SIZE[0], 3),
# # dtype=np.uint8),
# 'stereo': spaces.Box(low=0, high=255,
# shape=(TRAINING_IMAGE_SIZE[1], TRAINING_IMAGE_SIZE[0], 2),
# dtype=np.uint8)
# })
self.observation_space = spaces.Dict({
# 'left_camera': spaces.Box(low=0, high=255,
# shape=(TRAINING_IMAGE_SIZE[1], TRAINING_IMAGE_SIZE[0], 3),
# dtype=np.uint8),
'STEREO_CAMERAS': spaces.Box(low=0, high=255,
shape=(TRAINING_IMAGE_SIZE[1], TRAINING_IMAGE_SIZE[0], 2),
dtype=np.uint8),
'LIDAR': spaces.Box(low=0.15, high=1.0, shape=(64,))
})
# Create the action space
self.action_space = spaces.Box(low=np.array([-1, 0]), high=np.array([+1, +1]), dtype=np.float32)
if node_type == SIMULATION_WORKER:
# ROS initialization
rospy.init_node('rl_coach', anonymous=True)
# wait for required services
rospy.wait_for_service('/deepracer_simulation_environment/get_waypoints')
rospy.wait_for_service('/deepracer_simulation_environment/reset_car')
rospy.wait_for_service('/gazebo/get_model_state')
rospy.wait_for_service('/gazebo/get_link_state')
rospy.wait_for_service('/gazebo/clear_joint_forces')
rospy.wait_for_service('/gazebo/spawn_urdf_model')
self.get_model_state = rospy.ServiceProxy('/gazebo/get_model_state', GetModelState)
self.get_link_state = rospy.ServiceProxy('/gazebo/get_link_state', GetLinkState)
self.clear_forces_client = rospy.ServiceProxy('/gazebo/clear_joint_forces', JointRequest)
self.reset_car_client = rospy.ServiceProxy('/deepracer_simulation_environment/reset_car', ResetCarSrv)
get_waypoints_client = rospy.ServiceProxy('/deepracer_simulation_environment/get_waypoints', GetWaypointSrv)
# Create the publishers for sending speed and steering info to the car
self.velocity_pub_dict = OrderedDict()
self.steering_pub_dict = OrderedDict()
for topic in VELOCITY_TOPICS:
self.velocity_pub_dict[topic] = rospy.Publisher(topic, Float64, queue_size=1)
for topic in STEERING_TOPICS:
self.steering_pub_dict[topic] = rospy.Publisher(topic, Float64, queue_size=1)
# Read in parameters
self.world_name = rospy.get_param('WORLD_NAME')
self.job_type = rospy.get_param('JOB_TYPE')
self.aws_region = rospy.get_param('AWS_REGION')
self.metrics_s3_bucket = rospy.get_param('METRICS_S3_BUCKET')
self.metrics_s3_object_key = rospy.get_param('METRICS_S3_OBJECT_KEY')
self.metrics = []
self.simulation_job_arn = 'arn:aws:robomaker:' + self.aws_region + ':' + \
rospy.get_param('ROBOMAKER_SIMULATION_JOB_ACCOUNT_ID') + \
':simulation-job/' + rospy.get_param('AWS_ROBOMAKER_SIMULATION_JOB_ID')
# Setup SIM_TRACE_LOG data upload to s3
# self.setup_simtrace_data_upload_to_s3()
if self.job_type == TRAINING_JOB:
from custom_files.customer_reward_function import reward_function
self.reward_function = reward_function
self.target_number_of_episodes = rospy.get_param('NUMBER_OF_EPISODES')
self.target_reward_score = rospy.get_param('TARGET_REWARD_SCORE')
else:
from markov.defaults import reward_function
self.reward_function = reward_function
self.number_of_trials = 0
self.target_number_of_trials = rospy.get_param('NUMBER_OF_TRIALS')
# Request the waypoints
waypoints = None
try:
resp = get_waypoints_client()
waypoints = np.array(resp.waypoints).reshape(resp.row, resp.col)
except Exception as ex:
utils.json_format_logger("Unable to retrieve waypoints: {}".format(ex),
**utils.build_system_error_dict(utils.SIMAPP_ENVIRONMENT_EXCEPTION,
utils.SIMAPP_EVENT_ERROR_CODE_500))
is_loop = np.all(waypoints[0,:] == waypoints[-1,:])
if 'inch' in self.world_name:
coef = 0.5
else:
coef = 0.6
if is_loop:
self.center_line = LinearRing(waypoints[:,0:2])
self.inner_border = LinearRing(waypoints[:,2:4])
self.outer_border = LinearRing(waypoints[:,4:6])
self.left_lane = LinearRing(waypoints[:, 2:4]*coef + waypoints[:, 0:2]*(1-coef))
self.right_lane = LinearRing(waypoints[:, 4:6]*coef + waypoints[:, 0:2]*(1-coef))
self.road_poly = Polygon(self.outer_border, [self.inner_border])
else:
self.center_line = LineString(waypoints[:,0:2])
self.inner_border = LineString(waypoints[:,2:4])
self.outer_border = LineString(waypoints[:,4:6])
self.left_lane = LinearRing(waypoints[:, 2:4]*coef + waypoints[:, 0:2]*(1-coef))
self.right_lane = LinearRing(waypoints[:, 4:6]*coef + waypoints[:, 0:2]*(1-coef))
self.road_poly = Polygon(np.vstack((self.outer_border, np.flipud(self.inner_border))))
self.center_dists = [self.center_line.project(Point(p), normalized=True) for p in self.center_line.coords[:-1]] + [1.0]
self.track_length = self.center_line.length
self.safe_angle = 30
# Queue used to maintain image consumption synchronicity
self.image_queue_left_camera = queue.Queue(IMG_QUEUE_BUF_SIZE)
rospy.Subscriber('/racecar/camera/zed/rgb/image_rect_color', sensor_image, self.callback_image_left_camera)
self.image_queue_right_camera = queue.Queue(IMG_QUEUE_BUF_SIZE)
rospy.Subscriber('/racecar/camera/zed_right/rgb/image_rect_color_right', sensor_image, self.callback_image_right_camera)
print(self)
# Initialize state data
self.episodes = 0
self.reverse_dir = False
# if self.reverse_dir:
# self.start_ndist_map = [0.28, 0.5, 0.97]
# else:
# self.start_ndist_map = [0.2, 0.42, 0.78] # 36inch
# # self.start_ndist_map = [0.19, 0.67, 0.87] # base
if 'base' in self.world_name:
self.start_ndist_map = [0.19, 0.67, 0.87]
elif 'inch' in self.world_name:
self.start_ndist_map = [0.23, 0.45, 0.93]
elif 'mirror' in self.world_name:
self.start_ndist_map = [0.12, 0.34, 0.83]
else:
self.start_ndist_map = [0.3, 0.6, 0.97]
self.start_ndist_index = 0
# self.start_ndist = 0.0
self.start_ndist = self.start_ndist_map[0]
# self.change_start = rospy.get_param('CHANGE_START_POSITION', (self.job_type == TRAINING_JOB))
self.change_start = bool(rospy.get_param('CHANGE_START_POSITION', True))
self.alternate_dir = rospy.get_param('ALTERNATE_DRIVING_DIRECTION')
# if self.job_type == TRAINING_JOB:
# self.alternate_dir = True
# else:
# self.alternate_dir = True
print("Alternate direction is set to:", self.alternate_dir)
self.is_simulation_done = False
self.steering_angle = 0
self.speed = 0
self.action_taken = 0
self.prev_progress = 0
self.prev_point = Point(0, 0)
self.prev_point_2 = Point(0, 0)
self.next_state = None
self.reward = None
self.reward_in_episode = 0
self.done = False
self.steps = 0
self.simulation_start_time = 0
self.allow_servo_step_signals = False
self.checkpoint_num = 0
# Creating bot cars
self.lanes_list_np_inner = np.array([self.left_lane, self.right_lane], dtype=object)
self.lanes_list_np_outer = np.array([self.right_lane, self.left_lane], dtype=object)
self.lanes_list_np_center = np.array([self.center_line, self.left_lane], dtype=object)
self.lanes_list_np = np.array([self.left_lane, self.right_lane], dtype=object)
# self.bot_cars = [
# BotCarController(car_id=1, start_dist=0.1, speed=0, lanes_list_np=self.lanes_list_np_inner, change_lane_freq_sec=0),
# BotCarController(car_id=2, start_dist=2.9, speed=0, lanes_list_np=self.lanes_list_np_inner, change_lane_freq_sec=0),
# BotCarController(car_id=3, start_dist=7.3, speed=0, lanes_list_np=self.lanes_list_np_inner, change_lane_freq_sec=0),
# BotCarController(car_id=4, start_dist=10.8, speed=0, lanes_list_np=self.lanes_list_np_inner, change_lane_freq_sec=0),
# BotCarController(car_id=5, start_dist=14, speed=0, lanes_list_np=self.lanes_list_np_inner, change_lane_freq_sec=0)
# ]
# # reinvent 36inch
# # inner_lane: 14.7032; outer_lane: 18.1877; ratio=1.237
# # forward dir: inner lane: 0-2.6, 5.1, 7.6-11. outer lane: 0.4-3, 6.2, 9.6-13.3
# # reverse dir: inner lane: 14.2032-1.8, 4.4, 7.3-10.5. outer lane: 0.1-2.5, 5.8, 9.9-12.7
# # reinvent_base
# # inner_lane: 16.5382; outer_lane: 18.8032; ratio=1.137
# # inner lane: 0.1-2.8, 7.3-10.6, 13.9. outer lane: 0.1-2.8, 8.5-11.8, 15.6
# # 36inch
# self.bot_cars = [
# BotCarController(car_id=1, is_inner=True, speed=0.1, lanes_list_np=self.lanes_list_np),
# BotCarController(car_id=2, is_inner=False, speed=0, lanes_list_np=self.lanes_list_np),
# BotCarController(car_id=3, is_inner=True, speed=0.1, lanes_list_np=self.lanes_list_np)
# ]
if 'base' in self.world_name:
# base
self.bot_cars = [
BotCarController(car_id=1, is_inner=True, speed=0.1, lanes_list_np=self.lanes_list_np, world_name=self.world_name),
BotCarController(car_id=2, is_inner=False, speed=0.1, lanes_list_np=self.lanes_list_np, world_name=self.world_name),
BotCarController(car_id=3, is_inner=True, speed=0, lanes_list_np=self.lanes_list_np, world_name=self.world_name)
]
elif 'inch' in self.world_name:
# 36inch
self.bot_cars = [
BotCarController(car_id=1, is_inner=True, speed=0.1, lanes_list_np=self.lanes_list_np, world_name=self.world_name),
BotCarController(car_id=2, is_inner=False, speed=0, lanes_list_np=self.lanes_list_np, world_name=self.world_name),
BotCarController(car_id=3, is_inner=True, speed=0.1, lanes_list_np=self.lanes_list_np, world_name=self.world_name)
]
elif 'mirror' in self.world_name:
# mirror
self.bot_cars = [
BotCarController(car_id=1, is_inner=True, speed=0, lanes_list_np=self.lanes_list_np, world_name=self.world_name),
BotCarController(car_id=2, is_inner=False, speed=0.2, lanes_list_np=self.lanes_list_np, world_name=self.world_name),
BotCarController(car_id=3, is_inner=True, speed=0.1, lanes_list_np=self.lanes_list_np, world_name=self.world_name)
]
else:
# 2019
self.bot_cars = [
BotCarController(car_id=1, is_inner=True, speed=0.16, lanes_list_np=self.lanes_list_np, world_name=self.world_name),
BotCarController(car_id=2, is_inner=False, speed=0.1, lanes_list_np=self.lanes_list_np, world_name=self.world_name),
BotCarController(car_id=3, is_inner=True, speed=0.08, lanes_list_np=self.lanes_list_np, world_name=self.world_name)
]
# print(self.bot_cars[0].shapely_lane.length)
# print(self.bot_cars[1].shapely_lane.length)
self.bot_reset_callback = []
for bot in self.bot_cars:
self.bot_reset_callback.append(rospy.Subscriber('/clock', clock, bot.update_bot_sim))