Source code for openrl.runners.common.rl_agent
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright 2023 The OpenRL Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
""""""
import io
import pathlib
import time
from abc import abstractmethod
from typing import Optional, Tuple, Union
import gym
import torch
from openrl.modules.common import BaseNet
from openrl.runners.common.base_agent import BaseAgent, SelfAgent
from openrl.utils.callbacks import CallbackFactory
from openrl.utils.callbacks.callbacks import BaseCallback, CallbackList, ConvertCallback
from openrl.utils.callbacks.processbar_callback import ProgressBarCallback
from openrl.utils.type_aliases import MaybeCallback
[docs]class RLAgent(BaseAgent):
def __init__(
self,
net: Optional[Union[torch.nn.Module, BaseNet]] = None,
env: Union[gym.Env, str] = None,
run_dir: Optional[str] = None,
env_num: Optional[int] = None,
rank: int = 0,
world_size: int = 1,
use_wandb: bool = False,
use_tensorboard: bool = False,
project_name: str = "RLAgent",
) -> None:
self.net = net
if self.net is not None:
self.net.reset()
self._cfg = net.cfg
self._use_wandb = use_wandb
self._use_tensorboard = not use_wandb and use_tensorboard
self.project_name = project_name
if env is not None:
self._env = env
elif hasattr(net, "env") and net.env is not None:
self._env = net.env
else:
raise ValueError("env is None")
if env_num is not None:
self.env_num = env_num
else:
self.env_num = self._env.parallel_env_num
# current number of timesteps
self.num_time_steps = 0
self._episode_num = 0
self._total_time_steps = 0
self._cfg.n_rollout_threads = self.env_num
self._cfg.learner_n_rollout_threads = self._cfg.n_rollout_threads
self.rank = rank
self.world_size = world_size
self.client = None
self.agent_num = self._env.agent_num
if run_dir is None:
self.run_dir = self._cfg.run_dir
else:
self.run_dir = run_dir
if self.run_dir is None:
assert (not self._use_wandb) and (not self._use_tensorboard), (
"run_dir must be set when using wandb or tensorboard. Please set"
" run_dir in the config file or pass run_dir in the"
" command line."
)
if self._cfg.experiment_name == "":
self.exp_name = "rl"
else:
self.exp_name = self._cfg.experiment_name
[docs] @abstractmethod
def train(
self: SelfAgent,
total_time_steps: int,
callback: MaybeCallback = None,
) -> None:
raise NotImplementedError
def _setup_train(
self,
total_time_steps: int,
callback: MaybeCallback = None,
reset_num_time_steps: bool = True,
progress_bar: bool = False,
) -> Tuple[int, BaseCallback]:
"""
Initialize different variables needed for training.
:param total_time_steps: The total number of samples (env steps) to train on
:param callback: Callback(s) called at every step with state of the algorithm.
:param reset_num_time_steps: Whether to reset or not the ``num_time_steps`` attribute
:param progress_bar: Display a progress bar using tqdm and rich.
:return: Total time_steps and callback(s)
"""
self.start_time = time.time_ns()
if reset_num_time_steps:
self.num_time_steps = 0
self._episode_num = 0
else:
# Make sure training timesteps are ahead of the internal counter
total_time_steps += self.num_time_steps
self._total_time_steps = total_time_steps
# Create eval callback if needed
callback = self._init_callback(callback, progress_bar)
return total_time_steps, callback
def _init_callback(
self,
callback: MaybeCallback,
progress_bar: bool = False,
) -> BaseCallback:
"""
:param callback: Callback(s) called at every step with state of the algorithm.
:param progress_bar: Display a progress bar using tqdm and rich.
:return: A hybrid callback calling `callback` and performing evaluation.
"""
# Convert a list of callbacks into a callback
if isinstance(callback, list):
callback = CallbackList(callback)
# Convert functional callback to object
if not isinstance(callback, BaseCallback):
callback = ConvertCallback(callback)
# Add progress bar callback
if progress_bar:
callback = CallbackList([callback, ProgressBarCallback()])
if self._cfg.callbacks:
cfg_callback = CallbackFactory.get_callbacks(self._cfg.callbacks)
callback = CallbackList([callback, cfg_callback])
callback.init_callback(self)
return callback
[docs] def set_env(
self,
env: Union[gym.Env, str],
):
self.net.reset()
if env is not None:
self._env = env
self.env_num = env.parallel_env_num
self.agent_num = env.agent_num
env.reset(seed=self._cfg.seed)
self.net.reset(env)
[docs] def save(self, path: Union[str, pathlib.Path, io.BufferedIOBase]) -> None:
if isinstance(path, str):
path = pathlib.Path(path)
path.mkdir(parents=True, exist_ok=True)
torch.save(self.net.module, path / "module.pt")
[docs] def load(self, path: Union[str, pathlib.Path, io.BufferedIOBase]) -> None:
if isinstance(path, str):
path = pathlib.Path(path)
assert path.exists(), f"{path} does not exist"
if path.is_dir():
path = path / "module.pt"
assert path.exists(), f"{path} does not exist"
if not torch.cuda.is_available():
self.net.module = torch.load(path, map_location=torch.device("cpu"))
self.net.module.device = torch.device("cpu")
for key in self.net.module.models:
self.net.module.models[key].tpdv = dict(
dtype=torch.float32, device=torch.device("cpu")
)
else:
self.net.module = torch.load(path)
self.net.reset()
[docs] def load_policy(self, path: Union[str, pathlib.Path, io.BufferedIOBase]) -> None:
self.net.load_policy(path)