Source code for openrl.envs.mpe.core
import numpy as np
# physical/external base state of all entites
[docs]class EntityState(object):
def __init__(self):
# physical position
self.p_pos = None
# physical velocity
self.p_vel = None
# state of agents (including communication and internal/mental state)
[docs]class AgentState(EntityState):
def __init__(self):
super(AgentState, self).__init__()
# communication utterance
self.c = None
# action of the agent
[docs]class Action(object):
def __init__(self):
# physical action
self.u = None
# communication action
self.c = None
# properties of wall entities
[docs]class Wall(object):
def __init__(
self, orient="H", axis_pos=0.0, endpoints=(-1, 1), width=0.1, hard=True
):
# orientation: 'H'orizontal or 'V'ertical
self.orient = orient
# position along axis which wall lays on (y-axis for H, x-axis for V)
self.axis_pos = axis_pos
# endpoints of wall (x-coords for H, y-coords for V)
self.endpoints = np.array(endpoints)
# width of wall
self.width = width
# whether wall is impassable to all agents
self.hard = hard
# color of wall
self.color = np.array([0.0, 0.0, 0.0])
# properties and state of physical world entity
[docs]class Entity(object):
def __init__(self):
# index among all entities (important to set for distance caching)
self.i = 0
# name
self.name = ""
# properties:
self.size = 0.050
# entity can move / be pushed
self.movable = False
# entity collides with others
self.collide = True
# entity can pass through non-hard walls
self.ghost = False
# material density (affects mass)
self.density = 25.0
# color
self.color = None
# max speed and accel
self.max_speed = None
self.accel = None
# state: including internal/mental state p_pos, p_vel
self.state = EntityState()
# mass
self.initial_mass = 1.0
# commu channel
self.channel = None
@property
def mass(self):
return self.initial_mass
# properties of landmark entities
# properties of agent entities
[docs]class Agent(Entity):
def __init__(self):
super(Agent, self).__init__()
# agent are adversary
self.adversary = False
# agent are dummy
self.dummy = False
# agents are movable by default
self.movable = True
# cannot send communication signals
self.silent = False
# cannot observe the world
self.blind = False
# physical motor noise amount
self.u_noise = None
# communication noise amount
self.c_noise = None
# control range
self.u_range = 1.0
# state: including communication state(communication utterance) c and internal/mental state p_pos, p_vel
self.state = AgentState()
# action: physical action u & communication action c
self.action = Action()
# script behavior to execute
self.action_callback = None
# zoe 20200420
self.goal = None
# multi-agent world
[docs]class World(object):
name: str
def __init__(self):
# list of agents and entities (can change at execution-time!)
self.agents = []
self.landmarks = []
self.walls = []
# communication channel dimensionality
self.dim_c = 0
# position dimensionality
self.dim_p = 2
# color dimensionality
self.dim_color = 3
# simulation timestep
self.dt = 0.1
# physical damping(阻尼)
self.damping = 0.25
# contact response parameters
self.contact_force = 1e2
self.contact_margin = 1e-3
# cache distances between all agents (not calculated by default)
self.cache_dists = False
self.cached_dist_vect = None
self.cached_dist_mag = None
# zoe 20200420
self.world_length = 25
self.world_step = 0
self.num_agents = 0
self.num_landmarks = 0
# return all entities in the world
@property
def entities(self):
return self.agents + self.landmarks
# return all agents controllable by external policies
@property
def policy_agents(self):
return [agent for agent in self.agents if agent.action_callback is None]
# return all agents controlled by world scripts
@property
def scripted_agents(self):
return [agent for agent in self.agents if agent.action_callback is not None]
[docs] def calculate_distances(self):
if self.cached_dist_vect is None:
# initialize distance data structure
self.cached_dist_vect = np.zeros(
(len(self.entities), len(self.entities), self.dim_p)
)
# calculate minimum distance for a collision between all entities (size相加�?
self.min_dists = np.zeros((len(self.entities), len(self.entities)))
for ia, entity_a in enumerate(self.entities):
for ib in range(ia + 1, len(self.entities)):
entity_b = self.entities[ib]
min_dist = entity_a.size + entity_b.size
self.min_dists[ia, ib] = min_dist
self.min_dists[ib, ia] = min_dist
for ia, entity_a in enumerate(self.entities):
for ib in range(ia + 1, len(self.entities)):
entity_b = self.entities[ib]
delta_pos = entity_a.state.p_pos - entity_b.state.p_pos
self.cached_dist_vect[ia, ib, :] = delta_pos
self.cached_dist_vect[ib, ia, :] = -delta_pos
self.cached_dist_mag = np.linalg.norm(self.cached_dist_vect, axis=2)
self.cached_collisions = self.cached_dist_mag <= self.min_dists
[docs] def assign_agent_colors(self):
n_dummies = 0
if hasattr(self.agents[0], "dummy"):
n_dummies = len([a for a in self.agents if a.dummy])
n_adversaries = 0
if hasattr(self.agents[0], "adversary"):
n_adversaries = len([a for a in self.agents if a.adversary])
n_good_agents = len(self.agents) - n_adversaries - n_dummies
# r g b
dummy_colors = [(0.25, 0.75, 0.25)] * n_dummies
# sns.color_palette("OrRd_d", n_adversaries)
adv_colors = [(0.75, 0.25, 0.25)] * n_adversaries
# sns.color_palette("GnBu_d", n_good_agents)
good_colors = [(0.25, 0.25, 0.75)] * n_good_agents
colors = dummy_colors + adv_colors + good_colors
for color, agent in zip(colors, self.agents):
agent.color = color
# landmark color
[docs] def assign_landmark_colors(self):
for landmark in self.landmarks:
landmark.color = np.array([0.25, 0.25, 0.25])
# update state of the world
[docs] def step(self):
# zoe 20200420
self.world_step += 1
# set actions for scripted agents
for agent in self.scripted_agents:
agent.action = agent.action_callback(agent, self)
# gather forces applied to entities
p_force = [None] * len(self.entities)
# apply agent physical controls
p_force = self.apply_action_force(p_force)
# apply environment forces
p_force = self.apply_environment_force(p_force)
# integrate physical state
self.integrate_state(p_force)
# update agent state
for agent in self.agents:
self.update_agent_state(agent)
# calculate and store distances between all entities
if self.cache_dists:
self.calculate_distances()
# gather agent action forces
[docs] def apply_action_force(self, p_force):
# set applied forces
for i, agent in enumerate(self.agents):
if agent.movable:
noise = (
np.random.randn(*agent.action.u.shape) * agent.u_noise
if agent.u_noise
else 0.0
)
# force = mass * a * action + n
p_force[i] = (
agent.mass * agent.accel if agent.accel is not None else agent.mass
) * agent.action.u + noise
return p_force
# gather physical forces acting on entities
[docs] def apply_environment_force(self, p_force):
# simple (but inefficient) collision response
for a, entity_a in enumerate(self.entities):
for b, entity_b in enumerate(self.entities):
if b <= a:
continue
[f_a, f_b] = self.get_entity_collision_force(a, b)
if f_a is not None:
if p_force[a] is None:
p_force[a] = 0.0
p_force[a] = f_a + p_force[a]
if f_b is not None:
if p_force[b] is None:
p_force[b] = 0.0
p_force[b] = f_b + p_force[b]
if entity_a.movable:
for wall in self.walls:
wf = self.get_wall_collision_force(entity_a, wall)
if wf is not None:
if p_force[a] is None:
p_force[a] = 0.0
p_force[a] = p_force[a] + wf
return p_force
[docs] def integrate_state(self, p_force):
for i, entity in enumerate(self.entities):
if not entity.movable:
continue
entity.state.p_vel = entity.state.p_vel * (1 - self.damping)
if p_force[i] is not None:
entity.state.p_vel += (p_force[i] / entity.mass) * self.dt
if entity.max_speed is not None:
speed = np.sqrt(
np.square(entity.state.p_vel[0]) + np.square(entity.state.p_vel[1])
)
if speed > entity.max_speed:
entity.state.p_vel = (
entity.state.p_vel
/ np.sqrt(
np.square(entity.state.p_vel[0])
+ np.square(entity.state.p_vel[1])
)
* entity.max_speed
)
entity.state.p_pos += entity.state.p_vel * self.dt
[docs] def update_agent_state(self, agent):
# set communication state (directly for now)
if agent.silent:
agent.state.c = np.zeros(self.dim_c)
else:
noise = (
np.random.randn(*agent.action.c.shape) * agent.c_noise
if agent.c_noise
else 0.0
)
agent.state.c = agent.action.c + noise
# get collision forces for any contact between two entities
[docs] def get_entity_collision_force(self, ia, ib):
entity_a = self.entities[ia]
entity_b = self.entities[ib]
if (not entity_a.collide) or (not entity_b.collide):
return [None, None] # not a collider
if (not entity_a.movable) and (not entity_b.movable):
return [None, None] # neither entity moves
if entity_a is entity_b:
return [None, None] # don't collide against itself
if self.cache_dists:
delta_pos = self.cached_dist_vect[ia, ib]
dist = self.cached_dist_mag[ia, ib]
dist_min = self.min_dists[ia, ib]
else:
# compute actual distance between entities
delta_pos = entity_a.state.p_pos - entity_b.state.p_pos
dist = np.sqrt(np.sum(np.square(delta_pos)))
# minimum allowable distance
dist_min = entity_a.size + entity_b.size
# softmax penetration
k = self.contact_margin
penetration = np.logaddexp(0, -(dist - dist_min) / k) * k
force = self.contact_force * delta_pos / dist * penetration
if entity_a.movable and entity_b.movable:
# consider mass in collisions
force_ratio = entity_b.mass / entity_a.mass
force_a = force_ratio * force
force_b = -(1 / force_ratio) * force
else:
force_a = +force if entity_a.movable else None
force_b = -force if entity_b.movable else None
return [force_a, force_b]
# get collision forces for contact between an entity and a wall
[docs] def get_wall_collision_force(self, entity, wall):
if entity.ghost and not wall.hard:
return None # ghost passes through soft walls
if wall.orient == "H":
prll_dim = 0
perp_dim = 1
else:
prll_dim = 1
perp_dim = 0
ent_pos = entity.state.p_pos
if (
ent_pos[prll_dim] < wall.endpoints[0] - entity.size
or ent_pos[prll_dim] > wall.endpoints[1] + entity.size
):
return None # entity is beyond endpoints of wall
elif (
ent_pos[prll_dim] < wall.endpoints[0]
or ent_pos[prll_dim] > wall.endpoints[1]
):
# part of entity is beyond wall
if ent_pos[prll_dim] < wall.endpoints[0]:
dist_past_end = ent_pos[prll_dim] - wall.endpoints[0]
else:
dist_past_end = ent_pos[prll_dim] - wall.endpoints[1]
theta = np.arcsin(dist_past_end / entity.size)
dist_min = np.cos(theta) * entity.size + 0.5 * wall.width
else: # entire entity lies within bounds of wall
theta = 0
dist_past_end = 0
dist_min = entity.size + 0.5 * wall.width
# only need to calculate distance in relevant dim
delta_pos = ent_pos[perp_dim] - wall.axis_pos
dist = np.abs(delta_pos)
# softmax penetration
k = self.contact_margin
penetration = np.logaddexp(0, -(dist - dist_min) / k) * k
force_mag = self.contact_force * delta_pos / dist * penetration
force = np.zeros(2)
force[perp_dim] = np.cos(theta) * force_mag
force[prll_dim] = np.sin(theta) * np.abs(force_mag)
return force