mae_envs/envs/blueprint_construction.py (238 lines of code) (raw):
import numpy as np
import gym
from mae_envs.wrappers.multi_agent import (SplitMultiAgentActions, SplitObservations,
SelectKeysWrapper)
from mae_envs.wrappers.util import (DiscretizeActionWrapper, MaskActionWrapper,
DiscardMujocoExceptionEpisodes, SpoofEntityWrapper,
AddConstantObservationsWrapper,
ConcatenateObsWrapper, NumpyArrayRewardWrapper)
from mae_envs.wrappers.manipulation import (GrabObjWrapper, GrabClosestWrapper,
LockObjWrapper, LockAllWrapper)
from mae_envs.wrappers.lidar import Lidar
from mae_envs.wrappers.team import TeamMembership
from mae_envs.wrappers.line_of_sight import AgentAgentObsMask2D, AgentGeomObsMask2D
from mae_envs.envs.base import Base
from mae_envs.modules.agents import Agents, AgentManipulation
from mae_envs.modules.construction_sites import ConstructionSites
from mae_envs.modules.walls import WallScenarios, RandomWalls
from mae_envs.modules.objects import Boxes, LidarSites
from mae_envs.modules.world import FloorAttributes, WorldConstants
from mae_envs.modules.util import (uniform_placement, center_placement,
uniform_placement_middle)
class ConstructionDistancesWrapper(gym.ObservationWrapper):
'''
Calculates the distance between every pair of boxes, between boxes and
construction sites, and between box corners and construction site corners.
This wrapper should be only be applied if the both the Boxes module (with
mark_box_corners set to True) and the ConstructionSites module have been
added to the environment.
'''
def __init__(self, env):
super().__init__(env)
def observation(self, obs):
box_xpos = obs['box_xpos']
boxcorner_pos = obs['box_corner_pos']
site_pos = obs['construction_site_pos']
sitecorner_pos = obs['construction_site_corner_pos']
box_box_dist = np.linalg.norm(box_xpos[..., None] - box_xpos.T[None, ...], axis=1)
box_site_dist = np.linalg.norm(box_xpos[..., None] - site_pos.T[None, ...], axis=1)
boxcorner_sitecorner_dist = (
np.linalg.norm(boxcorner_pos[..., None] - sitecorner_pos.T[None, ...], axis=1))
obs.update({'box_box_dist': box_box_dist,
'box_site_dist': box_site_dist,
'boxcorner_sitecorner_dist': boxcorner_sitecorner_dist})
return obs
class ConstructionDenseRewardWrapper(gym.Wrapper):
'''
Adds a dense reward for placing the boxes at the construction site locations.
Reward is based on the smoothmin distance between each site and all the boxes.
Args:
use_corners (bool): Whether to calculate reward based solely on the distances
between box centers and site centers, or also based on the distances
between box corners and site corners.
alpha (float): Smoothing parameter. Should be nonpositive.
reward_scale (float): scales the reward by this factor
'''
def __init__(self, env, use_corners=False, alpha=-8, reward_scale=1):
super().__init__(env)
assert alpha < 0, 'alpha must be negative for the SmoothMin function to work'
self.alpha = alpha
self.reward_scale = reward_scale
self.use_corners = use_corners
def step(self, action):
obs, rew, done, info = self.env.step(action)
box_site_dist = (obs['boxcorner_sitecorner_dist']
if self.use_corners
else obs['box_site_dist'])
scaling_factors = np.exp(self.alpha * box_site_dist)
site_box_smoothmin_dists = (np.sum(box_site_dist * scaling_factors, axis=0) /
np.sum(scaling_factors, axis=0))
rew -= np.mean(site_box_smoothmin_dists) * self.reward_scale
return obs, rew, done, info
class ConstructionCompletedRewardWrapper(gym.Wrapper):
'''
Adds a sparse reward and ends the episode after all construction sites have been
'activated' by having a box within a certain distance of them. The reward is based
on the number of construction sites in the episode.
Args:
use_corners (bool): Whether to calculate if construction is finished based
solely on the distances between box centers and site centers, or also
based on the distances between box corners and site corners.
site_activation_radius (float): a site is considered 'activated' if there is
at least one box within the site activation radius.
reward_scale (float): scales the reward by this factor
'''
def __init__(self, env, use_corners=False, site_activation_radius=0.2, reward_scale=1):
super().__init__(env)
self.n_sites = self.metadata['curr_n_sites']
self.site_activation_radius = site_activation_radius
self.reward_scale = reward_scale
self.use_corners = use_corners
def reset(self):
obs = self.env.reset()
self.n_sites = self.metadata['curr_n_sites']
return obs
def step(self, action):
obs, rew, done, info = self.env.step(action)
site_dist_to_closest_box = obs['box_site_dist'].min(axis=0)
sitecorner_dist_to_closest_boxcorner = obs['boxcorner_sitecorner_dist'].min(axis=0)
activated_sites = site_dist_to_closest_box < self.site_activation_radius
aligned_corners = sitecorner_dist_to_closest_boxcorner < self.site_activation_radius
all_sites_activated = np.all(activated_sites)
all_corners_aligned = np.all(aligned_corners)
construction_completed = ((all_sites_activated and not self.use_corners) or
(all_sites_activated and all_corners_aligned))
if construction_completed:
rew += self.n_sites * self.reward_scale
done = True
return obs, rew, done, info
def make_env(n_substeps=15, horizon=80, deterministic_mode=False,
floor_size=6.0, grid_size=30,
n_agents=1,
n_rooms=4, random_room_number=True, scenario='empty', door_size=2,
n_sites=3, n_elongated_sites=0, site_placement='uniform_away_from_walls',
reward_infos=[{'type': 'construction_dense'}],
n_boxes=2, n_elongated_boxes=0,
n_min_boxes=None, box_size=0.5, box_only_z_rot=False,
lock_box=True, grab_box=True, grab_selective=False, lock_grab_radius=0.25,
lock_type='any_lock_specific', grab_exclusive=False,
grab_out_of_vision=False, lock_out_of_vision=True,
box_floor_friction=0.2, other_friction=0.01, gravity=[0, 0, -50],
action_lims=(-0.9, 0.9), polar_obs=True,
n_lidar_per_agent=0, visualize_lidar=False, compress_lidar_scale=None,
boxid_obs=True, boxsize_obs=True, team_size_obs=False, additional_obs={}):
grab_radius_multiplier = lock_grab_radius / box_size
lock_radius_multiplier = lock_grab_radius / box_size
if type(n_sites) not in [list, np.ndarray]:
n_sites = [n_sites, n_sites]
env = Base(n_agents=n_agents, n_substeps=n_substeps, horizon=horizon,
floor_size=floor_size, grid_size=grid_size,
action_lims=action_lims, deterministic_mode=deterministic_mode)
if scenario == 'randomwalls':
env.add_module(RandomWalls(grid_size=grid_size, num_rooms=n_rooms,
random_room_number=random_room_number, min_room_size=6,
door_size=door_size, gen_door_obs=False))
elif scenario == 'empty':
env.add_module(WallScenarios(grid_size=grid_size, door_size=door_size,
scenario='empty',
friction=other_friction))
env.add_module(Agents(n_agents,
placement_fn=uniform_placement,
color=[np.array((66., 235., 244., 255.)) / 255] * n_agents,
friction=other_friction,
polar_obs=polar_obs))
if np.max(n_boxes) > 0:
env.add_module(Boxes(n_boxes=n_boxes, placement_fn=uniform_placement,
friction=box_floor_friction, polar_obs=polar_obs,
n_elongated_boxes=n_elongated_boxes,
boxid_obs=boxid_obs, boxsize_obs=boxsize_obs,
box_size=box_size,
box_only_z_rot=box_only_z_rot,
mark_box_corners=True))
if n_sites[1] > 0:
if site_placement == 'center':
site_placement_fn = center_placement
elif site_placement == 'uniform':
site_placement_fn = uniform_placement
elif site_placement == 'uniform_away_from_walls':
site_placement_fn = uniform_placement_middle(0.85)
else:
raise ValueError(f'Site placement option: {site_placement} not implemented.'
' Please choose from center, uniform and uniform_away_from_walls.')
env.add_module(ConstructionSites(n_sites, placement_fn=site_placement_fn,
site_size=box_size, site_height=box_size / 2,
n_elongated_sites=n_elongated_sites))
if n_lidar_per_agent > 0 and visualize_lidar:
env.add_module(LidarSites(n_agents=n_agents, n_lidar_per_agent=n_lidar_per_agent))
if np.max(n_boxes) > 0 and grab_box:
env.add_module(AgentManipulation())
if box_floor_friction is not None:
env.add_module(FloorAttributes(friction=box_floor_friction))
env.add_module(WorldConstants(gravity=gravity))
env.reset()
keys_self = ['agent_qpos_qvel', 'hider', 'prep_obs']
keys_mask_self = ['mask_aa_obs']
keys_external = ['agent_qpos_qvel', 'construction_site_obs']
keys_copy = ['you_lock', 'team_lock', 'ramp_you_lock', 'ramp_team_lock']
keys_mask_external = []
env = AddConstantObservationsWrapper(env, new_obs=additional_obs)
keys_external += list(additional_obs)
keys_mask_external += [ob for ob in additional_obs if 'mask' in ob]
env = SplitMultiAgentActions(env)
if team_size_obs:
keys_self += ['team_size']
env = TeamMembership(env, np.zeros((n_agents,)))
env = AgentAgentObsMask2D(env)
env = DiscretizeActionWrapper(env, 'action_movement')
if np.max(n_boxes) > 0:
env = AgentGeomObsMask2D(env, pos_obs_key='box_pos', mask_obs_key='mask_ab_obs',
geom_idxs_obs_key='box_geom_idxs')
keys_external += ['mask_ab_obs', 'box_obs']
keys_mask_external.append('mask_ab_obs')
if lock_box and np.max(n_boxes) > 0:
agent_allowed_to_lock_keys = None if lock_out_of_vision else ["mask_ab_obs"]
env = LockObjWrapper(env, body_names=[f'moveable_box{i}' for i in range(n_boxes)],
agent_idx_allowed_to_lock=np.arange(n_agents),
lock_type=lock_type,
radius_multiplier=lock_radius_multiplier,
obj_in_game_metadata_keys=["curr_n_boxes"],
agent_allowed_to_lock_keys=agent_allowed_to_lock_keys)
if grab_box and np.max(n_boxes) > 0:
env = GrabObjWrapper(env, [f'moveable_box{i}' for i in range(n_boxes)],
radius_multiplier=grab_radius_multiplier,
grab_exclusive=grab_exclusive,
obj_in_game_metadata_keys=['curr_n_boxes'])
if n_lidar_per_agent > 0:
env = Lidar(env, n_lidar_per_agent=n_lidar_per_agent, visualize_lidar=visualize_lidar,
compress_lidar_scale=compress_lidar_scale)
keys_copy += ['lidar']
keys_external += ['lidar']
env = ConstructionDistancesWrapper(env)
env = NumpyArrayRewardWrapper(env)
reward_wrappers = {
'construction_dense': ConstructionDenseRewardWrapper,
'construction_completed': ConstructionCompletedRewardWrapper,
}
for rew_info in reward_infos:
rew_type = rew_info['type']
del rew_info['type']
env = reward_wrappers[rew_type](env, **rew_info)
env = SplitObservations(env, keys_self + keys_mask_self, keys_copy=keys_copy)
if n_agents == 1:
env = SpoofEntityWrapper(env, 2, ['agent_qpos_qvel', 'hider', 'prep_obs'], ['mask_aa_obs'])
env = SpoofEntityWrapper(env, n_boxes,
['box_obs', 'you_lock', 'team_lock', 'obj_lock'],
['mask_ab_obs'])
env = SpoofEntityWrapper(env, n_sites[1], ['construction_site_obs'], ['mask_acs_obs'])
keys_mask_external += ['mask_ab_obs_spoof', 'mask_acs_obs_spoof']
env = LockAllWrapper(env, remove_object_specific_lock=True)
if not grab_out_of_vision and grab_box:
env = MaskActionWrapper(env, 'action_pull', ['mask_ab_obs']) # Can only pull if in vision
if not grab_selective and grab_box:
env = GrabClosestWrapper(env)
env = DiscardMujocoExceptionEpisodes(env)
env = ConcatenateObsWrapper(env, {'agent_qpos_qvel': ['agent_qpos_qvel', 'hider', 'prep_obs'],
'box_obs': ['box_obs', 'you_lock', 'team_lock', 'obj_lock']})
env = SelectKeysWrapper(env, keys_self=keys_self,
keys_other=keys_external + keys_mask_self + keys_mask_external)
return env