mae_envs/modules/walls.py (421 lines of code) (raw):

import numpy as np from mujoco_worldgen.util.types import store_args from mujoco_worldgen import Geom from mujoco_worldgen.transforms import set_geom_attr_transform from mae_envs.modules import EnvModule class Wall: ''' Defines a wall object which is essentially a pair of points on a grid with some useful helper functions for creating randomized rooms. Args: pt1, pt2 (float tuple): points defining the wall height (float): wall height rgba (float tuple): wall rgba ''' def __init__(self, pt1, pt2, height=0.5, rgba=(0, 1, 0, 1)): assert pt1[0] == pt2[0] or pt1[1] == pt2[1], ( "Currently only horizontal and vertical walls are supported") self.is_vertical = pt1[0] == pt2[0] # Make sure pt2 is top right of pt1 if np.any(np.array(pt2) - np.array(pt1) < 0): self.pt1 = np.array(pt2) self.pt2 = np.array(pt1) else: self.pt1 = np.array(pt1) self.pt2 = np.array(pt2) self.length = int(np.linalg.norm(np.array(pt1) - np.array(pt2))) self.height = height self.rgba = rgba # Variables defining where other walls split from this wall on the left and right. # For horizontal walls, left means below, right means above self.left_edges = [self.pt1, self.pt2] self.right_edges = [self.pt1, self.pt2] def is_touching(self, pt): ''' Is pt (tuple) touching this wall ''' if self.is_vertical: return pt[0] == self.pt1[0] and pt[1] >= self.pt1[1] and pt[1] <= self.pt2[1] else: return pt[1] == self.pt1[1] and pt[0] >= self.pt1[0] and pt[0] <= self.pt2[0] def maybe_add_edge(self, wall): ''' Check if wall is originating from this wall. If so add it to the list of edges. ''' if self.is_vertical == wall.is_vertical: return if self.is_touching(wall.pt1): self.right_edges.append(wall.pt1) elif self.is_touching(wall.pt2): self.left_edges.append(wall.pt2) def intersects(self, wall): ''' Check if intersects with wall. ''' if self.is_vertical == wall.is_vertical: return False return np.all(np.logical_and(self.pt1 < wall.pt2, wall.pt1 < self.pt2)) def split_for_doors(self, num_doors=1, door_size=1, all_connect=False, random_state=np.random.RandomState()): ''' Split this wall into many walls with 'doors' in between. Args: num_doors (int): upper bound of number of doors to create door_size (int): door size in grid cells all_connect (bool): create a door in every wall segment between pairs of points where other walls connect with this wall random_state (np.random.RandomState): random state to use for sampling ''' edges = np.unique(self.left_edges + self.right_edges, axis=0) edges = np.array(sorted(edges, key=lambda x: x[1] if self.is_vertical else x[0])) rel_axis = edges[:, 1] if self.is_vertical else edges[:, 0] diffs = np.diff(rel_axis) possible_doors = diffs >= door_size + 1 # Door regions are stretches on the wall where we could create a door. door_regions = np.arange(len(edges) - 1) door_regions = door_regions[possible_doors] # The number of doors on this wall we want to/can create num_doors = len(edges) - 1 if all_connect else num_doors num_doors = min(num_doors, len(door_regions)) if num_doors == 0 or door_size == 0: return [self], [] # Sample num_doors regions to which we will add doors. door_regions = np.sort(random_state.choice(door_regions, num_doors, replace=False)) new_walls = [] doors = [] new_wall_start = edges[0] for door in door_regions: # door_start and door_end are the first and last point on the wall bounding the door # (inclusive boundary) door_start = random_state.randint(1, diffs[door] - door_size + 1) door_end = door_start + door_size - 1 # Because door boundaries are inclusive, we add 1 to the door_end to get next wall # start cell and subtract one from the door_start to get the current wall end cell. if self.is_vertical: new_wall_end = [edges[door][0], edges[door][1] + door_start - 1] next_new_wall_start = [new_wall_start[0], edges[door][1] + door_end + 1] door_start_cell = [edges[door][0], edges[door][1] + door_start] door_end_cell = [new_wall_start[0], edges[door][1] + door_end] else: new_wall_end = [edges[door][0] + door_start - 1, edges[door][1]] next_new_wall_start = [edges[door][0] + door_end + 1, edges[door][1]] door_start_cell = [edges[door][0] + door_start, edges[door][1]] door_end_cell = [new_wall_start[0] + door_end, edges[door][1]] # Store doors as inclusive boundaries. doors.append([door_start_cell, door_end_cell]) # Check that the new wall isn't size 0 if np.linalg.norm(np.array(new_wall_start) - np.array(new_wall_end)) > 0: new_walls.append(Wall(new_wall_start, new_wall_end)) new_wall_start = next_new_wall_start if np.linalg.norm(np.array(new_wall_start) - np.array(edges[-1])) > 0: new_walls.append(Wall(new_wall_start, edges[-1])) return new_walls, doors def connect_walls(wall1, wall2, min_dist_between, random_state=np.random.RandomState()): ''' Draw a random new wall connecting wall1 and wall2. Return None if the drawn wall was closer than min_dist_between to another wall or the wall wasn't valid. NOTE: This DOES NOT check if the created wall overlaps with any existing walls, that should be done outside of this function Args: wall1, wall2 (Wall): walls to draw a new wall between min_dist_between (int): closest another parallel wall can be to the new wall in grid cells. random_state (np.random.RandomState): random state to use for sampling ''' if wall1.is_vertical != wall2.is_vertical: return None length = random_state.randint(1, wall1.length) if wall1.is_vertical: pt1 = [wall1.pt1[0], wall1.pt1[1] + length] pt2 = [wall2.pt1[0], wall1.pt1[1] + length] else: pt1 = [wall1.pt1[0] + length, wall1.pt1[1]] pt2 = [wall1.pt1[0] + length, wall2.pt1[1]] # Make sure that the new wall actually touches both walls # and there is no wall close to this new wall wall1_right_of_wall2 = np.any(np.array(pt2) - np.array(pt1) < 0) if wall1_right_of_wall2: dists = np.array(pt1)[None, :] - np.array(wall1.left_edges) else: dists = np.array(pt1)[None, :] - np.array(wall1.right_edges) min_dist = np.linalg.norm(dists, axis=1).min() if wall2.is_touching(pt2) and min_dist > min_dist_between: return Wall(pt1, pt2) return None def choose_new_split(walls, min_dist_between, num_tries=10, random_state=np.random.RandomState()): ''' Given a list of walls, choose a random wall and draw a new wall perpendicular to it. NOTE: Right now this O(n_walls^2). We could probably get this to linear if we did something smarter with the occupancy grid. Until n_walls gets way bigger this should be fine though. Args: walls (Wall list): walls to possibly draw a new wall from min_dist_between (int): closest another parallel wall can be to the new wall in grid cells. num_tries (int): number of times before we can fail in placing a wall before giving up random_state (np.random.RandomState): random state to use for sampling ''' for i in range(num_tries): wall1 = random_state.choice(walls) proposed_walls = [connect_walls(wall1, wall2, min_dist_between, random_state=random_state) for wall2 in walls if wall2 != wall1] proposed_walls = [wall for wall in proposed_walls if wall is not None and not np.any([wall.intersects(_wall) for _wall in walls])] if len(proposed_walls): new_wall = random_state.choice(proposed_walls) for wall in walls: wall.maybe_add_edge(new_wall) new_wall.maybe_add_edge(wall) return new_wall return None def split_walls(walls, door_size, random_state=np.random.RandomState()): ''' Add a door to each wall in walls. Return the new walls and doors. Args: walls (Wall list): walls door_size (int): door size in grid cells random_state (np.random.RandomState): random state to use for sampling ''' split_walls = [] doors = [] for wall in walls: new_walls, new_doors = wall.split_for_doors(door_size=door_size, random_state=random_state) split_walls += new_walls doors += new_doors return split_walls, doors def construct_door_obs(doors, floor_size, grid_size): ''' Construct door observations in mujoco frame from door positions in grid frame. Args: doors ((n_doors, 2, 2) array): list of pairs of points of door edges. floor_size (float): size of floor grid_size (int): size of placement grid ''' _doors = doors + 0.5 scaling = floor_size / grid_size _door_sizes = np.array([np.linalg.norm(door[1] - door[0]) * scaling for door in _doors]) _doors = np.array([(door[0] + (door[1] - door[0]) / 2) * scaling for door in _doors]) return np.concatenate([_doors, _door_sizes[:, None]], -1) def add_walls_to_grid(grid, walls): ''' Draw walls onto a grid. Args: grid (np.ndarray): 2D occupancy grid walls (Wall list): walls ''' for wall in walls: if wall.is_vertical: grid[wall.pt1[0], wall.pt1[1]:wall.pt2[1] + 1] = 1 else: grid[wall.pt1[0]:wall.pt2[0] + 1, wall.pt1[1]] = 1 def walls_to_mujoco(floor, floor_size, grid_size, walls, friction=None): ''' Take a list of walls in grid frame and add them to the floor in the worldgen frame. Args: floor (worldgen.Floor): floor floor_size (float): size of floor grid_size (int): size of placement grid walls (Wall list): list of walls friction (float): wall friction ''' wall_width = floor_size / grid_size / 2 grid_cell_length = floor_size / grid_size for i, wall in enumerate(walls): if wall.is_vertical: wall_length_grid = (wall.pt2[1] - wall.pt1[1] + 1) offset = np.array([-1, 1]) else: wall_length_grid = (wall.pt2[0] - wall.pt1[0] + 1) offset = np.array([1, -1]) # Convert to mujoco frame wall_length = wall_length_grid * grid_cell_length # Subtract 1 grid_cell_length such that walls originate and end in the center of a grid cell # Subtract 1 wall_width such that perpendicular walls do not intersect at the center of a # grid cell wall_length -= grid_cell_length + wall_width if wall.is_vertical: size = (wall_width, wall_length, wall.height) else: size = (wall_length, wall_width, wall.height) # Position of object should be in the middle of a grid cell (add 0.5) shifted by # the wall width such that corners don't overlap pos = np.array([wall.pt1[0] + 0.5, wall.pt1[1] + 0.5]) / grid_size pos += offset * wall_width / floor_size / 2 # Convert from mujoco to worldgen scale scale_x = (floor_size - size[0]) / floor_size scale_y = (floor_size - size[1]) / floor_size pos = pos / np.array([scale_x, scale_y]) geom = Geom('box', size, name=f"wall{i}") geom.mark_static() geom.add_transform(set_geom_attr_transform('rgba', wall.rgba)) geom.add_transform(set_geom_attr_transform('group', 1)) if friction is not None: geom.add_transform(set_geom_attr_transform('friction', friction)) floor.append(geom, placement_xy=pos) def outside_walls(grid_size, rgba=(0, 1, 0, 0.1), use_low_wall_height=False): height = 0.5 if use_low_wall_height else 4.0 return [Wall([0, 0], [0, grid_size - 1], height=height, rgba=rgba), Wall([0, 0], [grid_size - 1, 0], height=height, rgba=rgba), Wall([grid_size - 1, 0], [grid_size - 1, grid_size - 1], height=height, rgba=rgba), Wall([0, grid_size - 1], [grid_size - 1, grid_size - 1], height=height, rgba=rgba)] class RandomWalls(EnvModule): ''' Add random walls to the environment. This must be the first module added to the environment Args: grid_size (int): grid size to place walls on num_rooms (int): number of rooms to create min_room_size (int): minimum size of a room in grid cells door_size (int): size of doors in grid cells friction (float): wall friction outside_walls (bool): If false, don't add outside walls to mujoco outside_wall_rgba (array): RGBA color of outside walls random_room_number (bool): If true, the actual number of rooms is sampled uniformly between 1 and num_rooms gen_door_obs (bool): If true, generate door observation (currently does not work with random room number) prob_outside_walls (float): probability that outside walls are used low_outside_walls (bool): If true, outside walls are the same height as inside walls. This is just used for pretty rendering ''' @store_args def __init__(self, grid_size, num_rooms, min_room_size, door_size, friction=None, num_tries=10, outside_wall_rgba=(0, 1, 0, 0.1), random_room_number=False, gen_door_obs=True, prob_outside_walls=1.0, low_outside_walls=False): pass def build_world_step(self, env, floor, floor_size): # Create rooms walls = outside_walls(self.grid_size, rgba=self.outside_wall_rgba, use_low_wall_height=self.low_outside_walls) failures = 0 if self.random_room_number: self.num_actual_rooms = env._random_state.randint(self.num_rooms) + 1 else: self.num_actual_rooms = self.num_rooms while len(walls) < self.num_actual_rooms + 3: new_wall = choose_new_split(walls, self.min_room_size, random_state=env._random_state) if new_wall is None: walls = outside_walls(self.grid_size, rgba=self.outside_wall_rgba, use_low_wall_height=self.low_outside_walls) failures += 1 else: walls.append(new_wall) if failures == self.num_tries: return False # Add doors new_walls, doors = split_walls(walls[4:], self.door_size, random_state=env._random_state) if env._random_state.uniform() < self.prob_outside_walls: walls = walls[:4] + new_walls else: walls = new_walls # Convert doors into mujoco frame if self.gen_door_obs: self.door_obs = construct_door_obs(np.array(doors), floor_size, self.grid_size) walls_to_mujoco(floor, floor_size, self.grid_size, walls, friction=self.friction) add_walls_to_grid(env.placement_grid, walls) return True def observation_step(self, env, sim): if self.gen_door_obs: obs = {'door_obs': self.door_obs} else: obs = {} return obs class WallScenarios(EnvModule): ''' Add a wall scenario to the environment. This must be the first module added to the environment. Args: grid_size (int): grid size to place walls on door_size (int): size of doors in grid cells scenario (string): Options: 'empty': no walls 'half': one wall in the middle with a random door 'quadrant': one quadrant is walled off with random door(s) 'var_quadrant': same as 'quadrant' but the room size is also randomized 'var_tri': three rooms, one taking about half of the area and the other two taking about a quarter of the area. Random doors friction (float): wall friction p_door_dropout (float): probability we don't place one of the doors either quadrant scenario low_outside_walls (bool): If true, outside walls are the same height as inside walls. This is just used for pretty rendering ''' @store_args def __init__(self, grid_size, door_size, scenario, friction=None, p_door_dropout=0.0, low_outside_walls=False): assert scenario in ['var_quadrant', 'quadrant', 'half', 'var_tri', 'empty'] def build_world_step(self, env, floor, floor_size): # Outside walls walls = outside_walls(self.grid_size, use_low_wall_height=self.low_outside_walls) if self.scenario in ['quadrant', 'var_quadrant']: q_size = env._random_state.uniform(0.3, 0.6) if self.scenario == 'var_quadrant' else 0.5 q_size = int(q_size * self.grid_size) env.metadata['quadrant_size'] = q_size new_walls = [ Wall([self.grid_size - q_size, 0], [self.grid_size - q_size, q_size]), Wall([self.grid_size - q_size, q_size], [self.grid_size - 1, q_size])] if env._random_state.uniform(0, 1) < self.p_door_dropout: wall_to_split = env._random_state.randint(0, 2) walls += [new_walls[(1 - wall_to_split)]] walls_to_split = [new_walls[wall_to_split]] else: walls_to_split = new_walls elif self.scenario == 'half': walls_to_split += [Wall([self.grid_size - 1, self.grid_size // 2], [0, self.grid_size // 2])] elif self.scenario == 'var_tri': wall1_splitoff_point, wall2_splitoff_point = [ int(self.grid_size * env._random_state.uniform(0.4, 0.6)) for _ in range(2) ] wall1_orientation = 'vertical' if env._random_state.uniform() < 0.5 else 'horizontal' # if first wall is horizontal, 'left' means below and 'right' means above wall2_orientation = 'left' if env._random_state.uniform() < 0.5 else 'right' env.metadata['tri_wall_splitoff_points'] = [wall1_splitoff_point, wall2_splitoff_point] env.metadata['tri_wall_orientations'] = [wall1_orientation, wall2_orientation] if wall1_orientation == 'horizontal': walls_to_split = [Wall([self.grid_size - 1, wall1_splitoff_point], [0, wall1_splitoff_point])] if wall2_orientation == 'left': walls_to_split += [Wall([wall2_splitoff_point, wall1_splitoff_point], [wall2_splitoff_point, 0])] rooms = [[(1, self.grid_size - 1), (wall1_splitoff_point + 1, self.grid_size - 1)], [(1, wall2_splitoff_point - 1), (1, wall1_splitoff_point - 1)], [(wall2_splitoff_point + 1, self.grid_size - 1), (1, wall1_splitoff_point - 1)]] elif wall2_orientation == 'right': walls_to_split += [Wall([wall2_splitoff_point, self.grid_size - 1], [wall2_splitoff_point, wall1_splitoff_point])] rooms = [[(1, self.grid_size - 1), (0, wall1_splitoff_point - 1)], [(1, wall2_splitoff_point - 1), (wall1_splitoff_point + 1, self.grid_size - 1)], [(wall2_splitoff_point + 1, self.grid_size - 1), (wall1_splitoff_point + 1, self.grid_size - 1)]] elif wall1_orientation == 'vertical': walls_to_split = [Wall([wall1_splitoff_point, self.grid_size - 1], [wall1_splitoff_point, 0])] if wall2_orientation == 'left': walls_to_split += [Wall([wall1_splitoff_point, wall2_splitoff_point], [0, wall2_splitoff_point])] rooms = [[(wall1_splitoff_point + 1, self.grid_size - 1), (1, self.grid_size - 1)], [(1, wall1_splitoff_point - 1), (1, wall2_splitoff_point - 1)], [(1, wall1_splitoff_point - 1), (wall2_splitoff_point + 1, self.grid_size - 1)]] elif wall2_orientation == 'right': walls_to_split += [Wall([self.grid_size - 1, wall2_splitoff_point], [wall1_splitoff_point, wall2_splitoff_point])] rooms = [[(0, wall1_splitoff_point - 1), (1, self.grid_size - 1)], [(wall1_splitoff_point + 1, self.grid_size - 1), (1, wall2_splitoff_point - 1)], [(wall1_splitoff_point + 1, self.grid_size - 1), (wall2_splitoff_point + 1, self.grid_size - 1)]] env.metadata['tri_room_grid_cell_range'] = rooms # this is used when we want to consecutively place objects in every room # e.g. if we want object i to go in room (i % 3) env.metadata['tri_placement_rotation'] = [] elif self.scenario == 'empty': walls_to_split = [] # Add doors new_walls, doors = split_walls(walls_to_split, self.door_size, random_state=env._random_state) walls += new_walls env.metadata['doors'] = np.array(doors) # Convert doors into mujoco frame if len(doors) > 0: self.door_obs = construct_door_obs(np.array(doors), floor_size, self.grid_size) else: self.door_obs = None walls_to_mujoco(floor, floor_size, self.grid_size, walls, friction=self.friction) add_walls_to_grid(env.placement_grid, walls) return True def observation_step(self, env, sim): if self.door_obs is not None: obs = {'door_obs': self.door_obs} else: obs = {} return obs