randomized_uncertain_social_preferences/rusp/env_indirect_reciprocity.py (280 lines of code) (raw):

import numpy as np import gym from copy import deepcopy from gym.spaces import Tuple, Discrete from mae_envs.wrappers.util import update_obs_space from mujoco_worldgen.util.types import store_args from mae_envs.wrappers.util import ConcatenateObsWrapper from mae_envs.wrappers.multi_agent import (SplitObservations, SelectKeysWrapper) from rusp.wrappers_rusp import RUSPWrapper, add_rew_share_observation_keys from rusp.wrappers_util import RandomIdentityVector, RandomizedHorizonWrapper from rusp.abstract_base_env import AbstractBaseEnv class MaskYourePlaying(gym.ObservationWrapper): ''' Construct a binary mask depending on who's playig. This can be used to mask the policy gradient on steps an agent isn't playing. ''' def __init__(self, env): super().__init__(env) self.observation_space.spaces['mask'] = gym.spaces.Dict({ 'action_defect': gym.spaces.Box(-np.inf, np.inf, (self.metadata['n_actors'], 2), float), }) def observation(self, obs): obs['mask'] = {'action_defect': np.zeros((self.metadata['n_actors'], 2), dtype=bool)} obs['mask']['action_defect'][np.squeeze(obs['youre_playing_self'], -1)] = 1 return obs class LastAgentScripted(gym.Wrapper): ''' Replace the last agent with either a all-cooperate, all-defect, or tit-for-tat scripted policy. The last agent is considered part of the environment, so we remove them from the action space and do not return an observation for them. In this setting tit-for-tat remembers the action for each agent separately. Args: policy_to_play (string): One of "allc", "alld", or "tft" ''' def __init__(self, env, policy_to_play): super().__init__(env) assert policy_to_play in ['allc', 'alld', 'tft'] self.n_agents = self.unwrapped.n_agents self.policy_to_play = policy_to_play self.metadata['n_actors'] -= 1 for k, v in self.action_space.spaces.items(): self.action_space.spaces[k] = Tuple(v.spaces[:-1]) def reset(self): self.previous_action_against_me = np.zeros(self.n_agents - 1, dtype=int) obs = self.env.reset() self._youre_playing = np.squeeze(obs['youre_playing_self'].copy(), -1) return self.observation(obs) def step(self, action): action = deepcopy(action) if self.policy_to_play == 'allc': ac_to_play = 0 elif self.policy_to_play == 'alld': ac_to_play = 1 elif self.policy_to_play == 'tft': # Take the zeroeth index incase this agent isn't currently playing ac_to_play = self.previous_action_against_me[self._youre_playing[:-1]][0] # Only update previous action against me if scripted agent is playing if self._youre_playing[-1]: self.previous_action_against_me[self._youre_playing[:-1]] = action['action_defect'][self._youre_playing[:-1]] action['action_defect'] = np.concatenate([action['action_defect'], [ac_to_play]]) obs, rew, done, info = self.env.step(action) self._youre_playing = np.squeeze(obs['youre_playing_self'].copy(), -1) return self.observation(obs), rew[:-1], done, info def observation(self, obs): obs = {k: v[:-1] for k, v in obs.items()} return obs class MultiPlayerIteratedMatrixGame(gym.Wrapper): ''' N player matrix game. Agents get a single binary action "action_defect". Agents get to observe the last action each agent took. Agents are rewarded according to payoff_matrix. Args: payoff_matrix (2x2x2 np.ndarray): the payoff payoff_matrix. We index into this payoff_matrix according to agent actions. Observations: prev_ac (n_agents, 1): previous action each agent took. If an agent wasn't playing that timestep we return -1 for this observation. prev_ac_while_playing (n_agents, 1): previous action each action took when they were playing. ''' @store_args def __init__(self, env, payoff_matrix): super().__init__(env) self.n_agents = self.unwrapped.n_agents # 0 means to cooperate, 1 means to defect self.action_space.spaces['action_defect'] = Tuple([Discrete(n=2) for _ in range(self.n_agents)]) self.observation_space = update_obs_space(self, { 'prev_ac': [self.n_agents, 1], 'prev_ac_while_playing': [self.n_agents, 1] }) def reset(self): self.previous_action = -1 * np.ones(self.n_agents) self.previous_action_while_playing = -1 * np.ones(self.n_agents) self.num_defects = np.zeros((self.n_agents, self.n_agents)) self.num_coops = np.zeros((self.n_agents, self.n_agents)) # sls stands for "since last started". This is useful for evaluation settings # where we want agents to gain rapport. Since last started means since the # last agent (index n-1) took its first action self.num_defects_sls = np.zeros((self.n_agents, self.n_agents)) self.num_coops_sls = np.zeros((self.n_agents, self.n_agents)) self.last_started = False obs = self.env.reset() # This comes from ChooseAgentsToPlay wrapper self._youre_playing = np.squeeze(obs['youre_playing'].copy(), -1) return self.observation(obs) def step(self, action): obs, _, done, info = self.env.step(action) # Update statistics for agents that are playing (p1 and p2) p1, p2 = np.where(self._youre_playing)[0] self.num_defects[p1, p2] += action['action_defect'][p1] self.num_defects[p2, p1] += action['action_defect'][p2] self.num_coops[p1, p2] += 1 - action['action_defect'][p1] self.num_coops[p2, p1] += 1 - action['action_defect'][p2] if p1 == self.n_agents - 1 or p2 == self.n_agents - 1 or self.last_started: self.last_started = True self.num_defects_sls[p1, p2] += action['action_defect'][p1] self.num_defects_sls[p2, p1] += action['action_defect'][p2] self.num_coops_sls[p1, p2] += 1 - action['action_defect'][p1] self.num_coops_sls[p2, p1] += 1 - action['action_defect'][p2] self.previous_action = action['action_defect'].copy() self.previous_action[~self._youre_playing] = -1 # if you weren't playing don't give info on what you chose self.previous_action_while_playing[self._youre_playing] = action['action_defect'][self._youre_playing].copy() rew = np.zeros(self.n_agents) rew[[p1, p2]] = self.payoff_matrix[action['action_defect'][p1], action['action_defect'][p2]] assert np.all(rew[~self._youre_playing] == 0) # Calling step will update the next players, so update this after computing reward. self._youre_playing = np.squeeze(obs['youre_playing'].copy(), -1) if done: info.update({f'actor{i}_n_coops': n_coops for i, n_coops in enumerate(np.sum(self.num_coops, 1))}) info.update({f'actor{i}_n_defects': n_defects for i, n_defects in enumerate(np.sum(self.num_defects, 1))}) # Compute fraction of actions that were defects against # (a) all agents as compared to the last agent, i.e. the difference in these fractions # (b) the last agent # We compute these statistics because for evaluation the last agent may be scripted, and so they are useful # in comparing e.g. the difference in fraction of defects against an all-defect versus all-cooperate policy num_actions = self.num_coops + self.num_defects frac_defects_against_each_other = np.sum(self.num_defects[:-1, :-1]) / np.sum(num_actions[:-1, :-1]) frac_defects_against_last = np.sum(self.num_defects[:-1, -1]) / np.sum(num_actions[:-1, -1]) info.update({'frac_defects_all_minus_last': frac_defects_against_each_other}) info.update({'frac_defects_against_last': frac_defects_against_last}) # In the Prior Rapport setting (see paper), we want to measure the fraction of defects against the last agent # AFTER the last agent has started acting, which is after the period in which the other agents have been able to # gain rapport. num_actions_sls = self.num_coops_sls + self.num_defects_sls frac_defects_against_each_other_sls = np.sum(self.num_defects_sls[:-1, :-1]) / np.sum(num_actions_sls[:-1, :-1]) frac_defects_against_last_sls = np.sum(self.num_defects_sls[:-1, -1]) / np.sum(num_actions_sls[:-1, -1]) if not (np.isnan(frac_defects_against_each_other_sls) or np.isnan(frac_defects_against_last_sls)): info.update({'frac_defects_all_minus_last_sls': frac_defects_against_each_other_sls}) info.update({'frac_defects_against_last_sls': frac_defects_against_last_sls}) return self.observation(obs), rew, done, info def observation(self, obs): obs['prev_ac'] = self.previous_action[:, None] obs['prev_ac_while_playing'] = self.previous_action_while_playing[:, None] return obs class ChooseAgentsToPlay(gym.Wrapper): ''' Pick which 2 agents will play on each timestep. Args: last_step_first_agent_vs_last_agent (bool): On the last step of the game, the first and last agent will play last_agent_always_plays (bool): Last agent will play every round last_doesnt_play_until_t (int): The last agent will only get to play after this amount of rounds last_must_play_at_t (int): Only active when last_doesnt_play_until_t is not None, but makes the last agent play at that round instead of just including them as an option to play at that round ''' @store_args def __init__(self, env, last_step_first_agent_vs_last_agent: bool, last_agent_always_plays: bool, last_doesnt_play_until_t: int = None, last_must_play_at_t: bool = False): super().__init__(env) self.n_agents = self.unwrapped.n_agents self.observation_space = update_obs_space(self, {'you_played': [self.n_agents, 1], 'youre_playing': [self.n_agents, 1]}) def step(self, action): obs, rew, done, info = self.env.step(action) self._t = obs['timestep'].copy()[0, 0] # Comes from RandomizedHorizonWrapper self._you_played = self._youre_playing.copy() self._sample_new_players() return self.observation(obs), rew, done, info def reset(self): obs = self.env.reset() self._t = obs['timestep'].copy()[0, 0] self._horizon = obs['horizon'].copy()[0, 0] self._you_played = np.zeros(self.n_agents).astype(bool) self._sample_new_players() return self.observation(obs) def observation(self, obs): # We call observation after reseting players, so the mask should be for the current teams. obs['you_played'] = self._you_played[:, None] obs['youre_playing'] = self._youre_playing[:, None] return obs def _sample_new_players(self): exclude_first = self.last_step_first_agent_vs_last_agent and self._t < self._horizon - 1 must_include_first_last = (self.last_step_first_agent_vs_last_agent and self._t == self._horizon - 1) exclude_last = (self.last_doesnt_play_until_t is not None and self._t < self.last_doesnt_play_until_t) p1_options = np.arange(self.n_agents) p2_options = np.arange(self.n_agents) if self.last_agent_always_plays and not exclude_last: p2_options = np.array([self.n_agents - 1]) p1_options = p1_options[p1_options != p2_options] if exclude_last: p1_options = p1_options[p1_options != self.n_agents - 1] p2_options = p2_options[p2_options != self.n_agents - 1] if must_include_first_last: p1_options = np.array([0]) p2_options = np.array([self.n_agents - 1]) if exclude_first: p1_options = p1_options[p1_options != 0] p2_options = p2_options[p2_options != 0] if self.last_doesnt_play_until_t is not None and self.last_doesnt_play_until_t == self._t and self.last_must_play_at_t: p1_options = p1_options[p1_options != self.n_agents - 1] p2_options = np.array([self.n_agents - 1]) p1 = np.random.choice(p1_options) p2_options = p2_options[p2_options != p1] p2 = np.random.choice(p2_options) self._youre_playing = np.zeros((self.n_agents,), dtype=bool) self._youre_playing[[p1, p2]] = 1 def make_env(n_agents=3, # Horizon horizon=20, horizon_lower=None, horizon_upper=None, prob_per_step_to_stop=0.05, # Matrix Payouts mutual_cooperate=2, defected_against=-2, successful_defect=4, mutual_defect=0, # Agent IDs agent_identity_dim=16, # Evals against_all_c=False, against_all_d=False, against_tft=False, last_step_first_agent_vs_last_agent=False, last_agent_always_plays=False, last_doesnt_play_until_t=None, last_must_play_at_t=False, # RUSP rusp_args={}): env = AbstractBaseEnv(n_agents) env = RandomizedHorizonWrapper(env, lower_lim=horizon_lower or horizon, upper_lim=horizon_upper or horizon, prob_per_step_to_stop=prob_per_step_to_stop) env = RandomIdentityVector(env, vector_dim=agent_identity_dim) env = ChooseAgentsToPlay(env, last_step_first_agent_vs_last_agent=last_step_first_agent_vs_last_agent, last_agent_always_plays=last_agent_always_plays, last_doesnt_play_until_t=last_doesnt_play_until_t, last_must_play_at_t=last_must_play_at_t) # Construct Payoff Matrix cc = [mutual_cooperate, mutual_cooperate] cd = [defected_against, successful_defect] dc = list(reversed(cd)) dd = [mutual_defect, mutual_defect] payoff_matrix = np.array([[cc, cd], [dc, dd]]) env = MultiPlayerIteratedMatrixGame(env, payoff_matrix=payoff_matrix) env = RUSPWrapper(env, **rusp_args) keys_self = ['prev_ac', 'you_played', 'youre_playing', 'agent_identity', 'timestep'] keys_additional_self_vf = ['fraction_episode_done', 'horizon'] keys_other_agents = ['prev_ac', 'youre_playing', 'agent_identity'] keys_additional_other_agents_vf = [] keys_self_matrices = [] add_rew_share_observation_keys(keys_self=keys_self, keys_additional_self_vf=keys_additional_self_vf, keys_other_agents=keys_other_agents, keys_additional_other_agents_vf=keys_additional_other_agents_vf, keys_self_matrices=keys_self_matrices, **rusp_args) keys_external = ['other_agents', 'other_agents_vf', 'additional_self_vf_obs'] keys_copy = [] env = SplitObservations(env, keys_self + keys_additional_self_vf, keys_copy=keys_copy, keys_self_matrices=keys_self_matrices) env = ConcatenateObsWrapper(env, {'other_agents': keys_other_agents, 'other_agents_vf': ['other_agents'] + keys_additional_other_agents_vf, 'additional_self_vf_obs': [k + '_self' for k in keys_additional_self_vf]}) env = SelectKeysWrapper(env, keys_self=keys_self, keys_other=keys_external + keys_copy + ['youre_playing_self']) # need to copy youre_playing_self through for the LastAgentScripted wrapper if against_all_c or against_all_d or against_tft: if against_all_c: policy_to_play = 'allc' elif against_all_d: policy_to_play = 'alld' elif against_tft: policy_to_play = 'tft' env = LastAgentScripted(env, policy_to_play) env = MaskYourePlaying(env) return env