in hucc/agents/hsd3.py [0:0]
def __init__(self, env, cfg: DictConfig):
gscfg = cfg.goal_space
lo_subsets: Optional[List[str]] = None
lo_task_map: Optional[Dict[str, int]] = None
try:
lo_subsets, lo_task_map = self.parse_lo_info(cfg)
except FileNotFoundError:
pass
if gscfg.subsets == 'from_lo':
subsets, task_map = lo_subsets, lo_task_map
else:
subsets, task_map = subsets_task_map(
features=gscfg.features,
robot=gscfg.robot,
spec=gscfg.subsets,
rank_min=gscfg.rank_min,
rank_max=gscfg.rank_max,
)
if lo_task_map is not None:
task_map = lo_task_map
if subsets is None or task_map is None or len(subsets) == 0:
raise ValueError('No goal space subsets selected')
self.task_map = task_map
self.subsets = [s.replace('+', ',') for s in subsets]
# XXX Unify
for i in range(len(self.subsets)):
su = []
for f in self.subsets[i].split(','):
if not f in su:
su.append(f)
self.subsets[i] = ','.join(su)
self.robot = gscfg.robot
self.features = gscfg.features
self.delta_actions = bool(gscfg.delta_actions)
self.mask_gsfeats = _parse_list(gscfg.mask_feats, int)
n_subsets = len(self.subsets)
n_obs = env.observation_space['observation'].shape[0]
self.max_rank = max((len(s.split(',')) for s in self.subsets))
ng = max(max(map(int, s.split(','))) for s in self.subsets) + 1
task_space = gym.spaces.Discrete(n_subsets)
subgoal_space = gym.spaces.Box(
low=-1, high=1, shape=(ng,), dtype=np.float32
)
self.action_space_hi = gym.spaces.Dict(
[('task', task_space), ('subgoal', subgoal_space)]
)
self.action_space_hi.seed(gscfg.seed)
self.task = th.zeros((n_subsets, len(self.task_map)), dtype=th.float32)
for i, s in enumerate(self.subsets):
for j, dim in enumerate(s.split(',')):
self.task[i][self.task_map[dim]] = 1
# XXX A very poor way of querying psi etc -- unify this.
fdist = {a: 1.0 for a in self.subsets}
dummy_env = CtrlgsPreTrainingEnv(
gscfg.robot,
gscfg.features,
feature_dist=fdist,
task_map=self.task_map,
)
self.gobs_space = dummy_env.observation_space.spaces['gs_observation']
self.gobs_names = dummy_env.goal_featurizer.feature_names()
self.goal_space = dummy_env.observation_space.spaces['desired_goal']
self.delta_feats = dummy_env.goal_space['delta_feats']
self.twist_feats = [
self.task_map[str(f)] for f in dummy_env.goal_space['twist_feats']
]
self.psi = dummy_env.psi
self.offset = dummy_env.offset
self.psi_1 = dummy_env.psi_1
self.offset_1 = dummy_env.offset_1
self.obs_mask = dummy_env.obs_mask
self.task_idx = dummy_env.task_idx
gsdim = self.psi.shape[0]
dummy_env.close()
self.observation_space_lo = gym.spaces.Dict(
{
'observation': gym.spaces.Box(
low=-np.inf,
high=np.inf,
shape=(len(self.obs_mask),),
dtype=np.float32,
),
'desired_goal': self.goal_space,
'task': gym.spaces.Box(
low=0, high=1, shape=(len(self.task_map),), dtype=np.float32
),
}
)
spaces = copy(env.observation_space.spaces)
# Ignore the goal space in both policies
self.gs_key = gscfg.key
del spaces[self.gs_key]
self.observation_space_hi = gym.spaces.Dict(spaces)
# Inverse psi matrix indexed by available subsets
self.psi_1_by_ss = th.zeros(
(n_subsets, self.max_rank, gsdim), dtype=th.float32
)
self.psi_by_ss = th.zeros(
(n_subsets, self.max_rank, gsdim), dtype=th.float32
)
self.offset_by_ss = th.zeros(
(n_subsets, self.max_rank), dtype=th.float32
)
for i, s in enumerate(self.subsets):
for j, dim in enumerate(s.split(',')):
self.psi_1_by_ss[i][j] = th.tensor(self.psi_1[int(dim)])
self.psi_by_ss[i][j] = th.tensor(self.psi[int(dim)])
self.offset_by_ss[i][j] = self.offset[int(dim)]
device = cfg.device
self.psi_1_by_ss = self.psi_1_by_ss.to(device)
self.psi_by_ss = self.psi_by_ss.to(device)
self.offset_by_ss = self.offset_by_ss.to(device)
self.offset_1 = th.tensor(
self.offset_1, device=device, dtype=th.float32
)
self.task = self.task.to(device)
self.subgoal_idxs = []
for s in self.subsets:
self.subgoal_idxs.append([self.task_map[f] for f in s.split(',')])