randomized_uncertain_social_preferences/rusp/env_oasis.py (375 lines of code) (raw):
import gym
import logging
import numpy as np
from collections import OrderedDict
from mae_envs.envs.base import Base
from mae_envs.wrappers.multi_agent import (SplitMultiAgentActions,
SplitObservations, SelectKeysWrapper)
from mae_envs.wrappers.util import (DiscretizeActionWrapper, DiscardMujocoExceptionEpisodes,
ConcatenateObsWrapper)
from mae_envs.wrappers.food import FoodHealthWrapper, AlwaysEatWrapper
from mae_envs.modules.agents import Agents
from mae_envs.modules.food import Food
from mae_envs.modules.world import FloorAttributes, WorldConstants
from mae_envs.modules.util import uniform_placement, close_to_other_object_placement
from mae_envs.wrappers.line_of_sight import (AgentAgentObsMask2D)
from mae_envs.wrappers.util import update_obs_space
from mujoco_worldgen.util.types import store_args
from rusp.wrappers_rusp import RUSPWrapper, add_rew_share_observation_keys
from rusp.wrappers_util import RandomIdentityVector, RandomizedHorizonWrapper, OtherActorAttentionAction, ActionOptionsWrapper
def zero_action(ac_space):
'''
Define default zero action for when an agent dies such that it stays in place and doesn't do anything.
'''
ac = OrderedDict()
for ac_key, s in ac_space.spaces.items():
assert isinstance(s, gym.spaces.Tuple), f"space {s} is not a Tuple"
single_agent_space = s.spaces[0]
if isinstance(single_agent_space, gym.spaces.Box):
ac[ac_key] = np.zeros_like(s.sample())
elif isinstance(single_agent_space, gym.spaces.Discrete):
ac[ac_key] = np.ones_like(s.sample()) * (single_agent_space.n // 2)
elif isinstance(single_agent_space, gym.spaces.MultiDiscrete):
ac[ac_key] = np.ones_like(s.sample(), dtype=int) * (single_agent_space.nvec // 2)
else:
raise NotImplementedError("MultiDiscrete not NotImplementedError")
return ac
class ZeroRews(gym.Wrapper):
'''
Change reward to a vector such that downstream wrappers do not need to check if it
is already a vector.
'''
def step(self, action):
obs, rew, done, info = self.env.step(action)
return obs, np.zeros((self.metadata['n_agents'],)), done, info
class OasisActionMasks(gym.ObservationWrapper):
'''
Construct masks for all actions in Oasis such that the policy gradient can be masked when
an agent dies. Unfortunately training code is not being released, but you can implement this
simply by setting the logprob of your policy to a large negative number.
'''
@store_args
def __init__(self, env, mask_all_when_dead=True):
super().__init__(env)
self.observation_space.spaces['mask'] = gym.spaces.Dict({
'action_movement': gym.spaces.Box(-np.inf, np.inf, (self.metadata['n_actors'], 11), float),
'action_attack_agent': gym.spaces.Box(-np.inf, np.inf, (self.metadata['n_actors'], self.metadata['n_actors'] - 1), float),
'action_choose_option': gym.spaces.Box(-np.inf, np.inf, (self.metadata['n_actors'], 3), float),
})
def observation(self, obs):
obs['mask'] = {
'action_movement': np.ones((self.metadata['n_actors'], 3, 11), dtype=bool),
'action_attack_agent': np.ones((self.metadata['n_actors'], self.metadata['n_actors'] - 1), dtype=bool),
'action_choose_option': np.ones((self.metadata['n_actors'], 3), dtype=bool),
}
if self.mask_all_when_dead:
obs['mask']['action_movement'] *= (1 - obs['mask_is_dead'])[..., None].astype(bool)
obs['mask']['action_attack_agent'] *= (1 - obs['mask_is_dead']).astype(bool)
obs['mask']['action_choose_option'] *= (1 - obs['mask_is_dead']).astype(bool)
return obs
class AgentHealthWrapper(gym.Wrapper):
'''
Manages agent health and death. This wrapper keeps track of each agent's health. Upstream wrappers
can update agent health by assigning "health_delta" in the info dict. health_delta will be a (n_agents) shaped
array. If an agent dies (health <= 0) it will be sent to the edge of the play area and held in place for a number
of steps at which point it may re-enter play.
Args:
max_health (float): maximum health an agent can reach. Agents are spawned with maximum health.
death_rew (float): reward for dying (this should be negative)
steps_freeze_on_death (int): number of timesteps
life_rew (float): reward given to an agent on each step for being alive. What other reward is there than life itself!
Observations:
agent_health (n_agents, 1): current health
is_dead (n_agents, 1): boolean indicating if an agent is dead or alive
time_to_alive (n_agents, 1): number of timesteps until each agent is alive again (0 if they are currently alive)
'''
@store_args
def __init__(self, env, max_health=20, death_rew=-100, steps_freeze_on_death=100, life_rew=1):
super().__init__(env)
assert death_rew <= 0
assert life_rew >= 0
assert steps_freeze_on_death >= 0
self.observation_space = update_obs_space(self, {
'agent_health': [self.metadata['n_agents'], 1],
'is_dead': [self.metadata['n_agents'], 1],
'time_to_alive': [self.metadata['n_agents'], 1]
})
self.zero_action = zero_action(self.action_space)
logging.info(f"Only {self.zero_action.keys()} will be zerod during death")
def reset(self):
self.healths = np.ones(self.metadata['n_agents']) * self.max_health
self.time_since_death = np.ones(self.metadata['n_agents']) * np.inf
self.is_dead = np.zeros(self.metadata['n_agents'])
self.agent_died_count = np.zeros(self.metadata['n_agents'])
return self.observation(self.env.reset())
def step(self, action):
self.is_dead = np.logical_or(self.healths <= 0, self.time_since_death < self.steps_freeze_on_death)
# If an agent just died, its health will be <= 0, update position and health
for i in np.where(self.healths <= 0)[0]:
x_ind = self.unwrapped.sim.model.joint_name2id(f'agent{i}:tx')
y_ind = self.unwrapped.sim.model.joint_name2id(f'agent{i}:ty')
fs = self.unwrapped.floor_size
self.unwrapped.sim.data.qpos[x_ind] = np.random.choice([np.random.uniform(-1, 0), np.random.uniform(fs, fs + 1)])
self.unwrapped.sim.data.qpos[y_ind] = np.random.choice([np.random.uniform(-1, 0), np.random.uniform(fs, fs + 1)])
self.healths[i] = self.max_health
self.time_since_death[i] = 0
self.agent_died_count[i] += 1
self.unwrapped.sim.forward() # Forward the sim so their position gets updated sooner
# Zero out actions for all dead agents
if np.any(self.is_dead):
for ac_key, ac in action.items():
ac[self.is_dead] = self.zero_action[ac_key][self.is_dead]
obs, rew, done, info = self.env.step(action)
# Update healths
self.healths[~self.is_dead] += info['health_delta'][~self.is_dead] # only change health of alive agents
self.healths = np.minimum(self.healths, self.max_health)
self.time_since_death += 1
rew[self.healths <= 0] += self.death_rew
# Reward for living
rew[~self.is_dead] += self.life_rew
# Done stats
if done:
info['n_unique_died'] = np.sum(self.agent_died_count > 0)
info['only_one_died'] = (np.sum(self.agent_died_count > 0) == 1)
info['n_died'] = np.sum(self.agent_died_count)
info['n_died_min'] = np.min(self.agent_died_count)
info['n_died_max'] = np.max(self.agent_died_count)
info['n_died_std'] = np.std(self.agent_died_count)
info['n_died_total_minus_max'] = np.sum(self.agent_died_count) - np.max(self.agent_died_count)
return self.observation(obs), rew, done, info
def observation(self, obs):
obs['agent_health'] = self.healths[:, None]
obs['is_dead'] = self.is_dead[:, None]
obs['mask_is_dead'] = self.is_dead[:, None].astype(bool)
obs['time_to_alive'] = np.minimum(1, self.time_since_death / self.steps_freeze_on_death)[:, None]
return obs
class FoodIncreaseHealth(gym.Wrapper):
'''
Adds a positive health_delta if the agent ate food (eating logic found in mae_envs.wrappers.food).
Args:
health_per_food_bounds ([float, float]): health gained per food eaten is randomized per episode, sampled
uniformly within this bound.
'''
def __init__(self, env, health_per_food_bounds):
super().__init__(env)
self.health_per_food_bounds = health_per_food_bounds
def reset(self):
self.health_per_food = np.random.uniform(self.health_per_food_bounds[0], self.health_per_food_bounds[1])
return self.env.reset()
def step(self, action):
obs, rew, done, info = self.env.step(action)
if 'health_delta' not in info:
info['health_delta'] = np.zeros((self.metadata['n_agents']))
info['health_delta'] += np.sum(info['agents_eat'], 1) * self.health_per_food
info['total_health_gained_from_food'] = np.sum(info['agents_eat']) * self.health_per_food
return obs, rew, done, info
class TimeDecreaseHealth(gym.Wrapper):
'''
Decrease agent health by a constant amount every timestep.
Args:
health_per_step (float):amount to decrease health every step.
'''
@store_args
def __init__(self, env, health_per_step=-1):
super().__init__(env)
def step(self, action):
obs, rew, done, info = self.env.step(action)
if 'health_delta' not in info:
info['health_delta'] = np.zeros((self.metadata['n_agents']))
info['health_delta'] += self.health_per_step
return obs, rew, done, info
class AttackAction(OtherActorAttentionAction):
'''
Attack action wrapper. Agents may attack other agents that are within range. Attacking causes
the targeted agent to lose health (attack damage). If an agent is attacked, we stop them from
eating that round.
Args:
attack_damage (float): amount of health gained from an attack -- must be a negative value.
attack_range (float): maximum distance a target can be for an agent to attack them.
mask_eat_if_attacked (bool): if True, an attacked agent will be disallowed to eat on the timestep
they are attacked
only_attack_in_front (bool): if True, agents can only attack when another agent is in their "field of view"
Observations:
attacked_me (n_agents, n_agents, 1): each row is an agent's binary observation of which other agent attacked them
n_attacked_me (n_agents, 1): this is 'attacked_me' summed over the second dimension -- the total number of agents
that attacked each agent.
'''
@store_args
def __init__(self, env, attack_damage=-5, attack_range=0.7, mask_eat_if_attacked=True,
only_attack_in_front=True):
assert attack_damage <= 0
assert attack_range >= 0
super().__init__(env, 'action_attack_agent')
self.observation_space = update_obs_space(self, {
'attacked_me': [self.n_agents, self.n_agents, 1],
'n_attacked_me': [self.n_agents, 1]
})
def reset(self):
self.attack_counts = np.zeros(self.n_agents)
self.attacked_me = np.zeros((self.n_agents, self.n_agents))
self.previous_obs = self.observation(self.env.reset())
return self.previous_obs
def step(self, action):
attack_matrix = np.zeros((self.n_agents, self.n_agents), dtype=bool)
for i in range(self.n_agents):
target_actors = self._get_target_actor(i, action)
if len(target_actors):
# See if the targeted agent can be attacked (in range and in front)
aa_ranges = np.linalg.norm(self.previous_obs['agent_pos'][i] - self.previous_obs['agent_pos'][target_actors], axis=1)
in_range = aa_ranges < self.attack_range
in_front = self.previous_obs['mask_aa_obs'][i, target_actors]
able_to_attack = np.logical_and(in_range, in_front) if self.only_attack_in_front else in_range
if np.any(able_to_attack):
# Filter down to those that are in range and in front
target_actors = target_actors[able_to_attack]
aa_ranges = aa_ranges[able_to_attack]
# Only attack the closest agent to you
target_actor = target_actors[np.argsort(aa_ranges)[0]]
attack_matrix[i, target_actor] = 1
self.attacked_me = attack_matrix.T
self.attack_counts += np.sum(attack_matrix, 1)
# Compute health updates
health_deltas = np.zeros((self.n_agents, self.n_agents))
health_deltas[self.attacked_me] += self.attack_damage
health_deltas = np.sum(health_deltas, 1)
# Turn off the eat action if you were attacked
if self.mask_eat_if_attacked:
action['action_eat_food'] *= ~np.any(self.attacked_me, 1, keepdims=True)
obs, rew, done, info = self.env.step(action)
info['health_delta'] += health_deltas
self.previous_obs = self.observation(obs)
if done:
info['n_attacks'] = np.sum(self.attack_counts)
info['n_attacks_per_agent'] = np.mean(self.attack_counts)
return self.previous_obs, rew, done, info
def observation(self, obs):
obs['attacked_me'] = self.attacked_me[:, :, None]
obs['n_attacked_me'] = np.sum(self.attacked_me, 1, keepdims=True)
return obs
class ColorAgentsByOption(gym.Wrapper):
'''
Purely for visualization purposes. Colors agents red if they are attacking,
green if they are eating, and blue if they are doing neither.
'''
@store_args
def __init__(self, env, action_key, options_list):
super().__init__(env)
self.colors = {
'action_attack_agent': np.array([1., 0, 0, 1.0]),
'action_eat_food': np.array([0, 1., 0, 1.0]),
'do_nothing': np.array([0, 1, 1, 1]),
}
def step(self, action):
for i in range(self.unwrapped.n_agents):
self._color_agent(self.options_list[action[self.action_key][i]], i)
return self.env.step(action)
def _color_agent(self, ac_name, agent_ind):
sim = self.unwrapped.sim
geom_ind = sim.model.geom_name2id(f'agent{agent_ind}:agent')
sim.model.geom_rgba[geom_ind] = self.colors[ac_name]
def make_env(n_substeps=15, n_agents=3,
floor_size=[1.5, 6], action_lims=(-0.9, 0.9),
grid_size=60, other_friction=0.01, box_floor_friction=0.2, gravity=[0, 0, -50],
horizon=1000, horizon_lower=None, horizon_upper=None, prob_per_step_to_stop=0.001,
# Food
n_food=1, n_food_cluster=1, food_radius=0.4,
food_respawn_time=0, max_food_health=5, food_together_radius=0.4,
food_rew_type='selfish', food_reward_scale=0.0,
# Health
max_agent_health=20, health_per_food_bounds=[2.1, 2.7], health_per_step=-1.0,
# Attacking
attack_range=0.7, attack_damage=-5.0, only_attack_in_front=True,
# Death
life_rew=1, death_rew=-100, steps_freeze_on_death=100,
# Random Teams
rusp_args={},
# ID
id_dim=16,
# Action Masking
mask_all_when_dead=True):
env = Base(n_agents=n_agents,
n_substeps=n_substeps,
floor_size=floor_size,
horizon=99999999999999, # Just a big number so actual horizon is done by RandomizedHorizonWrapper
action_lims=action_lims,
deterministic_mode=False,
grid_size=grid_size)
if box_floor_friction is not None:
env.add_module(FloorAttributes(friction=box_floor_friction))
env.add_module(WorldConstants(gravity=gravity))
env.add_module(Agents(n_agents,
placement_fn=uniform_placement,
friction=other_friction))
# Food
env.metadata['food_together_radius'] = food_together_radius
assert n_food % n_food_cluster == 0
cluster_assignments = np.repeat(np.arange(0, n_food, n_food // n_food_cluster), n_food // n_food_cluster)
food_placement = [close_to_other_object_placement(
"food", i, "food_together_radius") for i in cluster_assignments]
food_placement[::n_food // n_food_cluster] = [uniform_placement] * n_food_cluster
env.add_module(Food(n_food, placement_fn=food_placement))
env.reset()
keys_self = [
'agent_qpos_qvel',
'agent_identity',
'agent_health',
'is_dead',
'time_to_alive',
'timestep'
]
keys_additional_self_vf = ['fraction_episode_done', 'horizon']
keys_copy = ['mask_is_dead']
keys_other_agents = [
'agent_qpos_qvel',
'agent_identity',
'agent_health',
'is_dead',
'time_to_alive',
]
keys_additional_other_agents_vf = []
keys_self_matrices = []
add_rew_share_observation_keys(keys_self=keys_self,
keys_additional_self_vf=keys_additional_self_vf,
keys_other_agents=keys_other_agents,
keys_additional_other_agents_vf=keys_additional_other_agents_vf,
keys_self_matrices=keys_self_matrices,
**rusp_args)
keys_external = ['other_agents',
'other_agents_vf',
'additional_self_vf_obs']
keys_self_masks = ['mask_aa_obs']
env = SplitMultiAgentActions(env)
env = DiscretizeActionWrapper(env, 'action_movement')
env = AgentAgentObsMask2D(env)
env = ZeroRews(env)
env = RandomizedHorizonWrapper(env, lower_lim=horizon_lower or horizon, upper_lim=horizon_upper or horizon,
prob_per_step_to_stop=prob_per_step_to_stop)
env = FoodHealthWrapper(env, respawn_time=(np.inf if food_respawn_time is None else food_respawn_time),
eat_thresh=(np.inf if food_radius is None else food_radius),
max_food_health=max_food_health, food_rew_type=food_rew_type,
reward_scale=food_reward_scale, split_eat_between_agents=True)
keys_external += ['mask_af_obs', 'food_obs']
keys_copy.append('close_enough_to_food')
env = FoodIncreaseHealth(env, health_per_food_bounds=health_per_food_bounds)
env = TimeDecreaseHealth(env, health_per_step=health_per_step)
# Attack action should go before Food Health wrapper, since it masks eat action
env = AttackAction(env, attack_damage=attack_damage, attack_range=attack_range,
only_attack_in_front=only_attack_in_front)
env = ActionOptionsWrapper(env, ['action_attack_agent', 'action_eat_food'], {'action_attack_agent': -1, 'action_eat_food': 0})
env = ColorAgentsByOption(env, 'action_choose_option', ['action_attack_agent', 'action_eat_food', 'do_nothing'])
keys_self.append('previous_choice')
keys_other_agents.append('previous_choice')
keys_self_matrices.append('attacked_me')
keys_self.append('n_attacked_me')
keys_other_agents += ['attacked_me', 'n_attacked_me']
env = AgentHealthWrapper(env, max_health=max_agent_health, death_rew=death_rew,
steps_freeze_on_death=steps_freeze_on_death, life_rew=life_rew)
# This needs to come before options wrapper, so we can't group it above
env = AlwaysEatWrapper(env, agent_idx_allowed=np.arange(n_agents))
env = RUSPWrapper(env, **rusp_args)
env = RandomIdentityVector(env, vector_dim=id_dim)
env = SplitObservations(env, keys_self + keys_additional_self_vf,
keys_copy=keys_copy, keys_self_matrices=keys_self_matrices + keys_self_masks)
env = ConcatenateObsWrapper(env, {'other_agents': keys_other_agents,
'other_agents_vf': ['other_agents'] + keys_additional_other_agents_vf,
'additional_self_vf_obs': [k + '_self' for k in keys_additional_self_vf]})
env = DiscardMujocoExceptionEpisodes(env)
env = SelectKeysWrapper(env, keys_self=keys_self,
keys_other=keys_external + keys_copy + keys_self_masks)
env = OasisActionMasks(env, mask_all_when_dead=mask_all_when_dead)
return env