multiagent/scenarios/simple_crypto.py (121 lines of code) (raw):
"""
Scenario:
1 speaker, 2 listeners (one of which is an adversary). Good agents rewarded for proximity to goal, and distance from
adversary to goal. Adversary is rewarded for its distance to the goal.
"""
import numpy as np
from multiagent.core import World, Agent, Landmark
from multiagent.scenario import BaseScenario
import random
class CryptoAgent(Agent):
def __init__(self):
super(CryptoAgent, self).__init__()
self.key = None
class Scenario(BaseScenario):
def make_world(self):
world = World()
# set any world properties first
num_agents = 3
num_adversaries = 1
num_landmarks = 2
world.dim_c = 4
# add agents
world.agents = [CryptoAgent() for i in range(num_agents)]
for i, agent in enumerate(world.agents):
agent.name = 'agent %d' % i
agent.collide = False
agent.adversary = True if i < num_adversaries else False
agent.speaker = True if i == 2 else False
agent.movable = False
# add landmarks
world.landmarks = [Landmark() for i in range(num_landmarks)]
for i, landmark in enumerate(world.landmarks):
landmark.name = 'landmark %d' % i
landmark.collide = False
landmark.movable = False
# make initial conditions
self.reset_world(world)
return world
def reset_world(self, world):
# random properties for agents
for i, agent in enumerate(world.agents):
agent.color = np.array([0.25, 0.25, 0.25])
if agent.adversary:
agent.color = np.array([0.75, 0.25, 0.25])
agent.key = None
# random properties for landmarks
color_list = [np.zeros(world.dim_c) for i in world.landmarks]
for i, color in enumerate(color_list):
color[i] += 1
for color, landmark in zip(color_list, world.landmarks):
landmark.color = color
# set goal landmark
goal = np.random.choice(world.landmarks)
world.agents[1].color = goal.color
world.agents[2].key = np.random.choice(world.landmarks).color
for agent in world.agents:
agent.goal_a = goal
# set random initial states
for agent in world.agents:
agent.state.p_pos = np.random.uniform(-1, +1, world.dim_p)
agent.state.p_vel = np.zeros(world.dim_p)
agent.state.c = np.zeros(world.dim_c)
for i, landmark in enumerate(world.landmarks):
landmark.state.p_pos = np.random.uniform(-1, +1, world.dim_p)
landmark.state.p_vel = np.zeros(world.dim_p)
def benchmark_data(self, agent, world):
# returns data for benchmarking purposes
return (agent.state.c, agent.goal_a.color)
# return all agents that are not adversaries
def good_listeners(self, world):
return [agent for agent in world.agents if not agent.adversary and not agent.speaker]
# return all agents that are not adversaries
def good_agents(self, world):
return [agent for agent in world.agents if not agent.adversary]
# return all adversarial agents
def adversaries(self, world):
return [agent for agent in world.agents if agent.adversary]
def reward(self, agent, world):
return self.adversary_reward(agent, world) if agent.adversary else self.agent_reward(agent, world)
def agent_reward(self, agent, world):
# Agents rewarded if Bob can reconstruct message, but adversary (Eve) cannot
good_listeners = self.good_listeners(world)
adversaries = self.adversaries(world)
good_rew = 0
adv_rew = 0
for a in good_listeners:
if (a.state.c == np.zeros(world.dim_c)).all():
continue
else:
good_rew -= np.sum(np.square(a.state.c - agent.goal_a.color))
for a in adversaries:
if (a.state.c == np.zeros(world.dim_c)).all():
continue
else:
adv_l1 = np.sum(np.square(a.state.c - agent.goal_a.color))
adv_rew += adv_l1
return adv_rew + good_rew
def adversary_reward(self, agent, world):
# Adversary (Eve) is rewarded if it can reconstruct original goal
rew = 0
if not (agent.state.c == np.zeros(world.dim_c)).all():
rew -= np.sum(np.square(agent.state.c - agent.goal_a.color))
return rew
def observation(self, agent, world):
# goal color
goal_color = np.zeros(world.dim_color)
if agent.goal_a is not None:
goal_color = agent.goal_a.color
# get positions of all entities in this agent's reference frame
entity_pos = []
for entity in world.landmarks:
entity_pos.append(entity.state.p_pos - agent.state.p_pos)
# communication of all other agents
comm = []
for other in world.agents:
if other is agent or (other.state.c is None) or not other.speaker: continue
comm.append(other.state.c)
confer = np.array([0])
if world.agents[2].key is None:
confer = np.array([1])
key = np.zeros(world.dim_c)
goal_color = np.zeros(world.dim_c)
else:
key = world.agents[2].key
prnt = False
# speaker
if agent.speaker:
if prnt:
print('speaker')
print(agent.state.c)
print(np.concatenate([goal_color] + [key] + [confer] + [np.random.randn(1)]))
return np.concatenate([goal_color] + [key])
# listener
if not agent.speaker and not agent.adversary:
if prnt:
print('listener')
print(agent.state.c)
print(np.concatenate([key] + comm + [confer]))
return np.concatenate([key] + comm)
if not agent.speaker and agent.adversary:
if prnt:
print('adversary')
print(agent.state.c)
print(np.concatenate(comm + [confer]))
return np.concatenate(comm)