mae_envs/wrappers/manipulation.py (319 lines of code) (raw):
import gym
from gym.spaces import Discrete, MultiDiscrete, Tuple
import numpy as np
from mujoco_worldgen.util.rotation import mat2quat
from mae_envs.wrappers.util import update_obs_space
from mae_envs.util.geometry import dist_pt_to_cuboid
from copy import deepcopy
from itertools import compress
class GrabObjWrapper(gym.Wrapper):
'''
Allows agents to grab an object using a weld constraint.
Args:
body_names (list): list of body names that the agent can grab
radius_multiplier (float): How far away can this be activated (multiplier on box size)
grab_dist (float): If set, the object is held at a specific distance during
grabbing (default: None).
Note: This does not work well with oblong objects
grab_exclusive (bool): If set true, each object can only be grabbed by
a single agent. If several agents attempt to
grab the same object, only the closer agents succeeds.
obj_in_game_metadata_keys (list of string): keys in metadata with boolean array saying
which objects are currently in the game. This is used in the event we are randomizing
number of objects
'''
def __init__(self, env, body_names, radius_multiplier=1.7,
grab_dist=None, grab_exclusive=False,
obj_in_game_metadata_keys=None):
super().__init__(env)
self.n_agents = self.unwrapped.n_agents
self.body_names = body_names
self.n_obj = len(body_names)
self.obj_in_game_metadata_keys = obj_in_game_metadata_keys
self.action_space.spaces['action_pull'] = (
Tuple([MultiDiscrete([2] * self.n_obj) for _ in range(self.n_agents)]))
self.observation_space = update_obs_space(
env, {'obj_pull': (self.n_obj, 1),
'you_pull': (self.n_obj, self.n_agents)})
self.grab_radius = radius_multiplier * self.metadata['box_size']
self.grab_dist = grab_dist
self.grab_exclusive = grab_exclusive
def observation(self, obs):
obs['you_pull'] = self.obj_grabbed.T
obs['obj_pull'] = np.any(obs['you_pull'], axis=-1, keepdims=True)
return obs
def reset(self):
obs = self.env.reset()
sim = self.unwrapped.sim
if self.obj_in_game_metadata_keys is not None:
self.actual_body_slice = np.concatenate([self.metadata[k] for k in self.obj_in_game_metadata_keys])
else:
self.actual_body_slice = np.ones((len(self.body_names))).astype(np.bool)
actual_body_names = list(compress(self.body_names, self.actual_body_slice))
self.n_obj = len(actual_body_names)
# Cache body ids
self.obj_body_idxs = np.array([sim.model.body_name2id(body_name) for body_name in actual_body_names])
self.agent_body_idxs = np.array([sim.model.body_name2id(f"agent{i}:particle") for i in range(self.n_agents)])
# Cache geom ids
self.obj_geom_ids = np.array([sim.model.geom_name2id(body_name) for body_name in actual_body_names])
self.agent_geom_ids = np.array([sim.model.geom_name2id(f'agent{i}:agent') for i in range(self.n_agents)])
# Cache constraint ids
self.agent_eq_ids = np.array(
[i for i, obj1 in enumerate(sim.model.eq_obj1id)
if sim.model.body_names[obj1] == f"agent{i}:particle"])
assert len(self.agent_eq_ids) == self.n_agents
# turn off equality constraints
sim.model.eq_active[self.agent_eq_ids] = 0
self.obj_grabbed = np.zeros((self.n_agents, self.n_obj), dtype=bool)
self.last_obj_grabbed = np.zeros((self.n_agents, self.n_obj), dtype=bool)
return self.observation(obs)
def grab_obj(self, action):
'''
Implements object grabbing for all agents
Args:
action: Action dictionary
'''
action_pull = action['action_pull'][:, self.actual_body_slice]
sim = self.unwrapped.sim
agent_pos = sim.data.body_xpos[self.agent_body_idxs]
obj_pos = sim.data.body_xpos[self.obj_body_idxs]
obj_width = sim.model.geom_size[self.obj_geom_ids]
obj_quat = sim.data.body_xquat[self.obj_body_idxs]
assert len(obj_width) == len(obj_quat), (
"Number of object widths must be equal to number of quaternions for direct distance calculation method. " +
"This might be caused by a body that contains several geoms.")
obj_dist = dist_pt_to_cuboid(agent_pos, obj_pos, obj_width, obj_quat)
allowed_and_desired = np.logical_and(action_pull, obj_dist <= self.grab_radius)
obj_dist_masked = obj_dist.copy() # Mask the obj dists to find a valid argmin
obj_dist_masked[~allowed_and_desired] = np.inf
if self.grab_exclusive:
closest_obj = np.zeros((self.n_agents,), dtype=int)
while np.any(obj_dist_masked < np.inf):
# find agent and object of closest object distance
agent_idx, obj_idx = np.unravel_index(np.argmin(obj_dist_masked), obj_dist_masked.shape)
# set closest object for this agent
closest_obj[agent_idx] = obj_idx
# ensure exclusivity of grabbing
obj_dist_masked[:, obj_idx] = np.inf
obj_dist_masked[agent_idx, :] = np.inf
# mark same object as undesired for all other agents
allowed_and_desired[:agent_idx, obj_idx] = False
allowed_and_desired[(agent_idx + 1):, obj_idx] = False
else:
closest_obj = np.argmin(obj_dist_masked, axis=-1)
valid_grabs = np.any(allowed_and_desired, axis=-1) # (n_agent,) which agents have valid grabs
# Turn on/off agents with valid grabs
sim.model.eq_active[self.agent_eq_ids] = valid_grabs
sim.model.eq_obj2id[self.agent_eq_ids] = self.obj_body_idxs[closest_obj]
# keep track of which object is being grabbed
self.obj_grabbed = np.zeros((self.n_agents, self.n_obj), dtype=bool)
agent_with_valid_grab = np.argwhere(valid_grabs)[:, 0]
self.obj_grabbed[agent_with_valid_grab, closest_obj[agent_with_valid_grab]] = 1
# If there are new grabs, then setup the weld constraint parameters
new_grabs = np.logical_and(
valid_grabs, np.any(self.obj_grabbed != self.last_obj_grabbed, axis=-1))
for agent_idx in np.argwhere(new_grabs)[:, 0]:
agent_rot = sim.data.body_xmat[self.agent_body_idxs[agent_idx]].reshape((3, 3))
obj_rot = sim.data.body_xmat[self.obj_body_idxs[closest_obj[agent_idx]]].reshape((3, 3))
# Need to use the geom xpos rather than the qpos
obj_pos = sim.data.body_xpos[self.obj_body_idxs[closest_obj[agent_idx]]]
agent_pos = sim.data.body_xpos[self.agent_body_idxs[agent_idx]]
grab_vec = agent_pos - obj_pos
if self.grab_dist is not None:
grab_vec = self.grab_dist / (1e-3 + np.linalg.norm(grab_vec)) * grab_vec
# The distance constraint needs to be rotated into the frame of reference of the agent
sim.model.eq_data[self.agent_eq_ids[agent_idx], :3] = np.matmul(agent_rot.T, grab_vec)
# The angle constraint is the difference between the agents frame and the objects frame
sim.model.eq_data[self.agent_eq_ids[agent_idx], 3:] = mat2quat(np.matmul(agent_rot.T, obj_rot))
self.last_obj_grabbed = self.obj_grabbed
def step(self, action):
self.grab_obj(action)
obs, rew, done, info = self.env.step(action)
return self.observation(obs), rew, done, info
class GrabClosestWrapper(gym.ActionWrapper):
'''
Convert the action_pull (either grab or pull) to a binary action rather than having the
dimension of boxes. The grab wrapper will only grab the closest box, so we convert
the new action into an all 1's action.
'''
def __init__(self, env):
super().__init__(env)
self.action_space = deepcopy(self.action_space)
self.n_obj = len(self.action_space.spaces['action_pull'].spaces[0].nvec)
self.action_space.spaces['action_pull'] = (
Tuple([Discrete(2) for _ in range(self.unwrapped.n_agents)]))
def action(self, action):
action = deepcopy(action)
action['action_pull'] = np.repeat(action['action_pull'][:, None], self.n_obj, -1)
return action
class LockObjWrapper(gym.Wrapper):
'''
Allows agents to lock objects at their current position.
Args:
body_names (list): list of body names that the agent can lock
radius_multiplier (float): How far away can this be activated (multiplier on box size)
agent_idx_allowed_to_lock (np array of ints): Indicies of agents that are allowed to lock.
Defaults to all
lock_type (string): Options are
any_lock: if any agent wants to lock an object it will get locked
all_lock: all agents that are close enough must want to lock the object
any_lock_specific: if any agent wants to lock an object it will get locked. However,
now the lock is agent specific, and only the agent that locked the object can unlock it.
all_lock_team_specific: like all_lock, but only team members of the agent that
locked the object can unlock it.
ac_obs_prefix (string): prefix for the action and observation keys. This is useful if using
the lock wrapper more than once.
obj_in_game_metadata_keys (list of string): keys in metadata with boolean array saying
which objects are currently in the game. This is used in the event we are randomizing
number of objects
agent_allowed_to_lock_keys (list of string): keys in obs determining whether agent is allowed
to lock a certain object. Each key should be a mask matrix of dim (n_agents, n_obj)
'''
def __init__(self, env, body_names, radius_multiplier=1.5, agent_idx_allowed_to_lock=None,
lock_type="any_lock", ac_obs_prefix='', obj_in_game_metadata_keys=None,
agent_allowed_to_lock_keys=None):
super().__init__(env)
self.n_agents = self.unwrapped.n_agents
self.n_obj = len(body_names)
self.body_names = body_names
self.agent_idx_allowed_to_lock = np.arange(self.n_agents) if agent_idx_allowed_to_lock is None else agent_idx_allowed_to_lock
self.lock_type = lock_type
self.ac_obs_prefix = ac_obs_prefix
self.obj_in_game_metadata_keys = obj_in_game_metadata_keys
self.agent_allowed_to_lock_keys = agent_allowed_to_lock_keys
self.action_space.spaces[f'action_{ac_obs_prefix}glue'] = (
Tuple([MultiDiscrete([2] * self.n_obj) for _ in range(self.n_agents)]))
self.observation_space = update_obs_space(env, {f'{ac_obs_prefix}obj_lock': (self.n_obj, 1),
f'{ac_obs_prefix}you_lock': (self.n_agents, self.n_obj, 1),
f'{ac_obs_prefix}team_lock': (self.n_agents, self.n_obj, 1)})
self.lock_radius = radius_multiplier*self.metadata['box_size']
self.obj_locked = np.zeros((self.n_obj,), dtype=int)
def observation(self, obs):
obs[f'{self.ac_obs_prefix}obj_lock'] = self.obj_locked[:, None]
you_lock = np.arange(self.n_agents)[:, None] == self.which_locked[None, :]
obs[f'{self.ac_obs_prefix}you_lock'] = np.expand_dims(you_lock * obs[f'{self.ac_obs_prefix}obj_lock'].T, axis=-1)
obs[f'{self.ac_obs_prefix}team_lock'] = np.zeros((self.n_agents, self.n_obj, 1))
for team in np.unique(self.metadata['team_index']):
team_mask = self.metadata['team_index'] == team
obs[f'{self.ac_obs_prefix}team_lock'][team_mask] = np.any(obs[f'{self.ac_obs_prefix}you_lock'][team_mask], 0)
return obs
def reset(self):
obs = self.env.reset()
sim = self.unwrapped.sim
if self.obj_in_game_metadata_keys is not None:
self.actual_body_slice = np.concatenate([self.metadata[k] for k in self.obj_in_game_metadata_keys])
else:
self.actual_body_slice = np.ones((len(self.body_names))).astype(np.bool)
actual_body_names = list(compress(self.body_names, self.actual_body_slice))
self.n_obj = len(actual_body_names)
# Cache ids
self.obj_body_idxs = np.array([sim.model.body_name2id(body_name) for body_name in actual_body_names])
self.obj_jnt_idxs = [np.where(sim.model.jnt_bodyid == body_idx)[0] for body_idx in self.obj_body_idxs]
self.obj_geom_ids = [np.where(sim.model.geom_bodyid == body_idx)[0] for body_idx in self.obj_body_idxs]
self.agent_body_idxs = np.array([sim.model.body_name2id(f"agent{i}:particle") for i in range(self.n_agents)])
self.agent_body_idxs = self.agent_body_idxs[self.agent_idx_allowed_to_lock]
self.agent_geom_ids = np.array([sim.model.geom_name2id(f'agent{i}:agent') for i in range(self.n_agents)])
self.agent_geom_ids = self.agent_geom_ids[self.agent_idx_allowed_to_lock]
self.unlock_objs()
self.obj_locked = np.zeros((self.n_obj,), dtype=bool)
self.which_locked = np.zeros((self.n_obj,), dtype=int)
if self.agent_allowed_to_lock_keys is not None:
self.agent_allowed_to_lock_mask = np.concatenate([obs[k] for k in self.agent_allowed_to_lock_keys])
else:
self.agent_allowed_to_lock_mask = np.ones((self.n_agents, self.n_obj))
return self.observation(obs)
def lock_obj(self, action_lock):
'''
Implements object gluing for all agents
Args:
lock: (n_agent, n_obj) boolean matrix
'''
sim = self.unwrapped.sim
action_lock = action_lock[self.agent_idx_allowed_to_lock]
action_lock = action_lock[:, self.actual_body_slice]
agent_pos = sim.data.body_xpos[self.agent_body_idxs]
obj_pos = sim.data.body_xpos[self.obj_body_idxs]
obj_width = sim.model.geom_size[np.concatenate(self.obj_geom_ids)]
obj_quat = sim.data.body_xquat[self.obj_body_idxs]
assert len(obj_width) == len(obj_quat), (
"Number of object widths must be equal to number of quaternions for direct distance calculation method. " +
"This might be caused by a body that contains several geoms.")
obj_dist = dist_pt_to_cuboid(agent_pos, obj_pos, obj_width, obj_quat)
allowed_and_desired = np.logical_and(action_lock, obj_dist <= self.lock_radius)
allowed_and_desired = np.logical_and(allowed_and_desired, self.agent_allowed_to_lock_mask)
allowed_and_not_desired = np.logical_and(1 - action_lock, obj_dist <= self.lock_radius)
allowed_and_not_desired = np.logical_and(allowed_and_not_desired, self.agent_allowed_to_lock_mask)
# objs_to_lock should _all_ be locked this round. new_objs_to_lock are objs that were not locked last round
# objs_to_unlock are objs that no one wants to lock this round
if self.lock_type == "any_lock": # If any agent wants to lock, the obj becomes locked
objs_to_lock = np.any(allowed_and_desired, axis=0)
objs_to_unlock = np.logical_and(np.any(allowed_and_not_desired, axis=0), ~objs_to_lock)
new_objs_to_lock = np.logical_and(objs_to_lock, ~self.obj_locked)
elif self.lock_type == "all_lock": # All agents that are close enough must want to lock the obj
objs_to_unlock = np.any(allowed_and_not_desired, axis=0)
objs_to_lock = np.logical_and(np.any(allowed_and_desired, axis=0), ~objs_to_unlock)
new_objs_to_lock = np.logical_and(objs_to_lock, ~self.obj_locked)
elif self.lock_type == "any_lock_specific": # If any agent wants to lock, the obj becomes locked
allowed_to_unlock = np.arange(self.n_agents)[:, None] == self.which_locked[None, :] # (n_agent, n_obj)
allowed_to_unlock = np.logical_and(allowed_to_unlock, self.obj_locked[None, :]) # Can't unlock an obj that isn't locked
allowed_and_not_desired = np.logical_and(allowed_to_unlock[self.agent_idx_allowed_to_lock],
allowed_and_not_desired)
objs_to_unlock = np.any(allowed_and_not_desired, axis=0)
objs_to_lock = np.any(allowed_and_desired, axis=0)
objs_to_relock = np.logical_and(objs_to_unlock, objs_to_lock)
new_objs_to_lock = np.logical_and(np.logical_and(objs_to_lock, ~objs_to_relock), ~self.obj_locked)
objs_to_unlock = np.logical_and(objs_to_unlock, ~objs_to_lock)
for obj in np.argwhere(objs_to_relock)[:, 0]:
self.which_locked[obj] = np.random.choice(self.agent_idx_allowed_to_lock[
np.argwhere(allowed_and_desired[:, obj]).flatten()])
elif self.lock_type == "all_lock_team_specific":
# all close agents must want to lock the object
# only agents from the same team as the locker can unlock
allowed_to_unlock = self.metadata['team_index'][:, None] == (
self.metadata['team_index'][None, self.which_locked])
allowed_and_not_desired = np.logical_and(allowed_to_unlock[self.agent_idx_allowed_to_lock], allowed_and_not_desired)
objs_to_unlock = np.any(allowed_and_not_desired, axis=0)
objs_to_lock = np.logical_and(np.any(allowed_and_desired, axis=0), ~objs_to_unlock)
new_objs_to_lock = np.logical_and(objs_to_lock, ~self.obj_locked)
else:
assert False, f"{self.lock_type} lock type is not implemented"
joints_to_unlock = np.isin(sim.model.jnt_bodyid, self.obj_body_idxs[objs_to_unlock])
joints_to_lock = np.isin(sim.model.jnt_bodyid, self.obj_body_idxs[new_objs_to_lock])
# Turn on/off emission and joint limit
matids_to_darken = sim.model.geom_matid[np.isin(sim.model.geom_bodyid, self.obj_body_idxs[objs_to_unlock])]
matids_to_lighten = sim.model.geom_matid[np.isin(sim.model.geom_bodyid, self.obj_body_idxs[new_objs_to_lock])]
matids_to_darken = matids_to_darken[matids_to_darken != -1]
matids_to_lighten = matids_to_lighten[matids_to_lighten != -1]
sim.model.mat_emission[matids_to_darken] = 0
sim.model.mat_emission[matids_to_lighten] = 1
sim.model.jnt_limited[joints_to_unlock] = 0
sim.model.jnt_limited[joints_to_lock] = 1
# For objs we need to newly lock, set the joint ranges to the current qpos of the obj.
for obj in np.argwhere(new_objs_to_lock)[:, 0]:
sim.model.jnt_range[self.obj_jnt_idxs[obj], :] = sim.data.qpos[self.obj_jnt_idxs[obj], None]
self.which_locked[obj] = np.random.choice(self.agent_idx_allowed_to_lock[
np.argwhere(allowed_and_desired[:, obj]).flatten()])
self.obj_locked = np.logical_or(np.logical_and(self.obj_locked, ~objs_to_unlock), objs_to_lock)
def unlock_objs(self):
sim = self.unwrapped.sim
joints_to_unlock = np.isin(sim.model.jnt_bodyid, self.obj_body_idxs[np.arange(self.n_obj)])
objs_to_darken = np.isin(sim.model.geom_bodyid, self.obj_body_idxs)
sim.model.mat_emission[sim.model.geom_matid[objs_to_darken]] = 0
sim.model.jnt_limited[joints_to_unlock] = 0
def step(self, action):
self.lock_obj(action[f'action_{self.ac_obs_prefix}glue'])
obs, rew, done, info = self.env.step(action)
if self.agent_allowed_to_lock_keys is not None:
self.agent_allowed_to_lock_mask = np.concatenate([obs[k] for k in self.agent_allowed_to_lock_keys])
else:
self.agent_allowed_to_lock_mask = np.ones((self.n_agents, self.n_obj))
return self.observation(obs), rew, done, info
class LockAllWrapper(gym.ActionWrapper):
'''
Allows agents to lock all boxes. This wrapper introduces an action that
overrides box-individual gluing action to be activated.
Args:
remove_object_specific_lock (bool): If true, removes the agent's ability to
selectively lock individual boxes.
'''
def __init__(self, env, remove_object_specific_lock=False):
super().__init__(env)
self.n_agents = self.unwrapped.n_agents
self.lock_actions = [k for k in self.action_space.spaces.keys() if 'glue' in k]
self.n_obj = {k: len(self.action_space.spaces[k].spaces[0].nvec) for k in self.lock_actions}
self.action_space.spaces['action_glueall'] = (
Tuple([Discrete(2) for _ in range(self.n_agents)]))
if remove_object_specific_lock:
for k in self.lock_actions:
del self.action_space.spaces[k]
def action(self, action):
for k in self.lock_actions:
action[k] = np.zeros((self.n_agents, self.n_obj[k]))
action[k][action['action_glueall'] == 1, :] = 1
return action