Shortcuts

Source code for openrl.modules.common.ppo_net

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright 2023 The OpenRL Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

""""""
import copy
from typing import Any, Dict, Optional, Tuple, Union

import gymnasium as gym
import numpy as np
import torch

from openrl.configs.config import create_config_parser
from openrl.envs.vec_env.base_venv import BaseVecEnv
from openrl.modules.base_module import BaseModule
from openrl.modules.common.base_net import BaseNet
from openrl.modules.ppo_module import PPOModule
from openrl.utils.util import set_seed


[docs]def reset_rnn_states( rnn_states, episode_starts, env_num, agent_num, rnn_layers, hidden_size ): # First we reshape the episode_starts to match the rnn_states shape # Since episode_starts affects all agents in the environment, we repeat it agent_num times episode_starts = np.repeat(copy.copy(episode_starts), agent_num) # We then need to expand the dimensions of episode_starts to match rnn_states # The new shape of episode_starts should be (env_num * agent_num, 1, 1) to broadcast correctly episode_starts = episode_starts[:, None, None] # Now, episode_starts should broadcast over the last two dimensions of rnn_states when multiplied # We want to set rnn_states to zero where episode_starts is 1, so we invert the episode_starts as a mask mask = 1 - episode_starts # Apply the mask to rnn_states, setting the appropriate states to zero rnn_states *= mask return rnn_states
[docs]class PPONet(BaseNet): def __init__( self, env: Union[BaseVecEnv, gym.Env, str], cfg=None, device: Union[torch.device, str] = "cpu", n_rollout_threads: int = 1, model_dict: Optional[Dict[str, Any]] = None, module_class: BaseModule = PPOModule, ) -> None: super().__init__() if cfg is None: cfg_parser = create_config_parser() cfg = cfg_parser.parse_args([]) set_seed(cfg.seed) env.reset(seed=cfg.seed) cfg.num_agents = env.agent_num cfg.n_rollout_threads = n_rollout_threads cfg.learner_n_rollout_threads = cfg.n_rollout_threads if cfg.rnn_type == "gru": rnn_hidden_size = cfg.hidden_size elif cfg.rnn_type == "lstm": rnn_hidden_size = cfg.hidden_size * 2 else: raise NotImplementedError( f"RNN type {cfg.rnn_type} has not been implemented." ) cfg.rnn_hidden_size = rnn_hidden_size if isinstance(device, str): device = torch.device(device) self.module = module_class( cfg=cfg, policy_input_space=env.observation_space, critic_input_space=env.observation_space, act_space=env.action_space, share_model=cfg.use_share_model, device=device, rank=0, world_size=1, model_dict=model_dict, ) self.cfg = cfg self.env = env self.device = device self.rnn_states_actor = None self.masks = None
[docs] def act( self, observation: Union[np.ndarray, Dict[str, np.ndarray]], action_masks: Optional[np.ndarray] = None, deterministic: bool = False, episode_starts: Optional[np.ndarray] = None, ) -> Tuple[np.ndarray, Optional[Tuple[np.ndarray, ...]]]: if episode_starts is not None: self.rnn_states_actor = reset_rnn_states( self.rnn_states_actor, episode_starts, self.env.parallel_env_num, self.env.agent_num, self.rnn_states_actor.shape[1], self.rnn_states_actor.shape[2], ) actions, self.rnn_states_actor = self.module.act( obs=observation, rnn_states_actor=self.rnn_states_actor, masks=self.masks, action_masks=action_masks, deterministic=deterministic, ) return actions, self.rnn_states_actor
[docs] def reset(self, env: Optional[gym.Env] = None) -> None: if env is not None: self.env = env self.first_reset = False self.rnn_states_actor, self.masks = self.module.init_rnn_states( rollout_num=self.env.parallel_env_num, agent_num=self.env.agent_num, rnn_layers=self.cfg.recurrent_N, hidden_size=self.cfg.rnn_hidden_size, )
[docs] def load_policy(self, path: str) -> None: self.module.load_policy(path)