in mae_envs/wrappers/manipulation.py [0:0]
def lock_obj(self, action_lock):
'''
Implements object gluing for all agents
Args:
lock: (n_agent, n_obj) boolean matrix
'''
sim = self.unwrapped.sim
action_lock = action_lock[self.agent_idx_allowed_to_lock]
action_lock = action_lock[:, self.actual_body_slice]
agent_pos = sim.data.body_xpos[self.agent_body_idxs]
obj_pos = sim.data.body_xpos[self.obj_body_idxs]
obj_width = sim.model.geom_size[np.concatenate(self.obj_geom_ids)]
obj_quat = sim.data.body_xquat[self.obj_body_idxs]
assert len(obj_width) == len(obj_quat), (
"Number of object widths must be equal to number of quaternions for direct distance calculation method. " +
"This might be caused by a body that contains several geoms.")
obj_dist = dist_pt_to_cuboid(agent_pos, obj_pos, obj_width, obj_quat)
allowed_and_desired = np.logical_and(action_lock, obj_dist <= self.lock_radius)
allowed_and_desired = np.logical_and(allowed_and_desired, self.agent_allowed_to_lock_mask)
allowed_and_not_desired = np.logical_and(1 - action_lock, obj_dist <= self.lock_radius)
allowed_and_not_desired = np.logical_and(allowed_and_not_desired, self.agent_allowed_to_lock_mask)
# objs_to_lock should _all_ be locked this round. new_objs_to_lock are objs that were not locked last round
# objs_to_unlock are objs that no one wants to lock this round
if self.lock_type == "any_lock": # If any agent wants to lock, the obj becomes locked
objs_to_lock = np.any(allowed_and_desired, axis=0)
objs_to_unlock = np.logical_and(np.any(allowed_and_not_desired, axis=0), ~objs_to_lock)
new_objs_to_lock = np.logical_and(objs_to_lock, ~self.obj_locked)
elif self.lock_type == "all_lock": # All agents that are close enough must want to lock the obj
objs_to_unlock = np.any(allowed_and_not_desired, axis=0)
objs_to_lock = np.logical_and(np.any(allowed_and_desired, axis=0), ~objs_to_unlock)
new_objs_to_lock = np.logical_and(objs_to_lock, ~self.obj_locked)
elif self.lock_type == "any_lock_specific": # If any agent wants to lock, the obj becomes locked
allowed_to_unlock = np.arange(self.n_agents)[:, None] == self.which_locked[None, :] # (n_agent, n_obj)
allowed_to_unlock = np.logical_and(allowed_to_unlock, self.obj_locked[None, :]) # Can't unlock an obj that isn't locked
allowed_and_not_desired = np.logical_and(allowed_to_unlock[self.agent_idx_allowed_to_lock],
allowed_and_not_desired)
objs_to_unlock = np.any(allowed_and_not_desired, axis=0)
objs_to_lock = np.any(allowed_and_desired, axis=0)
objs_to_relock = np.logical_and(objs_to_unlock, objs_to_lock)
new_objs_to_lock = np.logical_and(np.logical_and(objs_to_lock, ~objs_to_relock), ~self.obj_locked)
objs_to_unlock = np.logical_and(objs_to_unlock, ~objs_to_lock)
for obj in np.argwhere(objs_to_relock)[:, 0]:
self.which_locked[obj] = np.random.choice(self.agent_idx_allowed_to_lock[
np.argwhere(allowed_and_desired[:, obj]).flatten()])
elif self.lock_type == "all_lock_team_specific":
# all close agents must want to lock the object
# only agents from the same team as the locker can unlock
allowed_to_unlock = self.metadata['team_index'][:, None] == (
self.metadata['team_index'][None, self.which_locked])
allowed_and_not_desired = np.logical_and(allowed_to_unlock[self.agent_idx_allowed_to_lock], allowed_and_not_desired)
objs_to_unlock = np.any(allowed_and_not_desired, axis=0)
objs_to_lock = np.logical_and(np.any(allowed_and_desired, axis=0), ~objs_to_unlock)
new_objs_to_lock = np.logical_and(objs_to_lock, ~self.obj_locked)
else:
assert False, f"{self.lock_type} lock type is not implemented"
joints_to_unlock = np.isin(sim.model.jnt_bodyid, self.obj_body_idxs[objs_to_unlock])
joints_to_lock = np.isin(sim.model.jnt_bodyid, self.obj_body_idxs[new_objs_to_lock])
# Turn on/off emission and joint limit
matids_to_darken = sim.model.geom_matid[np.isin(sim.model.geom_bodyid, self.obj_body_idxs[objs_to_unlock])]
matids_to_lighten = sim.model.geom_matid[np.isin(sim.model.geom_bodyid, self.obj_body_idxs[new_objs_to_lock])]
matids_to_darken = matids_to_darken[matids_to_darken != -1]
matids_to_lighten = matids_to_lighten[matids_to_lighten != -1]
sim.model.mat_emission[matids_to_darken] = 0
sim.model.mat_emission[matids_to_lighten] = 1
sim.model.jnt_limited[joints_to_unlock] = 0
sim.model.jnt_limited[joints_to_lock] = 1
# For objs we need to newly lock, set the joint ranges to the current qpos of the obj.
for obj in np.argwhere(new_objs_to_lock)[:, 0]:
sim.model.jnt_range[self.obj_jnt_idxs[obj], :] = sim.data.qpos[self.obj_jnt_idxs[obj], None]
self.which_locked[obj] = np.random.choice(self.agent_idx_allowed_to_lock[
np.argwhere(allowed_and_desired[:, obj]).flatten()])
self.obj_locked = np.logical_or(np.logical_and(self.obj_locked, ~objs_to_unlock), objs_to_lock)