in Advanced workshops/AI Driving Olympics 2019/challenge_train_DQN/src/markov/environments/deepracer_racetrack_env.py [0:0]
def __init__(self):
# Create the observation space
self.observation_space = spaces.Box(low=0, high=255,
shape=(TRAINING_IMAGE_SIZE[1], TRAINING_IMAGE_SIZE[0], 3),
dtype=np.uint8)
# Create the action space
self.action_space = spaces.Box(low=np.array([-1, 0]), high=np.array([+1, +1]), dtype=np.float32)
if node_type == SIMULATION_WORKER:
# ROS initialization
rospy.init_node('rl_coach', anonymous=True)
# wait for required services
rospy.wait_for_service('/deepracer_simulation_environment/get_waypoints')
rospy.wait_for_service('/deepracer_simulation_environment/reset_car')
rospy.wait_for_service('/gazebo/get_model_state')
rospy.wait_for_service('/gazebo/get_link_state')
rospy.wait_for_service('/gazebo/clear_joint_forces')
self.get_model_state = rospy.ServiceProxy('/gazebo/get_model_state', GetModelState)
self.get_link_state = rospy.ServiceProxy('/gazebo/get_link_state', GetLinkState)
self.clear_forces_client = rospy.ServiceProxy('/gazebo/clear_joint_forces',
JointRequest)
self.reset_car_client = rospy.ServiceProxy('/deepracer_simulation_environment/reset_car',
ResetCarSrv)
get_waypoints_client = rospy.ServiceProxy('/deepracer_simulation_environment/get_waypoints',
GetWaypointSrv)
# Create the publishers for sending speed and steering info to the car
self.velocity_pub_dict = OrderedDict()
self.steering_pub_dict = OrderedDict()
for topic in VELOCITY_TOPICS:
self.velocity_pub_dict[topic] = rospy.Publisher(topic, Float64, queue_size=1)
for topic in STEERING_TOPICS:
self.steering_pub_dict[topic] = rospy.Publisher(topic, Float64, queue_size=1)
# Read in parameters
self.world_name = rospy.get_param('WORLD_NAME')
self.job_type = rospy.get_param('JOB_TYPE')
self.aws_region = rospy.get_param('AWS_REGION')
self.metrics_s3_bucket = rospy.get_param('METRICS_S3_BUCKET')
self.metrics_s3_object_key = rospy.get_param('METRICS_S3_OBJECT_KEY')
self.metrics = []
self.simulation_job_arn = 'arn:aws:robomaker:' + self.aws_region + ':' + \
rospy.get_param('ROBOMAKER_SIMULATION_JOB_ACCOUNT_ID') + \
':simulation-job/' + rospy.get_param('AWS_ROBOMAKER_SIMULATION_JOB_ID')
if self.job_type == TRAINING_JOB:
from custom_files.customer_reward_function import reward_function
self.reward_function = reward_function
self.metric_name = rospy.get_param('METRIC_NAME')
self.metric_namespace = rospy.get_param('METRIC_NAMESPACE')
self.training_job_arn = rospy.get_param('TRAINING_JOB_ARN')
self.target_number_of_episodes = rospy.get_param('NUMBER_OF_EPISODES')
self.target_reward_score = rospy.get_param('TARGET_REWARD_SCORE')
else:
from markov.defaults import reward_function
self.reward_function = reward_function
self.number_of_trials = 0
self.target_number_of_trials = rospy.get_param('NUMBER_OF_TRIALS')
# Request the waypoints
waypoints = None
try:
resp = get_waypoints_client()
waypoints = np.array(resp.waypoints).reshape(resp.row, resp.col)
except Exception as ex:
utils.json_format_logger("Unable to retrieve waypoints: {}".format(ex),
**utils.build_system_error_dict(utils.SIMAPP_ENVIRONMENT_EXCEPTION,
utils.SIMAPP_EVENT_ERROR_CODE_500))
is_loop = np.all(waypoints[0,:] == waypoints[-1,:])
if is_loop:
self.center_line = LinearRing(waypoints[:,0:2])
self.inner_border = LinearRing(waypoints[:,2:4])
self.outer_border = LinearRing(waypoints[:,4:6])
self.road_poly = Polygon(self.outer_border, [self.inner_border])
else:
self.center_line = LineString(waypoints[:,0:2])
self.inner_border = LineString(waypoints[:,2:4])
self.outer_border = LineString(waypoints[:,4:6])
self.road_poly = Polygon(np.vstack((self.outer_border, np.flipud(self.inner_border))))
self.center_dists = [self.center_line.project(Point(p), normalized=True) for p in self.center_line.coords[:-1]] + [1.0]
self.track_length = self.center_line.length
# Queue used to maintain image consumption synchronicity
self.image_queue = queue.Queue(IMG_QUEUE_BUF_SIZE)
rospy.Subscriber('/camera/zed/rgb/image_rect_color', sensor_image, self.callback_image)
# Initialize state data
self.episodes = 0
self.start_ndist = 0.0
self.reverse_dir = False
self.change_start = rospy.get_param('CHANGE_START_POSITION', (self.job_type == TRAINING_JOB))
self.alternate_dir = rospy.get_param('ALTERNATE_DRIVING_DIRECTION', False)
self.is_simulation_done = False
self.steering_angle = 0
self.speed = 0
self.action_taken = 0
self.prev_progress = 0
self.prev_point = Point(0, 0)
self.prev_point_2 = Point(0, 0)
self.next_state = None
self.reward = None
self.reward_in_episode = 0
self.done = False
self.steps = 0
self.simulation_start_time = 0
self.allow_servo_step_signals = False