sonic_on_ray/sonic_on_ray.py (89 lines of code) (raw):
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from collections import deque
import cv2
import gym
import gym.spaces as spaces
import retro
import numpy as np
class LazyFrames(object):
def __init__(self, frames):
"""
This object ensures that common frames between the observations are
only stored once. It exists purely to optimize memory usage which can
be huge for DQN's 1M frames replay buffers. This object should only be
converted to numpy array before being passed to the model. You'd not
believe how complex the previous solution was.
"""
self._frames = frames
self._out = None
def _force(self):
if self._out is None:
self._out = np.concatenate(self._frames, axis=2)
self._frames = None
return self._out
def __array__(self, dtype=None):
out = self._force()
if dtype is not None:
out = out.astype(dtype)
return out
def __len__(self):
return len(self._force())
def __getitem__(self, i):
return self._force()[i]
class FrameStack(gym.Wrapper):
def __init__(self, env, k):
"""Stack the k last frames.
Returns a lazy array, which is much more memory efficient.
See Also
--------
baselines.common.atari_wrappers.LazyFrames
"""
gym.Wrapper.__init__(self, env)
self.k = k
self.frames = deque([], maxlen=k)
shp = env.observation_space.shape
self.observation_space = spaces.Box(low=0, high=255,
shape=(shp[0], shp[1], shp[2] * k),
dtype=np.uint8)
def reset(self):
ob = self.env.reset()
for _ in range(self.k):
self.frames.append(ob)
return self._get_ob()
def step(self, action):
ob, reward, done, info = self.env.step(action)
self.frames.append(ob)
return self._get_ob(), reward, done, info
def _get_ob(self):
assert len(self.frames) == self.k
return LazyFrames(list(self.frames))
class WarpFrame(gym.ObservationWrapper):
def __init__(self, env):
"""Warp frames to 84x84 as done in the Nature paper and later work."""
gym.ObservationWrapper.__init__(self, env)
self.width = 80
self.height = 80
self.observation_space = spaces.Box(low=0, high=255,
shape=(self.height, self.width, 1),
dtype=np.uint8)
def observation(self, frame):
frame = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY)
frame = cv2.resize(frame, (self.width, self.height),
interpolation=cv2.INTER_AREA)
return frame[:, :, None]
class SonicDiscretizer(gym.ActionWrapper):
"""
Wrap a gym-retro environment and make it use discrete
actions for the Sonic game.
"""
def __init__(self, env):
super(SonicDiscretizer, self).__init__(env)
buttons = ["B", "A", "MODE", "START", "UP", "DOWN", "LEFT", "RIGHT",
"C", "Y", "X", "Z"]
actions = [['LEFT'], ['RIGHT'], ['LEFT', 'DOWN'], ['RIGHT', 'DOWN'],
['DOWN'], ['DOWN', 'B'], ['B']]
self._actions = []
for action in actions:
arr = np.array([False] * 12)
for button in action:
arr[buttons.index(button)] = True
self._actions.append(arr)
self.action_space = gym.spaces.Discrete(len(self._actions))
def action(self, a):
return self._actions[a].copy()
class RewardScaler(gym.RewardWrapper):
"""
Bring rewards to a reasonable scale for PPO. This is incredibly important
and effects performance a lot.
"""
def reward(self, reward):
return reward * 0.01
def make(game, state, stack=True, scale_rew=True):
"""
Create an environment with some standard wrappers.
"""
env = retro.make(game, state)
env = SonicDiscretizer(env)
if scale_rew:
env = RewardScaler(env)
env = WarpFrame(env)
if stack:
env = FrameStack(env, 4)
return env