gym_pusht/envs/pusht.py (348 lines of code) (raw):

import collections import os import warnings import cv2 import gymnasium as gym import numpy as np with warnings.catch_warnings(): # Filter out DeprecationWarnings raised from pkg_resources warnings.filterwarnings("ignore", "pkg_resources is deprecated as an API", category=DeprecationWarning) import pygame import pymunk import pymunk.pygame_util import shapely.geometry as sg from gymnasium import spaces from pymunk.vec2d import Vec2d from .pymunk_override import DrawOptions RENDER_MODES = ["rgb_array"] if os.environ.get("MUJOCO_GL") != "egl": RENDER_MODES.append("human") def pymunk_to_shapely(body, shapes): geoms = [] for shape in shapes: if isinstance(shape, pymunk.shapes.Poly): verts = [body.local_to_world(v) for v in shape.get_vertices()] verts += [verts[0]] geoms.append(sg.Polygon(verts)) else: raise RuntimeError(f"Unsupported shape type {type(shape)}") geom = sg.MultiPolygon(geoms) return geom class PushTEnv(gym.Env): """ ## Description PushT environment. The goal of the agent is to push the block to the goal zone. The agent is a circle and the block is a tee shape. ## Action Space The action space is continuous and consists of two values: [x, y]. The values are in the range [0, 512] and represent the target position of the agent. ## Observation Space If `obs_type` is set to `state`, the observation space is a 5-dimensional vector representing the state of the environment: [agent_x, agent_y, block_x, block_y, block_angle]. The values are in the range [0, 512] for the agent and block positions and [0, 2*pi] for the block angle. If `obs_type` is set to `environment_state_agent_pos` the observation space is a dictionary with: - `environment_state`: 16-dimensional vector representing the keypoint locations of the T (in [x0, y0, x1, y1, ...] format). The values are in the range [0, 512]. See `get_keypoints` for a diagram showing the location of the keypoint indices. - `agent_pos`: A 2-dimensional vector representing the position of the robot end-effector. If `obs_type` is set to `pixels`, the observation space is a 96x96 RGB image of the environment. ## Rewards The reward is the coverage of the block in the goal zone. The reward is 1.0 if the block is fully in the goal zone. ## Success Criteria The environment is considered solved if the block is at least 95% in the goal zone. ## Starting State The agent starts at a random position and the block starts at a random position and angle. ## Episode Termination The episode terminates when the block is at least 95% in the goal zone. ## Arguments ```python >>> import gymnasium as gym >>> import gym_pusht >>> env = gym.make("gym_pusht/PushT-v0", obs_type="state", render_mode="rgb_array") >>> env <TimeLimit<OrderEnforcing<PassiveEnvChecker<PushTEnv<gym_pusht/PushT-v0>>>>> ``` * `obs_type`: (str) The observation type. Can be either `state`, `keypoints`, `pixels` or `pixels_agent_pos`. Default is `state`. * `block_cog`: (tuple) The center of gravity of the block if different from the center of mass. Default is `None`. * `damping`: (float) The damping factor of the environment if different from 0. Default is `None`. * `observation_width`: (int) The width of the observed image. Default is `96`. * `observation_height`: (int) The height of the observed image. Default is `96`. * `visualization_width`: (int) The width of the visualized image. Default is `680`. * `visualization_height`: (int) The height of the visualized image. Default is `680`. ## Reset Arguments Passing the option `options["reset_to_state"]` will reset the environment to a specific state. > [!WARNING] > For legacy compatibility, the inner fonctionning has been preserved, and the state set is not the same as the > the one passed in the argument. ```python >>> import gymnasium as gym >>> import gym_pusht >>> env = gym.make("gym_pusht/PushT-v0") >>> state, _ = env.reset(options={"reset_to_state": [0.0, 10.0, 20.0, 30.0, 1.0]}) >>> state array([ 0. , 10. , 57.866196, 50.686398, 1. ], dtype=float32) ``` ## Version History * v0: Original version ## References * TODO: """ metadata = {"render_modes": RENDER_MODES, "render_fps": 10} def __init__( self, obs_type="state", render_mode="rgb_array", block_cog=None, damping=None, observation_width=96, observation_height=96, visualization_width=680, visualization_height=680, ): super().__init__() # Observations self.obs_type = obs_type # Rendering self.render_mode = render_mode self.observation_width = observation_width self.observation_height = observation_height self.visualization_width = visualization_width self.visualization_height = visualization_height # Initialize spaces self._initialize_observation_space() self.action_space = spaces.Box(low=0, high=512, shape=(2,), dtype=np.float32) # Physics self.k_p, self.k_v = 100, 20 # PD control.z self.control_hz = self.metadata["render_fps"] self.dt = 0.01 self.block_cog = block_cog self.damping = damping # If human-rendering is used, `self.window` will be a reference # to the window that we draw to. `self.clock` will be a clock that is used # to ensure that the environment is rendered at the correct framerate in # human-mode. They will remain `None` until human-mode is used for the # first time. self.window = None self.clock = None self.teleop = None self._last_action = None self.success_threshold = 0.95 # 95% coverage def _initialize_observation_space(self): if self.obs_type == "state": # [agent_x, agent_y, block_x, block_y, block_angle] self.observation_space = spaces.Box( low=np.array([0, 0, 0, 0, 0]), high=np.array([512, 512, 512, 512, 2 * np.pi]), dtype=np.float64, ) elif self.obs_type == "environment_state_agent_pos": self.observation_space = spaces.Dict( { "environment_state": spaces.Box( low=np.zeros(16), high=np.full((16,), 512), dtype=np.float64, ), "agent_pos": spaces.Box( low=np.array([0, 0]), high=np.array([512, 512]), dtype=np.float64, ), }, ) elif self.obs_type == "pixels": self.observation_space = spaces.Box( low=0, high=255, shape=(self.observation_height, self.observation_width, 3), dtype=np.uint8 ) elif self.obs_type == "pixels_agent_pos": self.observation_space = spaces.Dict( { "pixels": spaces.Box( low=0, high=255, shape=(self.observation_height, self.observation_width, 3), dtype=np.uint8, ), "agent_pos": spaces.Box( low=np.array([0, 0]), high=np.array([512, 512]), dtype=np.float64, ), } ) else: raise ValueError( f"Unknown obs_type {self.obs_type}. Must be one of [pixels, state, environment_state_agent_pos, " "pixels_agent_pos]" ) def _get_coverage(self): goal_body = self.get_goal_pose_body(self.goal_pose) goal_geom = pymunk_to_shapely(goal_body, self.block.shapes) block_geom = pymunk_to_shapely(self.block, self.block.shapes) intersection_area = goal_geom.intersection(block_geom).area goal_area = goal_geom.area return intersection_area / goal_area def step(self, action): self.n_contact_points = 0 n_steps = int(1 / (self.dt * self.control_hz)) self._last_action = action for _ in range(n_steps): # Step PD control # self.agent.velocity = self.k_p * (act - self.agent.position) # P control works too. acceleration = self.k_p * (action - self.agent.position) + self.k_v * ( Vec2d(0, 0) - self.agent.velocity ) self.agent.velocity += acceleration * self.dt # Step physics self.space.step(self.dt) # Compute reward coverage = self._get_coverage() reward = np.clip(coverage / self.success_threshold, 0.0, 1.0) terminated = is_success = coverage > self.success_threshold observation = self.get_obs() info = self._get_info() info["is_success"] = is_success info["coverage"] = coverage truncated = False return observation, reward, terminated, truncated, info def reset(self, seed=None, options=None): super().reset(seed=seed) self._setup() if options is not None and options.get("reset_to_state") is not None: state = np.array(options.get("reset_to_state")) else: # state = self.np_random.uniform(low=[50, 50, 100, 100, -np.pi], high=[450, 450, 400, 400, np.pi]) rs = np.random.RandomState(seed=seed) state = np.array( [ rs.randint(50, 450), rs.randint(50, 450), rs.randint(100, 400), rs.randint(100, 400), rs.randn() * 2 * np.pi - np.pi, ], # dtype=np.float64 ) self._set_state(state) observation = self.get_obs() info = self._get_info() info["is_success"] = False if self.render_mode == "human": self.render() return observation, info def _draw(self): # Create a screen screen = pygame.Surface((512, 512)) screen.fill((255, 255, 255)) draw_options = DrawOptions(screen) # Draw goal pose goal_body = self.get_goal_pose_body(self.goal_pose) for shape in self.block.shapes: goal_points = [goal_body.local_to_world(v) for v in shape.get_vertices()] goal_points = [pymunk.pygame_util.to_pygame(point, draw_options.surface) for point in goal_points] goal_points += [goal_points[0]] pygame.draw.polygon(screen, pygame.Color("LightGreen"), goal_points) # Draw agent and block self.space.debug_draw(draw_options) return screen def _get_img(self, screen, width, height, render_action=False): img = np.transpose(np.array(pygame.surfarray.pixels3d(screen)), axes=(1, 0, 2)) img = cv2.resize(img, (width, height)) render_size = min(width, height) if render_action and self._last_action is not None: action = np.array(self._last_action) coord = (action / 512 * [height, width]).astype(np.int32) marker_size = int(8 / 96 * render_size) thickness = int(1 / 96 * render_size) cv2.drawMarker( img, coord, color=(255, 0, 0), markerType=cv2.MARKER_CROSS, markerSize=marker_size, thickness=thickness, ) return img def render(self): return self._render(visualize=True) def _render(self, visualize=False): width, height = ( (self.visualization_width, self.visualization_height) if visualize else (self.observation_width, self.observation_height) ) screen = self._draw() # draw the environment on a screen if self.render_mode == "rgb_array": return self._get_img(screen, width=width, height=height, render_action=visualize) elif self.render_mode == "human": if self.window is None: pygame.init() pygame.display.init() self.window = pygame.display.set_mode((512, 512)) if self.clock is None: self.clock = pygame.time.Clock() self.window.blit( screen, screen.get_rect() ) # copy our drawings from `screen` to the visible window pygame.event.pump() self.clock.tick(self.metadata["render_fps"] * int(1 / (self.dt * self.control_hz))) pygame.display.update() else: raise ValueError(self.render_mode) def close(self): if self.window is not None: pygame.display.quit() pygame.quit() def teleop_agent(self): teleop_agent = collections.namedtuple("TeleopAgent", ["act"]) def act(obs): act = None mouse_position = pymunk.pygame_util.from_pygame(Vec2d(*pygame.mouse.get_pos()), self.screen) if self.teleop or (mouse_position - self.agent.position).length < 30: self.teleop = True act = mouse_position return act return teleop_agent(act) def get_obs(self): if self.obs_type == "state": agent_position = np.array(self.agent.position) block_position = np.array(self.block.position) block_angle = self.block.angle % (2 * np.pi) return np.concatenate([agent_position, block_position, [block_angle]], dtype=np.float64) if self.obs_type == "environment_state_agent_pos": return { "environment_state": self.get_keypoints(self._block_shapes).flatten(), "agent_pos": np.array(self.agent.position), } pixels = self._render() if self.obs_type == "pixels": return pixels elif self.obs_type == "pixels_agent_pos": return { "pixels": pixels, "agent_pos": np.array(self.agent.position), } @staticmethod def get_goal_pose_body(pose): mass = 1 inertia = pymunk.moment_for_box(mass, (50, 100)) body = pymunk.Body(mass, inertia) # preserving the legacy assignment order for compatibility # the order here doesn't matter somehow, maybe because CoM is aligned with body origin body.position = pose[:2].tolist() body.angle = pose[2] return body def _get_info(self): n_steps = int(1 / self.dt * self.control_hz) n_contact_points_per_step = int(np.ceil(self.n_contact_points / n_steps)) info = { "pos_agent": np.array(self.agent.position), "vel_agent": np.array(self.agent.velocity), "block_pose": np.array(list(self.block.position) + [self.block.angle]), "goal_pose": self.goal_pose, "n_contacts": n_contact_points_per_step, } return info def _handle_collision(self, arbiter, space, data): self.n_contact_points += len(arbiter.contact_point_set.points) def _setup(self): self.space = pymunk.Space() self.space.gravity = 0, 0 self.space.damping = self.damping if self.damping is not None else 0.0 self.teleop = False # Add walls walls = [ self.add_segment(self.space, (5, 506), (5, 5), 2), self.add_segment(self.space, (5, 5), (506, 5), 2), self.add_segment(self.space, (506, 5), (506, 506), 2), self.add_segment(self.space, (5, 506), (506, 506), 2), ] self.space.add(*walls) # Add agent, block, and goal zone self.agent = self.add_circle(self.space, (256, 400), 15) self.block, self._block_shapes = self.add_tee(self.space, (256, 300), 0) self.goal_pose = np.array([256, 256, np.pi / 4]) # x, y, theta (in radians) if self.block_cog is not None: self.block.center_of_gravity = self.block_cog # Add collision handling self.collision_handeler = self.space.add_collision_handler(0, 0) self.collision_handeler.post_solve = self._handle_collision self.n_contact_points = 0 def _set_state(self, state): self.agent.position = list(state[:2]) # Setting angle rotates with respect to center of mass, therefore will modify the geometric position if not # the same as CoM. Therefore should theoretically set the angle first. But for compatibility with legacy data, # we do the opposite. self.block.position = list(state[2:4]) self.block.angle = state[4] # Run physics to take effect self.space.step(self.dt) @staticmethod def add_segment(space, a, b, radius): # TODO(rcadene): rename add_segment to make_segment, since it is not added to the space shape = pymunk.Segment(space.static_body, a, b, radius) shape.color = pygame.Color("LightGray") # https://htmlcolorcodes.com/color-names return shape @staticmethod def add_circle(space, position, radius): body = pymunk.Body(body_type=pymunk.Body.KINEMATIC) body.position = position body.friction = 1 shape = pymunk.Circle(body, radius) shape.color = pygame.Color("RoyalBlue") space.add(body, shape) return body @staticmethod def add_tee(space, position, angle, scale=30, color="LightSlateGray", mask=None): if mask is None: mask = pymunk.ShapeFilter.ALL_MASKS() mass = 1 length = 4 vertices1 = [ (-length * scale / 2, scale), (length * scale / 2, scale), (length * scale / 2, 0), (-length * scale / 2, 0), ] inertia1 = pymunk.moment_for_poly(mass, vertices=vertices1) vertices2 = [ (-scale / 2, scale), (-scale / 2, length * scale), (scale / 2, length * scale), (scale / 2, scale), ] inertia2 = pymunk.moment_for_poly(mass, vertices=vertices1) body = pymunk.Body(mass, inertia1 + inertia2) shape1 = pymunk.Poly(body, vertices1) shape2 = pymunk.Poly(body, vertices2) shape1.color = pygame.Color(color) shape2.color = pygame.Color(color) shape1.filter = pymunk.ShapeFilter(mask=mask) shape2.filter = pymunk.ShapeFilter(mask=mask) body.center_of_gravity = (shape1.center_of_gravity + shape2.center_of_gravity) / 2 body.angle = angle body.position = position body.friction = 1 space.add(body, shape1, shape2) return body, [shape1, shape2] @staticmethod def get_keypoints(block_shapes): """Get a (8, 2) numpy array with the T keypoints. The T is composed of two rectangles each with 4 keypoints. 0───────────1 │ │ 3───4───5───2 │ │ │ │ │ │ │ │ 7───6 """ keypoints = [] for shape in block_shapes: for v in shape.get_vertices(): v = v.rotated(shape.body.angle) v = v + shape.body.position keypoints.append(np.array(v)) return np.row_stack(keypoints)