randomized_uncertain_social_preferences/rusp/env_prisoners_buddy.py (184 lines of code) (raw):
import numpy as np
from collections import defaultdict
from mujoco_worldgen.util.types import store_args
from mae_envs.wrappers.util import update_obs_space
from mae_envs.wrappers.util import ConcatenateObsWrapper
from mae_envs.wrappers.multi_agent import (SplitObservations, SelectKeysWrapper)
from rusp.wrappers_rusp import RUSPWrapper, add_rew_share_observation_keys
from rusp.wrappers_util import RandomIdentityVector, RandomizedHorizonWrapper, OtherActorAttentionAction, ActionOptionsWrapper
from rusp.abstract_base_env import AbstractBaseEnv
class PrisonersBuddy(OtherActorAttentionAction):
'''
Agents must mutually choose others to get reward (mutual_cooperate_rew). They can choose to sitout
and receive zero reward. If they make an unreciprocated choice, the targeted agent will recieve a defection
reward (successful_defect_rew). We call it a defection reward since they could reciprocated the choice.
The agent making the unreciprocated choice receives a penalty (defected_against_rew).
Agents get a chance to "communicate" in that their choices only elicit rewards every few timesteps (choosing_period).
This gives them time to break symmetry.
Observations:
chose_me (n_agents, n_agents, 1): which other agents (column) chose me (row agent) last step
i_chose (n_agents, n_agents, 1): which other agents (column) did I choose (row agent) last step
chose_me_rew (n_agents, n_agents, 1): which other agents (column) chose me (row agent) last step reward was given
i_chose_rew (n_agents, n_agents, 1): which other agents (column) did I choose (row agent) last step reward was given
i_chose_any_rew (n_agents, 1): Did I choose to sitout or choose someone last timestep reward was given
previous_choice_identity (n_agents, agent_identity_dim): ID of agent I previously chose
next_choice_is_real (n_agents, 1): is the next timestep one in which reward will be given
'''
@store_args
def __init__(self, env, choosing_period,
agent_identity_dim=4,
mutual_cooperate_rew=2,
defected_against_rew=-1,
successful_defect_rew=1):
super().__init__(env, 'action_choose_agent')
self.observation_space = update_obs_space(self, {
'chose_me': [self.n_agents, self.n_agents, 1],
'i_chose': [self.n_agents, self.n_agents, 1],
'chose_me_rew': [self.n_agents, self.n_agents, 1],
'i_chose_rew': [self.n_agents, self.n_agents, 1],
'i_chose_any_rew': [self.n_agents, 1],
'previous_choice_identity': [self.n_agents, agent_identity_dim],
'next_choice_is_real': [self.n_agents, 1],
})
def reset(self):
self._t = 1 # Start t at 1 such that first round is not a reward round
self._chose_me = np.zeros((self.n_agents, self.n_agents))
self._chose_me_rew = np.zeros((self.n_agents, self.n_agents))
self._n_times_not_chosen = np.zeros((self.n_agents))
self._n_times_team_changed = np.zeros((self.n_agents))
self._n_agents_on_team = []
self._previous_choice_identity = np.zeros((self.n_agents, self.agent_identity_dim))
self._i_chose_any_rew_obs = np.zeros((self.n_agents, 1))
self._team_lengths = []
self._n_successful_defections = 0
self._current_team_lengths = defaultdict(lambda: 0)
self._previous_teams = np.ones(self.n_agents, dtype=int) * -1
self._both_chose = np.zeros((self.n_agents, self.n_agents), dtype=bool)
self._perfect_game = True
self._first_choice = True
return self.observation(self.env.reset())
def step(self, action):
obs, rew, done, info = self.env.step(action)
self._chose_me = np.zeros((self.n_agents, self.n_agents), dtype=bool)
targets = np.ones(self.n_agents, dtype=int) * -1
for i in range(self.n_agents):
target = self._get_target_actor(i, action)
if len(target):
targets[i] = target[0]
self._chose_me[target[0], i] = 1
self._previous_choice_identity = obs['agent_identity'][targets]
self._previous_choice_identity[targets == -1] = 0
# Reward rounds
if self._t % self.choosing_period == 0:
self._both_chose = self._chose_me * self._chose_me.T
self._chose_me_rew = self._chose_me.copy()
self._teams = np.argmax(self._both_chose, axis=1) # Indicies of teamate
self._teams[np.all(self._both_chose == 0, axis=1)] = -1 # Make sure those without team are set to -1 instead of 0
rew = self._prisoners_buddy_reward_update(rew)
# Track stats
self._n_times_not_chosen[np.sum(self._chose_me, 1) == 0] += 1
# Since both_chose is symmetric, just get the index of nonzero entry in upper triangle
current_team_indices = np.c_[np.nonzero(np.triu(self._both_chose))]
current_team_tuples = list(map(tuple, current_team_indices))
teams_done = [k for k in self._current_team_lengths.keys() if k not in current_team_tuples]
for team_done in teams_done:
self._team_lengths.append(self._current_team_lengths[team_done])
del self._current_team_lengths[team_done]
for current_team_tuple in current_team_tuples:
self._current_team_lengths[current_team_tuple] += 1
self._i_chose_any_rew_obs = np.any(self._chose_me_rew, 0)[:, None]
if self._first_choice:
self._first_choice = False
else:
all_teams_didnt_change = np.all(self._previous_teams == self._teams)
max_number_of_teams_filled = np.sum(self._teams != -1) == ((self.n_agents // 2) * 2)
self._perfect_game = self._perfect_game and all_teams_didnt_change and max_number_of_teams_filled
self._previous_teams = self._teams
self._t += 1
if done:
self._team_lengths += list(self._current_team_lengths.values())
info['average_team_length'] = np.mean(self._team_lengths) if len(self._team_lengths) else 0
info['n_times_team_changed'] = np.sum(self._n_times_team_changed)
info['n_agents_on_team_per_step'] = np.mean(self._n_agents_on_team)
info['number_decisions'] = self._t / self.choosing_period
info['n_unique_not_chosen'] = np.sum(self._n_times_not_chosen > 0)
info['n_successful_defections'] = self._n_successful_defections
info['perfect_game'] = self._perfect_game
return self.observation(obs), rew, done, info
def observation(self, obs):
obs['chose_me'] = self._chose_me[:, :, None]
obs['i_chose'] = self._chose_me.T[:, :, None]
obs['chose_me_rew'] = self._chose_me_rew[:, :, None]
obs['i_chose_rew'] = self._chose_me_rew.T[:, :, None]
obs['i_chose_any_rew'] = self._i_chose_any_rew_obs
obs['previous_choice_identity'] = self._previous_choice_identity
# assumes this is called after t is increased
obs['next_choice_is_real'] = np.ones((self.n_agents, 1)) if self._t % self.choosing_period == 0 else np.zeros((self.n_agents, 1))
return obs
def _prisoners_buddy_reward_update(self, rew):
on_team = np.any(self._both_chose, axis=1)
chose_me_oneway = (self._chose_me & ~self._both_chose)
num_chose_me_oneway = np.sum(chose_me_oneway, axis=1)
i_chose_one_way = np.any(chose_me_oneway, axis=0)
assert np.all(np.sum(chose_me_oneway, axis=0) <= 1)
assert np.all((i_chose_one_way & on_team) == 0)
previous_has_team = (self._previous_teams != -1)
your_team_changed = (self._teams != self._previous_teams)
rew[on_team] += self.mutual_cooperate_rew
rew[i_chose_one_way] += self.defected_against_rew
rew += num_chose_me_oneway * self.successful_defect_rew
# Stats
self._n_successful_defections += np.sum(i_chose_one_way)
self._n_times_team_changed += (previous_has_team & your_team_changed)
self._n_agents_on_team.append(np.sum(on_team))
return rew
def make_env(n_agents=5, horizon=50, horizon_lower=None, horizon_upper=None,
prob_per_step_to_stop=0.02,
choosing_period=5,
mutual_cooperate_rew=2, defected_against_rew=-2, successful_defect_rew=1,
agent_identity_dim=16,
rusp_args={}):
env = AbstractBaseEnv(n_agents)
env = RandomizedHorizonWrapper(env, lower_lim=horizon_lower or horizon, upper_lim=horizon_upper or horizon,
prob_per_step_to_stop=prob_per_step_to_stop)
env = RandomIdentityVector(env, vector_dim=agent_identity_dim)
env = PrisonersBuddy(env, choosing_period=choosing_period,
agent_identity_dim=agent_identity_dim,
mutual_cooperate_rew=mutual_cooperate_rew, defected_against_rew=defected_against_rew,
successful_defect_rew=successful_defect_rew)
env = ActionOptionsWrapper(env, ['action_choose_agent'], {'action_choose_agent': -1})
env = RUSPWrapper(env, **rusp_args)
keys_self = ['previous_choice',
'next_choice_is_real',
'i_chose_any_rew',
'agent_identity',
'previous_choice_identity',
'timestep']
keys_additional_self_vf = ['fraction_episode_done', 'horizon']
keys_other_agents = [
'previous_choice',
'chose_me',
'i_chose',
'chose_me_rew',
'i_chose_rew',
'i_chose_any_rew',
'agent_identity',
'previous_choice_identity'
]
keys_additional_other_agents_vf = []
keys_self_matrices = ['chose_me',
'i_chose',
'chose_me_rew',
'i_chose_rew']
keys_external = ['other_agents',
'other_agents_vf',
'additional_self_vf_obs']
add_rew_share_observation_keys(keys_self=keys_self,
keys_additional_self_vf=keys_additional_self_vf,
keys_other_agents=keys_other_agents,
keys_additional_other_agents_vf=keys_additional_other_agents_vf,
keys_self_matrices=keys_self_matrices,
**rusp_args)
env = SplitObservations(env, keys_self + keys_additional_self_vf,
keys_copy=[], keys_self_matrices=keys_self_matrices)
env = ConcatenateObsWrapper(env, {'other_agents': keys_other_agents,
'other_agents_vf': ['other_agents'] + keys_additional_other_agents_vf,
'additional_self_vf_obs': [k + '_self' for k in keys_additional_self_vf]})
env = SelectKeysWrapper(env, keys_self=keys_self,
keys_other=keys_external)
return env