def lock_obj()

in mae_envs/wrappers/manipulation.py [0:0]


    def lock_obj(self, action_lock):
        '''
            Implements object gluing for all agents
            Args:
                lock: (n_agent, n_obj) boolean matrix
        '''
        sim = self.unwrapped.sim
        action_lock = action_lock[self.agent_idx_allowed_to_lock]
        action_lock = action_lock[:, self.actual_body_slice]
        agent_pos = sim.data.body_xpos[self.agent_body_idxs]
        obj_pos = sim.data.body_xpos[self.obj_body_idxs]

        obj_width = sim.model.geom_size[np.concatenate(self.obj_geom_ids)]
        obj_quat = sim.data.body_xquat[self.obj_body_idxs]
        assert len(obj_width) == len(obj_quat), (
            "Number of object widths must be equal to number of quaternions for direct distance calculation method. " +
            "This might be caused by a body that contains several geoms.")
        obj_dist = dist_pt_to_cuboid(agent_pos, obj_pos, obj_width, obj_quat)

        allowed_and_desired = np.logical_and(action_lock, obj_dist <= self.lock_radius)
        allowed_and_desired = np.logical_and(allowed_and_desired, self.agent_allowed_to_lock_mask)
        allowed_and_not_desired = np.logical_and(1 - action_lock, obj_dist <= self.lock_radius)
        allowed_and_not_desired = np.logical_and(allowed_and_not_desired, self.agent_allowed_to_lock_mask)

        # objs_to_lock should _all_ be locked this round. new_objs_to_lock are objs that were not locked last round
        # objs_to_unlock are objs that no one wants to lock this round
        if self.lock_type == "any_lock":  # If any agent wants to lock, the obj becomes locked
            objs_to_lock = np.any(allowed_and_desired, axis=0)
            objs_to_unlock = np.logical_and(np.any(allowed_and_not_desired, axis=0), ~objs_to_lock)
            new_objs_to_lock = np.logical_and(objs_to_lock, ~self.obj_locked)
        elif self.lock_type == "all_lock":  # All agents that are close enough must want to lock the obj
            objs_to_unlock = np.any(allowed_and_not_desired, axis=0)
            objs_to_lock = np.logical_and(np.any(allowed_and_desired, axis=0), ~objs_to_unlock)
            new_objs_to_lock = np.logical_and(objs_to_lock, ~self.obj_locked)
        elif self.lock_type == "any_lock_specific":  # If any agent wants to lock, the obj becomes locked
            allowed_to_unlock = np.arange(self.n_agents)[:, None] == self.which_locked[None, :]  # (n_agent, n_obj)
            allowed_to_unlock = np.logical_and(allowed_to_unlock, self.obj_locked[None, :])  # Can't unlock an obj that isn't locked
            allowed_and_not_desired = np.logical_and(allowed_to_unlock[self.agent_idx_allowed_to_lock],
                                                     allowed_and_not_desired)
            objs_to_unlock = np.any(allowed_and_not_desired, axis=0)
            objs_to_lock = np.any(allowed_and_desired, axis=0)
            objs_to_relock = np.logical_and(objs_to_unlock, objs_to_lock)
            new_objs_to_lock = np.logical_and(np.logical_and(objs_to_lock, ~objs_to_relock), ~self.obj_locked)
            objs_to_unlock = np.logical_and(objs_to_unlock, ~objs_to_lock)

            for obj in np.argwhere(objs_to_relock)[:, 0]:
                self.which_locked[obj] = np.random.choice(self.agent_idx_allowed_to_lock[
                            np.argwhere(allowed_and_desired[:, obj]).flatten()])
        elif self.lock_type == "all_lock_team_specific":
            # all close agents must want to lock the object
            # only agents from the same team as the locker can unlock
            allowed_to_unlock = self.metadata['team_index'][:, None] == (
                                self.metadata['team_index'][None, self.which_locked])
            allowed_and_not_desired = np.logical_and(allowed_to_unlock[self.agent_idx_allowed_to_lock], allowed_and_not_desired)
            objs_to_unlock = np.any(allowed_and_not_desired, axis=0)
            objs_to_lock = np.logical_and(np.any(allowed_and_desired, axis=0), ~objs_to_unlock)
            new_objs_to_lock = np.logical_and(objs_to_lock, ~self.obj_locked)
        else:
            assert False, f"{self.lock_type} lock type is not implemented"

        joints_to_unlock = np.isin(sim.model.jnt_bodyid, self.obj_body_idxs[objs_to_unlock])
        joints_to_lock = np.isin(sim.model.jnt_bodyid, self.obj_body_idxs[new_objs_to_lock])

        # Turn on/off emission and joint limit
        matids_to_darken = sim.model.geom_matid[np.isin(sim.model.geom_bodyid, self.obj_body_idxs[objs_to_unlock])]
        matids_to_lighten = sim.model.geom_matid[np.isin(sim.model.geom_bodyid, self.obj_body_idxs[new_objs_to_lock])]
        matids_to_darken = matids_to_darken[matids_to_darken != -1]
        matids_to_lighten = matids_to_lighten[matids_to_lighten != -1]
        sim.model.mat_emission[matids_to_darken] = 0
        sim.model.mat_emission[matids_to_lighten] = 1
        sim.model.jnt_limited[joints_to_unlock] = 0
        sim.model.jnt_limited[joints_to_lock] = 1

        # For objs we need to newly lock, set the joint ranges to the current qpos of the obj.
        for obj in np.argwhere(new_objs_to_lock)[:, 0]:
            sim.model.jnt_range[self.obj_jnt_idxs[obj], :] = sim.data.qpos[self.obj_jnt_idxs[obj], None]
            self.which_locked[obj] = np.random.choice(self.agent_idx_allowed_to_lock[
                        np.argwhere(allowed_and_desired[:, obj]).flatten()])

        self.obj_locked = np.logical_or(np.logical_and(self.obj_locked, ~objs_to_unlock), objs_to_lock)