in src/markov/agent_ctrl/rollout_agent_ctrl.py [0:0]
def __init__(self, config_dict, run_phase_sink, metrics):
'''config_dict (dict): containing all the keys in ConfigParams
run_phase_sink (RunPhaseSubject): Sink to receive notification of a change in run phase
metrics (EvalMetrics/TrainingMetrics): Training or evaluation metrics
'''
self._effect = None
self._current_sim_time = 0
self._ctrl_status = dict()
self._ctrl_status[AgentCtrlStatus.AGENT_PHASE.value] = AgentPhase.PREPARE.value
self._pause_duration = 0.0
# virtual event
self._is_virtual_event = config_dict.get(const.ConfigParams.IS_VIRTUAL_EVENT.value, False)
self._speed_mode = CarControlMode.MODEL_SPEED.value
self._speed_value = 0.0
self._race_car_ctrl_status = CarControlStatus.RESUME.value
self._start_sim_time = None
self._reset_behind_dist = float(rospy.get_param("RESET_BEHIND_DIST", const.DEFAULT_RESET_BEHIND_DIST))
# thread lock
self._lock = RLock()
# reset rules manager
self._metrics = metrics
self._is_continuous = config_dict[const.ConfigParams.IS_CONTINUOUS.value]
self._reset_rules_manager = construct_reset_rules_manager(config_dict)
self._config_dict = config_dict
self._done_condition = config_dict.get(const.ConfigParams.DONE_CONDITION.value, any)
self._number_of_resets = config_dict[const.ConfigParams.NUMBER_OF_RESETS.value]
self._penalties = {EpisodeStatus.OFF_TRACK.value: config_dict.get(const.ConfigParams.OFF_TRACK_PENALTY.value, 0.0),
EpisodeStatus.CRASHED.value: config_dict.get(const.ConfigParams.COLLISION_PENALTY.value, 0.0),
EpisodeStatus.REVERSED.value: config_dict.get(const.ConfigParams.REVERSE_PENALTY.value,
config_dict.get(const.ConfigParams.OFF_TRACK_PENALTY.value, 0.0)),
EpisodeStatus.IMMOBILIZED.value: config_dict.get(const.ConfigParams.IMMOBILIZED_PENALTY.value, 0.0)}
self._reset_count = 0
self._curr_crashed_object_name = ''
self._simapp_version_ = config_dict[const.ConfigParams.VERSION.value]
if self._is_virtual_event:
# If virtual event then override the simapp version to the latest version.
self._simapp_version_ = SIMAPP_VERSION_4
# simapp_version speed scale
self._wheel_radius_ = get_wheel_radius(self._simapp_version_)
# Store the name of the agent used to set agents position on the track
self._agent_name_ = config_dict[const.ConfigParams.AGENT_NAME.value]
# For single agent, agent_name racecar will return None as index. For multi agents case
# postfix x such as racecar_x will be returned as agent index.
self._agent_idx_ = get_racecar_idx(self._agent_name_)
# Get track data
self._track_data_ = TrackData.get_instance()
# Set start lane
if self._agent_idx_ is not None:
self._start_lane_ = self._track_data_.inner_lane \
if self._agent_idx_ % 2 else self._track_data_.outer_lane
else:
self._start_lane_ = self._track_data_.center_line
# Store the name of the links in the agent, this should be const
self._agent_link_name_list_ = config_dict[const.ConfigParams.LINK_NAME_LIST.value]
# Store the reward function
self._reward_ = config_dict[const.ConfigParams.REWARD.value]
# Create publishers for controlling the car
self._velocity_pub_dict_ = OrderedDict()
self._steering_pub_dict_ = OrderedDict()
for topic in config_dict[const.ConfigParams.VELOCITY_LIST.value]:
self._velocity_pub_dict_[topic] = rospy.Publisher(topic, Float64, queue_size=1)
for topic in config_dict[const.ConfigParams.STEERING_LIST.value]:
self._steering_pub_dict_[topic] = rospy.Publisher(topic, Float64, queue_size=1)
#Create default reward parameters
self._reward_params_ = const.RewardParam.make_default_param()
#Create the default metrics dictionary
self._step_metrics_ = StepMetrics.make_default_metric()
# Dictionary of bools indicating starting position behavior
self._start_pos_behavior_ = \
{'change_start': config_dict[const.ConfigParams.CHANGE_START.value],
'alternate_dir': config_dict[const.ConfigParams.ALT_DIR.value]}
# Dictionary to track the previous way points
self._prev_waypoints_ = {'prev_point' : Point(0, 0), 'prev_point_2' : Point(0, 0)}
# Normalized distance of new start line from the original start line of the track.
start_ndist = 0.0
# Normalized start position offset w.r.t to start_ndist, which is the start line of the track.
start_pos_offset = config_dict.get(const.ConfigParams.START_POSITION.value, 0.0)
self._start_line_ndist_offset = start_pos_offset / self._track_data_.get_track_length()
# Dictionary containing some of the data for the agent
# - During the reset call, every value except start_ndist will get wiped out by self._clear_data
# (reset happens prior to every episodes begin)
# - If self._start_line_ndist_offset is not 0 (usually some minus value),
# then initial current_progress suppose to be non-zero (usually some minus value) as progress
# suppose to be based on start_ndist.
# - This will be correctly calculated by first call of utils.compute_current_prog function.
# As prev_progress will be initially 0.0 and physical position is not at start_ndist,
# utils.compute_current_prog will return negative progress if self._start_line_ndist_offset is negative value
# (meaning behind start line) and will return positive progress if self._start_line_ndist_offset is
# positive value (meaning ahead of start line).
self._data_dict_ = {'max_progress': 0.0,
'current_progress': 0.0,
'prev_progress': 0.0,
'steps': 0.0,
'start_ndist': start_ndist,
'prev_car_pose': 0.0}
# Load the action space
self._model_metadata_ = config_dict[const.ConfigParams.MODEL_METADATA.value]
self._action_space_ = load_action_space(self._model_metadata_)
#! TODO evaluate if this is the best way to reset the car
# subscriber to time to update camera position
self.camera_manager = CameraManager.get_instance()
# True if the agent is in the training phase
self._is_training_ = False
# Register to the phase sink
run_phase_sink.register(self)
# Make sure velocity and angle are set to 0
send_action(self._velocity_pub_dict_, self._steering_pub_dict_, 0.0, 0.0)
# start_dist should be hypothetical start line (start_ndist) plus
# start position offset (start_line_ndist_offset).
start_pose = self._start_lane_.interpolate_pose(
(self._data_dict_['start_ndist'] + self._start_line_ndist_offset) * self._track_data_.get_track_length(),
finite_difference=FiniteDifference.FORWARD_DIFFERENCE)
self._track_data_.initialize_object(self._agent_name_, start_pose, \
ObstacleDimensions.BOT_CAR_DIMENSION)
self.make_link_points = lambda link_state: Point(link_state.pose.position.x,
link_state.pose.position.y)
self.reference_frames = ['' for _ in self._agent_link_name_list_]
# pause pose for car at pause state
self._pause_car_model_pose = self._track_data_.get_object_pose(self._agent_name_)
# prepare pose for car at prepare state
self._prepare_car_model_pose = self._track_data_.get_object_pose(self._agent_name_)
self._park_position = DEFAULT_PARK_POSITION
if self._is_virtual_event:
# Subscriber to udpate car speed if it's virtual event
rospy.Subscriber(WEBRTC_CAR_CTRL_FORMAT.format(self._agent_name_,
CarControlTopic.SPEED_CTRL.value),
String,
self._get_speed_mode_value)
# Subscriber to udpate car status if it's virtual event
rospy.Subscriber(WEBRTC_CAR_CTRL_FORMAT.format(self._agent_name_,
CarControlTopic.STATUS_CTRL.value),
String,
self._update_car_status)
AbstractTracker.__init__(self, TrackerPriority.HIGH)