gym_hil/envs/panda_pick_gym_env.py (135 lines of code) (raw):
#!/usr/bin/env python
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Any, Dict, Literal, Tuple
import mujoco
import numpy as np
from gymnasium import spaces
from gym_hil.mujoco_gym_env import FrankaGymEnv, GymRenderingSpec
_PANDA_HOME = np.asarray((0, 0.195, 0, -2.43, 0, 2.62, 0.785))
_CARTESIAN_BOUNDS = np.asarray([[0.2, -0.3, 0], [0.6, 0.3, 0.5]])
_SAMPLING_BOUNDS = np.asarray([[0.3, -0.15], [0.5, 0.15]])
class PandaPickCubeGymEnv(FrankaGymEnv):
"""Environment for a Panda robot picking up a cube."""
def __init__(
self,
seed: int = 0,
control_dt: float = 0.1,
physics_dt: float = 0.002,
render_spec: GymRenderingSpec = GymRenderingSpec(), # noqa: B008
render_mode: Literal["rgb_array", "human"] = "rgb_array",
image_obs: bool = False,
reward_type: str = "sparse",
random_block_position: bool = False,
):
self.reward_type = reward_type
super().__init__(
seed=seed,
control_dt=control_dt,
physics_dt=physics_dt,
render_spec=render_spec,
render_mode=render_mode,
image_obs=image_obs,
home_position=_PANDA_HOME,
cartesian_bounds=_CARTESIAN_BOUNDS,
)
# Task-specific setup
self._block_z = self._model.geom("block").size[2]
self._random_block_position = random_block_position
# Setup observation space properly to match what _compute_observation returns
# Observation space design:
# - "state": agent (robot) configuration as a single Box
# - "environment_state": block position in the world as a single Box
# - "pixels": (optional) dict of camera views if image observations are enabled
agent_dim = self.get_robot_state().shape[0]
agent_box = spaces.Box(-np.inf, np.inf, (agent_dim,), dtype=np.float32)
env_box = spaces.Box(-np.inf, np.inf, (3,), dtype=np.float32)
if self.image_obs:
self.observation_space = spaces.Dict(
{
"pixels": spaces.Dict(
{
"front": spaces.Box(
0,
255,
(self._render_specs.height, self._render_specs.width, 3),
dtype=np.uint8,
),
"wrist": spaces.Box(
0,
255,
(self._render_specs.height, self._render_specs.width, 3),
dtype=np.uint8,
),
}
),
"agent_pos": agent_box,
}
)
else:
self.observation_space = spaces.Dict(
{
"agent_pos": agent_box,
"environment_state": env_box,
}
)
def reset(self, seed=None, **kwargs) -> Tuple[Dict[str, np.ndarray], Dict[str, Any]]:
"""Reset the environment."""
# Ensure gymnasium internal RNG is initialized when a seed is provided
super().reset(seed=seed)
mujoco.mj_resetData(self._model, self._data)
# Reset the robot to home position
self.reset_robot()
# Sample a new block position
if self._random_block_position:
block_xy = np.random.uniform(*_SAMPLING_BOUNDS)
self._data.jnt("block").qpos[:3] = (*block_xy, self._block_z)
else:
block_xy = np.asarray([0.5, 0.0])
self._data.jnt("block").qpos[:3] = (*block_xy, self._block_z)
mujoco.mj_forward(self._model, self._data)
# Cache the initial block height
self._z_init = self._data.sensor("block_pos").data[2]
self._z_success = self._z_init + 0.1
obs = self._compute_observation()
return obs, {}
def step(self, action: np.ndarray) -> Tuple[Dict[str, np.ndarray], float, bool, bool, Dict[str, Any]]:
"""Take a step in the environment."""
# Apply the action to the robot
self.apply_action(action)
# Compute observation, reward and termination
obs = self._compute_observation()
rew = self._compute_reward()
success = self._is_success()
if self.reward_type == "sparse":
success = rew == 1.0
# Check if block is outside bounds
block_pos = self._data.sensor("block_pos").data
exceeded_bounds = np.any(block_pos[:2] < (_SAMPLING_BOUNDS[0] - 0.05)) or np.any(
block_pos[:2] > (_SAMPLING_BOUNDS[1] + 0.05)
)
terminated = bool(success or exceeded_bounds)
return obs, rew, terminated, False, {"succeed": success}
def _compute_observation(self) -> dict:
"""Compute the current observation."""
# Create the dictionary structure that matches our observation space
observation = {}
# Get robot state
robot_state = self.get_robot_state().astype(np.float32)
# Assemble observation respecting the newly defined observation_space
block_pos = self._data.sensor("block_pos").data.astype(np.float32)
if self.image_obs:
# Image observations
front_view, wrist_view = self.render()
observation = {
"pixels": {"front": front_view, "wrist": wrist_view},
"agent_pos": robot_state,
}
else:
# State-only observations
observation = {
"agent_pos": robot_state,
"environment_state": block_pos,
}
return observation
def _compute_reward(self) -> float:
"""Compute reward based on current state."""
block_pos = self._data.sensor("block_pos").data
if self.reward_type == "dense":
tcp_pos = self._data.sensor("2f85/pinch_pos").data
dist = np.linalg.norm(block_pos - tcp_pos)
r_close = np.exp(-20 * dist)
r_lift = (block_pos[2] - self._z_init) / (self._z_success - self._z_init)
r_lift = np.clip(r_lift, 0.0, 1.0)
return 0.3 * r_close + 0.7 * r_lift
else:
lift = block_pos[2] - self._z_init
return float(lift > 0.1)
def _is_success(self) -> bool:
"""Check if the task is successfully completed."""
block_pos = self._data.sensor("block_pos").data
tcp_pos = self._data.sensor("2f85/pinch_pos").data
dist = np.linalg.norm(block_pos - tcp_pos)
lift = block_pos[2] - self._z_init
return dist < 0.05 and lift > 0.1
if __name__ == "__main__":
from gym_hil import PassiveViewerWrapper
env = PandaPickCubeGymEnv(render_mode="human")
env = PassiveViewerWrapper(env)
env.reset()
for _ in range(100):
env.step(np.random.uniform(-1, 1, 7))
env.close()