openrl.envs.vec_env package¶
Subpackages¶
Submodules¶
openrl.envs.vec_env.async_venv module¶
An async vector environment.
- enum openrl.envs.vec_env.async_venv.AsyncState(value)[source]¶
Bases:
enum.EnumValid values are as follows:
- DEFAULT = <AsyncState.DEFAULT: 'default'>¶
- WAITING_RESET = <AsyncState.WAITING_RESET: 'reset'>¶
- WAITING_STEP = <AsyncState.WAITING_STEP: 'step'>¶
- WAITING_CALL = <AsyncState.WAITING_CALL: 'call'>¶
- class openrl.envs.vec_env.async_venv.AsyncVectorEnv(env_fns: Sequence[Callable[[], gymnasium.core.Env]], observation_space: Optional[gymnasium.spaces.space.Space] = None, action_space: Optional[gymnasium.spaces.space.Space] = None, shared_memory: bool = False, copy: bool = True, context: Optional[str] = None, daemon: bool = True, worker: Optional[Callable] = None, render_mode: Optional[str] = None, auto_reset: bool = True)[source]¶
Bases:
openrl.envs.vec_env.base_venv.BaseVecEnvVectorized environment that runs multiple environments in parallel.
It uses
multiprocessingprocesses, and pipes for communication.- call_fetch(timeout: Optional[Union[int, float]] = None) list[source]¶
Calls all parent pipes and waits for the results.
- Args:
- timeout: Number of seconds before the call to call_fetch times out.
If None (default), the call to call_fetch never times out.
- Returns:
List of the results of the individual calls to the method or property for each environment.
- Raises:
NoAsyncCallError: Calling call_fetch without any prior call to call_send. TimeoutError: The call to call_fetch has timed out after timeout second(s).
- call_send(name: str, *args, **kwargs)[source]¶
Calls the method with name asynchronously and apply args and kwargs to the method.
- Args:
name: Name of the method or property to call. *args: Arguments to apply to the method call. **kwargs: Keyword arguments to apply to the method call.
- Raises:
ClosedEnvironmentError: If the environment was closed (if
close()was previously called). AlreadyPendingCallError: Calling call_send while waiting for a pending call to complete
- close_extras(timeout: Optional[Union[int, float]] = None, terminate: bool = False)[source]¶
Close the environments & clean up the extra resources (processes and pipes).
- Args:
- timeout: Number of seconds before the call to
close()times out. IfNone, the call to
close()never times out. If the call toclose()times out, then all processes are terminated.
terminate: If
True, then theclose()operation is forced and all processes are terminated.- timeout: Number of seconds before the call to
- Raises:
TimeoutError: If
close()timed out.
- property env_name¶
- exec_func_fetch(timeout: Optional[Union[int, float]] = None) list[source]¶
Calls all parent pipes and waits for the results.
- Args:
- timeout: Number of seconds before the call to exec_func_fetch times out.
If None (default), the call to exec_func_fetch never times out.
- Returns:
List of the results of the individual calls to the method or property for each environment.
- Raises:
NoAsyncCallError: Calling call_fetch without any prior call to call_send. TimeoutError: The call to call_fetch has timed out after timeout second(s).
- exec_func_send(func: Callable, indices, *args, **kwargs)[source]¶
Calls the method with name asynchronously and apply args and kwargs to the method.
- Args:
func: a function. indices: Indices of the environments to call the method on. *args: Arguments to apply to the method call. **kwargs: Keyword arguments to apply to the method call.
- Raises:
ClosedEnvironmentError: If the environment was closed (if
close()was previously called). AlreadyPendingCallError: Calling call_send while waiting for a pending call to complete
- get_attr(name: str)[source]¶
Get a property from each parallel environment.
- Args:
name (str): Name of the property to be get from each individual environment.
- Returns:
The property with name
- reset_fetch(timeout: Optional[Union[int, float]] = None) Union[gymnasium.core.ObsType, Tuple[gymnasium.core.ObsType, dict]][source]¶
Waits for the calls triggered by
reset_send()to finish and returns the results.- Args:
timeout: Number of seconds before the call to reset_fetch times out. If None, the call to reset_fetch never times out. seed: ignored options: ignored
- Returns:
A tuple of batched observations and list of dictionaries
- Raises:
ClosedEnvironmentError: If the environment was closed (if
close()was previously called). NoAsyncCallError: Ifreset_fetch()was called without any prior call toreset_send(). TimeoutError: Ifreset_fetch()timed out.
- reset_send(seed: Optional[Union[int, List[int]]] = None, options: Optional[dict] = None)[source]¶
Send calls to the
resetmethods of the sub-environments.To get the results of these calls, you may invoke
reset_fetch().- Args:
seed: List of seeds for each environment options: The reset option
- Raises:
ClosedEnvironmentError: If the environment was closed (if
close()was previously called). AlreadyPendingCallError: If the environment is already waiting for a pending call to anothermethod (e.g.
step_send()). This can be caused by two consecutive calls toreset_send(), with no call toreset_fetch()in between.
- set_attr(name: str, values: Union[List[Any], Tuple[Any], object])[source]¶
Sets an attribute of the sub-environments.
- Args:
name: Name of the property to be set in each individual environment. values: Values of the property to be set to. If
valuesis a list ortuple, then it corresponds to the values for each individual environment, otherwise a single value is set for all environments.
- Raises:
ValueError: Values must be a list or tuple with length equal to the number of environments. AlreadyPendingCallError: Calling set_attr while waiting for a pending call to complete.
- step_fetch(timeout: Optional[Union[int, float]] = None) Union[Tuple[Any, numpy.ndarray[Any, numpy.dtype[Any]], numpy.ndarray[Any, numpy.dtype[Any]], List[Dict[str, Any]]], Tuple[Any, numpy.ndarray[Any, numpy.dtype[Any]], numpy.ndarray[Any, numpy.dtype[Any]], numpy.ndarray[Any, numpy.dtype[Any]], List[Dict[str, Any]]]][source]¶
Wait for the calls to
stepin each sub-environment to finish.- Args:
timeout: Number of seconds before the call to
step_fetch()times out. IfNone, the call tostep_fetch()never times out.- Returns:
The batched environment step information, (obs, reward, terminated, truncated, info)
- Raises:
ClosedEnvironmentError: If the environment was closed (if
close()was previously called). NoAsyncCallError: Ifstep_fetch()was called without any prior call tostep_send(). TimeoutError: Ifstep_fetch()timed out.
- step_send(actions: numpy.ndarray)[source]¶
Send the calls to
stepto each sub-environment.- Args:
actions: Batch of actions. element of
action_space- Raises:
ClosedEnvironmentError: If the environment was closed (if
close()was previously called). AlreadyPendingCallError: If the environment is already waiting for a pending call to anothermethod (e.g.
reset_send()). This can be caused by two consecutive calls tostep_send(), with no call tostep_fetch()in between.
openrl.envs.vec_env.base_venv module¶
- class openrl.envs.vec_env.base_venv.BaseVecEnv(parallel_env_num: int, observation_space: gymnasium.spaces.space.Space, action_space: gymnasium.spaces.space.Space, render_mode: Optional[str] = None, auto_reset: bool = True)[source]¶
Bases:
abc.ABCAn abstract vectorized environment.
- Parameters
parallel_env_num – Number of environments
observation_space – Observation space
action_space – Action space
- action_space: gymnasium.spaces.space.Space¶
- property agent_num¶
- call(name: str, *args, **kwargs) List[Any][source]¶
Call a method, or get a property, from each parallel environment.
- call_fetch(**kwargs) List[Any][source]¶
After calling a method in
call_send(), this function collects the results.
- call_send(name, *args, **kwargs)[source]¶
Calls a method name for each parallel environment asynchronously.
- closed = False¶
- env_is_wrapped(wrapper_class: Type[openrl.envs.wrappers.base_wrapper.BaseWrapper], indices: Union[None, int, Iterable[int]] = None) List[bool][source]¶
Check if worker environments are wrapped with a given wrapper
- abstract property env_name¶
- exec_func(func: Callable, indices: Optional[List[int]] = None, *args, **kwargs) List[Any][source]¶
Call a method, or get a property, from each parallel environment.
- Args:
func : Name of the method to call. indices: Indices of the environments to call the method on. *args: Arguments to apply to the method call. **kwargs: Keyword arguments to apply to the method call.
- Returns:
List of the results of the individual calls to the method or property for each environment.
- exec_func_fetch(timeout: Optional[Union[int, float]] = None) list[source]¶
Calls all parent pipes and waits for the results.
- Args:
- timeout: Number of seconds before the call to step_fetch times out.
If None (default), the call to step_fetch never times out.
- Returns:
List of the results of the individual calls to the method or property for each environment.
- Raises:
NoAsyncCallError: Calling call_fetch without any prior call to call_send. TimeoutError: The call to call_fetch has timed out after timeout second(s).
- exec_func_send(func: Callable, indices, *args, **kwargs)[source]¶
Calls the method with name asynchronously and apply args and kwargs to the method.
- Args:
func: a function. indices: Indices of the environments to call the method on. *args: Arguments to apply to the method call. **kwargs: Keyword arguments to apply to the method call.
- Raises:
ClosedEnvironmentError: If the environment was closed (if
close()was previously called). AlreadyPendingCallError: Calling call_send while waiting for a pending call to complete
- get_attr(name: str)[source]¶
Get a property from each parallel environment.
- Args:
name (str): Name of the property to be get from each individual environment.
- Returns:
The property with name
- metadata = {'render.modes': ['human', 'rgb_array', 'group_human', 'group_rgb_array', 'single_human', 'single_rgb_array']}¶
- observation_space: gymnasium.spaces.space.Space¶
- parallel_env_num: int¶
- random_action(infos: Optional[List[Dict[str, Any]]] = None)[source]¶
Get a random action from the action space
- render(mode: Optional[str] = None) Optional[numpy.ndarray][source]¶
Gym environment rendering
- Parameters
mode – the rendering type
- reset(*, seed: Optional[Union[int, List[int]]] = None, options: Optional[dict] = None)[source]¶
Reset all the environments and return an array of observations, or a tuple of observation arrays.
If step_send is still doing work, that work will be cancelled and step_fetch() should not be called until step_send() is invoked again.
- Returns
observation
- seed(seed: Optional[int] = None) List[Union[None, int]][source]¶
Environment seeds can be passed to this reset argument in the future. The old
.seed()method is being deprecated. Sets the random seeds for all environments, based on a given seed. Each individual environment will still get its own seed, by incrementing the given seed.- Parameters
seed – The random seed. May be None for completely random seeding.
- Returns
Returns a list containing the seeds for each individual env. Note that all list elements may be None, if the env does not return anything when being seeded.
- set_attr(name: str, values: Union[list, tuple, object])[source]¶
Set a property in each sub-environment.
- Args:
name (str): Name of the property to be set in each individual environment. values (list, tuple, or object): Values of the property to be set to. If values is a list or
tuple, then it corresponds to the values for each individual environment, otherwise a single value is set for all environments.
- step(actions)[source]¶
Step the environments with the given action
- Parameters
actions – the action
- Returns
observation, reward, done, information
- property unwrapped: openrl.envs.vec_env.base_venv.BaseVecEnv¶
openrl.envs.vec_env.sync_venv module¶
- class openrl.envs.vec_env.sync_venv.SyncVectorEnv(env_fns: Iterable[Callable[[], gymnasium.core.Env]], observation_space: gymnasium.spaces.space.Space = None, action_space: gymnasium.spaces.space.Space = None, copy: bool = True, render_mode: Optional[str] = None, auto_reset: bool = True)[source]¶
Bases:
openrl.envs.vec_env.base_venv.BaseVecEnvVectorized environment that serially runs multiple environments.
- property env_name¶
- exec_func(func: Callable, indices: Optional[List[int]] = None, *args, **kwargs) tuple[source]¶
Calls the method with name and applies args and kwargs.
- seed(seed: Optional[Union[int, Sequence[int]]] = None)[source]¶
Sets the seed in all sub-environments.
- Args:
seed: The seed
- set_attr(name: str, values: Union[list, tuple, Any])[source]¶
Sets an attribute of the sub-environments.
- Args:
name: The property name to change values: Values of the property to be set to. If
valuesis a list ortuple, then it corresponds to the values for each individual environment, otherwise, a single value is set for all environments.
- Raises:
ValueError: Values must be a list or tuple with length equal to the number of environments.
Module contents¶
- class openrl.envs.vec_env.AsyncVectorEnv(env_fns: Sequence[Callable[[], gymnasium.core.Env]], observation_space: Optional[gymnasium.spaces.space.Space] = None, action_space: Optional[gymnasium.spaces.space.Space] = None, shared_memory: bool = False, copy: bool = True, context: Optional[str] = None, daemon: bool = True, worker: Optional[Callable] = None, render_mode: Optional[str] = None, auto_reset: bool = True)[source]¶
Bases:
openrl.envs.vec_env.base_venv.BaseVecEnvVectorized environment that runs multiple environments in parallel.
It uses
multiprocessingprocesses, and pipes for communication.- call_fetch(timeout: Optional[Union[int, float]] = None) list[source]¶
Calls all parent pipes and waits for the results.
- Args:
- timeout: Number of seconds before the call to call_fetch times out.
If None (default), the call to call_fetch never times out.
- Returns:
List of the results of the individual calls to the method or property for each environment.
- Raises:
NoAsyncCallError: Calling call_fetch without any prior call to call_send. TimeoutError: The call to call_fetch has timed out after timeout second(s).
- call_send(name: str, *args, **kwargs)[source]¶
Calls the method with name asynchronously and apply args and kwargs to the method.
- Args:
name: Name of the method or property to call. *args: Arguments to apply to the method call. **kwargs: Keyword arguments to apply to the method call.
- Raises:
ClosedEnvironmentError: If the environment was closed (if
close()was previously called). AlreadyPendingCallError: Calling call_send while waiting for a pending call to complete
- close_extras(timeout: Optional[Union[int, float]] = None, terminate: bool = False)[source]¶
Close the environments & clean up the extra resources (processes and pipes).
- Args:
- timeout: Number of seconds before the call to
close()times out. IfNone, the call to
close()never times out. If the call toclose()times out, then all processes are terminated.
terminate: If
True, then theclose()operation is forced and all processes are terminated.- timeout: Number of seconds before the call to
- Raises:
TimeoutError: If
close()timed out.
- property env_name¶
- exec_func_fetch(timeout: Optional[Union[int, float]] = None) list[source]¶
Calls all parent pipes and waits for the results.
- Args:
- timeout: Number of seconds before the call to exec_func_fetch times out.
If None (default), the call to exec_func_fetch never times out.
- Returns:
List of the results of the individual calls to the method or property for each environment.
- Raises:
NoAsyncCallError: Calling call_fetch without any prior call to call_send. TimeoutError: The call to call_fetch has timed out after timeout second(s).
- exec_func_send(func: Callable, indices, *args, **kwargs)[source]¶
Calls the method with name asynchronously and apply args and kwargs to the method.
- Args:
func: a function. indices: Indices of the environments to call the method on. *args: Arguments to apply to the method call. **kwargs: Keyword arguments to apply to the method call.
- Raises:
ClosedEnvironmentError: If the environment was closed (if
close()was previously called). AlreadyPendingCallError: Calling call_send while waiting for a pending call to complete
- get_attr(name: str)[source]¶
Get a property from each parallel environment.
- Args:
name (str): Name of the property to be get from each individual environment.
- Returns:
The property with name
- reset_fetch(timeout: Optional[Union[int, float]] = None) Union[gymnasium.core.ObsType, Tuple[gymnasium.core.ObsType, dict]][source]¶
Waits for the calls triggered by
reset_send()to finish and returns the results.- Args:
timeout: Number of seconds before the call to reset_fetch times out. If None, the call to reset_fetch never times out. seed: ignored options: ignored
- Returns:
A tuple of batched observations and list of dictionaries
- Raises:
ClosedEnvironmentError: If the environment was closed (if
close()was previously called). NoAsyncCallError: Ifreset_fetch()was called without any prior call toreset_send(). TimeoutError: Ifreset_fetch()timed out.
- reset_send(seed: Optional[Union[int, List[int]]] = None, options: Optional[dict] = None)[source]¶
Send calls to the
resetmethods of the sub-environments.To get the results of these calls, you may invoke
reset_fetch().- Args:
seed: List of seeds for each environment options: The reset option
- Raises:
ClosedEnvironmentError: If the environment was closed (if
close()was previously called). AlreadyPendingCallError: If the environment is already waiting for a pending call to anothermethod (e.g.
step_send()). This can be caused by two consecutive calls toreset_send(), with no call toreset_fetch()in between.
- set_attr(name: str, values: Union[List[Any], Tuple[Any], object])[source]¶
Sets an attribute of the sub-environments.
- Args:
name: Name of the property to be set in each individual environment. values: Values of the property to be set to. If
valuesis a list ortuple, then it corresponds to the values for each individual environment, otherwise a single value is set for all environments.
- Raises:
ValueError: Values must be a list or tuple with length equal to the number of environments. AlreadyPendingCallError: Calling set_attr while waiting for a pending call to complete.
- step_fetch(timeout: Optional[Union[int, float]] = None) Union[Tuple[Any, numpy.ndarray[Any, numpy.dtype[Any]], numpy.ndarray[Any, numpy.dtype[Any]], List[Dict[str, Any]]], Tuple[Any, numpy.ndarray[Any, numpy.dtype[Any]], numpy.ndarray[Any, numpy.dtype[Any]], numpy.ndarray[Any, numpy.dtype[Any]], List[Dict[str, Any]]]][source]¶
Wait for the calls to
stepin each sub-environment to finish.- Args:
timeout: Number of seconds before the call to
step_fetch()times out. IfNone, the call tostep_fetch()never times out.- Returns:
The batched environment step information, (obs, reward, terminated, truncated, info)
- Raises:
ClosedEnvironmentError: If the environment was closed (if
close()was previously called). NoAsyncCallError: Ifstep_fetch()was called without any prior call tostep_send(). TimeoutError: Ifstep_fetch()timed out.
- step_send(actions: numpy.ndarray)[source]¶
Send the calls to
stepto each sub-environment.- Args:
actions: Batch of actions. element of
action_space- Raises:
ClosedEnvironmentError: If the environment was closed (if
close()was previously called). AlreadyPendingCallError: If the environment is already waiting for a pending call to anothermethod (e.g.
reset_send()). This can be caused by two consecutive calls tostep_send(), with no call tostep_fetch()in between.
- class openrl.envs.vec_env.BaseVecEnv(parallel_env_num: int, observation_space: gymnasium.spaces.space.Space, action_space: gymnasium.spaces.space.Space, render_mode: Optional[str] = None, auto_reset: bool = True)[source]¶
Bases:
abc.ABCAn abstract vectorized environment.
- Parameters
parallel_env_num – Number of environments
observation_space – Observation space
action_space – Action space
- action_space: gymnasium.spaces.space.Space¶
- property agent_num¶
- call(name: str, *args, **kwargs) List[Any][source]¶
Call a method, or get a property, from each parallel environment.
- call_fetch(**kwargs) List[Any][source]¶
After calling a method in
call_send(), this function collects the results.
- call_send(name, *args, **kwargs)[source]¶
Calls a method name for each parallel environment asynchronously.
- closed = False¶
- env_is_wrapped(wrapper_class: Type[openrl.envs.wrappers.base_wrapper.BaseWrapper], indices: Union[None, int, Iterable[int]] = None) List[bool][source]¶
Check if worker environments are wrapped with a given wrapper
- abstract property env_name¶
- exec_func(func: Callable, indices: Optional[List[int]] = None, *args, **kwargs) List[Any][source]¶
Call a method, or get a property, from each parallel environment.
- Args:
func : Name of the method to call. indices: Indices of the environments to call the method on. *args: Arguments to apply to the method call. **kwargs: Keyword arguments to apply to the method call.
- Returns:
List of the results of the individual calls to the method or property for each environment.
- exec_func_fetch(timeout: Optional[Union[int, float]] = None) list[source]¶
Calls all parent pipes and waits for the results.
- Args:
- timeout: Number of seconds before the call to step_fetch times out.
If None (default), the call to step_fetch never times out.
- Returns:
List of the results of the individual calls to the method or property for each environment.
- Raises:
NoAsyncCallError: Calling call_fetch without any prior call to call_send. TimeoutError: The call to call_fetch has timed out after timeout second(s).
- exec_func_send(func: Callable, indices, *args, **kwargs)[source]¶
Calls the method with name asynchronously and apply args and kwargs to the method.
- Args:
func: a function. indices: Indices of the environments to call the method on. *args: Arguments to apply to the method call. **kwargs: Keyword arguments to apply to the method call.
- Raises:
ClosedEnvironmentError: If the environment was closed (if
close()was previously called). AlreadyPendingCallError: Calling call_send while waiting for a pending call to complete
- get_attr(name: str)[source]¶
Get a property from each parallel environment.
- Args:
name (str): Name of the property to be get from each individual environment.
- Returns:
The property with name
- metadata = {'render.modes': ['human', 'rgb_array', 'group_human', 'group_rgb_array', 'single_human', 'single_rgb_array']}¶
- observation_space: gymnasium.spaces.space.Space¶
- parallel_env_num: int¶
- random_action(infos: Optional[List[Dict[str, Any]]] = None)[source]¶
Get a random action from the action space
- render(mode: Optional[str] = None) Optional[numpy.ndarray][source]¶
Gym environment rendering
- Parameters
mode – the rendering type
- reset(*, seed: Optional[Union[int, List[int]]] = None, options: Optional[dict] = None)[source]¶
Reset all the environments and return an array of observations, or a tuple of observation arrays.
If step_send is still doing work, that work will be cancelled and step_fetch() should not be called until step_send() is invoked again.
- Returns
observation
- seed(seed: Optional[int] = None) List[Union[None, int]][source]¶
Environment seeds can be passed to this reset argument in the future. The old
.seed()method is being deprecated. Sets the random seeds for all environments, based on a given seed. Each individual environment will still get its own seed, by incrementing the given seed.- Parameters
seed – The random seed. May be None for completely random seeding.
- Returns
Returns a list containing the seeds for each individual env. Note that all list elements may be None, if the env does not return anything when being seeded.
- set_attr(name: str, values: Union[list, tuple, object])[source]¶
Set a property in each sub-environment.
- Args:
name (str): Name of the property to be set in each individual environment. values (list, tuple, or object): Values of the property to be set to. If values is a list or
tuple, then it corresponds to the values for each individual environment, otherwise a single value is set for all environments.
- step(actions)[source]¶
Step the environments with the given action
- Parameters
actions – the action
- Returns
observation, reward, done, information
- property unwrapped: openrl.envs.vec_env.base_venv.BaseVecEnv¶
- class openrl.envs.vec_env.RewardWrapper(env: openrl.envs.vec_env.base_venv.BaseVecEnv, reward_class: openrl.rewards.base_reward.BaseReward)[source]¶
Bases:
openrl.envs.vec_env.wrappers.base_wrapper.VecEnvWrapper
- class openrl.envs.vec_env.SyncVectorEnv(env_fns: Iterable[Callable[[], gymnasium.core.Env]], observation_space: gymnasium.spaces.space.Space = None, action_space: gymnasium.spaces.space.Space = None, copy: bool = True, render_mode: Optional[str] = None, auto_reset: bool = True)[source]¶
Bases:
openrl.envs.vec_env.base_venv.BaseVecEnvVectorized environment that serially runs multiple environments.
- property env_name¶
- exec_func(func: Callable, indices: Optional[List[int]] = None, *args, **kwargs) tuple[source]¶
Calls the method with name and applies args and kwargs.
- seed(seed: Optional[Union[int, Sequence[int]]] = None)[source]¶
Sets the seed in all sub-environments.
- Args:
seed: The seed
- set_attr(name: str, values: Union[list, tuple, Any])[source]¶
Sets an attribute of the sub-environments.
- Args:
name: The property name to change values: Values of the property to be set to. If
valuesis a list ortuple, then it corresponds to the values for each individual environment, otherwise, a single value is set for all environments.
- Raises:
ValueError: Values must be a list or tuple with length equal to the number of environments.
- class openrl.envs.vec_env.VecMonitorWrapper(vec_info: openrl.envs.vec_env.vec_info.base_vec_info.BaseVecInfo, env: openrl.envs.vec_env.base_venv.BaseVecEnv)[source]¶
Bases:
openrl.envs.vec_env.wrappers.base_wrapper.VecEnvWrapper- step(action: gymnasium.core.ActType, extra_data: Optional[Dict[str, Any]] = None)[source]¶
Step all environments.
- property use_monitor¶