in gym3/trajectory_recorder.py [0:0]
def act(self, ac: Any) -> None:
_, ob, _ = self.observe()
info = self.get_info()
# We have to wait for the first call to act() to initialize the _trajectories list, because
# sometimes the environment returns observations with dtypes that do not match self.env.ob_space.
if self._trajectories is None:
self._ob_actual_dtype = multimap(lambda x: x.dtype, ob)
self._ac_actual_dtype = multimap(lambda x: x.dtype, ac)
self._trajectories = [
self._new_trajectory_dict() for _ in range(self.env.num)
]
for i in range(self.env.num):
# With non-dict spaces, the `ob` and/or `ac` is a numpy array of shape [batch, obs_shape...] so separating
# each trajectory into its own structure was relatively simple.
# Take ob[i] then append it to self._trajectories[i]['ob'].
#
# With dict spaces, the returned ob becomes a nested dict
# {
# 'obs_key1': [batch, obs1_shape...],
# 'obs_key2': [batch, obs2_shape...]
# }
# So to separate each trajectory, we have to take ob['obs_key1'][i] then append it to
# self._trajectories[i]['ob']['obs_key1']
self._trajectories[i]["ob"] = concat(
[self._trajectories[i]["ob"], multimap(lambda x: x[i : i + 1], ob)],
axis=0,
)
self._trajectories[i]["act"] = concat(
[self._trajectories[i]["act"], multimap(lambda x: x[i : i + 1], ac)],
axis=0,
)
self._trajectories[i]["info"].append(info[i])
super().act(ac)
reward, _, first = self.observe()
for i in range(self.env.num):
self._trajectories[i]["reward"].append(reward[i])
# For each completed trajectory, write it out
for i in range(self.env.num):
if first[i]:
self._write_and_reset_trajectory(i)