import logging
from dataclasses import dataclass
from typing import Any, Dict, List, Optional
import numpy as np
import duo_ai
from duo_ai.core.environment import CoordEnv
[docs]@dataclass
class EvaluatorConfig:
"""
Configuration for the Evaluator.
Parameters
----------
num_episodes : int, optional
Number of episodes to use for evaluation. Default is 256.
max_num_steps : int, optional
Maximum number of steps per episode. Default is 256.
temperature : float, optional
Temperature parameter for action selection. Default is 1.0.
log_action_id : int, optional
The action index to track and log during evaluation. Default is CoordEnv.EXPERT.
Examples
--------
>>> config = EvaluatorConfig(num_episodes=100, temperature=0.5)
"""
num_episodes: int = 256
max_num_steps: int = 256
temperature: float = 1.0
log_action_id: int = CoordEnv.EXPERT
[docs]class Evaluator:
"""
Evaluator for running policy evaluation on environments and summarizing results.
Examples
--------
>>> evaluator = Evaluator(EvaluatorConfig(), env)
>>> summary = evaluator.evaluate(policy)
"""
config_cls = EvaluatorConfig
def __init__(self, config: EvaluatorConfig, env: "gym.Env") -> None:
"""
Initialize the Evaluator.
Parameters
----------
config : EvaluatorConfig
Configuration object for the evaluator.
env : gym.Env
The environment instance to evaluate on.
Returns
-------
None
"""
self.config = config
self.env = env
[docs] def evaluate(
self,
policy: "duo_ai.core.Policy",
num_episodes: Optional[int] = None,
) -> Dict[str, Any]:
"""
Evaluate a policy on the environment and summarize the results.
Parameters
----------
policy : duo.core.Policy
The policy to evaluate. Must implement an `act` method and have a `.model` attribute.
num_episodes : int, optional
Number of episodes to run. If None, uses value from config.
Returns
-------
dict
A dictionary mapping split names to summary statistics for each evaluation.
Examples
--------
>>> summary = evaluator.evaluate(policy, num_episodes=100)
>>> print(summary['reward_mean'])
"""
config = self.config
env = self.env
if num_episodes is None:
num_episodes = config.num_episodes
assert (
num_episodes % env.num_envs == 0
), "Number of episodes must be divisible by the number of environments in each split."
policy.eval()
num_iterations = num_episodes // env.num_envs
self.summarizer = EvaluationSummarizer(config)
for _ in range(num_iterations):
self._eval_one_iteration(policy, env)
summary = self.summarizer.write()
return summary
[docs] def _eval_one_iteration(self, policy: "duo_ai.core.Policy", env: "gym.Env") -> None:
"""
Run a single evaluation iteration for the policy on the environment.
Parameters
----------
policy : duo.core.Policy
The policy to evaluate.
env : gym.Env
The environment instance to evaluate on.
Returns
-------
None
"""
self.summarizer.initialize_episode(env)
obs = env.reset()
has_done = np.array([False] * env.num_envs)
policy.reset(np.ones_like(has_done))
for _ in range(self.config.max_num_steps):
action = policy.act(obs, temperature=self.config.temperature)
obs, reward, done, info = env.step(action.cpu().numpy())
# NOTE: put this before update has_done to include last step in summary
self.summarizer.add_episode_step(env, action, reward, info, has_done)
has_done |= done
if has_done.all():
break
self.summarizer.finalize_episode()
[docs]class EvaluationSummarizer:
"""
Summarizer for evaluation statistics and logging.
Examples
--------
>>> summarizer = EvaluationSummarizer(EvaluatorConfig())
"""
def __init__(self, config: EvaluatorConfig) -> None:
"""
Initialize the EvaluationSummarizer.
Parameters
----------
config : EvaluatorConfig
Configuration object for the summarizer.
Returns
-------
None
"""
self.log_action_id = config.log_action_id
self.clear()
[docs] def clear(self) -> None:
"""
Clear the summary statistics log.
Returns
-------
None
"""
self.log = {}
[docs] def initialize_episode(self, env: "gym.Env") -> None:
"""
Initialize logging for a new evaluation episode.
Parameters
----------
env : gym.Env
The environment instance for the episode.
Returns
-------
None
"""
self.episode_log = {
"reward": [0] * env.num_envs,
"base_reward": [0] * env.num_envs,
"episode_length": [0] * env.num_envs,
f"action_{self.log_action_id}": 0,
}
[docs] def finalize_episode(self) -> None:
"""
Finalize and aggregate statistics for the episode.
Returns
-------
None
"""
if self.log:
for k, v in self.episode_log.items():
if isinstance(v, list):
self.log[k].extend(v)
else:
self.log[k] += v
else:
self.log.update(self.episode_log)
[docs] def add_episode_step(
self,
env: "gym.Env",
action: "torch.Tensor",
reward: np.ndarray,
info: List[Dict[str, Any]],
has_done: np.ndarray,
) -> None:
"""
Log statistics for each episode step.
Parameters
----------
env : gym.Env
The environment instance.
action : torch.Tensor
Actions taken at this step.
reward : np.ndarray
Rewards received at this step.
info : list of dict
Additional info for each environment.
has_done : np.ndarray
Boolean array indicating which episodes are done.
Returns
-------
None
"""
for i in range(env.num_envs):
if "base_reward" in info[i]:
self.episode_log["base_reward"][i] += info[i]["base_reward"] * (
1 - has_done[i]
)
self.episode_log["reward"][i] += reward[i] * (1 - has_done[i])
self.episode_log["episode_length"][i] += 1 - has_done[i]
if not has_done[i]:
self.episode_log[f"action_{self.log_action_id}"] += (
action[i] == self.log_action_id
).sum()
[docs] def summarize(self) -> Dict[str, Any]:
"""
Compute summary statistics for the current log.
Returns
-------
dict
Dictionary of summary statistics.
Examples
--------
>>> summary = summarizer.summarize()
"""
log = self.log
self.summary = {
"steps": int(sum(log["episode_length"])),
"all_rewards": log["reward"],
"episode_length_mean": float(np.mean(log["episode_length"])),
"episode_length_min": int(np.min(log["episode_length"])),
"episode_length_max": int(np.max(log["episode_length"])),
"reward_mean": float(np.mean(log["reward"])),
"reward_std": float(np.std(log["reward"])),
"base_reward_mean": float(np.mean(log["base_reward"])),
"base_reward_std": float(np.std(log["base_reward"])),
f"action_{self.log_action_id}_frac": float(
log[f"action_{self.log_action_id}"] / sum(log["episode_length"])
),
}
return self.summary
[docs] def write(self, summary: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""
Pretty-print and log the summary statistics.
Parameters
----------
summary : dict, optional
Precomputed summary statistics. If None, will compute from log.
Returns
-------
dict
The summary statistics that were logged.
Examples
--------
>>> logged_summary = summarizer.write()
"""
if summary is None:
summary = self.summarize()
log_str = (
f" Steps: {summary['steps']}\n"
f" Episode length: mean {summary['episode_length_mean']:7.2f} "
f"min {summary['episode_length_min']:7.2f} "
f"max {summary['episode_length_max']:7.2f}\n"
f" Reward: mean {summary['reward_mean']:.2f} "
f"± {(1.96 * summary['reward_std']) / (len(summary['all_rewards']) ** 0.5):.2f}\n"
f" Base Reward: mean {summary['base_reward_mean']:.2f} "
f"± {(1.96 * summary['base_reward_std']) / (len(summary['all_rewards']) ** 0.5):.2f}\n"
f" Action {self.log_action_id} fraction: {summary[f'action_{self.log_action_id}_frac']:7.2f}\n"
)
logging.info(log_str)
return summary