gym_hil/envs/panda_arrange_boxes_gym_env.py (139 lines of code) (raw):
#!/usr/bin/env python
from pathlib import Path
from typing import Any, Dict, Literal, Tuple
import mujoco
import numpy as np
from gymnasium import spaces
from gym_hil.mujoco_gym_env import FrankaGymEnv, GymRenderingSpec
_PANDA_HOME = np.asarray((0, -0.785, 0, -2.35, 0, 1.57, np.pi / 4))
_CARTESIAN_BOUNDS = np.asarray([[0.2, -0.5, 0], [0.6, 0.5, 0.5]])
_SAMPLING_BOUNDS = np.asarray([[0.3, -0.15], [0.5, 0.15]])
class PandaArrangeBoxesGymEnv(FrankaGymEnv):
"""Environment for a Panda robot picking up a cube."""
def __init__(
self,
seed: int = 0,
control_dt: float = 0.1,
physics_dt: float = 0.002,
render_spec: GymRenderingSpec = GymRenderingSpec(), # noqa: B008
render_mode: Literal["rgb_array", "human"] = "rgb_array",
image_obs: bool = False,
reward_type: str = "sparse",
):
self.reward_type = reward_type
super().__init__(
seed=seed,
control_dt=control_dt,
physics_dt=physics_dt,
render_spec=render_spec,
render_mode=render_mode,
image_obs=image_obs,
home_position=_PANDA_HOME,
cartesian_bounds=_CARTESIAN_BOUNDS,
xml_path=Path(__file__).parent.parent / "assets" / "arrange_boxes_scene.xml",
)
# Task-specific setup
self._block_z = self._model.geom("block1").size[2]
# Setup observation space properly to match what _compute_observation returns
# Observation space design:
# - "state": agent (robot) configuration as a single Box
# - "environment_state": block position in the world as a single Box
# - "pixels": (optional) dict of camera views if image observations are enabled
agent_dim = self.get_robot_state().shape[0]
agent_box = spaces.Box(-np.inf, np.inf, (agent_dim,), dtype=np.float32)
env_box = spaces.Box(-np.inf, np.inf, (3,), dtype=np.float32)
self.no_blocks = self._get_no_boxes()
self.block_range = 0.3
if self.image_obs:
self.observation_space = spaces.Dict(
{
"pixels": spaces.Dict(
{
"front": spaces.Box(
0,
255,
(self._render_specs.height, self._render_specs.width, 3),
dtype=np.uint8,
),
"wrist": spaces.Box(
0,
255,
(self._render_specs.height, self._render_specs.width, 3),
dtype=np.uint8,
),
}
),
"agent_pos": agent_box,
}
)
else:
self.observation_space = spaces.Dict(
{
"agent_pos": agent_box,
"environment_state": env_box,
}
)
def _get_no_boxes(self):
joint_names = [
mujoco.mj_id2name(self.model, mujoco.mjtObj.mjOBJ_JOINT, i) for i in range(self.model.njnt)
]
block_names = list(filter(lambda joint: "block" in joint, joint_names))
return len(block_names)
def reset(self, seed=None, **kwargs) -> Tuple[Dict[str, np.ndarray], Dict[str, Any]]:
"""Reset the environment."""
# Ensure gymnasium internal RNG is initialized when a seed is provided
super().reset(seed=seed)
mujoco.mj_resetData(self._model, self._data)
# Reset the robot to home position
self.reset_robot()
positions_coords = np.linspace(-self.block_range, self.block_range, self.no_blocks)
np.random.shuffle(positions_coords)
# Sample a new block position
blocks = [f"block{i}" for i in range(1, self.no_blocks + 1)]
np.random.shuffle(blocks)
for block, pos in zip(blocks, positions_coords, strict=False):
block_x_coord = self._data.joint(block).qpos[0]
block_coords = np.array([block_x_coord, pos])
self._data.joint(block).qpos[:3] = (*block_coords, self._block_z)
mujoco.mj_forward(self._model, self._data)
obs = self._compute_observation()
return obs, {}
def step(self, action: np.ndarray) -> Tuple[Dict[str, np.ndarray], float, bool, bool, Dict[str, Any]]:
"""Take a step in the environment."""
# Apply the action to the robot
self.apply_action(action)
# Compute observation, reward and termination
obs = self._compute_observation()
rew = self._compute_reward()
success = self._is_success()
if self.reward_type == "sparse":
success = rew == 1.0
# Check if block is outside bounds
block_pos = self._data.sensor("block1_pos").data
exceeded_bounds = np.any(block_pos[:2] < (_SAMPLING_BOUNDS[0] - self.block_range - 0.05)) or np.any(
block_pos[:2] > (_SAMPLING_BOUNDS[1] + self.block_range + 0.05)
)
terminated = bool(success or exceeded_bounds)
return obs, rew, terminated, False, {"succeed": success}
def _compute_observation(self) -> dict:
"""Compute the current observation."""
# Create the dictionary structure that matches our observation space
observation = {}
# Get robot state
robot_state = self.get_robot_state().astype(np.float32)
# Assemble observation respecting the newly defined observation_space
block_pos = self._data.sensor("block1_pos").data.astype(np.float32)
if self.image_obs:
# Image observations
front_view, wrist_view = self.render()
observation = {
"pixels": {"front": front_view, "wrist": wrist_view},
"agent_pos": robot_state,
}
else:
# State-only observations
observation = {
"agent_pos": robot_state,
"environment_state": block_pos,
}
return observation
def _get_sensors(self) -> Tuple[list, list]:
"""Retrieve block and target positions."""
return (
[self._data.sensor(f"block{i}_pos") for i in range(1, self.no_blocks + 1)],
[self._data.sensor(f"target{i}_pos") for i in range(1, self.no_blocks + 1)],
)
def _compute_reward(self) -> float:
"""Compute the current reward based on block-target distances."""
block_sensors, target_sensors = self._get_sensors()
distances = [
np.linalg.norm(block.data - target.data)
for block, target in zip(block_sensors, target_sensors, strict=False)
]
if self.reward_type == "dense":
return sum(np.exp(-20 * d) for d in distances)
else:
return float(all(d < 0.03 for d in distances))
def _is_success(self) -> bool:
"""Check if the task is successfully completed."""
block_sensors, target_sensors = self._get_sensors()
distances = [
np.linalg.norm(block.data - target.data)
for block, target in zip(block_sensors, target_sensors, strict=False)
]
return all(dist < 0.03 for dist in distances)