gym-compete/gym_compete/new_envs/multi_agent_env.py (174 lines of code) (raw):
import numpy as np
from gym import Env, spaces
from .multi_agent_scene import MultiAgentScene
from .agents import *
from .utils import create_multiagent_xml
import os
import six
class MultiAgentEnv(Env):
'''
A multi-agent environment consists of some number of Agent and
a MultiAgentScene
The supported agents and their classes are defined in
AGENT_MAP, a dictionary mapping {agent_name: (xml_path, class)}
Agents with initial x coordinate < 0 have goal on the right and
vice versa
'''
AGENT_MAP = {
'ant': (
os.path.join(os.path.dirname(__file__), "assets", "ant_body.xml"),
Ant
),
'humanoid': (
os.path.join(os.path.dirname(__file__), "assets", "humanoid_body.xml"),
Humanoid
),
'humanoid_blocker': (
os.path.join(os.path.dirname(__file__), "assets", "humanoid_body.xml"),
HumanoidBlocker
),
'humanoid_fighter': (
os.path.join(os.path.dirname(__file__), "assets", "humanoid_body.xml"),
HumanoidFighter
),
'ant_fighter': (
os.path.join(os.path.dirname(__file__), "assets", "ant_body.xml"),
AntFighter
),
'humanoid_kicker': (
os.path.join(os.path.dirname(__file__), "assets", "humanoid_body.xml"),
HumanoidKicker
),
'humanoid_goalkeeper': (
os.path.join(os.path.dirname(__file__), "assets", "humanoid_body.xml"),
HumanoidGoalKeeper
),
}
WORLD_XML = os.path.join(os.path.dirname(__file__), "assets", "world_body.xml")
GOAL_REWARD = 1000
def __init__(
self, agent_names,
world_xml_path=WORLD_XML, agent_map=AGENT_MAP,
scene_xml_path=None, move_reward_weight=1.0,
init_pos=None, rgb=None, agent_args=None
):
'''
agent_args is a list of kwargs for each agent
'''
self.n_agents = len(agent_names)
self.agents = {}
all_agent_xml_paths = []
if not agent_args:
agent_args = [{} for _ in range(self.n_agents)]
assert len(agent_args) == self.n_agents, "Incorrect length of agent_args"
for i, name in enumerate(agent_names):
print("Creating agent", name)
agent_xml_path, agent_class = agent_map[name]
self.agents[i] = agent_class(i, agent_xml_path, **agent_args[i])
all_agent_xml_paths.append(agent_xml_path)
agent_scopes = ['agent' + str(i) for i in range(self.n_agents)]
# print(scene_xml_path)
if scene_xml_path is not None and os.path.exists(scene_xml_path):
self._env_xml_path = scene_xml_path
else:
print("Creating Scene XML")
print(init_pos)
_, self._env_xml_path = create_multiagent_xml(
world_xml_path, all_agent_xml_paths, agent_scopes,
# outdir=os.path.join(os.path.dirname(__file__), "assets"),
outpath=scene_xml_path,
ini_pos=init_pos, rgb=rgb
)
print("Scene XML path:", self._env_xml_path)
self.env_scene = MultiAgentScene(self._env_xml_path, self.n_agents)
print("Created Scene with agents")
for i, agent in self.agents.items():
agent.set_env(self.env_scene)
self._set_observation_space()
self._set_action_space()
self.metadata = self.env_scene.metadata
self.move_reward_weight = move_reward_weight
gid = self.env_scene.model.geom_names.index(six.b('rightgoal'))
self.RIGHT_GOAL = self.env_scene.model.geom_pos[gid][0]
gid = self.env_scene.model.geom_names.index(six.b('leftgoal'))
self.LEFT_GOAL = self.env_scene.model.geom_pos[gid][0]
for i in range(self.n_agents):
if self.agents[i].get_qpos()[0] > 0:
self.agents[i].set_goal(self.LEFT_GOAL)
else:
self.agents[i].set_goal(self.RIGHT_GOAL)
def _set_observation_space(self):
self.observation_space = spaces.Tuple(
[self.agents[i].observation_space for i in range(self.n_agents)]
)
def _set_action_space(self):
self.action_space = spaces.Tuple(
[self.agents[i].action_space for i in range(self.n_agents)]
)
def goal_rewards(self, infos=None, agent_dones=None):
touchdowns = [self.agents[i].reached_goal()
for i in range(self.n_agents)]
num_reached_goal = sum(touchdowns)
goal_rews = [0. for _ in range(self.n_agents)]
if num_reached_goal != 1:
return goal_rews, num_reached_goal > 0
for i in range(self.n_agents):
if touchdowns[i]:
goal_rews[i] = self.GOAL_REWARD
if infos:
infos[i]['winner'] = True
else:
goal_rews[i] = - self.GOAL_REWARD
return goal_rews, True
def _get_done(self, dones, game_done):
done = np.all(dones)
done = game_done or not np.isfinite(self.state_vector()).all() or done
dones = tuple(done for _ in range(self.n_agents))
return dones
def _step(self, actions):
for i in range(self.n_agents):
self.agents[i].before_step()
self.env_scene.simulate(actions)
move_rews = []
infos = []
dones = []
for i in range(self.n_agents):
move_r, agent_done, rinfo = self.agents[i].after_step(actions[i])
move_rews.append(move_r)
dones.append(agent_done)
rinfo['agent_done'] = agent_done
infos.append(rinfo)
goal_rews, game_done = self.goal_rewards(infos=infos, agent_dones=dones)
rews = []
for i, info in enumerate(infos):
info['reward_remaining'] = float(goal_rews[i])
rews.append(float(goal_rews[i] + self.move_reward_weight * move_rews[i]))
rews = tuple(rews)
done = self._get_done(dones, game_done)
infos = tuple(infos)
obses = self._get_obs()
return obses, rews, done, infos
def _get_obs(self):
return tuple([self.agents[i]._get_obs() for i in range(self.n_agents)])
'''
Following remaps all mujoco-env calls to the scene
'''
def _seed(self, seed=None):
return self.env_scene._seed(seed)
def _reset(self):
# _ = self.env_scene._reset()
ob = self.reset_model()
return ob
def set_state(self, qpos, qvel):
self.env_scene.set_state(qpos, qvel)
@property
def dt(self):
return self.env_scene.dt
def _render(self, mode='human', close=False):
return self.env_scene._render(mode, close)
def state_vector(self):
return self.env_scene.state_vector()
def reset_model(self):
# self.env_scene.reset_model()
_ = self.env_scene.reset()
for i in range(self.n_agents):
self.agents[i].reset_agent()
return self._get_obs()
def viewer_setup(self):
self.env_scene.viewer_setup()