in mae_envs/envs/blueprint_construction.py [0:0]
def make_env(n_substeps=15, horizon=80, deterministic_mode=False,
floor_size=6.0, grid_size=30,
n_agents=1,
n_rooms=4, random_room_number=True, scenario='empty', door_size=2,
n_sites=3, n_elongated_sites=0, site_placement='uniform_away_from_walls',
reward_infos=[{'type': 'construction_dense'}],
n_boxes=2, n_elongated_boxes=0,
n_min_boxes=None, box_size=0.5, box_only_z_rot=False,
lock_box=True, grab_box=True, grab_selective=False, lock_grab_radius=0.25,
lock_type='any_lock_specific', grab_exclusive=False,
grab_out_of_vision=False, lock_out_of_vision=True,
box_floor_friction=0.2, other_friction=0.01, gravity=[0, 0, -50],
action_lims=(-0.9, 0.9), polar_obs=True,
n_lidar_per_agent=0, visualize_lidar=False, compress_lidar_scale=None,
boxid_obs=True, boxsize_obs=True, team_size_obs=False, additional_obs={}):
grab_radius_multiplier = lock_grab_radius / box_size
lock_radius_multiplier = lock_grab_radius / box_size
if type(n_sites) not in [list, np.ndarray]:
n_sites = [n_sites, n_sites]
env = Base(n_agents=n_agents, n_substeps=n_substeps, horizon=horizon,
floor_size=floor_size, grid_size=grid_size,
action_lims=action_lims, deterministic_mode=deterministic_mode)
if scenario == 'randomwalls':
env.add_module(RandomWalls(grid_size=grid_size, num_rooms=n_rooms,
random_room_number=random_room_number, min_room_size=6,
door_size=door_size, gen_door_obs=False))
elif scenario == 'empty':
env.add_module(WallScenarios(grid_size=grid_size, door_size=door_size,
scenario='empty',
friction=other_friction))
env.add_module(Agents(n_agents,
placement_fn=uniform_placement,
color=[np.array((66., 235., 244., 255.)) / 255] * n_agents,
friction=other_friction,
polar_obs=polar_obs))
if np.max(n_boxes) > 0:
env.add_module(Boxes(n_boxes=n_boxes, placement_fn=uniform_placement,
friction=box_floor_friction, polar_obs=polar_obs,
n_elongated_boxes=n_elongated_boxes,
boxid_obs=boxid_obs, boxsize_obs=boxsize_obs,
box_size=box_size,
box_only_z_rot=box_only_z_rot,
mark_box_corners=True))
if n_sites[1] > 0:
if site_placement == 'center':
site_placement_fn = center_placement
elif site_placement == 'uniform':
site_placement_fn = uniform_placement
elif site_placement == 'uniform_away_from_walls':
site_placement_fn = uniform_placement_middle(0.85)
else:
raise ValueError(f'Site placement option: {site_placement} not implemented.'
' Please choose from center, uniform and uniform_away_from_walls.')
env.add_module(ConstructionSites(n_sites, placement_fn=site_placement_fn,
site_size=box_size, site_height=box_size / 2,
n_elongated_sites=n_elongated_sites))
if n_lidar_per_agent > 0 and visualize_lidar:
env.add_module(LidarSites(n_agents=n_agents, n_lidar_per_agent=n_lidar_per_agent))
if np.max(n_boxes) > 0 and grab_box:
env.add_module(AgentManipulation())
if box_floor_friction is not None:
env.add_module(FloorAttributes(friction=box_floor_friction))
env.add_module(WorldConstants(gravity=gravity))
env.reset()
keys_self = ['agent_qpos_qvel', 'hider', 'prep_obs']
keys_mask_self = ['mask_aa_obs']
keys_external = ['agent_qpos_qvel', 'construction_site_obs']
keys_copy = ['you_lock', 'team_lock', 'ramp_you_lock', 'ramp_team_lock']
keys_mask_external = []
env = AddConstantObservationsWrapper(env, new_obs=additional_obs)
keys_external += list(additional_obs)
keys_mask_external += [ob for ob in additional_obs if 'mask' in ob]
env = SplitMultiAgentActions(env)
if team_size_obs:
keys_self += ['team_size']
env = TeamMembership(env, np.zeros((n_agents,)))
env = AgentAgentObsMask2D(env)
env = DiscretizeActionWrapper(env, 'action_movement')
if np.max(n_boxes) > 0:
env = AgentGeomObsMask2D(env, pos_obs_key='box_pos', mask_obs_key='mask_ab_obs',
geom_idxs_obs_key='box_geom_idxs')
keys_external += ['mask_ab_obs', 'box_obs']
keys_mask_external.append('mask_ab_obs')
if lock_box and np.max(n_boxes) > 0:
agent_allowed_to_lock_keys = None if lock_out_of_vision else ["mask_ab_obs"]
env = LockObjWrapper(env, body_names=[f'moveable_box{i}' for i in range(n_boxes)],
agent_idx_allowed_to_lock=np.arange(n_agents),
lock_type=lock_type,
radius_multiplier=lock_radius_multiplier,
obj_in_game_metadata_keys=["curr_n_boxes"],
agent_allowed_to_lock_keys=agent_allowed_to_lock_keys)
if grab_box and np.max(n_boxes) > 0:
env = GrabObjWrapper(env, [f'moveable_box{i}' for i in range(n_boxes)],
radius_multiplier=grab_radius_multiplier,
grab_exclusive=grab_exclusive,
obj_in_game_metadata_keys=['curr_n_boxes'])
if n_lidar_per_agent > 0:
env = Lidar(env, n_lidar_per_agent=n_lidar_per_agent, visualize_lidar=visualize_lidar,
compress_lidar_scale=compress_lidar_scale)
keys_copy += ['lidar']
keys_external += ['lidar']
env = ConstructionDistancesWrapper(env)
env = NumpyArrayRewardWrapper(env)
reward_wrappers = {
'construction_dense': ConstructionDenseRewardWrapper,
'construction_completed': ConstructionCompletedRewardWrapper,
}
for rew_info in reward_infos:
rew_type = rew_info['type']
del rew_info['type']
env = reward_wrappers[rew_type](env, **rew_info)
env = SplitObservations(env, keys_self + keys_mask_self, keys_copy=keys_copy)
if n_agents == 1:
env = SpoofEntityWrapper(env, 2, ['agent_qpos_qvel', 'hider', 'prep_obs'], ['mask_aa_obs'])
env = SpoofEntityWrapper(env, n_boxes,
['box_obs', 'you_lock', 'team_lock', 'obj_lock'],
['mask_ab_obs'])
env = SpoofEntityWrapper(env, n_sites[1], ['construction_site_obs'], ['mask_acs_obs'])
keys_mask_external += ['mask_ab_obs_spoof', 'mask_acs_obs_spoof']
env = LockAllWrapper(env, remove_object_specific_lock=True)
if not grab_out_of_vision and grab_box:
env = MaskActionWrapper(env, 'action_pull', ['mask_ab_obs']) # Can only pull if in vision
if not grab_selective and grab_box:
env = GrabClosestWrapper(env)
env = DiscardMujocoExceptionEpisodes(env)
env = ConcatenateObsWrapper(env, {'agent_qpos_qvel': ['agent_qpos_qvel', 'hider', 'prep_obs'],
'box_obs': ['box_obs', 'you_lock', 'team_lock', 'obj_lock']})
env = SelectKeysWrapper(env, keys_self=keys_self,
keys_other=keys_external + keys_mask_self + keys_mask_external)
return env