multiagent/scenarios/simple_tag.py (113 lines of code) (raw):

import numpy as np from multiagent.core import World, Agent, Landmark from multiagent.scenario import BaseScenario class Scenario(BaseScenario): def make_world(self): world = World() # set any world properties first world.dim_c = 2 num_good_agents = 1 num_adversaries = 3 num_agents = num_adversaries + num_good_agents num_landmarks = 2 # add agents world.agents = [Agent() for i in range(num_agents)] for i, agent in enumerate(world.agents): agent.name = 'agent %d' % i agent.collide = True agent.silent = True agent.adversary = True if i < num_adversaries else False agent.size = 0.075 if agent.adversary else 0.05 agent.accel = 3.0 if agent.adversary else 4.0 #agent.accel = 20.0 if agent.adversary else 25.0 agent.max_speed = 1.0 if agent.adversary else 1.3 # add landmarks world.landmarks = [Landmark() for i in range(num_landmarks)] for i, landmark in enumerate(world.landmarks): landmark.name = 'landmark %d' % i landmark.collide = True landmark.movable = False landmark.size = 0.2 landmark.boundary = False # make initial conditions self.reset_world(world) return world def reset_world(self, world): # random properties for agents for i, agent in enumerate(world.agents): agent.color = np.array([0.35, 0.85, 0.35]) if not agent.adversary else np.array([0.85, 0.35, 0.35]) # random properties for landmarks for i, landmark in enumerate(world.landmarks): landmark.color = np.array([0.25, 0.25, 0.25]) # set random initial states for agent in world.agents: agent.state.p_pos = np.random.uniform(-1, +1, world.dim_p) agent.state.p_vel = np.zeros(world.dim_p) agent.state.c = np.zeros(world.dim_c) for i, landmark in enumerate(world.landmarks): if not landmark.boundary: landmark.state.p_pos = np.random.uniform(-0.9, +0.9, world.dim_p) landmark.state.p_vel = np.zeros(world.dim_p) def benchmark_data(self, agent, world): # returns data for benchmarking purposes if agent.adversary: collisions = 0 for a in self.good_agents(world): if self.is_collision(a, agent): collisions += 1 return collisions else: return 0 def is_collision(self, agent1, agent2): delta_pos = agent1.state.p_pos - agent2.state.p_pos dist = np.sqrt(np.sum(np.square(delta_pos))) dist_min = agent1.size + agent2.size return True if dist < dist_min else False # return all agents that are not adversaries def good_agents(self, world): return [agent for agent in world.agents if not agent.adversary] # return all adversarial agents def adversaries(self, world): return [agent for agent in world.agents if agent.adversary] def reward(self, agent, world): # Agents are rewarded based on minimum agent distance to each landmark main_reward = self.adversary_reward(agent, world) if agent.adversary else self.agent_reward(agent, world) return main_reward def agent_reward(self, agent, world): # Agents are negatively rewarded if caught by adversaries rew = 0 shape = False adversaries = self.adversaries(world) if shape: # reward can optionally be shaped (increased reward for increased distance from adversary) for adv in adversaries: rew += 0.1 * np.sqrt(np.sum(np.square(agent.state.p_pos - adv.state.p_pos))) if agent.collide: for a in adversaries: if self.is_collision(a, agent): rew -= 10 # agents are penalized for exiting the screen, so that they can be caught by the adversaries def bound(x): if x < 0.9: return 0 if x < 1.0: return (x - 0.9) * 10 return min(np.exp(2 * x - 2), 10) for p in range(world.dim_p): x = abs(agent.state.p_pos[p]) rew -= bound(x) return rew def adversary_reward(self, agent, world): # Adversaries are rewarded for collisions with agents rew = 0 shape = False agents = self.good_agents(world) adversaries = self.adversaries(world) if shape: # reward can optionally be shaped (decreased reward for increased distance from agents) for adv in adversaries: rew -= 0.1 * min([np.sqrt(np.sum(np.square(a.state.p_pos - adv.state.p_pos))) for a in agents]) if agent.collide: for ag in agents: for adv in adversaries: if self.is_collision(ag, adv): rew += 10 return rew def observation(self, agent, world): # get positions of all entities in this agent's reference frame entity_pos = [] for entity in world.landmarks: if not entity.boundary: entity_pos.append(entity.state.p_pos - agent.state.p_pos) # communication of all other agents comm = [] other_pos = [] other_vel = [] for other in world.agents: if other is agent: continue comm.append(other.state.c) other_pos.append(other.state.p_pos - agent.state.p_pos) if not other.adversary: other_vel.append(other.state.p_vel) return np.concatenate([agent.state.p_vel] + [agent.state.p_pos] + entity_pos + other_pos + other_vel)