in src/markov/agent_ctrl/rollout_agent_ctrl.py [0:0]
def _judge_action_at_run_phase(self, episode_status, pause):
self._pause_duration = 0.0
current_car_pose = self._track_data_.get_object_pose(self._agent_name_)
try:
reward = float(self._reward_(copy.deepcopy(self._reward_params_)))
except Exception as ex:
raise RewardFunctionError('Reward function exception {}'.format(ex))
if math.isnan(reward) or math.isinf(reward):
raise RewardFunctionError('{} returned as reward'.format(reward))
# transition to AgentPhase.PARK.value when episode complete and done condition is all
if episode_status == EpisodeStatus.EPISODE_COMPLETE.value and \
self._done_condition == all:
self._park_position = self._track_data_.pop_park_position()
self._ctrl_status[AgentCtrlStatus.AGENT_PHASE.value] = AgentPhase.PARK.value
self._park_car_model()
# transition to AgentPhase.PAUSE.value
if pause:
should_reset_camera = False
pause_car_model_pose = current_car_pose
penalty = self._penalties[episode_status]
# add pause time based on different paused status
if episode_status == EpisodeStatus.CRASHED.value:
self._pause_duration += penalty
# add blink effect and remove current agent from collision list
if penalty > 0.0:
self._effect = BlinkEffect(model_name=self._agent_name_,
min_alpha=const.BLINK_MIN_ALPHA,
interval=const.BLINK_INTERVAL,
duration=penalty)
self._effect.attach()
# If crash into an static obstacle, reset first and then pause. This will prevent
# agent and obstacle wiggling around because bit mask is not used between agent
# and static obstacle
if 'obstacle' in self._curr_crashed_object_name:
pause_car_model_pose = self._get_car_reset_model_state(
car_pose=current_car_pose).pose
should_reset_camera = True
elif episode_status in \
[EpisodeStatus.OFF_TRACK.value,
EpisodeStatus.REVERSED.value,
EpisodeStatus.IMMOBILIZED.value]:
self._pause_duration += penalty
# add blink effect and remove current agent from collision list
if penalty > 0.0:
self._effect = BlinkEffect(model_name=self._agent_name_,
min_alpha=const.BLINK_MIN_ALPHA,
interval=const.BLINK_INTERVAL,
duration=penalty)
self._effect.attach()
# when agent off track current car pose might be closer
# to other part of the track. Therefore, instead of using
# current car pose to calculate reset position, the previous
# car pose is used.
pause_car_model_pose = self._get_car_reset_model_state(
car_pose=self._data_dict_['prev_car_pose']).pose
should_reset_camera = True
self._pause_car_model_pose = pause_car_model_pose
# pause car model through blocking call to make sure agent pose
# is updated in sync. Non blocking can cause problem during off track reset
# especially when there is a small gap betwee two parts of the track.
self._pause_car_model(car_model_pose=self._pause_car_model_pose,
should_reset_camera=should_reset_camera,
blocking=True)
self._ctrl_status[AgentCtrlStatus.AGENT_PHASE.value] = AgentPhase.PAUSE.value
self._data_dict_['prev_car_pose'] = current_car_pose
return reward