in ss_baselines/common/sync_vector_env.py [0:0]
def __call__(self, command, data):
while command != CLOSE_COMMAND:
if command == STEP_COMMAND:
# different step methods for habitat.RLEnv and habitat.Env
if isinstance(self._env, habitat.RLEnv) or isinstance(
self._env, gym.Env
):
# habitat.RLEnv
observations, reward, done, info = self._env.step(**data)
if self._auto_reset_done and done:
observations = self._env.reset()
return observations, reward, done, info
elif isinstance(self._env, habitat.Env):
# habitat.Env
observations = self._env.step(**data)
if self._auto_reset_done and self._env.episode_over:
observations = self._env.reset()
return observations
else:
raise NotImplementedError
elif command == RESET_COMMAND:
observations = self._env.reset()
return observations
elif command == RENDER_COMMAND:
return self._env.render(*data[0], **data[1])
elif (
command == OBSERVATION_SPACE_COMMAND
or command == ACTION_SPACE_COMMAND
):
if isinstance(command, str):
return getattr(self._env, command)
elif command == CALL_COMMAND:
function_name, function_args = data
if function_args is None or len(function_args) == 0:
result = getattr(self._env, function_name)()
else:
result = getattr(self._env, function_name)(**function_args)
return result
# TODO: update CALL_COMMAND for getting attribute like this
elif command == EPISODE_COMMAND:
return self._env.current_episode
else:
raise NotImplementedError