mae_envs/wrappers/lidar.py (83 lines of code) (raw):
import gym
import numpy as np
from mujoco_worldgen.util.rotation import quat_from_angle_and_axis
from mujoco_worldgen.util.geometry import raycast
from mae_envs.wrappers.util import update_obs_space
class Lidar(gym.ObservationWrapper):
'''
Creates LIDAR-type observations based on Mujoco raycast
Args:
n_lidar_per_agent (int): Number of concentric lidar rays per agent
lidar_range (float): Maximum range of lidar
compress_lidar_scale (float): Scale for non-linear compression of
lidar range
visualize_lidar (bool): If true, visualize lidar using thin cylinders
representing lidar rays (requires environment to create corresponding
sites)
'''
def __init__(self, env, n_lidar_per_agent=30, lidar_range=6.0,
compress_lidar_scale=None, visualize_lidar=False):
super().__init__(env)
self.n_lidar_per_agent = n_lidar_per_agent
self.lidar_range = lidar_range
self.compress_lidar_scale = compress_lidar_scale
self.visualize_lidar = visualize_lidar
self.n_agents = self.unwrapped.n_agents
self.observation_space = update_obs_space(
env, {'lidar': (self.n_agents, self.n_lidar_per_agent, 1)})
# generate concentric lidar rays centered at origin
self.lidar_angles = np.linspace(0, 2*np.pi, num=self.n_lidar_per_agent, endpoint=False)
self.lidar_rays = self.lidar_range * np.array([np.cos(self.lidar_angles),
np.sin(self.lidar_angles),
np.zeros_like(self.lidar_angles)]).T
self.lidar_rays = self.lidar_rays[None, :]
def reset(self):
obs = self.env.reset()
sim = self.unwrapped.sim
# Cache ids
self.agent_body_ids = np.array([sim.model.body_name2id(f"agent{i}:particle")
for i in range(self.n_agents)])
self.agent_geom_ids = np.array([sim.model.geom_name2id(f'agent{i}:agent')
for i in range(self.n_agents)])
if self.visualize_lidar:
self.lidar_ids = np.array([[sim.model.site_name2id(f"agent{i}:lidar{j}")
for j in range(self.n_lidar_per_agent)]
for i in range(self.n_agents)])
return self.observation(obs)
def place_lidar_ray_markers(self, agent_pos, lidar_endpoints):
sim = self.unwrapped.sim
site_offset = sim.data.site_xpos[self.lidar_ids, :] - sim.model.site_pos[self.lidar_ids, :]
# compute location of lidar rays
sim.model.site_pos[self.lidar_ids, :] = .5 * (agent_pos[:, None, :] + lidar_endpoints) - site_offset
# compute length of lidar rays
rel_vec = lidar_endpoints - agent_pos[:, None, :]
rel_vec_length = np.linalg.norm(rel_vec, axis=-1)
sim.model.site_size[self.lidar_ids, 1] = rel_vec_length / 2
# compute rotation of lidar rays
# normalize relative vector
rel_vec_norm = rel_vec / rel_vec_length[:, :, None]
# set small relative vectors to zero instead
rel_vec_norm[rel_vec_length <= 1e-8, :] = 0.0
# start vector
start_vec = np.array([0.0, 0.0, 1.0])
# calculate rotation axis: cross product between start and goal vector
rot_axis = np.cross(start_vec, rel_vec_norm)
norm_rot_axis = np.linalg.norm(rot_axis, axis=-1)
# calculate rotation angle and quaternion
rot_angle = np.arctan2(norm_rot_axis, np.dot(rel_vec_norm, start_vec))
quat = quat_from_angle_and_axis(rot_angle, rot_axis)
# if norm of cross product is very small, set rotation to identity
eps = 1e-3
quat[norm_rot_axis <= eps, :] = np.array([1.0, 0.0, 0.0, 0.0])
sim.model.site_quat[self.lidar_ids, :] = quat
def observation(self, obs):
sim = self.unwrapped.sim
agent_pos = sim.data.body_xpos[self.agent_body_ids]
lidar_endpoints = agent_pos[:, None, :] + self.lidar_rays
# Would be nice to vectorize in the future with better mujoco-py interface
lidar = np.zeros((self.n_agents, self.n_lidar_per_agent))
for i in range(self.n_agents):
for j in range(self.n_lidar_per_agent):
lidar[i, j] = raycast(sim, geom1_id=self.agent_geom_ids[i],
pt2=lidar_endpoints[i, j], geom_group=None)[0]
lidar[lidar < 0.0] = self.lidar_range
if self.compress_lidar_scale is not None:
obs['lidar'] = (self.compress_lidar_scale *
np.tanh(lidar[..., None] / self.compress_lidar_scale))
else:
obs['lidar'] = lidar[..., None]
if self.visualize_lidar:
# recalculate lidar endpoints
lidar_endpoints = agent_pos[:, None, :] + \
lidar[:, :, None] / self.lidar_range * self.lidar_rays
self.place_lidar_ray_markers(agent_pos, lidar_endpoints)
sim.model.site_rgba[self.lidar_ids, :] = np.array([0.0, 0.0, 1.0, 0.2])
sim.forward()
return obs