mae_envs/modules/objects.py (281 lines of code) (raw):
import numpy as np
from mujoco_worldgen.util.types import store_args
from mujoco_worldgen.util.sim_funcs import (qpos_idxs_from_joint_prefix,
qvel_idxs_from_joint_prefix)
from mujoco_worldgen import Geom, Material, ObjFromXML
from mujoco_worldgen.transforms import set_geom_attr_transform
from mujoco_worldgen.util.rotation import normalize_angles
from mae_envs.util.transforms import remove_hinge_axis_transform
from mae_envs.modules import EnvModule, rejection_placement, get_size_from_xml
class Boxes(EnvModule):
'''
Add moveable boxes to the environment.
Args:
n_boxes (int or (int, int)): number of boxes. If tuple of ints, every episode the
number of boxes is drawn uniformly from range(n_boxes[0], n_boxes[1] + 1)
n_elongated_boxes (int or (int, int)): Number of elongated boxes. If tuple of ints,
every episode the number of elongated boxes is drawn uniformly from
range(n_elongated_boxes[0], min(curr_n_boxes, n_elongated_boxes[1]) + 1)
placement_fn (fn or list of fns): See mae_envs.modules.util:rejection_placement for spec
If list of functions, then it is assumed there is one function given per box
box_size (float): box size
box_mass (float): box mass
friction (float): box friction
box_only_z_rot (bool): If true, boxes can only be rotated around the z-axis
boxid_obs (bool): If true, the id of boxes is observed
boxsize_obs (bool): If true, the size of the boxes is observed (note that the size
is still observed if boxsize_obs is False but there are elongated boxes)
polar_obs (bool): Give observations about rotation in polar coordinates
mark_box_corners (bool): If true, puts a site in the middle of each of the 4 vertical
box edges for each box (these sites are used for calculating distances in the
blueprint construction task).
'''
@store_args
def __init__(self, n_boxes, n_elongated_boxes=0, placement_fn=None,
box_size=0.5, box_mass=1.0, friction=None, box_only_z_rot=False,
boxid_obs=True, boxsize_obs=False, polar_obs=True, mark_box_corners=False):
if type(n_boxes) not in [tuple, list, np.ndarray]:
self.n_boxes = [n_boxes, n_boxes]
if type(n_elongated_boxes) not in [tuple, list, np.ndarray]:
self.n_elongated_boxes = [n_elongated_boxes, n_elongated_boxes]
def build_world_step(self, env, floor, floor_size):
env.metadata['box_size'] = self.box_size
self.curr_n_boxes = env._random_state.randint(self.n_boxes[0], self.n_boxes[1] + 1)
env.metadata['curr_n_boxes'] = np.zeros((self.n_boxes[1]))
env.metadata['curr_n_boxes'][:self.curr_n_boxes] = 1
env.metadata['curr_n_boxes'] = env.metadata['curr_n_boxes'].astype(np.bool)
self.curr_n_elongated_boxes = env._random_state.randint(
self.n_elongated_boxes[0], min(self.n_elongated_boxes[1], self.curr_n_boxes) + 1)
self.box_size_array = self.box_size * np.ones((self.curr_n_boxes, 3))
if self.curr_n_elongated_boxes > 0:
# sample number of x-aligned boxes
n_xaligned = env._random_state.randint(self.curr_n_elongated_boxes + 1)
self.box_size_array[:n_xaligned, :] = self.box_size * np.array([3.3, 0.3, 1.0])
self.box_size_array[n_xaligned:self.curr_n_elongated_boxes, :] = (self.box_size * np.array([0.3, 3.3, 1.0]))
env.metadata['box_size_array'] = self.box_size_array
successful_placement = True
for i in range(self.curr_n_boxes):
char = chr(ord('A') + i % 26)
geom = Geom("box", self.box_size_array[i, :], name=f'moveable_box{i}')
geom.set_material(Material(texture="chars/" + char + ".png"))
geom.add_transform(set_geom_attr_transform('mass', self.box_mass))
if self.mark_box_corners:
for j, (x, y) in enumerate([[0, 0], [0, 1], [1, 0], [1, 1]]):
geom.mark(f'moveable_box{i}_corner{j}', relative_xyz=(x, y, 0.5),
rgba=[1., 1., 1., 0.])
if self.friction is not None:
geom.add_transform(set_geom_attr_transform('friction', self.friction))
if self.box_only_z_rot:
geom.add_transform(remove_hinge_axis_transform(np.array([1.0, 0.0, 0.0])))
geom.add_transform(remove_hinge_axis_transform(np.array([0.0, 1.0, 0.0])))
if self.placement_fn is not None:
_placement_fn = (self.placement_fn[i]
if isinstance(self.placement_fn, list)
else self.placement_fn)
pos, _ = rejection_placement(env, _placement_fn, floor_size,
self.box_size_array[i, :2])
if pos is not None:
floor.append(geom, placement_xy=pos)
else:
successful_placement = False
else:
floor.append(geom)
return successful_placement
def modify_sim_step(self, env, sim):
# Cache qpos, qvel idxs
self.box_geom_idxs = np.array([sim.model.geom_name2id(f'moveable_box{i}')
for i in range(self.curr_n_boxes)])
self.box_qpos_idxs = np.array([qpos_idxs_from_joint_prefix(sim, f'moveable_box{i}:')
for i in range(self.curr_n_boxes)])
self.box_qvel_idxs = np.array([qvel_idxs_from_joint_prefix(sim, f'moveable_box{i}:')
for i in range(self.curr_n_boxes)])
if self.mark_box_corners:
self.box_corner_idxs = np.array([sim.model.site_name2id(f'moveable_box{i}_corner{j}')
for i in range(self.curr_n_boxes)
for j in range(4)])
def observation_step(self, env, sim):
qpos = sim.data.qpos.copy()
qvel = sim.data.qvel.copy()
box_inds = np.expand_dims(np.arange(self.curr_n_boxes), -1)
box_geom_idxs = np.expand_dims(self.box_geom_idxs, -1)
box_qpos = qpos[self.box_qpos_idxs]
box_qvel = qvel[self.box_qvel_idxs]
box_angle = normalize_angles(box_qpos[:, 3:])
polar_angle = np.concatenate([np.cos(box_angle), np.sin(box_angle)], -1)
if self.polar_obs:
box_qpos = np.concatenate([box_qpos[:, :3], polar_angle], -1)
box_obs = np.concatenate([box_qpos, box_qvel], -1)
if self.boxid_obs:
box_obs = np.concatenate([box_obs, box_inds], -1)
if self.n_elongated_boxes[1] > 0 or self.boxsize_obs:
box_obs = np.concatenate([box_obs, self.box_size_array], -1)
obs = {'box_obs': box_obs,
'box_angle': box_angle,
'box_geom_idxs': box_geom_idxs,
'box_pos': box_qpos[:, :3],
'box_xpos': sim.data.geom_xpos[self.box_geom_idxs]}
if self.mark_box_corners:
obs.update({'box_corner_pos': sim.data.site_xpos[self.box_corner_idxs],
'box_corner_idxs': np.expand_dims(self.box_corner_idxs, -1)})
return obs
class Ramps(EnvModule):
'''
Add moveable ramps to the environment.
Args:
n_ramps (int): number of ramps
placement_fn (fn or list of fns): See mae_envs.modules.util:rejection_placement for spec
If list of functions, then it is assumed there is one function given per ramp
friction (float): ramp friction
polar_obs (bool): Give observations about rotation in polar coordinates
pad_ramp_size (bool): pads 3 rows of zeros to the ramp observation. This makes
ramp observations match the dimensions of elongated box observations.
'''
@store_args
def __init__(self, n_ramps, placement_fn=None, friction=None, polar_obs=True,
pad_ramp_size=False):
pass
def build_world_step(self, env, floor, floor_size):
successful_placement = True
env.metadata['curr_n_ramps'] = np.ones((self.n_ramps)).astype(np.bool)
for i in range(self.n_ramps):
char = chr(ord('A') + i % 26)
geom = geom = ObjFromXML('ramp', name=f"ramp{i}")
geom.set_material(Material(texture="chars/" + char + ".png"))
if self.friction is not None:
geom.add_transform(set_geom_attr_transform('friction', self.friction))
if self.placement_fn is not None:
_placement_fn = (self.placement_fn[i]
if isinstance(self.placement_fn, list)
else self.placement_fn)
pos, _ = rejection_placement(env, _placement_fn, floor_size, get_size_from_xml(geom))
if pos is not None:
floor.append(geom, placement_xy=pos)
else:
successful_placement = False
else:
floor.append(geom)
return successful_placement
def modify_sim_step(self, env, sim):
# Cache qpos, qvel idxs
self.ramp_qpos_idxs = np.array([qpos_idxs_from_joint_prefix(sim, f'ramp{i}')
for i in range(self.n_ramps)])
self.ramp_qvel_idxs = np.array([qvel_idxs_from_joint_prefix(sim, f'ramp{i}')
for i in range(self.n_ramps)])
self.ramp_geom_idxs = np.array([sim.model.geom_name2id(f'ramp{i}:ramp')
for i in range(self.n_ramps)])
def observation_step(self, env, sim):
qpos = sim.data.qpos.copy()
qvel = sim.data.qvel.copy()
ramp_geom_idxs = np.expand_dims(self.ramp_geom_idxs, -1)
ramp_qpos = qpos[self.ramp_qpos_idxs]
ramp_qvel = qvel[self.ramp_qvel_idxs]
ramp_angle = normalize_angles(ramp_qpos[:, 3:])
polar_angle = np.concatenate([np.cos(ramp_angle), np.sin(ramp_angle)], -1)
if self.polar_obs:
ramp_qpos = np.concatenate([ramp_qpos[:, :3], polar_angle], -1)
ramp_obs = np.concatenate([ramp_qpos, ramp_qvel], -1)
if self.pad_ramp_size:
ramp_obs = np.concatenate([ramp_obs, np.zeros((ramp_obs.shape[0], 3))], -1)
obs = {'ramp_obs': ramp_obs,
'ramp_angle': ramp_angle,
'ramp_geom_idxs': ramp_geom_idxs,
'ramp_pos': ramp_qpos[:, :3]}
return obs
class Cylinders(EnvModule):
'''
Add cylinders to the environment.
Args:
n_objects (int): Number of cylinders
diameter (float or (float, float)): Diameter of cylinders. If tuple of floats, every
episode the diameter is drawn uniformly from (diameter[0], diameter[1]).
(Note that all cylinders within an episode still share the same diameter)
height (float or (float, float)): Height of cylinders. If tuple of floats, every
episode the height is drawn uniformly from (height[0], height[1]).
(Note that all cylinders within an episode still share the same height)
make_static (bool): Makes the cylinders static, preventing them from moving. Note that
the observations (and observation keys) are different when make_static=True
placement_fn (fn or list of fns): See mae_envs.modules.util:rejection_placement for spec
If list of functions, then it is assumed there is one function given per cylinder
rgba ([float, float, float, float]): Determines cylinder color.
'''
@store_args
def __init__(self, n_objects, diameter, height, make_static=False,
placement_fn=None, rgba=[1., 1., 1., 1.]):
if type(diameter) not in [list, np.ndarray]:
self.diameter = [diameter, diameter]
if type(height) not in [list, np.ndarray]:
self.height = [height, height]
def build_world_step(self, env, floor, floor_size):
default_name = 'static_cylinder' if self.make_static else 'moveable_cylinder'
diameter = env._random_state.uniform(self.diameter[0], self.diameter[1])
height = env._random_state.uniform(self.height[0], self.height[1])
obj_size = (diameter, height, 0)
successful_placement = True
for i in range(self.n_objects):
geom = Geom('cylinder', obj_size, name=f'{default_name}{i}', rgba=self.rgba)
if self.make_static:
geom.mark_static()
if self.placement_fn is not None:
_placement_fn = (self.placement_fn[i]
if isinstance(self.placement_fn, list)
else self.placement_fn)
pos, _ = rejection_placement(env, _placement_fn, floor_size, diameter * np.ones(2))
if pos is not None:
floor.append(geom, placement_xy=pos)
else:
successful_placement = False
else:
floor.append(geom)
return successful_placement
def modify_sim_step(self, env, sim):
if self.make_static:
self.s_cylinder_geom_idxs = np.array([sim.model.geom_name2id(f'static_cylinder{i}')
for i in range(self.n_objects)])
else:
self.m_cylinder_geom_idxs = np.array([sim.model.geom_name2id(f'moveable_cylinder{i}')
for i in range(self.n_objects)])
qpos_idxs = [qpos_idxs_from_joint_prefix(sim, f'moveable_cylinder{i}')
for i in range(self.n_objects)]
qvel_idxs = [qvel_idxs_from_joint_prefix(sim, f'moveable_cylinder{i}')
for i in range(self.n_objects)]
self.m_cylinder_qpos_idxs = np.array(qpos_idxs)
self.m_cylinder_qvel_idxs = np.array(qvel_idxs)
def observation_step(self, env, sim):
qpos = sim.data.qpos.copy()
qvel = sim.data.qvel.copy()
if self.make_static:
s_cylinder_geom_idxs = np.expand_dims(self.s_cylinder_geom_idxs, -1)
s_cylinder_xpos = sim.data.geom_xpos[self.s_cylinder_geom_idxs]
obs = {'static_cylinder_geom_idxs': s_cylinder_geom_idxs,
'static_cylinder_xpos': s_cylinder_xpos}
else:
m_cylinder_geom_idxs = np.expand_dims(self.m_cylinder_geom_idxs, -1)
m_cylinder_xpos = sim.data.geom_xpos[self.m_cylinder_geom_idxs]
m_cylinder_qpos = qpos[self.m_cylinder_qpos_idxs]
m_cylinder_qvel = qvel[self.m_cylinder_qvel_idxs]
mc_angle = normalize_angles(m_cylinder_qpos[:, 3:])
polar_angle = np.concatenate([np.cos(mc_angle), np.sin(mc_angle)], -1)
m_cylinder_qpos = np.concatenate([m_cylinder_qpos[:, :3], polar_angle], -1)
m_cylinder_obs = np.concatenate([m_cylinder_qpos, m_cylinder_qvel], -1)
obs = {'moveable_cylinder_geom_idxs': m_cylinder_geom_idxs,
'moveable_cylinder_xpos': m_cylinder_xpos,
'moveable_cylinder_obs': m_cylinder_obs}
return obs
class LidarSites(EnvModule):
'''
Adds sites to visualize Lidar rays
Args:
n_agents (int): number of agents
n_lidar_per_agent (int): number of lidar sites per agent
'''
@store_args
def __init__(self, n_agents, n_lidar_per_agent):
pass
def build_world_step(self, env, floor, floor_size):
for i in range(self.n_agents):
for j in range(self.n_lidar_per_agent):
floor.mark(f"agent{i}:lidar{j}", (0.0, 0.0, 0.0), rgba=np.zeros((4,)))
return True
def modify_sim_step(self, env, sim):
# set lidar size and shape
self.lidar_ids = np.array([[sim.model.site_name2id(f"agent{i}:lidar{j}")
for j in range(self.n_lidar_per_agent)]
for i in range(self.n_agents)])
# set lidar site shape to cylinder
sim.model.site_type[self.lidar_ids] = 5
sim.model.site_size[self.lidar_ids, 0] = 0.02