Source code for openrl.envs.vec_env.base_venv
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright 2023 The OpenRL Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
""""""
import sys
import warnings
from abc import ABC, abstractmethod
from typing import Any, Callable, Dict, Iterable, List, Optional, Sequence, Type, Union
import gymnasium as gym
import numpy as np
from openrl.envs.vec_env.utils.numpy_utils import single_random_action
from openrl.envs.vec_env.utils.util import prepare_action_masks, tile_images
from openrl.envs.wrappers.base_wrapper import BaseWrapper
from openrl.envs.wrappers.util import is_wrapped
IN_COLAB = "google.colab" in sys.modules
# Define type aliases here to avoid circular import
# Used when we want to access one or more VecEnv
VecEnvIndices = Union[None, int, Iterable[int]]
[docs]class BaseVecEnv(
ABC,
):
"""
An abstract vectorized environment.
:param parallel_env_num: Number of environments
:param observation_space: Observation space
:param action_space: Action space
"""
metadata = {
"render.modes": [
"human",
"rgb_array",
"group_human",
"group_rgb_array",
"single_human",
"single_rgb_array",
]
}
observation_space: gym.Space
action_space: gym.Space
parallel_env_num: int
closed = False
_np_random: Optional[np.random.Generator] = None
def __init__(
self,
parallel_env_num: int,
observation_space: gym.Space,
action_space: gym.Space,
render_mode: Optional[str] = None,
auto_reset: bool = True,
):
self.parallel_env_num = parallel_env_num
self.observation_space = observation_space
self.action_space = action_space
self.render_mode = render_mode
self.closed = False
self.viewer = None
self.auto_reset = auto_reset
[docs] def reset(
self,
*,
seed: Optional[Union[int, List[int]]] = None,
options: Optional[dict] = None,
):
"""
Reset all the environments and return an array of
observations, or a tuple of observation arrays.
If step_send is still doing work, that work will
be cancelled and step_fetch() should not be called
until step_send() is invoked again.
:return: observation
"""
results = self._reset(seed=seed, options=options)
self.vector_render()
return results
def _reset(
self,
seed: Optional[Union[int, List[int]]] = None,
options: Optional[dict] = None,
):
raise NotImplementedError
[docs] def step(self, actions):
"""
Step the environments with the given action
:param actions: the action
:return: observation, reward, done, information
"""
results = self._step(actions)
self.vector_render()
return results
def _step(self, actions):
raise NotImplementedError
[docs] def vector_render(self):
if self.render_mode is None or self.render_mode in [
"rgb_array",
"human",
"single_rgb_array",
]:
return
if self.render_mode == "group_human":
self.render("human")
elif self.render_mode == "group_rgb_array":
pass
else:
raise NotImplementedError(
"render_mode {} is not implemented".format(self.render_mode)
)
[docs] def close(self, **kwargs) -> None:
"""
Clean up the environment's resources.
"""
if self.closed:
return
if self.viewer is not None:
self.viewer.close()
self.close_extras(**kwargs)
self.closed = True
[docs] def close_extras(self, **kwargs):
"""Clean up the extra resources e.g. beyond what's in this base class."""
pass
[docs] def render(self, mode: Optional[str] = None) -> Optional[np.ndarray]:
"""
Gym environment rendering
:param mode: the rendering type
"""
try:
imgs = self._get_images()
except NotImplementedError:
warnings.warn(f"Render not defined for {self}")
return
# Create a big image by tiling images from subprocesses
bigimg = tile_images(imgs)
if mode == "human":
if IN_COLAB:
from google.colab.patches import cv2_imshow
cv2_imshow(bigimg[:, :, ::-1])
else:
import cv2 # pytype:disable=import-error
cv2.imshow("Vec_Env:{}".format(self.env_name), bigimg[:, :, ::-1])
cv2.waitKey(1)
elif mode in [None, "rgb_array"]:
return bigimg
else:
raise NotImplementedError(f"Render mode {mode} is not supported by VecEnvs")
def _get_images(self) -> Sequence[np.ndarray]:
"""
Return RGB images from each environment
"""
raise NotImplementedError
[docs] def seed(self, seed: Optional[int] = None) -> List[Union[None, int]]:
"""
Environment seeds can be passed to this reset argument in the future.
The old ``.seed()`` method is being deprecated.
Sets the random seeds for all environments, based on a given seed.
Each individual environment will still get its own seed, by incrementing the given seed.
:param seed: The random seed. May be None for completely random seeding.
:return: Returns a list containing the seeds for each individual env.
Note that all list elements may be None, if the env does not return anything when being seeded.
"""
pass
def __del__(self):
"""Closes the vector environment."""
if not getattr(self, "closed", True):
self.close()
@property
def unwrapped(self) -> "BaseVecEnv":
return self
@property
@abstractmethod
def env_name(self):
return None
@property
def agent_num(self):
return self._agent_num
[docs] def call_send(self, name, *args, **kwargs):
"""Calls a method name for each parallel environment asynchronously."""
[docs] def call_fetch(self, **kwargs) -> List[Any]: # type: ignore
"""After calling a method in :meth:`call_send`, this function collects the results."""
[docs] def call(self, name: str, *args, **kwargs) -> List[Any]:
"""Call a method, or get a property, from each parallel environment.
Args:
name (str): Name of the method or property to call.
*args: Arguments to apply to the method call.
**kwargs: Keyword arguments to apply to the method call.
Returns:
List of the results of the individual calls to the method or property for each environment.
"""
self.call_send(name, *args, **kwargs)
return self.call_fetch()
[docs] def exec_func_send(self, func: Callable, indices, *args, **kwargs):
"""Calls the method with name asynchronously and apply args and kwargs to the method.
Args:
func: a function.
indices: Indices of the environments to call the method on.
*args: Arguments to apply to the method call.
**kwargs: Keyword arguments to apply to the method call.
Raises:
ClosedEnvironmentError: If the environment was closed (if :meth:`close` was previously called).
AlreadyPendingCallError: Calling `call_send` while waiting for a pending call to complete
"""
[docs] def exec_func_fetch(self, timeout: Union[int, float, None] = None) -> list:
"""Calls all parent pipes and waits for the results.
Args:
timeout: Number of seconds before the call to `step_fetch` times out.
If `None` (default), the call to `step_fetch` never times out.
Returns:
List of the results of the individual calls to the method or property for each environment.
Raises:
NoAsyncCallError: Calling `call_fetch` without any prior call to `call_send`.
TimeoutError: The call to `call_fetch` has timed out after timeout second(s).
"""
[docs] def exec_func(
self, func: Callable, indices: Optional[List[int]] = None, *args, **kwargs
) -> List[Any]:
"""Call a method, or get a property, from each parallel environment.
Args:
func : Name of the method to call.
indices: Indices of the environments to call the method on.
*args: Arguments to apply to the method call.
**kwargs: Keyword arguments to apply to the method call.
Returns:
List of the results of the individual calls to the method or property for each environment.
"""
self.exec_func_send(func, indices, *args, **kwargs)
return self.exec_func_fetch()
[docs] def get_attr(self, name: str):
"""Get a property from each parallel environment.
Args:
name (str): Name of the property to be get from each individual environment.
Returns:
The property with name
"""
return self.call(name)
[docs] def set_attr(self, name: str, values: Union[list, tuple, object]):
"""Set a property in each sub-environment.
Args:
name (str): Name of the property to be set in each individual environment.
values (list, tuple, or object): Values of the property to be set to. If `values` is a list or
tuple, then it corresponds to the values for each individual environment, otherwise a single value
is set for all environments.
"""
[docs] def random_action(self, infos: Optional[List[Dict[str, Any]]] = None):
"""
Get a random action from the action space
"""
action_masks = prepare_action_masks(
infos, agent_num=self.agent_num, as_batch=False
)
return np.array(
[
[
single_random_action(
self.action_space,
(
action_masks[env_index][agent_index]
if action_masks is not None
else None
),
)
for agent_index in range(self.agent_num)
]
for env_index in range(self.parallel_env_num)
]
)
[docs] def env_is_wrapped(
self, wrapper_class: Type[BaseWrapper], indices: VecEnvIndices = None
) -> List[bool]:
"""Check if worker environments are wrapped with a given wrapper"""
indices = self._get_indices(indices)
results = self.exec_func(
is_wrapped, indices=indices, wrapper_class=wrapper_class
)
return [results[i] for i in indices]
def _get_indices(self, indices: VecEnvIndices) -> Iterable[int]:
"""
Convert a flexibly-typed reference to environment indices to an implied list of indices.
:param indices: refers to indices of envs.
:return: the implied list of indices.
"""
if indices is None:
indices = range(self.parallel_env_num)
elif isinstance(indices, int):
indices = [indices]
return indices