in lerobot/scripts/rl/gym_manipulator.py [0:0]
def step(self, action):
"""
Step the environment, using gamepad input to override actions when active.
Args:
action: Original action from agent.
Returns:
Tuple of (observation, reward, terminated, truncated, info).
"""
# Get gamepad state and action
(
is_intervention,
gamepad_action,
terminate_episode,
success,
rerecord_episode,
) = self.get_teleop_commands()
# Update episode ending state if requested
if terminate_episode:
logging.info(f"Episode manually ended: {'SUCCESS' if success else 'FAILURE'}")
# Only override the action if gamepad is active
action = gamepad_action if is_intervention else action
# Step the environment
obs, reward, terminated, truncated, info = self.env.step(action)
# Add episode ending if requested via gamepad
terminated = terminated or truncated or terminate_episode
if success:
reward = 1.0
logging.info("Episode ended successfully with reward 1.0")
if isinstance(action, np.ndarray):
action = torch.from_numpy(action)
info["is_intervention"] = is_intervention
# The original `BaseLeaderControlWrapper` puts `action_intervention` in info.
# For Gamepad, if intervention, `gamepad_action` is the intervention.
# If not intervention, policy's action is `action`.
# For consistency, let's store the *human's* action if intervention occurred.
info["action_intervention"] = action
info["rerecord_episode"] = rerecord_episode
# If episode ended, reset the state
if terminated or truncated:
# Add success/failure information to info dict
info["next.success"] = success
# Auto reset if configured
if self.auto_reset:
obs, reset_info = self.reset()
info.update(reset_info)
return obs, reward, terminated, truncated, info