def make_robot_env()

in lerobot/scripts/rl/gym_manipulator.py [0:0]


def make_robot_env(cfg: EnvConfig) -> gym.Env:
    """
    Factory function to create a robot environment.

    This function builds a robot environment with all necessary wrappers
    based on the provided configuration.

    Args:
        cfg: Configuration object containing environment parameters.

    Returns:
        A gym environment with all necessary wrappers applied.
    """
    if cfg.type == "hil":
        import gym_hil  # noqa: F401

        # TODO (azouitine)
        env = gym.make(
            f"gym_hil/{cfg.task}",
            image_obs=True,
            render_mode="human",
            use_gripper=cfg.wrapper.use_gripper,
            gripper_penalty=cfg.wrapper.gripper_penalty,
        )
        env = GymHilObservationProcessorWrapper(env=env)
        env = GymHilDeviceWrapper(env=env, device=cfg.device)
        env = BatchCompatibleWrapper(env=env)
        env = TorchActionWrapper(env=env, device=cfg.device)
        return env

    if not hasattr(cfg, "robot") or not hasattr(cfg, "teleop"):
        raise ValueError(
            "Configuration for 'gym_manipulator' must be HILSerlRobotEnvConfig with robot and teleop."
        )

    if cfg.robot is None:
        raise ValueError("RobotConfig (cfg.robot) must be provided for gym_manipulator environment.")
    robot = make_robot_from_config(cfg.robot)

    teleop_device = make_teleoperator_from_config(cfg.teleop)
    teleop_device.connect()

    # Create base environment
    env = RobotEnv(
        robot=robot,
        use_gripper=cfg.wrapper.use_gripper,
        display_cameras=cfg.wrapper.display_cameras if cfg.wrapper else False,
    )

    # Add observation and image processing
    if cfg.wrapper:
        if cfg.wrapper.add_joint_velocity_to_observation:
            env = AddJointVelocityToObservation(env=env, fps=cfg.fps)
        if cfg.wrapper.add_current_to_observation:
            env = AddCurrentToObservation(env=env)
        if cfg.wrapper.add_ee_pose_to_observation:
            env = EEObservationWrapper(env=env, ee_pose_limits=robot.end_effector_bounds)

    env = ConvertToLeRobotObservation(env=env, device=cfg.device)

    if cfg.wrapper and cfg.wrapper.crop_params_dict is not None:
        env = ImageCropResizeWrapper(
            env=env,
            crop_params_dict=cfg.wrapper.crop_params_dict,
            resize_size=cfg.wrapper.resize_size,
        )

    # Add reward computation and control wrappers
    reward_classifier = init_reward_classifier(cfg)
    if reward_classifier is not None:
        env = RewardWrapper(env=env, reward_classifier=reward_classifier, device=cfg.device)

    env = TimeLimitWrapper(env=env, control_time_s=cfg.wrapper.control_time_s, fps=cfg.fps)
    if cfg.wrapper.use_gripper and cfg.wrapper.gripper_penalty is not None:
        env = GripperPenaltyWrapper(
            env=env,
            penalty=cfg.wrapper.gripper_penalty,
        )

    # Control mode specific wrappers
    control_mode = cfg.wrapper.control_mode
    if control_mode == "gamepad":
        assert isinstance(teleop_device, GamepadTeleop), (
            "teleop_device must be an instance of GamepadTeleop for gamepad control mode"
        )
        env = GamepadControlWrapper(
            env=env,
            teleop_device=teleop_device,
            use_gripper=cfg.wrapper.use_gripper,
        )
    elif control_mode == "keyboard_ee":
        assert isinstance(teleop_device, KeyboardEndEffectorTeleop), (
            "teleop_device must be an instance of KeyboardEndEffectorTeleop for keyboard control mode"
        )
        env = KeyboardControlWrapper(
            env=env,
            teleop_device=teleop_device,
            use_gripper=cfg.wrapper.use_gripper,
        )
    elif control_mode == "leader":
        env = GearedLeaderControlWrapper(
            env=env,
            teleop_device=teleop_device,
            end_effector_step_sizes=cfg.robot.end_effector_step_sizes,
            use_gripper=cfg.wrapper.use_gripper,
        )
    elif control_mode == "leader_automatic":
        env = GearedLeaderAutomaticControlWrapper(
            env=env,
            teleop_device=teleop_device,
            end_effector_step_sizes=cfg.robot.end_effector_step_sizes,
            use_gripper=cfg.wrapper.use_gripper,
        )
    else:
        raise ValueError(f"Invalid control mode: {control_mode}")

    env = ResetWrapper(
        env=env,
        reset_pose=cfg.wrapper.fixed_reset_joint_positions,
        reset_time_s=cfg.wrapper.reset_time_s,
    )

    env = BatchCompatibleWrapper(env=env)
    env = TorchActionWrapper(env=env, device=cfg.device)

    return env