Shortcuts

Source code for openrl.envs.mpe.scenarios.simple_spread

from typing import Optional

import numpy as np

from openrl.envs.mpe.core import Agent, Landmark, World
from openrl.envs.mpe.scenario import BaseScenario


[docs]class Scenario(BaseScenario): def __init__(self): self.render_mode = None
[docs] def make_world( self, render_mode: Optional[str] = None, world_length: int = 25, num_agents: int = 3, num_landmarks: int = 3, ): self.render_mode = render_mode world = World() world.name = "simple_spread" world.world_length = world_length # set any world properties first world.dim_c = 2 world.num_agents = num_agents world.num_landmarks = num_landmarks # 3 world.collaborative = True # add agents world.agents = [Agent() for i in range(world.num_agents)] for i, agent in enumerate(world.agents): agent.name = "agent %d" % i agent.collide = True agent.silent = True agent.size = 0.15 # add landmarks world.landmarks = [Landmark() for i in range(world.num_landmarks)] for i, landmark in enumerate(world.landmarks): landmark.name = "landmark %d" % i landmark.collide = False landmark.movable = False # make initial conditions self.reset_world(world) return world
[docs] def reset_world(self, world, np_random: Optional[np.random.Generator] = None): # random properties for agents if np_random is None: np_random = np.random.default_rng() world.assign_agent_colors() world.assign_landmark_colors() # set random initial states for agent in world.agents: agent.state.p_pos = np_random.uniform(-1, +1, world.dim_p) agent.state.p_vel = np.zeros(world.dim_p) agent.state.c = np.zeros(world.dim_c) for i, landmark in enumerate(world.landmarks): landmark.state.p_pos = 0.8 * np_random.uniform(-1, +1, world.dim_p) landmark.state.p_vel = np.zeros(world.dim_p)
[docs] def benchmark_data(self, agent, world): rew = 0 collisions = 0 occupied_landmarks = 0 min_dists = 0 for landmark in world.landmarks: dists = [ np.sqrt(np.sum(np.square(a.state.p_pos - landmark.state.p_pos))) for a in world.agents ] min_dists += min(dists) rew -= min(dists) if min(dists) < 0.1: occupied_landmarks += 1 if agent.collide: for a in world.agents: if self.is_collision(a, agent): rew -= 1 collisions += 1 return (rew, collisions, min_dists, occupied_landmarks)
[docs] def is_collision(self, agent1, agent2): delta_pos = agent1.state.p_pos - agent2.state.p_pos dist = np.sqrt(np.sum(np.square(delta_pos))) dist_min = agent1.size + agent2.size return True if dist < dist_min else False
[docs] def reward(self, agent, world): # Agents are rewarded based on minimum agent distance to each landmark, penalized for collisions rew = 0 for landmark in world.landmarks: dists = [ np.sqrt(np.sum(np.square(a.state.p_pos - landmark.state.p_pos))) for a in world.agents ] rew -= min(dists) if agent.collide: for a in world.agents: if self.is_collision(a, agent): rew -= 1 return rew
[docs] def observation(self, agent, world): # get positions of all entities in this agent's reference frame entity_pos = [] for entity in world.landmarks: # world.entities: entity_pos.append(entity.state.p_pos - agent.state.p_pos) # entity colors entity_color = [] for entity in world.landmarks: # world.entities: entity_color.append(entity.color) # communication of all other agents comm = [] other_pos = [] for other in world.agents: if other is agent: continue comm.append(other.state.c) other_pos.append(other.state.p_pos - agent.state.p_pos) return np.concatenate( [agent.state.p_vel] + [agent.state.p_pos] + entity_pos + other_pos + comm )