mae_envs/envs/box_locking.py (310 lines of code) (raw):

import gym import numpy as np from mae_envs.wrappers.multi_agent import (SplitMultiAgentActions, SplitObservations, SelectKeysWrapper) from mae_envs.wrappers.util import (DiscretizeActionWrapper, MaskActionWrapper, DiscardMujocoExceptionEpisodes, AddConstantObservationsWrapper, SpoofEntityWrapper, ConcatenateObsWrapper) from mae_envs.wrappers.manipulation import (GrabObjWrapper, GrabClosestWrapper, LockObjWrapper, LockAllWrapper) from mae_envs.wrappers.lidar import Lidar from mae_envs.wrappers.line_of_sight import AgentAgentObsMask2D, AgentGeomObsMask2D from mae_envs.wrappers.team import TeamMembership from mae_envs.wrappers.util import NumpyArrayRewardWrapper from mae_envs.modules.agents import Agents, AgentManipulation from mae_envs.modules.walls import RandomWalls, WallScenarios from mae_envs.modules.objects import Boxes, Ramps, LidarSites from mae_envs.modules.world import FloorAttributes, WorldConstants from mae_envs.modules.util import uniform_placement, center_placement from mae_envs.envs.base import Base from mae_envs.envs.hide_and_seek import quadrant_placement class LockObjectsTask(gym.Wrapper): """ Reward wrapper for the lock object family of tasks. The reward consists of four components: (1) A fixed reward for locking a new box; (2) A fixed penalty for unlocking a previously locked box; (3) A shaped reward proportional to the reduction in distance between the agent and its next target (which is either the next box that needs to be locked or the agent's spawning point) (4) A success reward that is received in every timestep during which the task is completed. E.g. in the 'all' task, the success reward is received in every timestep where all boxes are locked; but if the agent locks all boxes and later accidentally unlocks a box, it will stop receiving the reward until all boxes are locked again. Args: n_objs (int): number of objects task (str): {'all', 'order'} + ['-return'] 'all': success when all boxes are locked 'order': success when boxes are locked in a specific order 'xxx-return': after finishing the task of 'xxx', the agent needs to return to the location it was spawned at the beginning of the episode. fixed_order (bool): if True, the order/selection of boxes to lock for a success will always be fixed obj_lock_obs_key (str): Observation key for which objects are currently locked. obj_pos_obs_key (str): Observation key for object positions act_lock_key (str): Action key for lock action agent_pos_key (str): Observation key for agent positions lock_reward (float): Reward for locking a box unlock_penalty (float): Penalty for unlocking a box shaped_reward_scale (float): Scales the shaped reward by this factor success_reward (float): This reward is received in every timestep during which the task is completed. return_threshold (float): In 'xxx-return' tasks, after finishing the base task the agent needs to return within this distance of its original spawning position in order for the task to be considered completed """ def __init__(self, env, n_objs, task='all', fixed_order=False, obj_lock_obs_key='obj_lock', obj_pos_obs_key='box_pos', act_lock_key='action_glue', agent_pos_key='agent_pos', lock_reward=5.0, unlock_penalty=10.0, shaped_reward_scale=1.0, success_reward=1, return_threshold=0.1): super().__init__(env) self.n_objs = n_objs self.task = task or 'all' assert task in ['all', 'order', 'all-return', 'order-return'], ( f'task {task} is currently not supported') self.need_return = 'return' in self.task self.return_threshold = return_threshold if self.need_return: self.task = self.task.replace('-return', '') self.n_agents = self.unwrapped.n_agents assert self.n_agents == 1, 'The locking tasks only support 1 agent' self.agent_key = agent_pos_key self.obj_order = list(range(self.n_objs)) self.fixed_order = fixed_order self.lock_key = obj_lock_obs_key self.pos_key = obj_pos_obs_key self.act_key = act_lock_key self.lock_reward = lock_reward self.unlock_penalty = unlock_penalty self.shaped_reward_scale = shaped_reward_scale self.success_reward = success_reward self.objs_locked = np.zeros((n_objs, ), dtype=np.int8) self.spawn_pos = None self.spawn_pos_dist = None self.next_obj = None self.next_obj_dist = 0 self.unlocked_objs = [] def reset(self): if not self.fixed_order: np.random.shuffle(self.obj_order) self.objs_locked[:] = 0 self.unlocked_objs = self.obj_order obs = self.env.reset() self.spawn_pos = obs[self.agent_key][0, :2] self.spawn_pos_dist = 0 self.next_obj, self.next_obj_dist = self._get_next_obj(obs) return obs def _get_next_obj(self, obs): ''' Return the next object that needs to be locked & the distance to it. ''' agent_pos = obs[self.agent_key][:, :2] if len(self.unlocked_objs) == 0: next_obj = None next_obj_dist = 0 elif self.task == 'order': next_obj = self.unlocked_objs[0] next_obj_pos = obs[self.pos_key][next_obj, :2] next_obj_dist = np.linalg.norm(agent_pos - next_obj_pos) elif self.task == 'all': obj_dist = [(np.linalg.norm(obs[self.pos_key][i, :2] - agent_pos), i) for i in self.unlocked_objs] next_obj_dist, next_obj = min(obj_dist) return next_obj, next_obj_dist def _get_lock_reward(self, curr_objs_locked, old_objs_locked): ''' Calculates the locking reward / unlocking penalty ''' n_new_lock = np.sum(np.logical_and(curr_objs_locked == 1, old_objs_locked == 0)) n_new_unlock = np.sum(np.logical_and(curr_objs_locked == 0, old_objs_locked == 1)) lock_reward = n_new_lock * self.lock_reward - n_new_unlock * self.unlock_penalty return lock_reward def _get_shaped_reward(self, new_next_obj, new_next_obj_dist, new_spawn_pos_dist): ''' Calculates the shaped reward based on the change in distance from the target ''' rew = 0 if (self.next_obj is not None) and (new_next_obj == self.next_obj): rew += (self.next_obj_dist - new_next_obj_dist) * self.shaped_reward_scale elif ((self.next_obj is not None) and (new_next_obj != self.next_obj)): if self.objs_locked[self.next_obj] == 1: # previous target object locked rew += self.next_obj_dist * self.shaped_reward_scale else: # previously locked object unlocked rew -= new_next_obj_dist * self.shaped_reward_scale elif (self.next_obj is None) and (new_next_obj is not None): # previously locked object unlocked rew -= new_next_obj_dist * self.shaped_reward_scale elif (self.next_obj is None) and (new_next_obj is None): if self.need_return: # all objects locked; agent is rewarded for returning to its spawning point rew += (self.spawn_pos_dist - new_spawn_pos_dist) * self.shaped_reward_scale return rew def step(self, action): if self.task == 'order': """ you can unlock any locked objs but only lock objs when all previous ones are locked """ if len(self.unlocked_objs) > 1: action[self.act_key][:, self.unlocked_objs[1:]] = 0 obs, rew, done, info = self.env.step(action) curr_objs_locked = obs[self.lock_key].flatten().astype(np.int8) rew += self._get_lock_reward(curr_objs_locked, old_objs_locked=self.objs_locked) self.objs_locked = curr_objs_locked self.unlocked_objs = [i for i in self.obj_order if self.objs_locked[i] == 0] new_next_obj, new_next_obj_dist = self._get_next_obj(obs) agent_pos = obs[self.agent_key][:, :2] new_spawn_pos_dist = np.linalg.norm(agent_pos - self.spawn_pos) rew += self._get_shaped_reward(new_next_obj, new_next_obj_dist, new_spawn_pos_dist) self.spawn_pos_dist = new_spawn_pos_dist self.next_obj_dist = new_next_obj_dist self.next_obj = new_next_obj n_unlocked = len(self.unlocked_objs) if n_unlocked == 0 and ((not self.need_return) or self.spawn_pos_dist <= self.return_threshold): # reward for successfully completing the task rew += self.success_reward return obs, rew, done, info def tri_placement(tri_room_idx): ''' This function expects the wall scenario to be 'var_tri' Returns a placement function that randomly places objects in the room with index tri_room_idx ''' def placement(grid, obj_size, metadata, random_state): assert 'tri_room_grid_cell_range' in metadata x_rag, y_rag = metadata['tri_room_grid_cell_range'][tri_room_idx] pos = np.array([random_state.randint(x_rag[0], x_rag[1] - obj_size[0]), random_state.randint(y_rag[0], y_rag[1] - obj_size[1])]) return pos return placement def rotate_tri_placement(grid, obj_size, metadata, random_state): ''' This function expects the wall scenario to be 'var_tri'. It places objects equally among the three rooms, so that any room has contains at most 1 more object than any other room. ''' if 'tri_placement_rotation' not in metadata: metadata['tri_placement_rotation'] = [] filled_rooms = metadata['tri_placement_rotation'] if len(filled_rooms) == 3: filled_rooms = [] available_rooms = [i for i in range(3) if i not in filled_rooms] n_available_rooms = len(available_rooms) next_room = available_rooms[random_state.randint(0, 10000) % n_available_rooms] filled_rooms.append(next_room) metadata['tri_placement_rotation'] = filled_rooms return tri_placement(next_room)(grid, obj_size, metadata, random_state) def make_env(n_substeps=15, horizon=80, deterministic_mode=False, floor_size=6.0, grid_size=30, door_size=2, n_agents=1, fixed_agent_spawn=False, lock_box=True, grab_box=True, grab_selective=False, lock_type='any_lock_specific', lock_grab_radius=0.25, grab_exclusive=False, grab_out_of_vision=False, lock_out_of_vision=True, box_floor_friction=0.2, other_friction=0.01, gravity=[0, 0, -50], action_lims=(-0.9, 0.9), polar_obs=True, scenario='quadrant', p_door_dropout=0.0, n_rooms=4, random_room_number=True, n_lidar_per_agent=0, visualize_lidar=False, compress_lidar_scale=None, n_boxes=2, box_size=0.5, box_only_z_rot=False, boxid_obs=True, boxsize_obs=True, pad_ramp_size=True, additional_obs={}, # lock-box task task_type='all', lock_reward=5.0, unlock_penalty=7.0, shaped_reward_scale=0.25, return_threshold=0.1, # ramps n_ramps=0): grab_radius_multiplier = lock_grab_radius / box_size lock_radius_multiplier = lock_grab_radius / box_size env = Base(n_agents=n_agents, n_substeps=n_substeps, floor_size=floor_size, horizon=horizon, action_lims=action_lims, deterministic_mode=deterministic_mode, grid_size=grid_size) if scenario == 'randomwalls': env.add_module(RandomWalls(grid_size=grid_size, num_rooms=n_rooms, random_room_number=random_room_number, min_room_size=6, door_size=door_size, gen_door_obs=False)) box_placement_fn = uniform_placement ramp_placement_fn = uniform_placement agent_placement_fn = uniform_placement if not fixed_agent_spawn else center_placement elif scenario == 'quadrant': env.add_module(WallScenarios(grid_size=grid_size, door_size=door_size, scenario=scenario, friction=other_friction, p_door_dropout=p_door_dropout)) box_placement_fn = uniform_placement ramp_placement_fn = uniform_placement agent_placement_fn = quadrant_placement if not fixed_agent_spawn else center_placement elif scenario == 'empty': env.add_module(WallScenarios(grid_size=grid_size, door_size=2, scenario='empty')) box_placement_fn = uniform_placement ramp_placement_fn = uniform_placement agent_placement_fn = center_placement elif 'var_tri' in scenario: env.add_module(WallScenarios(grid_size=grid_size, door_size=door_size, scenario='var_tri')) ramp_placement_fn = [tri_placement(i % 3) for i in range(n_ramps)] agent_placement_fn = center_placement if fixed_agent_spawn else \ (uniform_placement if 'uniform' in scenario else rotate_tri_placement) box_placement_fn = uniform_placement if 'uniform' in scenario else rotate_tri_placement else: raise ValueError(f"Scenario {scenario} not supported.") env.add_module(Agents(n_agents, placement_fn=agent_placement_fn, color=[np.array((66., 235., 244., 255.)) / 255] * n_agents, friction=other_friction, polar_obs=polar_obs)) if np.max(n_boxes) > 0: env.add_module(Boxes(n_boxes=n_boxes, placement_fn=box_placement_fn, friction=box_floor_friction, polar_obs=polar_obs, n_elongated_boxes=0, boxid_obs=boxid_obs, box_only_z_rot=box_only_z_rot, boxsize_obs=boxsize_obs)) if n_ramps > 0: env.add_module(Ramps(n_ramps=n_ramps, placement_fn=ramp_placement_fn, friction=other_friction, polar_obs=polar_obs, pad_ramp_size=pad_ramp_size)) if n_lidar_per_agent > 0 and visualize_lidar: env.add_module(LidarSites(n_agents=n_agents, n_lidar_per_agent=n_lidar_per_agent)) if np.max(n_boxes) > 0 and grab_box: env.add_module(AgentManipulation()) if box_floor_friction is not None: env.add_module(FloorAttributes(friction=box_floor_friction)) env.add_module(WorldConstants(gravity=gravity)) env.reset() keys_self = ['agent_qpos_qvel', 'hider', 'prep_obs'] keys_mask_self = ['mask_aa_obs'] keys_external = ['agent_qpos_qvel'] keys_copy = ['you_lock', 'team_lock'] keys_mask_external = [] env = SplitMultiAgentActions(env) env = TeamMembership(env, np.zeros((n_agents,))) env = AgentAgentObsMask2D(env) env = DiscretizeActionWrapper(env, 'action_movement') env = NumpyArrayRewardWrapper(env) if np.max(n_boxes) > 0: env = AgentGeomObsMask2D(env, pos_obs_key='box_pos', mask_obs_key='mask_ab_obs', geom_idxs_obs_key='box_geom_idxs') keys_external += ['mask_ab_obs', 'box_obs'] keys_mask_external.append('mask_ab_obs') if lock_box and np.max(n_boxes) > 0: env = LockObjWrapper(env, body_names=[f'moveable_box{i}' for i in range(n_boxes)], agent_idx_allowed_to_lock=np.arange(n_agents), lock_type=lock_type, radius_multiplier=lock_radius_multiplier, obj_in_game_metadata_keys=["curr_n_boxes"], agent_allowed_to_lock_keys=None if lock_out_of_vision else ["mask_ab_obs"]) if n_ramps > 0: env = AgentGeomObsMask2D(env, pos_obs_key='ramp_pos', mask_obs_key='mask_ar_obs', geom_idxs_obs_key='ramp_geom_idxs') env = LockObjWrapper(env, body_names=[f"ramp{i}:ramp" for i in range(n_ramps)], agent_idx_allowed_to_lock=np.arange(n_agents), lock_type=lock_type, ac_obs_prefix='ramp_', radius_multiplier=lock_radius_multiplier, agent_allowed_to_lock_keys=None if lock_out_of_vision else ["mask_ar_obs"]) keys_external += ['ramp_obs'] keys_mask_external += ['mask_ar_obs'] keys_copy += ['ramp_you_lock', 'ramp_team_lock'] if grab_box and np.max(n_boxes) > 0: body_names = ([f'moveable_box{i}' for i in range(n_boxes)] + [f"ramp{i}:ramp" for i in range(n_ramps)]) obj_in_game_meta_keys = ['curr_n_boxes'] + (['curr_n_ramps'] if n_ramps > 0 else []) env = GrabObjWrapper(env, body_names=body_names, radius_multiplier=grab_radius_multiplier, grab_exclusive=grab_exclusive, obj_in_game_metadata_keys=obj_in_game_meta_keys) if n_lidar_per_agent > 0: env = Lidar(env, n_lidar_per_agent=n_lidar_per_agent, visualize_lidar=visualize_lidar, compress_lidar_scale=compress_lidar_scale) keys_copy += ['lidar'] keys_external += ['lidar'] env = AddConstantObservationsWrapper(env, new_obs=additional_obs) keys_external += list(additional_obs) keys_mask_external += [ob for ob in additional_obs if 'mask' in ob] ############################################# # lock Box Task Reward ### env = LockObjectsTask(env, n_objs=n_boxes, task=task_type, fixed_order=True, obj_lock_obs_key='obj_lock', obj_pos_obs_key='box_pos', act_lock_key='action_glue', agent_pos_key='agent_pos', lock_reward=lock_reward, unlock_penalty=unlock_penalty, shaped_reward_scale=shaped_reward_scale, return_threshold=return_threshold) ### ############################################# env = SplitObservations(env, keys_self + keys_mask_self, keys_copy=keys_copy) env = SpoofEntityWrapper(env, n_boxes, ['box_obs', 'you_lock', 'team_lock', 'obj_lock'], ['mask_ab_obs']) keys_mask_external += ['mask_ab_obs_spoof'] if n_agents < 2: env = SpoofEntityWrapper(env, 1, ['agent_qpos_qvel', 'hider', 'prep_obs'], ['mask_aa_obs']) env = LockAllWrapper(env, remove_object_specific_lock=True) if not grab_out_of_vision and grab_box: # Can only pull if in vision mask_keys = ['mask_ab_obs'] + (['mask_ar_obs'] if n_ramps > 0 else []) env = MaskActionWrapper(env, action_key='action_pull', mask_keys=mask_keys) if not grab_selective and grab_box: env = GrabClosestWrapper(env) env = DiscardMujocoExceptionEpisodes(env) env = ConcatenateObsWrapper(env, {'agent_qpos_qvel': ['agent_qpos_qvel', 'hider', 'prep_obs'], 'box_obs': ['box_obs', 'you_lock', 'team_lock', 'obj_lock'], 'ramp_obs': ['ramp_obs', 'ramp_you_lock', 'ramp_team_lock', 'ramp_obj_lock']}) env = SelectKeysWrapper(env, keys_self=keys_self, keys_other=keys_external + keys_mask_self + keys_mask_external) return env