torchbenchmark/models/soft_actor_critic/envs.py (261 lines of code) (raw):
import argparse
import random
from collections import deque
import math
import gym
import numpy as np
class ActionRepeatWrapper(gym.Wrapper):
def __init__(self, env, repeat_multiplier=8):
super().__init__(env)
self.action_space = gym.spaces.Box(
-1.0, 1.0, shape=(1 + self.env.action_space.shape[0],)
)
self.repeat_multiplier = repeat_multiplier / 2.0
def step(self, action):
repeat_action = max(math.floor((action[0] + 1.0) * self.repeat_multiplier), 1)
main_action = action[1:]
total_reward = 0
for _ in range(repeat_action):
next_state, reward, done, _ = self.env.step(main_action)
total_reward += reward
return next_state, total_reward, done, {}
class ChannelsFirstWrapper(gym.ObservationWrapper):
"""
Some pixel-based gym environments use a (Height, Width, Channel) image format.
This wrapper rolls those axes to (Channel, Height, Width) to work with pytorch
Conv2D layers.
"""
def __init__(self, env):
super().__init__(env)
self.observation_space.shape = (
env.observation_space.shape[-1],
) + env.observation_space.shape[:-1]
def observation(self, frame):
frame = np.transpose(frame, (2, 0, 1))
return np.ascontiguousarray(frame)
class NormalizeObservationSpace(gym.ObservationWrapper):
def __init__(self, env, obs_mean, obs_std):
super().__init__(env)
self.mean = obs_mean
self.std = obs_std + 1e-5
def observation(self, x):
return (x - self.mean) / self.std
class NormalizeContinuousActionSpace(gym.ActionWrapper):
def __init__(self, env):
super().__init__(env)
self._true_action_space = env.action_space
self.action_space = gym.spaces.Box(
low=-1.0, high=1.0, shape=self._true_action_space.shape, dtype=np.float32,
)
def action(self, action):
true_delta = self._true_action_space.high - self._true_action_space.low
norm_delta = self.action_space.high - self.action_space.low
action = (action - self.action_space.low) / norm_delta
action = action * true_delta + self._true_action_space.low
return action
def robosuite_action_adjustment(robosuite_env, verbose=False):
if verbose:
action_space = robosuite_env.action_space
high = action_space.high
same_high = np.all(high == high[0])
low = action_space.low
same_low = np.all(low == low[0])
shape = action_space.shape[0]
print("RoboSuite Action Space Report:")
if same_high and same_low:
print(f"Uniformly Bounded Action Space in [{low[0]}, {high[0]}]^{shape}")
else:
print(f"Non-uniform Bounded Action Space with elements = {zip(low, high)}")
print("\nAttempting to normalize action space using dc.envs.Normalize...\n")
env = NormalizeContinuousActionSpace(robosuite_env)
if verbose:
action_space = env.action_space
high = action_space.high
same_high = np.all(high == high[0])
low = action_space.low
same_low = np.all(low == low[0])
shape = action_space.shape[0]
print("Normalized RoboSuite Action Space Report:")
if same_high and same_low:
print(f"Uniformly Bounded Action Space in [{low[0]}, {high[0]}]^{shape}")
else:
print(f"Non-uniform Bounded Action Space with elements = {zip(low, high)}")
return env
class FlattenObsWrapper(gym.ObservationWrapper):
"""
Simple wrapper that flattens an image observation
into a state vector when CNNs are overkill.
"""
def __init__(self, env):
super().__init__(env)
self.observation_space.shape = (np.prod(env.observation_space.shape),)
def observation(self, obs):
return obs.flatten()
class ConcatObsWrapper(gym.ObservationWrapper):
def __init__(self, env):
super().__init__(env)
obs_space_shape = sum(x.shape[0] for x in self.observation_space)
self.observation_space.shape = (obs_space_shape,)
def observation(self, obs):
return np.concatenate(obs, axis=0)
def highway_env(env_id):
"""
Convenience function to turn all the highway_env
environments into continuous control tasks.
highway_env: https://highway-env.readthedocs.io/en/latest/index.html
"""
import gym
import highway_env
env = gym.make(env_id)
env.configure({"action": {"type": "ContinuousAction"}})
env.reset()
env = NormalizeContinuousActionSpace(env)
env = FlattenObsWrapper(env)
return env
class DiscreteActionWrapper(gym.ActionWrapper):
"""
This is intended to let the action be any scalar
(float or int) or np array (float or int) of size 1.
floats are cast to ints using python's standard rounding.
"""
def __init__(self, env):
super().__init__(env)
self.action_space.shape = (env.action_space.n,)
def action(self, action):
if isinstance(action, np.ndarray):
if len(action.shape) > 0:
action = action[0]
return int(action)
class FrameStack(gym.Wrapper):
def __init__(self, env, num_stack):
gym.Wrapper.__init__(self, env)
self._k = num_stack
self._frames = deque([], maxlen=num_stack)
shp = env.observation_space.shape
self.observation_space = gym.spaces.Box(
low=0,
high=1,
shape=((shp[0] * num_stack,) + shp[1:]),
dtype=env.observation_space.dtype,
)
def reset(self):
obs = self.env.reset()
for _ in range(self._k):
self._frames.append(obs)
return self._get_obs()
def step(self, action):
obs, reward, done, info = self.env.step(action)
self._frames.append(obs)
return self._get_obs(), reward, done, info
def _get_obs(self):
assert len(self._frames) == self._k
return np.concatenate(list(self._frames), axis=0)
class GoalBasedWrapper(gym.ObservationWrapper):
"""
Some goal-based envs (like the Gym Robotics suite) use dictionary observations
with one entry for the current state and another to describe the goal. This
wrapper concatenates those into a single vector so it can be used just like
any other env.
"""
def __init__(self, env):
super().__init__(env)
self.observation_space.shape = (
env.observation_space["observation"].shape[0]
+ env.observation_space["desired_goal"].shape[0],
)
def observation(self, obs_dict):
return self._flatten_obs(obs_dict)
def _flatten_obs(self, obs_dict):
return np.concatenate((obs_dict["observation"], obs_dict["desired_goal"]))
def add_gym_args(parser):
"""
Add a --env_id cl flag to an argparser
"""
parser.add_argument("--env_id", type=str, default="Pendulum-v1")
parser.add_argument("--seed", type=int, default=123)
def load_gym(env_id="CartPole-v1", seed=None, normalize_action_space=True, **_):
"""
Load an environment from OpenAI gym (or pybullet_gym, if installed)
"""
# optional pybullet import
try:
import pybullet
import pybulletgym
except ImportError:
pass
env = gym.make(env_id)
if normalize_action_space and isinstance(env.action_space, gym.spaces.Box):
env = NormalizeContinuousActionSpace(env)
if seed is None:
seed = random.randint(1, 100000)
env.seed(seed)
return env
def add_dmc_args(parser):
"""
Add cl flags associated with the deepmind control suite to a parser
"""
parser.add_argument("--domain_name", type=str, default="fish")
parser.add_argument("--task_name", type=str, default="swim")
parser.add_argument(
"--from_pixels", action="store_true", help="Use image observations"
)
parser.add_argument("--height", type=int, default=84)
parser.add_argument("--width", type=int, default=84)
parser.add_argument("--camera_id", type=int, default=0)
parser.add_argument("--frame_skip", type=int, default=1)
parser.add_argument("--frame_stack", type=int, default=3)
parser.add_argument("--channels_last", action="store_true")
parser.add_argument("--rgb", action="store_true")
parser.add_argument("--seed", type=int, default=231)
def add_atari_args(parser):
parser.add_argument("--game_id", type=str, default="Boxing-v0")
parser.add_argument("--noop_max", type=int, default=30)
parser.add_argument("--frame_skip", type=int, default=1)
parser.add_argument("--screen_size", type=int, default=84)
parser.add_argument("--terminal_on_life_loss", action="store_true")
parser.add_argument("--rgb", action="store_true")
parser.add_argument("--normalize", action="store_true")
parser.add_argument("--frame_stack", type=int, default=4)
parser.add_argument("--seed", type=int, default=231)
def load_atari(
game_id,
seed=None,
noop_max=30,
frame_skip=1,
screen_size=84,
terminal_on_life_loss=False,
rgb=False,
normalize=False,
frame_stack=4,
clip_reward=True,
**_,
):
"""
Load a game from the Atari benchmark, with the usual settings
Note that the simplest game ids (e.g. Boxing-v0) come with frame
skipping by default, and you'll get an error if the frame_skp arg > 1.
Use `BoxingNoFrameskip-v0` with frame_skip > 1.
"""
env = gym.make(game_id)
if seed is None:
seed = random.randint(1, 100000)
env.seed(seed)
env = gym.wrappers.AtariPreprocessing(
env,
noop_max=noop_max,
frame_skip=frame_skip,
screen_size=screen_size,
terminal_on_life_loss=terminal_on_life_loss,
grayscale_obs=False, # use GrayScale wrapper instead...
scale_obs=normalize,
)
if not rgb:
env = gym.wrappers.GrayScaleObservation(env, keep_dim=True)
if clip_reward:
env = ClipReward(env)
env = ChannelsFirstWrapper(env)
env = FrameStack(env, num_stack=frame_stack)
env = DiscreteActionWrapper(env)
return env
class ClipReward(gym.RewardWrapper):
def __init__(self, env, low=-1.0, high=1.0):
super().__init__(env)
self._clip_low = low
self._clip_high = high
def reward(self, rew):
return max(min(rew, self._clip_high), self._clip_low)
def load_dmc(
domain_name,
task_name,
seed=None,
from_pixels=False,
frame_stack=1,
height=84,
width=84,
camera_id=0,
frame_skip=1,
channels_last=False,
rgb=False,
**_,
):
"""
Load a task from the deepmind control suite.
Uses dmc2gym (https://github.com/denisyarats/dmc2gym)
Note that setting seed=None (the default) picks a random seed
"""
import dmc2gym
if seed is None:
seed = random.randint(1, 100000)
env = dmc2gym.make(
domain_name=domain_name,
task_name=task_name,
from_pixels=from_pixels,
height=height,
width=width,
camera_id=camera_id,
visualize_reward=False,
frame_skip=frame_skip,
channels_first=not channels_last
if rgb
else False, # if we're using RGB, set the channel order here
)
if not rgb and from_pixels:
env = gym.wrappers.GrayScaleObservation(env, keep_dim=True)
env = ChannelsFirstWrapper(env)
if from_pixels:
env = FrameStack(env, num_stack=frame_stack)
return env