Shortcuts

Source code for openrl.envs.mpe.core

import numpy as np


# physical/external base state of all entites
[docs]class EntityState(object): def __init__(self): # physical position self.p_pos = None # physical velocity self.p_vel = None
# state of agents (including communication and internal/mental state)
[docs]class AgentState(EntityState): def __init__(self): super(AgentState, self).__init__() # communication utterance self.c = None
# action of the agent
[docs]class Action(object): def __init__(self): # physical action self.u = None # communication action self.c = None
# properties of wall entities
[docs]class Wall(object): def __init__( self, orient="H", axis_pos=0.0, endpoints=(-1, 1), width=0.1, hard=True ): # orientation: 'H'orizontal or 'V'ertical self.orient = orient # position along axis which wall lays on (y-axis for H, x-axis for V) self.axis_pos = axis_pos # endpoints of wall (x-coords for H, y-coords for V) self.endpoints = np.array(endpoints) # width of wall self.width = width # whether wall is impassable to all agents self.hard = hard # color of wall self.color = np.array([0.0, 0.0, 0.0])
# properties and state of physical world entity
[docs]class Entity(object): def __init__(self): # index among all entities (important to set for distance caching) self.i = 0 # name self.name = "" # properties: self.size = 0.050 # entity can move / be pushed self.movable = False # entity collides with others self.collide = True # entity can pass through non-hard walls self.ghost = False # material density (affects mass) self.density = 25.0 # color self.color = None # max speed and accel self.max_speed = None self.accel = None # state: including internal/mental state p_pos, p_vel self.state = EntityState() # mass self.initial_mass = 1.0 # commu channel self.channel = None @property def mass(self): return self.initial_mass
# properties of landmark entities
[docs]class Landmark(Entity): def __init__(self): super(Landmark, self).__init__()
# properties of agent entities
[docs]class Agent(Entity): def __init__(self): super(Agent, self).__init__() # agent are adversary self.adversary = False # agent are dummy self.dummy = False # agents are movable by default self.movable = True # cannot send communication signals self.silent = False # cannot observe the world self.blind = False # physical motor noise amount self.u_noise = None # communication noise amount self.c_noise = None # control range self.u_range = 1.0 # state: including communication state(communication utterance) c and internal/mental state p_pos, p_vel self.state = AgentState() # action: physical action u & communication action c self.action = Action() # script behavior to execute self.action_callback = None # zoe 20200420 self.goal = None
# multi-agent world
[docs]class World(object): name: str def __init__(self): # list of agents and entities (can change at execution-time!) self.agents = [] self.landmarks = [] self.walls = [] # communication channel dimensionality self.dim_c = 0 # position dimensionality self.dim_p = 2 # color dimensionality self.dim_color = 3 # simulation timestep self.dt = 0.1 # physical damping(阻尼) self.damping = 0.25 # contact response parameters self.contact_force = 1e2 self.contact_margin = 1e-3 # cache distances between all agents (not calculated by default) self.cache_dists = False self.cached_dist_vect = None self.cached_dist_mag = None # zoe 20200420 self.world_length = 25 self.world_step = 0 self.num_agents = 0 self.num_landmarks = 0 # return all entities in the world @property def entities(self): return self.agents + self.landmarks # return all agents controllable by external policies @property def policy_agents(self): return [agent for agent in self.agents if agent.action_callback is None] # return all agents controlled by world scripts @property def scripted_agents(self): return [agent for agent in self.agents if agent.action_callback is not None]
[docs] def calculate_distances(self): if self.cached_dist_vect is None: # initialize distance data structure self.cached_dist_vect = np.zeros( (len(self.entities), len(self.entities), self.dim_p) ) # calculate minimum distance for a collision between all entities (size相加�? self.min_dists = np.zeros((len(self.entities), len(self.entities))) for ia, entity_a in enumerate(self.entities): for ib in range(ia + 1, len(self.entities)): entity_b = self.entities[ib] min_dist = entity_a.size + entity_b.size self.min_dists[ia, ib] = min_dist self.min_dists[ib, ia] = min_dist for ia, entity_a in enumerate(self.entities): for ib in range(ia + 1, len(self.entities)): entity_b = self.entities[ib] delta_pos = entity_a.state.p_pos - entity_b.state.p_pos self.cached_dist_vect[ia, ib, :] = delta_pos self.cached_dist_vect[ib, ia, :] = -delta_pos self.cached_dist_mag = np.linalg.norm(self.cached_dist_vect, axis=2) self.cached_collisions = self.cached_dist_mag <= self.min_dists
[docs] def assign_agent_colors(self): n_dummies = 0 if hasattr(self.agents[0], "dummy"): n_dummies = len([a for a in self.agents if a.dummy]) n_adversaries = 0 if hasattr(self.agents[0], "adversary"): n_adversaries = len([a for a in self.agents if a.adversary]) n_good_agents = len(self.agents) - n_adversaries - n_dummies # r g b dummy_colors = [(0.25, 0.75, 0.25)] * n_dummies # sns.color_palette("OrRd_d", n_adversaries) adv_colors = [(0.75, 0.25, 0.25)] * n_adversaries # sns.color_palette("GnBu_d", n_good_agents) good_colors = [(0.25, 0.25, 0.75)] * n_good_agents colors = dummy_colors + adv_colors + good_colors for color, agent in zip(colors, self.agents): agent.color = color
# landmark color
[docs] def assign_landmark_colors(self): for landmark in self.landmarks: landmark.color = np.array([0.25, 0.25, 0.25])
# update state of the world
[docs] def step(self): # zoe 20200420 self.world_step += 1 # set actions for scripted agents for agent in self.scripted_agents: agent.action = agent.action_callback(agent, self) # gather forces applied to entities p_force = [None] * len(self.entities) # apply agent physical controls p_force = self.apply_action_force(p_force) # apply environment forces p_force = self.apply_environment_force(p_force) # integrate physical state self.integrate_state(p_force) # update agent state for agent in self.agents: self.update_agent_state(agent) # calculate and store distances between all entities if self.cache_dists: self.calculate_distances()
# gather agent action forces
[docs] def apply_action_force(self, p_force): # set applied forces for i, agent in enumerate(self.agents): if agent.movable: noise = ( np.random.randn(*agent.action.u.shape) * agent.u_noise if agent.u_noise else 0.0 ) # force = mass * a * action + n p_force[i] = ( agent.mass * agent.accel if agent.accel is not None else agent.mass ) * agent.action.u + noise return p_force
# gather physical forces acting on entities
[docs] def apply_environment_force(self, p_force): # simple (but inefficient) collision response for a, entity_a in enumerate(self.entities): for b, entity_b in enumerate(self.entities): if b <= a: continue [f_a, f_b] = self.get_entity_collision_force(a, b) if f_a is not None: if p_force[a] is None: p_force[a] = 0.0 p_force[a] = f_a + p_force[a] if f_b is not None: if p_force[b] is None: p_force[b] = 0.0 p_force[b] = f_b + p_force[b] if entity_a.movable: for wall in self.walls: wf = self.get_wall_collision_force(entity_a, wall) if wf is not None: if p_force[a] is None: p_force[a] = 0.0 p_force[a] = p_force[a] + wf return p_force
[docs] def integrate_state(self, p_force): for i, entity in enumerate(self.entities): if not entity.movable: continue entity.state.p_vel = entity.state.p_vel * (1 - self.damping) if p_force[i] is not None: entity.state.p_vel += (p_force[i] / entity.mass) * self.dt if entity.max_speed is not None: speed = np.sqrt( np.square(entity.state.p_vel[0]) + np.square(entity.state.p_vel[1]) ) if speed > entity.max_speed: entity.state.p_vel = ( entity.state.p_vel / np.sqrt( np.square(entity.state.p_vel[0]) + np.square(entity.state.p_vel[1]) ) * entity.max_speed ) entity.state.p_pos += entity.state.p_vel * self.dt
[docs] def update_agent_state(self, agent): # set communication state (directly for now) if agent.silent: agent.state.c = np.zeros(self.dim_c) else: noise = ( np.random.randn(*agent.action.c.shape) * agent.c_noise if agent.c_noise else 0.0 ) agent.state.c = agent.action.c + noise
# get collision forces for any contact between two entities
[docs] def get_entity_collision_force(self, ia, ib): entity_a = self.entities[ia] entity_b = self.entities[ib] if (not entity_a.collide) or (not entity_b.collide): return [None, None] # not a collider if (not entity_a.movable) and (not entity_b.movable): return [None, None] # neither entity moves if entity_a is entity_b: return [None, None] # don't collide against itself if self.cache_dists: delta_pos = self.cached_dist_vect[ia, ib] dist = self.cached_dist_mag[ia, ib] dist_min = self.min_dists[ia, ib] else: # compute actual distance between entities delta_pos = entity_a.state.p_pos - entity_b.state.p_pos dist = np.sqrt(np.sum(np.square(delta_pos))) # minimum allowable distance dist_min = entity_a.size + entity_b.size # softmax penetration k = self.contact_margin penetration = np.logaddexp(0, -(dist - dist_min) / k) * k force = self.contact_force * delta_pos / dist * penetration if entity_a.movable and entity_b.movable: # consider mass in collisions force_ratio = entity_b.mass / entity_a.mass force_a = force_ratio * force force_b = -(1 / force_ratio) * force else: force_a = +force if entity_a.movable else None force_b = -force if entity_b.movable else None return [force_a, force_b]
# get collision forces for contact between an entity and a wall
[docs] def get_wall_collision_force(self, entity, wall): if entity.ghost and not wall.hard: return None # ghost passes through soft walls if wall.orient == "H": prll_dim = 0 perp_dim = 1 else: prll_dim = 1 perp_dim = 0 ent_pos = entity.state.p_pos if ( ent_pos[prll_dim] < wall.endpoints[0] - entity.size or ent_pos[prll_dim] > wall.endpoints[1] + entity.size ): return None # entity is beyond endpoints of wall elif ( ent_pos[prll_dim] < wall.endpoints[0] or ent_pos[prll_dim] > wall.endpoints[1] ): # part of entity is beyond wall if ent_pos[prll_dim] < wall.endpoints[0]: dist_past_end = ent_pos[prll_dim] - wall.endpoints[0] else: dist_past_end = ent_pos[prll_dim] - wall.endpoints[1] theta = np.arcsin(dist_past_end / entity.size) dist_min = np.cos(theta) * entity.size + 0.5 * wall.width else: # entire entity lies within bounds of wall theta = 0 dist_past_end = 0 dist_min = entity.size + 0.5 * wall.width # only need to calculate distance in relevant dim delta_pos = ent_pos[perp_dim] - wall.axis_pos dist = np.abs(delta_pos) # softmax penetration k = self.contact_margin penetration = np.logaddexp(0, -(dist - dist_min) / k) * k force_mag = self.contact_force * delta_pos / dist * penetration force = np.zeros(2) force[perp_dim] = np.cos(theta) * force_mag force[prll_dim] = np.sin(theta) * np.abs(force_mag) return force