gym-compete/gym_compete/new_envs/sumo.py (167 lines of code) (raw):
from .multi_agent_env import MultiAgentEnv
import numpy as np
from gym import spaces
import six
class SumoEnv(MultiAgentEnv):
'''
'''
def __init__(self, max_episode_steps=500, min_radius=1, max_radius=3.5, **kwargs):
super(SumoEnv, self).__init__(**kwargs)
self._max_episode_steps = max_episode_steps
self._elapsed_steps = 0
self.GOAL_REWARD = 1000
self.RADIUS = self.MAX_RADIUS = self.current_max_radius = max_radius
self.MIN_RADIUS = min_radius
self.LIM_X = [(-2, 0), (0, 2)]
self.LIM_Y = [(-2, 2), (-2, 2)]
self.RANGE_X = self.LIM_X.copy()
self.RANGE_Y = self.LIM_Y.copy()
self.arena_id = self.env_scene.model.geom_names.index(six.b('arena'))
self.arena_height = self.env_scene.model.geom_size[self.arena_id][1] * 2
self._set_geom_radius()
self.agent_contacts = False
def _past_limit(self):
if self._max_episode_steps <= self._elapsed_steps:
return True
return False
def _past_arena(self, agent_id):
xy = self.agents[agent_id].get_qpos()[:2]
r = np.sum(xy ** 2) ** 0.5
# print("Agent", agent_id, "at", r)
if r > self.RADIUS:
return True
return False
def _is_fallen(self, agent_id, limit=0.5):
if self.agents[agent_id].team == 'ant':
limit = 0.3
limit = limit + self.arena_height
return bool(self.agents[agent_id].get_qpos()[2] <= limit)
def _is_standing(self, agent_id, limit=0.9):
limit = limit + self.arena_height
return bool(self.agents[agent_id].get_qpos()[2] > limit)
def get_agent_contacts(self):
mjcontacts = self.env_scene.data._wrapped.contents.contact
ncon = self.env_scene.model.data.ncon
contacts = []
for i in range(ncon):
ct = mjcontacts[i]
g1 , g2 = ct.geom1, ct.geom2
g1 = self.env_scene.model.geom_names[g1]
g2 = self.env_scene.model.geom_names[g2]
if g1.find(six.b('agent')) >= 0 and g2.find(six.b('agent')) >= 0:
if g1.find(six.b('agent0')) >= 0:
if g2.find(six.b('agent1')) >= 0 and ct.dist < 0:
contacts.append((g1, g2, ct.dist))
elif g1.find(six.b('agent1')) >= 0:
if g2.find(six.b('agent0')) >= 0 and ct.dist < 0:
contacts.append((g1, g2, ct.dist))
return contacts
def goal_rewards(self, infos=None, agent_dones=None):
self._elapsed_steps += 1
goal_rews = [0. for _ in range(self.n_agents)]
fallen = [self._is_fallen(i)
for i in range(self.n_agents)]
timeup = self._past_limit()
past_arena = [self._past_arena(i) for i in range(self.n_agents)]
done = False
agent_contacts = self.get_agent_contacts()
if len(agent_contacts) > 0:
# print('Detected contacts:', agent_contacts)
self.agent_contacts = True
if any(fallen):
done = True
for j in range(self.n_agents):
if fallen[j]:
print('Agent', j, 'fallen')
goal_rews[j] -= self.GOAL_REWARD
elif self.agent_contacts:
goal_rews[j] += self.GOAL_REWARD
infos[j]['winner'] = True
# import ipdb; ipdb.set_trace()
elif any(past_arena):
done = True
for j in range(self.n_agents):
if past_arena[j]:
print('Agent', j, 'past arena')
goal_rews[j] -= self.GOAL_REWARD
elif self.agent_contacts:
goal_rews[j] += self.GOAL_REWARD
infos[j]['winner'] = True
elif timeup:
for j in range(self.n_agents):
goal_rews[j] -= self.GOAL_REWARD
done = timeup or done
return goal_rews, done
def _set_observation_space(self):
ob_spaces_limits = []
# nextra = 3 + self.n_agents - 1
nextra = 4
for i in range(self.n_agents):
s = self.agents[i].observation_space.shape[0]
h = np.ones(s+nextra) * np.inf
l = -h
ob_spaces_limits.append((l, h))
self.observation_space = spaces.Tuple(
[spaces.Box(l, h) for l,h in ob_spaces_limits]
)
def _get_obs(self):
obs = []
dists = []
for i in range(self.n_agents):
xy = self.agents[i].get_qpos()[:2]
r = np.sqrt(np.sum(xy**2))
d = self.RADIUS - r
# print(r, d)
dists.append(d)
for i in range(self.n_agents):
ob = self.agents[i]._get_obs()
mydist = np.asarray(dists[i])
if self.n_agents == 1:
other_dist = np.asarray(self.RADIUS)
elif self.n_agents == 2:
other_dist = np.asarray(dists[1-i])
else:
other_dist = np.asarray([dists[j] for j in range(self.n_agents) if j != i])
ob = np.concatenate(
[ob.flat, np.asarray(self.RADIUS).flat,
mydist.flat, other_dist.flat,
np.asarray(self._max_episode_steps - self._elapsed_steps).flat
]
)
obs.append(ob)
return tuple(obs)
def _reset_max_radius(self, version):
decay_func_r = lambda x: 0.1 * np.exp(0.001 * x)
vr = decay_func_r(version)
self.current_max_radius = min(self.MAX_RADIUS, self.MIN_RADIUS + vr)
# print(self.current_max_radius)
def _reset_radius(self):
self.RADIUS = np.random.uniform(self.MIN_RADIUS, self.current_max_radius)
# print('setting Radus to', self.RADIUS)
def _set_geom_radius(self):
gs = self.env_scene.model.geom_size.copy()
gs[self.arena_id][0] = self.RADIUS
self.env_scene.model.__setattr__('geom_size', gs)
self.env_scene.model.forward()
def _reset_agents(self):
# set agent 0
min_gap = 0.3 + self.MIN_RADIUS / 2
for i in range(self.n_agents):
if i % 2 == 0:
x = np.random.uniform(-self.RADIUS + min_gap, -0.3)
y_lim = np.sqrt(self.RADIUS**2 - x**2)
y = np.random.uniform(-y_lim + min_gap, y_lim - min_gap)
else:
x = np.random.uniform(0.3, self.RADIUS - min_gap)
y_lim = np.sqrt(self.RADIUS**2 - x**2)
y = np.random.uniform(-y_lim + min_gap, y_lim - min_gap)
self.agents[i].set_xyz((x,y,None))
# print('setting agent', i, 'at', (x,y))
def _reset(self, version=None):
self._elapsed_steps = 0
self.agent_contacts = False
# self.RADIUS = self.START_RADIUS
if version is not None:
self._reset_max_radius(version)
self._reset_radius()
self._set_geom_radius()
# print("here")
_ = self.env_scene.reset()
self._reset_agents()
ob = self._get_obs()
return ob
def reset(self, margins=None, version=None):
ob = self._reset(version=version)
if margins:
for i in range(self.n_agents):
self.agents[i].set_margin(margins[i])
return ob