randomized_uncertain_social_preferences/rusp/env_ipd.py (123 lines of code) (raw):
import numpy as np
import gym
from gym.spaces import Tuple, Discrete
from mae_envs.wrappers.util import update_obs_space
from mujoco_worldgen.util.types import store_args
from rusp.abstract_base_env import AbstractBaseEnv
from rusp.wrappers_util import RandomizedHorizonWrapper
from rusp.wrappers_rusp import RUSPWrapper, add_rew_share_observation_keys
from mae_envs.wrappers.util import ConcatenateObsWrapper
from mae_envs.wrappers.multi_agent import (SplitObservations, SelectKeysWrapper)
class IteratedMatrixGameWrapper(gym.Wrapper):
'''
2 player matrix game. Agents get a single binary action "action_defect". Agents
get to observe the last action each agent took. Agents are rewarded according to
payoff_matrix.
Args:
payoff_matrix (2x2x2 np.ndarray): the payoff payoff_matrix. We index into this payoff_matrix
according to agent actions.
Observations:
prev_ac (n_agents, 1): previous action each agent took.
'''
@store_args
def __init__(self, env, payoff_matrix):
super().__init__(env)
self.n_agents = self.metadata['n_agents']
self.action_space.spaces['action_defect'] = Tuple([Discrete(n=2) for _ in range(self.n_agents)])
self.observation_space = update_obs_space(self, {'prev_ac': [self.n_agents, 1]})
def reset(self):
self.previous_action = -1 * np.ones(self.n_agents)
self.num_defects = np.zeros(self.n_agents)
self.num_coops = np.zeros(self.n_agents)
return self.observation(self.env.reset())
def step(self, action):
self.previous_action = action['action_defect'].copy()
obs, _, done, info = self.env.step(action)
self.num_defects += action['action_defect']
self.num_coops += (1 - action['action_defect'])
rew = self.payoff_matrix[action['action_defect'][0], action['action_defect'][1]]
if done:
info.update({f'actor{i}_n_defects': n_defects for i, n_defects in enumerate(self.num_defects)})
info.update({f'actor{i}_n_coops': n_coops for i, n_coops in enumerate(self.num_coops)})
return self.observation(obs), rew, done, info
def observation(self, obs):
obs['prev_ac'] = self.previous_action[:, None]
return obs
class LastAgentScripted(gym.Wrapper):
'''
Replace the last agent with either a all-cooperate, all-defect, or tit-for-tat scripted policy.
The last agent is considered part of the environment, so we remove them from the observation
and action space.
Args:
policy_to_play (string): One of "allc", "alld", or "tft"
'''
def __init__(self, env, policy_to_play):
super().__init__(env)
assert policy_to_play in ['allc', 'alld', 'tft']
self.policy_to_play = policy_to_play
self.metadata['n_actors'] -= 1
for k, v in self.action_space.spaces.items():
self.action_space.spaces[k] = Tuple(v.spaces[:-1])
def reset(self):
self.previous_action = 0
return self.observation(self.env.reset())
def step(self, action):
if self.policy_to_play == 'allc':
ac_to_play = 0
elif self.policy_to_play == 'alld':
ac_to_play = 1
elif self.policy_to_play == 'tft':
ac_to_play = self.previous_action
self.previous_action = action['action_defect'][0]
action['action_defect'] = np.concatenate([action['action_defect'], [ac_to_play]])
obs, rew, done, info = self.env.step(action)
return self.observation(obs), rew[:-1], done, info
def observation(self, obs):
obs = {k: v[:-1] for k, v in obs.items()}
return obs
def make_env(horizon=10, horizon_lower=None, horizon_upper=None,
prob_per_step_to_stop=0.1, # If set then we play the infinite game,
mutual_cooperate=2, defected_against=-2, successful_defect=4, mutual_defect=0,
# Evals
against_all_c=False, against_all_d=False, against_tft=False,
# Random Teams
rusp_args={}):
env = AbstractBaseEnv(2)
env = RandomizedHorizonWrapper(env, lower_lim=horizon_lower or horizon, upper_lim=horizon_upper or horizon,
prob_per_step_to_stop=prob_per_step_to_stop)
# Construct Payoff Matrix
cc = [mutual_cooperate, mutual_cooperate]
cd = [defected_against, successful_defect]
dc = list(reversed(cd))
dd = [mutual_defect, mutual_defect]
payoff_matrix = np.array([[cc, cd],
[dc, dd]])
env = IteratedMatrixGameWrapper(env, payoff_matrix=payoff_matrix)
env = RUSPWrapper(env, **rusp_args)
keys_self = ['prev_ac', 'timestep']
keys_additional_self_vf = ['fraction_episode_done', 'horizon']
keys_other_agents = ['prev_ac']
keys_additional_other_agents_vf = []
keys_self_matrices = []
add_rew_share_observation_keys(keys_self=keys_self,
keys_additional_self_vf=keys_additional_self_vf,
keys_other_agents=keys_other_agents,
keys_additional_other_agents_vf=keys_additional_other_agents_vf,
keys_self_matrices=keys_self_matrices,
**rusp_args)
keys_external = ['other_agents',
'other_agents_vf',
'additional_self_vf_obs']
env = SplitObservations(env, keys_self + keys_additional_self_vf,
keys_copy=[], keys_self_matrices=keys_self_matrices)
env = ConcatenateObsWrapper(env, {'other_agents': keys_other_agents,
'other_agents_vf': ['other_agents'] + keys_additional_other_agents_vf,
'additional_self_vf_obs': [k + '_self' for k in keys_additional_self_vf]})
env = SelectKeysWrapper(env, keys_self=keys_self,
keys_other=keys_external)
if against_all_c or against_all_d or against_tft:
if against_all_c:
policy_to_play = 'allc'
elif against_all_d:
policy_to_play = 'alld'
elif against_tft:
policy_to_play = 'tft'
env = LastAgentScripted(env, policy_to_play)
return env