in lerobot/scripts/rl/gym_manipulator.py [0:0]
def make_robot_env(cfg: EnvConfig) -> gym.Env:
"""
Factory function to create a robot environment.
This function builds a robot environment with all necessary wrappers
based on the provided configuration.
Args:
cfg: Configuration object containing environment parameters.
Returns:
A gym environment with all necessary wrappers applied.
"""
if cfg.type == "hil":
import gym_hil # noqa: F401
# TODO (azouitine)
env = gym.make(
f"gym_hil/{cfg.task}",
image_obs=True,
render_mode="human",
use_gripper=cfg.wrapper.use_gripper,
gripper_penalty=cfg.wrapper.gripper_penalty,
)
env = GymHilObservationProcessorWrapper(env=env)
env = GymHilDeviceWrapper(env=env, device=cfg.device)
env = BatchCompatibleWrapper(env=env)
env = TorchActionWrapper(env=env, device=cfg.device)
return env
if not hasattr(cfg, "robot") or not hasattr(cfg, "teleop"):
raise ValueError(
"Configuration for 'gym_manipulator' must be HILSerlRobotEnvConfig with robot and teleop."
)
if cfg.robot is None:
raise ValueError("RobotConfig (cfg.robot) must be provided for gym_manipulator environment.")
robot = make_robot_from_config(cfg.robot)
teleop_device = make_teleoperator_from_config(cfg.teleop)
teleop_device.connect()
# Create base environment
env = RobotEnv(
robot=robot,
use_gripper=cfg.wrapper.use_gripper,
display_cameras=cfg.wrapper.display_cameras if cfg.wrapper else False,
)
# Add observation and image processing
if cfg.wrapper:
if cfg.wrapper.add_joint_velocity_to_observation:
env = AddJointVelocityToObservation(env=env, fps=cfg.fps)
if cfg.wrapper.add_current_to_observation:
env = AddCurrentToObservation(env=env)
if cfg.wrapper.add_ee_pose_to_observation:
env = EEObservationWrapper(env=env, ee_pose_limits=robot.end_effector_bounds)
env = ConvertToLeRobotObservation(env=env, device=cfg.device)
if cfg.wrapper and cfg.wrapper.crop_params_dict is not None:
env = ImageCropResizeWrapper(
env=env,
crop_params_dict=cfg.wrapper.crop_params_dict,
resize_size=cfg.wrapper.resize_size,
)
# Add reward computation and control wrappers
reward_classifier = init_reward_classifier(cfg)
if reward_classifier is not None:
env = RewardWrapper(env=env, reward_classifier=reward_classifier, device=cfg.device)
env = TimeLimitWrapper(env=env, control_time_s=cfg.wrapper.control_time_s, fps=cfg.fps)
if cfg.wrapper.use_gripper and cfg.wrapper.gripper_penalty is not None:
env = GripperPenaltyWrapper(
env=env,
penalty=cfg.wrapper.gripper_penalty,
)
# Control mode specific wrappers
control_mode = cfg.wrapper.control_mode
if control_mode == "gamepad":
assert isinstance(teleop_device, GamepadTeleop), (
"teleop_device must be an instance of GamepadTeleop for gamepad control mode"
)
env = GamepadControlWrapper(
env=env,
teleop_device=teleop_device,
use_gripper=cfg.wrapper.use_gripper,
)
elif control_mode == "keyboard_ee":
assert isinstance(teleop_device, KeyboardEndEffectorTeleop), (
"teleop_device must be an instance of KeyboardEndEffectorTeleop for keyboard control mode"
)
env = KeyboardControlWrapper(
env=env,
teleop_device=teleop_device,
use_gripper=cfg.wrapper.use_gripper,
)
elif control_mode == "leader":
env = GearedLeaderControlWrapper(
env=env,
teleop_device=teleop_device,
end_effector_step_sizes=cfg.robot.end_effector_step_sizes,
use_gripper=cfg.wrapper.use_gripper,
)
elif control_mode == "leader_automatic":
env = GearedLeaderAutomaticControlWrapper(
env=env,
teleop_device=teleop_device,
end_effector_step_sizes=cfg.robot.end_effector_step_sizes,
use_gripper=cfg.wrapper.use_gripper,
)
else:
raise ValueError(f"Invalid control mode: {control_mode}")
env = ResetWrapper(
env=env,
reset_pose=cfg.wrapper.fixed_reset_joint_positions,
reset_time_s=cfg.wrapper.reset_time_s,
)
env = BatchCompatibleWrapper(env=env)
env = TorchActionWrapper(env=env, device=cfg.device)
return env