torchbenchmark/models/soft_actor_critic/utils.py (125 lines of code) (raw):
import math
import os
import random
from collections import namedtuple
import gym
import numpy as np
import torch
def clean_hparams_dict(hparams_dict):
return {key: val for key, val in hparams_dict.items() if val}
def get_grad_norm(model):
total_norm = 0.0
for p in model.parameters():
try:
param = p.grad.data
except AttributeError:
continue
else:
param_norm = param.norm(2)
total_norm += param_norm.item() ** 2
total_norm = total_norm ** (1.0 / 2)
return total_norm
def torch_and_pad(x):
if not isinstance(x, np.ndarray):
x = np.array(x)
return torch.from_numpy(x.astype(np.float32)).unsqueeze(0)
def mean(lst):
return float(sum(lst)) / len(lst)
def make_process_dirs(run_name, base_path="dc_saves"):
base_dir = os.path.join(base_path, run_name)
i = 0
while os.path.exists(base_dir + f"_{i}"):
i += 1
base_dir += f"_{i}"
os.makedirs(base_dir)
return base_dir
def compute_conv_output(
inp_shape, kernel_size, padding=(0, 0), dilation=(1, 1), stride=(1, 1)
):
"""
Compute the shape of the output of a torch Conv2d layer using
the formula from the docs.
every argument is a tuple corresponding to (height, width), e.g. kernel_size=(3, 4)
"""
height_out = math.floor(
(
(inp_shape[0] + 2 * padding[0] - dilation[0] * (kernel_size[0] - 1) - 1)
/ stride[0]
)
+ 1
)
width_out = math.floor(
(
(inp_shape[1] + 2 * padding[1] - dilation[1] * (kernel_size[1] - 1) - 1)
/ stride[1]
)
+ 1
)
return height_out, width_out
def soft_update(target, source, tau):
for target_param, param in zip(target.parameters(), source.parameters()):
target_param.data.copy_(target_param.data * (1.0 - tau) + param.data * tau)
def hard_update(target, source):
for target_param, param in zip(target.parameters(), source.parameters()):
target_param.data.copy_(param.data)
""" This is all from: https://github.com/matthiasplappert/keras-rl/blob/master/rl/random.py """
class AnnealedGaussianProcess:
def __init__(self, mu, sigma, sigma_min, n_steps_annealing):
self.mu = mu
self.sigma = sigma
self.n_steps = 0
if sigma_min is not None:
self.m = -float(sigma - sigma_min) / float(n_steps_annealing)
self.c = sigma
self.sigma_min = sigma_min
else:
self.m = 0.0
self.c = sigma
self.sigma_min = sigma
@property
def current_sigma(self):
sigma = max(self.sigma_min, self.m * float(self.n_steps) + self.c)
return sigma
class OrnsteinUhlenbeckProcess(AnnealedGaussianProcess):
def __init__(
self,
theta,
mu=0.0,
sigma=1.0,
dt=1e-2,
x0=None,
size=1,
sigma_min=None,
n_steps_annealing=1000,
):
super(OrnsteinUhlenbeckProcess, self).__init__(
mu=mu, sigma=sigma, sigma_min=sigma_min, n_steps_annealing=n_steps_annealing
)
self.theta = theta
self.mu = mu
self.dt = dt
self.x0 = x0
self.size = size
self.reset_states()
def sample(self):
x = (
self.x_prev
+ self.theta * (self.mu - self.x_prev) * self.dt
+ self.current_sigma * np.sqrt(self.dt) * np.random.normal(size=self.size)
)
self.x_prev = x
self.n_steps += 1
return x
def reset_states(self):
self.x_prev = self.x0 if self.x0 is not None else np.zeros(self.size)
class GaussianExplorationNoise:
def __init__(self, size, start_scale=1.0, final_scale=0.1, steps_annealed=1000):
assert start_scale >= final_scale
self.size = size
self.start_scale = start_scale
self.final_scale = final_scale
self.steps_annealed = steps_annealed
self._current_scale = start_scale
self._scale_slope = (start_scale - final_scale) / steps_annealed
def sample(self):
noise = self._current_scale * torch.randn(*self.size)
self._current_scale = max(
self._current_scale - self._scale_slope, self.final_scale
)
return noise.numpy()
def reset_states(self):
pass