multiagent/scenarios/simple_push.py (73 lines of code) (raw):

import numpy as np from multiagent.core import World, Agent, Landmark from multiagent.scenario import BaseScenario class Scenario(BaseScenario): def make_world(self): world = World() # set any world properties first world.dim_c = 2 num_agents = 2 num_adversaries = 1 num_landmarks = 2 # add agents world.agents = [Agent() for i in range(num_agents)] for i, agent in enumerate(world.agents): agent.name = 'agent %d' % i agent.collide = True agent.silent = True if i < num_adversaries: agent.adversary = True else: agent.adversary = False # add landmarks world.landmarks = [Landmark() for i in range(num_landmarks)] for i, landmark in enumerate(world.landmarks): landmark.name = 'landmark %d' % i landmark.collide = False landmark.movable = False # make initial conditions self.reset_world(world) return world def reset_world(self, world): # random properties for landmarks for i, landmark in enumerate(world.landmarks): landmark.color = np.array([0.1, 0.1, 0.1]) landmark.color[i + 1] += 0.8 landmark.index = i # set goal landmark goal = np.random.choice(world.landmarks) for i, agent in enumerate(world.agents): agent.goal_a = goal agent.color = np.array([0.25, 0.25, 0.25]) if agent.adversary: agent.color = np.array([0.75, 0.25, 0.25]) else: j = goal.index agent.color[j + 1] += 0.5 # set random initial states for agent in world.agents: agent.state.p_pos = np.random.uniform(-1, +1, world.dim_p) agent.state.p_vel = np.zeros(world.dim_p) agent.state.c = np.zeros(world.dim_c) for i, landmark in enumerate(world.landmarks): landmark.state.p_pos = np.random.uniform(-1, +1, world.dim_p) landmark.state.p_vel = np.zeros(world.dim_p) def reward(self, agent, world): # Agents are rewarded based on minimum agent distance to each landmark return self.adversary_reward(agent, world) if agent.adversary else self.agent_reward(agent, world) def agent_reward(self, agent, world): # the distance to the goal return -np.sqrt(np.sum(np.square(agent.state.p_pos - agent.goal_a.state.p_pos))) def adversary_reward(self, agent, world): # keep the nearest good agents away from the goal agent_dist = [np.sqrt(np.sum(np.square(a.state.p_pos - a.goal_a.state.p_pos))) for a in world.agents if not a.adversary] pos_rew = min(agent_dist) #nearest_agent = world.good_agents[np.argmin(agent_dist)] #neg_rew = np.sqrt(np.sum(np.square(nearest_agent.state.p_pos - agent.state.p_pos))) neg_rew = np.sqrt(np.sum(np.square(agent.goal_a.state.p_pos - agent.state.p_pos))) #neg_rew = sum([np.sqrt(np.sum(np.square(a.state.p_pos - agent.state.p_pos))) for a in world.good_agents]) return pos_rew - neg_rew def observation(self, agent, world): # get positions of all entities in this agent's reference frame entity_pos = [] for entity in world.landmarks: # world.entities: entity_pos.append(entity.state.p_pos - agent.state.p_pos) # entity colors entity_color = [] for entity in world.landmarks: # world.entities: entity_color.append(entity.color) # communication of all other agents comm = [] other_pos = [] for other in world.agents: if other is agent: continue comm.append(other.state.c) other_pos.append(other.state.p_pos - agent.state.p_pos) if not agent.adversary: return np.concatenate([agent.state.p_vel] + [agent.goal_a.state.p_pos - agent.state.p_pos] + [agent.color] + entity_pos + entity_color + other_pos) else: #other_pos = list(reversed(other_pos)) if random.uniform(0,1) > 0.5 else other_pos # randomize position of other agents in adversary network return np.concatenate([agent.state.p_vel] + entity_pos + other_pos)