Source code for openrl.utils.callbacks.eval_callback
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright 2023 The OpenRL Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
""""""
import os
import warnings
from pathlib import Path
from typing import Any, Dict, List, Optional, Union
import gymnasium as gym
import numpy as np
import openrl.utils.callbacks.callbacks_factory as callbacks_factory
from openrl.envs.common import make
from openrl.envs.vec_env import BaseVecEnv, SyncVectorEnv
from openrl.envs.wrappers.monitor import Monitor
from openrl.utils.callbacks.callbacks import BaseCallback, EventCallback
from openrl.utils.evaluation import evaluate_policy
env_wrappers = [
Monitor,
]
def _make_env(
env: Union[str, Dict[str, Any]], render: bool, asynchronous: bool
) -> BaseVecEnv:
if isinstance(env, str):
env = {"id": env, "env_num": 1}
envs = make(
env["id"],
env_num=env["env_num"],
render_mode="group_human" if render else None,
env_wrappers=env_wrappers,
asynchronous=asynchronous,
)
return envs
[docs]class EvalCallback(EventCallback):
"""
Callback for evaluating an agent.
.. warning::
When using multiple environments, each call to ``env.step()``
will effectively correspond to ``n_envs`` steps.
To account for that, you can use ``eval_freq = max(eval_freq // n_envs, 1)``
:param eval_env: The environment used for initialization
:param callback_on_new_best: Callback to trigger
when there is a new best model according to the ``mean_reward``
:param callbacks_after_eval: Callback to trigger after every evaluation
:param n_eval_episodes: The number of episodes to test the agent
:param eval_freq: Evaluate the agent every ``eval_freq`` call of the callback.
:param log_path: Path to a folder where the evaluations (``evaluations.npz``)
will be saved. It will be updated at each evaluation.
:param best_model_save_path: Path to a folder where the best model
according to performance on the eval env will be saved.
:param deterministic: Whether the evaluation should
use a stochastic or deterministic actions.
:param render: Whether to render or not the environment during evaluation
:param verbose: Verbosity level: 0 for no output, 1 for indicating information about evaluation results
:param warn: Passed to ``evaluate_policy`` (warns if ``eval_env`` has not been
wrapped with a Monitor wrapper)
"""
def __init__(
self,
eval_env: Union[str, Dict[str, Any], gym.Env, BaseVecEnv],
callbacks_on_new_best: Optional[
Union[List[Dict[str, Any]], Dict[str, Any], BaseCallback]
] = None,
callbacks_after_eval: Optional[
Union[List[Dict[str, Any]], Dict[str, Any], BaseCallback]
] = None,
n_eval_episodes: int = 5,
eval_freq: int = 10000,
log_path: Optional[Union[str, Path]] = None,
best_model_save_path: Optional[Union[str, Path]] = None,
deterministic: bool = True,
render: bool = False,
asynchronous: bool = True,
verbose: int = 1,
warn: bool = True,
stop_logic: str = "OR",
close_env_at_end: bool = True,
):
if isinstance(callbacks_after_eval, list):
callbacks_after_eval = callbacks_factory.CallbackFactory.get_callbacks(
callbacks_after_eval, stop_logic=stop_logic
)
super().__init__(callbacks_after_eval, verbose=verbose)
self.stop_logic = stop_logic
if isinstance(callbacks_on_new_best, list):
callbacks_on_new_best = callbacks_factory.CallbackFactory.get_callbacks(
callbacks_on_new_best, stop_logic=stop_logic
)
self.callbacks_on_new_best = callbacks_on_new_best
if self.callbacks_on_new_best is not None:
# Give access to the parent
self.callbacks_on_new_best.set_parent(self)
self.n_eval_episodes = n_eval_episodes
self.eval_freq = eval_freq
self.best_mean_reward = -np.inf
self.last_mean_reward = -np.inf
self.deterministic = deterministic
self.render = render
self.warn = warn
self.close_env_at_end = close_env_at_end
if isinstance(eval_env, str) or isinstance(eval_env, dict):
eval_env = _make_env(eval_env, render, asynchronous)
# Convert to BaseVecEnv for consistency
if not isinstance(eval_env, BaseVecEnv):
eval_env = SyncVectorEnv([lambda: eval_env])
self.eval_env = eval_env
self.best_model_save_path = best_model_save_path
# Logs will be written in ``evaluations.npz``
if log_path is not None:
log_path = os.path.join(log_path, "evaluations")
self.log_path = log_path
self.evaluations_results = []
self.evaluations_time_steps = []
self.evaluations_length = []
# For computing success rate
self._is_success_buffer = []
self.evaluations_successes = []
def _init_callback(self) -> None:
# Does not work in some corner cases, where the wrapper is not the same
if not isinstance(self.training_env, type(self.eval_env)):
warnings.warn(
"Training and eval env are not of the same type"
f"{self.training_env} != {self.eval_env}"
)
# Create folders if needed
if self.best_model_save_path is not None:
os.makedirs(self.best_model_save_path, exist_ok=True)
if self.log_path is not None:
os.makedirs(os.path.dirname(self.log_path), exist_ok=True)
# Init callback called on new best model
if self.callbacks_on_new_best is not None:
self.callbacks_on_new_best.init_callback(self.agent)
def _log_success_callback(
self, locals_: Dict[str, Any], globals_: Dict[str, Any]
) -> None:
"""
Callback passed to the ``evaluate_policy`` function
in order to log the success rate (when applicable),
for instance when using HER.
:param locals_:
:param globals_:
"""
info = locals_["info"]
if locals_["done"]:
maybe_final_info = info.get("final_info")
if maybe_final_info is not None:
if isinstance(maybe_final_info, dict):
maybe_is_success = maybe_final_info.get("is_success")
else:
maybe_is_success = maybe_final_info[0].get("is_success")
if maybe_is_success is not None:
self._is_success_buffer.append(maybe_is_success)
def _on_step(self) -> bool:
continue_training = True
if self.eval_freq > 0 and self.n_calls % self.eval_freq == 0:
# Reset success rate buffer
eval_info = {}
self._is_success_buffer = []
episode_rewards, episode_lengths = evaluate_policy(
self.agent,
self.eval_env,
n_eval_episodes=self.n_eval_episodes,
render=self.render,
deterministic=self.deterministic,
return_episode_rewards=True,
warn=self.warn,
callback=self._log_success_callback,
)
if self.log_path is not None:
self.evaluations_time_steps.append(self.num_time_steps)
self.evaluations_results.append(episode_rewards)
self.evaluations_length.append(episode_lengths)
kwargs = {}
# Save success log if present
if len(self._is_success_buffer) > 0:
self.evaluations_successes.append(self._is_success_buffer)
kwargs = dict(successes=self.evaluations_successes)
np.savez(
self.log_path,
timesteps=self.evaluations_time_steps,
results=self.evaluations_results,
ep_lengths=self.evaluations_length,
**kwargs,
)
mean_reward, std_reward = np.mean(episode_rewards), np.std(episode_rewards)
mean_ep_length, std_ep_length = np.mean(episode_lengths), np.std(
episode_lengths
)
self.last_mean_reward = mean_reward
eval_info["Eval/episode_reward"] = mean_reward
eval_info["Eval/episode_reward_std"] = std_reward
eval_info["Eval/episode_length"] = mean_ep_length
eval_info["Eval/episode_length_std"] = std_ep_length
if self.verbose >= 1:
print(
f"Eval num_timesteps={self.num_time_steps}, "
f"episode_reward={mean_reward:.2f} +/- {std_reward:.2f}"
)
print(f"Episode length: {mean_ep_length:.2f} +/- {std_ep_length:.2f}")
if len(self._is_success_buffer) > 0:
success_rate = np.mean(self._is_success_buffer)
eval_info["Eval/success_rate"] = success_rate
if self.verbose >= 1:
print(f"Success rate: {100 * success_rate:.2f}%")
if mean_reward > self.best_mean_reward:
if self.verbose >= 1:
print("New best mean reward!")
if self.best_model_save_path is not None:
self.agent.save(
os.path.join(self.best_model_save_path, "best_model")
)
with open(
os.path.join(self.best_model_save_path, "best_model_info.txt"),
"w",
) as f:
f.write(f"best model at step: {self.num_time_steps}\n")
f.write(f"best model reward: {mean_reward}\n")
self.best_mean_reward = mean_reward
# Trigger callback on new best model, if needed
if self.callbacks_on_new_best is not None:
continue_training = self.callbacks_on_new_best.on_step()
# Trigger callback after every evaluation, if needed
if self.callback is not None:
continue_training = continue_training and self._on_event()
self.agent.logger.log_info(eval_info, self.num_time_steps)
return continue_training
[docs] def update_child_locals(self, locals_: Dict[str, Any]) -> None:
"""
Update the references to the local variables.
:param locals_: the local variables during rollout collection
"""
if self.callback:
self.callback.update_locals(locals_)
def _on_training_end(self):
if self.close_env_at_end:
self.eval_env.close()