import logging
import os
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Tuple, Union
# Import gym or gymnasium based on environment variable
if os.environ.get("GYM_BACKEND", "gym") == "gymnasium":
import gymnasium as gym
else:
import gym
import numpy as np
import torch
import torch.optim as optim
import wandb
from torch.distributions.categorical import Categorical
from duo_ai.core import Algorithm
from duo_ai.utils.global_variables import get_global_variable
from duo_ai.utils.logging import configure_logging
from duo_ai.utils.wandb import WandbLogger
[docs]@dataclass
class PPOAlgorithmConfig:
"""
Configuration for the PPOAlgorithm.
Parameters
----------
name : str, optional
Name of the algorithm.
log_freq : int, optional
Frequency (in iterations) to log training statistics.
save_freq : int, optional
Frequency (in iterations) to save model checkpoints.
num_steps : int, optional
Number of steps to run in each environment per iteration.
total_timesteps : int, optional
Total number of environment steps to train for.
update_epochs : int, optional
Number of epochs to update the policy per iteration.
gamma : float, optional
Discount factor for rewards.
gae_lambda : float, optional
Lambda for Generalized Advantage Estimation.
num_minibatches : int, optional
Number of minibatches for each update epoch.
clip_coef : float, optional
Clipping coefficient for PPO surrogate objective.
norm_adv : bool, optional
Whether to normalize advantages.
clip_vloss : bool, optional
Whether to use clipped value loss.
vf_coef : float, optional
Coefficient for value function loss.
ent_coef : float, optional
Coefficient for entropy bonus.
max_grad_norm : float, optional
Maximum norm for gradient clipping.
learning_rate : float, optional
Learning rate for optimizer.
critic_pretrain_steps : int, optional
Number of steps to pretrain the critic before policy updates.
anneal_lr : bool, optional
Whether to linearly anneal the learning rate.
log_action_id : int, optional
Action ID to log statistics for (e.g., expert action).
Examples
--------
>>> cfg = PPOAlgorithmConfig(num_steps=128, total_timesteps=10000)
"""
name: str = "ppo"
log_freq: int = 10
save_freq: int = 0
num_steps: int = 256
total_timesteps: int = 1_500_000
update_epochs: int = 3
gamma: float = 0.999
gae_lambda: float = 0.95
num_minibatches: int = 8
clip_coef: float = 0.2
norm_adv: bool = True
clip_vloss: bool = True
vf_coef: float = 0.5
ent_coef: float = 0.01
max_grad_norm: float = 0.5
learning_rate: float = 0.0005
critic_pretrain_steps: int = 0
anneal_lr: bool = False
log_action_id: int = 1
[docs]class PPOAlgorithm(Algorithm):
"""
Proximal Policy Optimization (PPO) algorithm implementation.
Examples
--------
>>> algo = PPOAlgorithm(PPOAlgorithmConfig())
>>> algo.train(policy, env, validators)
"""
config_cls = PPOAlgorithmConfig
def __init__(self, config: PPOAlgorithmConfig) -> None:
"""
Initialize the PPOAlgorithm.
Parameters
----------
config : PPOAlgorithmConfig
Configuration object containing PPO hyperparameters.
Returns
-------
None
Examples
--------
>>> algo = PPOAlgorithm(PPOAlgorithmConfig())
"""
self.config = config
[docs] def _initialize(self) -> None:
"""
Initialize PPO training state, buffers, optimizer, and logging.
Returns
-------
None
"""
config = self.config
env = self.env
policy = self.policy
self.num_envs = env.num_envs
self.batch_size = int(self.num_envs * config.num_steps)
self.minibatch_size = int(self.batch_size // config.num_minibatches)
self.num_iterations = config.total_timesteps // self.batch_size
self.save_dir = get_global_variable("experiment_dir")
self.buffer = TrainBuffer.new(env, config.num_steps)
self.global_step = 0
self.summarizer = PPOTrainSummarizer(config)
self.optim = optim.Adam(
policy.model.parameters(), lr=config.learning_rate, eps=1e-5
)
# NOTE: weird bug, torch.optim messes up logging, so we need to reconfigure
configure_logging(get_global_variable("log_file"))
self.wandb_logger = WandbLogger()
self.last_obs = env.reset()
[docs] def train(
self,
policy: "duo.policies.PPOPolicy",
env: "gym.Env",
validators: Dict[str, "duo.core.Evaluator"],
) -> None:
"""
Train the PPO algorithm on the specified environment(s) using the provided policy.
This method performs multiple training iterations, periodically evaluates the policy,
logs statistics, and saves checkpoints for the best and last models.
Parameters
----------
policy : duo.policies.PPOPolicy
The policy to be trained.
env : gym.Env
The environment instance for training.
validators : dict of str to duo.core.Evaluator
Dictionary mapping split names to evaluator instances for evaluation.
Returns
-------
None
Examples
--------
>>> algorithm.train(policy, env, validators)
"""
config = self.config
self.env = env
self.policy = policy
self._initialize()
best_result = {split: {"reward_mean": -float("inf")} for split in validators}
for iteration in range(self.num_iterations):
# save checkpoint
if config.save_freq > 0 and iteration % config.save_freq == 0:
self.save_checkpoint(policy, f"step_{self.global_step}")
# evaluation
if iteration % config.log_freq == 0:
if iteration > 0:
logging.info(f"Iteration {iteration}")
logging.info(f"Train {self.global_step} steps:")
train_summary = self.summarizer.write()
self.save_checkpoint(policy, "last")
eval_result = {}
for split, validator in validators.items():
logging.info(f"Evaluating on {split} split")
eval_result[split] = validator.evaluate(policy)
if (
eval_result[split]["reward_mean"]
> best_result[split]["reward_mean"]
):
best_result[split] = eval_result[split]
self.save_checkpoint(policy, f"best_{split}")
for split, validator in validators.items():
logging.info(f"BEST {split} so far")
validator.summarizer.write(best_result[split])
# wandb logging
self.wandb_logger.clear()
self.wandb_logger.log["step"] = self.global_step
if iteration > 0:
self.wandb_logger.add("train", train_summary)
for split in validators:
self.wandb_logger.add(split, eval_result[split])
self.wandb_logger.add(f"best_{split}", best_result[split])
wandb.log(self.wandb_logger.get())
# training
self._train_once()
# close env after training
env.close()
[docs] def _train_once(self) -> None:
"""
Perform a single training iteration of PPO, including trajectory collection,
advantage computation, and policy/value updates.
Returns
-------
None
"""
config = self.config
env = self.env
policy = self.policy
buffer = self.buffer
device = get_global_variable("device")
self.summarizer.initialize_iteration(env)
# NOTE: set policy to eval mode when collecting trajectories
policy.eval()
next_done = np.zeros((self.num_envs,))
next_obs = self.last_obs
# NOTE: not tested on recurrent policies. This might be suboptimal for them.
policy.reset(np.ones_like(next_done))
for step in range(config.num_steps):
self.global_step += self.num_envs
done = torch.from_numpy(next_done).to(device).float()
obs = TensorDict.from_numpy(next_obs).to(device)
with torch.no_grad():
action, cur_model_output = policy.act(
obs.data, return_model_output=True
)
log_prob = Categorical(logits=cur_model_output.logits).log_prob(action)
next_obs, reward, next_done, info = env.step(action.cpu().numpy())
# Correctly handling boostrapping for truncation
# Issue: https://github.com/DLR-RM/stable-baselines3/issues/633
# Solution: https://github.com/DLR-RM/stable-baselines3/blob/master/stable_baselines3/common/on_policy_algorithm.py#L234
for i, d in enumerate(done):
if (
d
and info[i].get("terminal_observation") is not None
and info[i].get("TimeLimit.truncated", False)
):
terminal_obs = TensorDict.from_numpy(
info[i]["terminal_observation"]
).to(device)
with torch.no_grad():
terminal_value = policy.model(terminal_obs).value
reward[i] += config.gamma * terminal_value
policy.reset(next_done)
buffer.add(
step,
{
"obs": obs,
"actions": action,
"dones": done,
"values": cur_model_output.value,
"log_probs": log_prob,
"rewards": torch.from_numpy(reward).to(device).float(),
},
)
self.summarizer.add_episode_step(
action,
log_prob,
reward,
next_done,
info,
)
# Keep track of the last observation
self.last_obs = next_obs
# NOTE: don't forget to add done and value of last step
done = torch.from_numpy(next_done).to(device).float()
obs = TensorDict.from_numpy(next_obs).to(device)
with torch.no_grad():
buffer.add(step, {"dones": done, "values": policy.model(obs.data).value})
# conpute returns and advantages
buffer["advantages"], buffer["returns"] = self._compute_advantages_and_returns()
# flatten buffer
buffer = buffer.flatten()
# NOTE: set policy to training mode
policy.train()
self._update_learning_rate()
for mb in buffer.generate_minibatches(
config.update_epochs, self.minibatch_size
):
cur_model_output = policy.model(mb.obs.data)
cur_dist = Categorical(logits=cur_model_output.logits)
ref_log_prob = mb.log_probs
cur_log_prob = cur_dist.log_prob(mb.actions)
ratio = (cur_log_prob - ref_log_prob).exp()
adv = mb.advantages
if config.norm_adv:
adv = (mb.advantages - mb.advantages.mean()) / (
mb.advantages.std() + 1e-8
)
# Policy loss
pg_loss1 = -adv * ratio
pg_loss2 = -adv * torch.clamp(
ratio, 1 - config.clip_coef, 1 + config.clip_coef
)
pg_loss = torch.max(pg_loss1, pg_loss2).mean()
# Value loss
ref_value = mb.values
cur_value = cur_model_output.value
if config.clip_vloss:
v_loss_unclipped = (cur_value - mb.returns) ** 2
v_clipped = ref_value + torch.clamp(
cur_value - ref_value,
-config.clip_coef,
config.clip_coef,
)
v_loss_clipped = (v_clipped - mb.returns) ** 2
v_loss_max = torch.max(v_loss_unclipped, v_loss_clipped)
v_loss = 0.5 * v_loss_max.mean()
else:
v_loss = 0.5 * ((cur_value - mb.returns) ** 2).mean()
entropy_loss = -cur_dist.entropy().mean()
if self.global_step < config.critic_pretrain_steps:
loss = v_loss
else:
loss = (
pg_loss + config.vf_coef * v_loss + config.ent_coef * entropy_loss
)
loss.backward()
if config.max_grad_norm is not None:
torch.nn.utils.clip_grad_norm_(
policy.model.parameters(), config.max_grad_norm
)
self.optim.step()
self.optim.zero_grad()
self.summarizer.add_training_iteration(
cur_value,
adv,
pg_loss,
v_loss,
entropy_loss,
loss,
)
self.summarizer.finalize_iteration()
[docs] def _update_learning_rate(self) -> None:
"""
Update the learning rate for the optimizer, optionally annealing it over time.
Returns
-------
None
"""
config = self.config
lrnow = config.learning_rate
if config.anneal_lr:
lrnow *= 1 - self.global_step / config.total_timesteps
self.optim.param_groups[0]["lr"] = lrnow
# kepp track of the learning rate in the summarizer
self.summarizer.log["lr"] = lrnow
[docs] def _compute_advantages_and_returns(self) -> Tuple[torch.Tensor, torch.Tensor]:
"""
Compute advantages and returns using Generalized Advantage Estimation (GAE).
Returns
-------
advantages : torch.Tensor
Advantage estimates for each step.
returns : torch.Tensor
Computed returns for each step.
Examples
--------
>>> adv, ret = algo._compute_advantages_and_returns()
"""
buffer = self.buffer
config = self.config
advantages = torch.zeros_like(buffer.rewards)
last_gaelam = 0
for t in reversed(range(config.num_steps)):
next_nonterminal = 1.0 - buffer.dones[t + 1]
next_values = buffer.values[t + 1]
delta = (
buffer.rewards[t]
+ config.gamma * next_values * next_nonterminal
- buffer.values[t]
)
advantages[t] = last_gaelam = (
delta
+ config.gamma * config.gae_lambda * next_nonterminal * last_gaelam
)
returns = advantages + buffer.values[:-1]
return advantages, returns
[docs] def save_checkpoint(self, policy: "duo.policies.PPOPolicy", name: str) -> None:
"""
Save the current policy and optimizer state to a checkpoint file.
Parameters
----------
policy : duo.policies.PPOPolicy
The policy to save.
name : str
Name for the checkpoint file.
Returns
-------
None
Examples
--------
>>> algo.save_checkpoint(policy, "last")
"""
save_path = f"{self.save_dir}/{name}.ckpt"
torch.save(
{
"policy_config": policy.config,
"model_state_dict": policy.get_params(),
"optim_state_dict": self.optim.state_dict(),
"global_step": self.global_step,
},
save_path,
)
logging.info(f"Saved checkpoint to {save_path}")
[docs] def load_checkpoint(self, policy: "duo.policies.PPOPolicy", load_path: str) -> None:
"""
Load policy and optimizer state from a checkpoint file.
Parameters
----------
policy : duo.policies.PPOPolicy
The policy to load parameters into.
load_path : str
Path to the checkpoint file.
Returns
-------
None
Examples
--------
>>> algo.load_checkpoint(policy, "checkpoint.ckpt")
"""
ckpt = torch.load(load_path, map_location=get_global_variable("device"))
policy.set_params(ckpt["model_state_dict"])
self.optim.load_state_dict(ckpt["optim_state_dict"])
self.global_step = ckpt["global_step"]
logging.info(
f"Loaded checkpoint from {load_path}, global step: {self.global_step}"
)
[docs]@dataclass
class PPOBatch:
"""
Data structure for a batch of PPO training data.
Examples
--------
>>> batch = PPOBatch(obs, actions, log_probs, advantages, returns, values)
"""
obs: "TensorDict"
actions: torch.Tensor
log_probs: torch.Tensor
advantages: torch.Tensor
returns: torch.Tensor
values: torch.Tensor
[docs]class TrainBuffer:
"""
Buffer for storing trajectories and training data for PPO.
Examples
--------
>>> buffer = TrainBuffer.new(env, num_steps=128)
"""
def __init__(self, data: Dict[str, Any]) -> None:
"""
Initialize the TrainBuffer.
Parameters
----------
data : dict
Dictionary containing buffer arrays for each key.
"""
self.data = data
[docs] def __getattr__(self, name: str) -> Any:
"""
Retrieve a buffer attribute by key.
Parameters
----------
name : str
Name of the buffer key to retrieve.
Returns
-------
Any
The buffer value for the given key.
Raises
------
AttributeError
If the key is not found in the buffer.
"""
if name in self.data:
return self.data[name]
raise AttributeError(f"'TrainBuffer' object has no attribute '{name}'")
[docs] @classmethod
def new(cls, env: "gym.Env", num_steps: int) -> "TrainBuffer":
"""
Create a new TrainBuffer with zero-initialized arrays for the given environment and number of steps.
Parameters
----------
env : gym.Env
The environment instance.
num_steps : int
Number of steps to allocate in the buffer.
Returns
-------
TrainBuffer
A new buffer instance with allocated arrays.
Examples
--------
>>> buffer = TrainBuffer.new(env, 128)
"""
device = get_global_variable("device")
num_envs = env.num_envs
if isinstance(env.observation_space, gym.spaces.Dict):
obs_shape = {
k: space.shape for k, space in env.observation_space.spaces.items()
}
else:
obs_shape = env.observation_space.shape
action_shape = env.action_space.shape
if isinstance(obs_shape, dict):
obs_buffer_shape = {
k: (num_steps, num_envs) + shape for k, shape in obs_shape.items()
}
else:
obs_buffer_shape = (num_steps, num_envs) + obs_shape
new_data = {}
new_data["obs"] = TensorDict.zeros(obs_buffer_shape).to(device)
new_data["actions"] = torch.zeros((num_steps, num_envs) + action_shape).to(
device
)
new_data["log_probs"] = torch.zeros((num_steps, num_envs)).to(device)
new_data["rewards"] = torch.zeros((num_steps, num_envs)).to(device)
new_data["dones"] = torch.zeros((num_steps + 1, num_envs)).to(device)
new_data["values"] = torch.zeros((num_steps + 1, num_envs)).to(device)
return cls(new_data)
[docs] def add(self, step: int, new_data: Dict[str, Any]) -> None:
"""
Add new data for a given step to the buffer.
Parameters
----------
step : int
The step index to add data to.
new_data : dict
Dictionary of new data to add for this step.
Returns
-------
None
"""
for k, v in new_data.items():
assert k in self.data, f"Key {k} not found in buffer"
self.data[k][step] = v
[docs] def flatten(self) -> "TrainBuffer":
"""
Flatten the buffer for minibatch training.
Returns
-------
TrainBuffer
A new buffer with flattened arrays.
Examples
--------
>>> flat_buffer = buffer.flatten()
"""
new_data = {}
for k, v in self.data.items():
new_data[k] = v.flatten(0, 1)
return TrainBuffer(new_data)
[docs] def __setitem__(self, name: str, value: Any) -> None:
"""
Set a buffer value by key.
Parameters
----------
name : str
Name of the buffer key to set.
value : Any
Value to assign to the buffer key.
Returns
-------
None
"""
self.data[name] = value
[docs] def generate_minibatches(self, num_epochs: int, minibatch_size: int) -> "PPOBatch":
"""
Yield minibatches for training.
Parameters
----------
num_epochs : int
Number of epochs to iterate over the buffer.
minibatch_size : int
Size of each minibatch.
Yields
------
PPOBatch
A minibatch of PPO training data.
Examples
--------
>>> for mb in buffer.generate_minibatches(3, 64):
... # train on mb
"""
batch_size = self.actions.shape[0]
b_inds = np.arange(batch_size)
for _ in range(num_epochs):
np.random.shuffle(b_inds)
for start in range(0, batch_size, minibatch_size):
end = start + minibatch_size
mb_inds = b_inds[start:end]
yield PPOBatch(
obs=self.obs[mb_inds],
actions=self.actions[mb_inds],
log_probs=self.log_probs[mb_inds],
advantages=self.advantages[mb_inds],
returns=self.returns[mb_inds],
values=self.values[mb_inds],
)
[docs]class TensorDict:
"""
Utility class for handling dictionary-structured tensors, supporting batch operations.
Examples
--------
>>> td = TensorDict({'obs': torch.zeros(4, 3)})
"""
def __init__(self, data: Union[Dict[str, torch.Tensor], torch.Tensor]) -> None:
"""
Initialize a TensorDict.
Parameters
----------
data : dict or torch.Tensor
Dictionary of tensors or a single tensor.
Returns
-------
None
"""
self.data = data
[docs] @classmethod
def zeros(
cls, shape: Union[Dict[str, Tuple[int, ...]], Tuple[int, ...]]
) -> "TensorDict":
"""
Create a TensorDict of zeros with the given shape.
Parameters
----------
shape : dict or tuple
Shape for each tensor or the single tensor.
Returns
-------
TensorDict
A TensorDict of zeros.
Examples
--------
>>> td = TensorDict.zeros({'obs': (4, 3)})
"""
if isinstance(shape, dict):
data = {}
for k, shape in shape.items():
data[k] = torch.zeros(shape)
else:
data = torch.zeros(shape)
return TensorDict(data)
[docs] def to(self, device: Union[torch.device, str]) -> "TensorDict":
"""
Move all tensors in the TensorDict to the specified device.
Parameters
----------
device : torch.device or str
The device to move tensors to.
Returns
-------
TensorDict
A new TensorDict with tensors on the specified device.
Examples
--------
>>> td = td.to("cuda")
"""
if isinstance(self.data, dict):
data = {}
for k in self.data:
data[k] = self.data[k].to(device)
else:
data = self.data.to(device)
return TensorDict(data)
[docs] def __setitem__(self, indices: Any, other: "TensorDict") -> None:
"""
Set values in the TensorDict at the given indices.
Parameters
----------
indices : Any
Indices to set.
other : TensorDict
TensorDict containing values to set.
Returns
-------
None
"""
if isinstance(self.data, dict):
for k in self.data:
self.data[k][indices] = other.data[k]
else:
self.data[indices] = other.data
[docs] def __getitem__(self, indices: Any) -> "TensorDict":
"""
Retrieve values from the TensorDict at the given indices.
Parameters
----------
indices : Any
Indices to retrieve.
Returns
-------
TensorDict
A new TensorDict with the selected values.
Examples
--------
>>> td_slice = td[0:2]
"""
if isinstance(self.data, dict):
data = {}
for k in self.data:
data[k] = self.data[k][indices]
else:
data = self.data[indices]
return TensorDict(data)
[docs] def flatten(self, start_dim: int = 0, end_dim: int = -1) -> "TensorDict":
"""
Flatten tensors in the TensorDict along specified dimensions.
Parameters
----------
start_dim : int, optional
The first dimension to flatten. Default is 0.
end_dim : int, optional
The last dimension to flatten. Default is -1.
Returns
-------
TensorDict
A new TensorDict with flattened tensors.
Examples
--------
>>> td_flat = td.flatten(0, 1)
"""
if isinstance(self.data, dict):
data = {}
for k in self.data:
data[k] = self.data[k].flatten(start_dim, end_dim)
else:
data = self.data.flatten(start_dim, end_dim)
return TensorDict(data)
[docs] @classmethod
def from_numpy(cls, data: Union[Dict[str, np.ndarray], np.ndarray]) -> "TensorDict":
"""
Convert numpy arrays to a TensorDict.
Parameters
----------
data : dict or np.ndarray
Dictionary of numpy arrays or a single numpy array.
Returns
-------
TensorDict
A TensorDict with tensors converted from numpy arrays.
Examples
--------
>>> td = TensorDict.from_numpy({'obs': np.zeros((4, 3))})
"""
if isinstance(data, dict):
data = data.copy()
for k in data:
data[k] = torch.from_numpy(data[k]).float()
else:
data = torch.from_numpy(data).float()
return TensorDict(data)
[docs]class PPOTrainSummarizer:
"""
Summarizer for PPO training statistics and logging.
Examples
--------
>>> summarizer = PPOTrainSummarizer(config)
"""
def __init__(self, config: PPOAlgorithmConfig) -> None:
"""
Initialize the PPOTrainSummarizer.
Parameters
----------
config : PPOAlgorithmConfig
Configuration object for the summarizer.
Returns
-------
None
"""
self.log_action_id = config.log_action_id
self.clear()
[docs] def clear(self) -> None:
"""
Clear the summary statistics log.
Returns
-------
None
"""
self.log = {}
[docs] def initialize_iteration(self, env: "gym.Env") -> None:
"""
Initialize logging for a new training iteration.
Parameters
----------
env : gym.Env
The environment instance for the iteration.
Returns
-------
None
"""
keys = [
"reward",
"base_reward",
f"action_{self.log_action_id}",
"action_prob",
"pg_loss",
"v_loss",
"ent_loss",
"loss",
"advantage",
"value",
]
self.iter_log = {k: [] for k in keys}
self.episode_total_reward = {
"reward": [0.0] * env.num_envs,
"base_reward": [0.0] * env.num_envs,
}
[docs] def finalize_iteration(self) -> None:
"""
Finalize and aggregate statistics for the iteration.
Returns
-------
None
"""
for k, v in self.iter_log.items():
if isinstance(v, list):
self.log.setdefault(k, []).extend(v)
else:
raise NotImplementedError
[docs] def add_episode_step(
self,
action: torch.Tensor,
log_prob: torch.Tensor,
reward: Union[np.ndarray, torch.Tensor],
done: Union[np.ndarray, torch.Tensor],
info: List[dict],
) -> None:
"""
Log statistics for each episode step.
Parameters
----------
action : torch.Tensor
Actions taken at this step.
log_prob : torch.Tensor
Log probabilities of the actions.
reward : np.ndarray or torch.Tensor
Rewards received at this step.
done : np.ndarray or torch.Tensor
Done flags for each environment.
info : list of dict
Additional info for each environment.
Returns
-------
None
"""
self.iter_log[f"action_{self.log_action_id}"].extend(
(action == self.log_action_id).long().tolist()
)
self.iter_log["action_prob"].extend(log_prob.exp().tolist())
for i in range(action.shape[0]):
self.episode_total_reward["reward"][i] += reward[i]
if "base_reward" in info[i]:
self.episode_total_reward["base_reward"][i] += info[i]["base_reward"]
if done[i]:
self.iter_log["reward"].append(self.episode_total_reward["reward"][i])
self.iter_log["base_reward"].append(
self.episode_total_reward["base_reward"][i]
)
self.episode_total_reward["reward"][i] = 0
self.episode_total_reward["base_reward"][i] = 0
[docs] def add_training_iteration(
self,
value: torch.Tensor,
advantage: torch.Tensor,
pg_loss: torch.Tensor,
v_loss: torch.Tensor,
entropy_loss: torch.Tensor,
loss: torch.Tensor,
) -> None:
"""
Log statistics for each training minibatch.
Parameters
----------
value : torch.Tensor
Value function predictions.
advantage : torch.Tensor
Advantage estimates.
pg_loss : torch.Tensor
Policy gradient loss.
v_loss : torch.Tensor
Value loss.
entropy_loss : torch.Tensor
Entropy loss.
loss : torch.Tensor
Total loss.
Returns
-------
None
"""
self.iter_log["value"].extend(value.tolist())
self.iter_log["advantage"].extend(advantage.tolist())
self.iter_log["pg_loss"].append(pg_loss.item())
self.iter_log["v_loss"].append(v_loss.item())
self.iter_log["ent_loss"].append(entropy_loss.item())
self.iter_log["loss"].append(loss.item())
[docs] def summarize(self) -> Dict[str, float]:
"""
Compute summary statistics for the current log.
Returns
-------
dict
Dictionary of summary statistics.
Examples
--------
>>> stats = summarizer.summarize()
"""
log = self.log
return {
"lr": log["lr"],
"reward_mean": float(np.mean(log["reward"])),
"reward_std": float(np.std(log["reward"])),
"base_reward_mean": float(np.mean(log["base_reward"])),
"base_reward_std": float(np.std(log["base_reward"])),
"pg_loss": float(np.mean(log["pg_loss"])),
"v_loss": float(np.mean(log["v_loss"])),
"ent_loss": float(np.mean(log["ent_loss"])),
"loss": float(np.mean(log["loss"])),
"advantage_mean": float(np.mean(log["advantage"])),
"advantage_std": float(np.std(log["advantage"])),
"value_mean": float(np.mean(log["value"])),
"value_std": float(np.std(log["value"])),
f"action_{self.log_action_id}": float(
np.mean(log[f"action_{self.log_action_id}"])
),
"action_prob": float(np.mean(log["action_prob"])),
}
[docs] def write(self, summary: Optional[Dict[str, float]] = None) -> Dict[str, float]:
"""
Pretty-print and log the summary statistics.
Parameters
----------
summary : dict, optional
Precomputed summary statistics. If None, will compute from log.
Returns
-------
dict
The summary statistics that were logged.
Examples
--------
>>> summarizer.write()
"""
if summary is None:
summary = self.summarize()
log_str = (
"\n"
f" Reward: mean {summary['reward_mean']:7.2f} ± {summary['reward_std']:7.2f}\n"
f" Base Reward: mean {summary['base_reward_mean']:7.2f} ± {summary['base_reward_std']:7.2f}\n"
f" Loss: pg_loss {summary['pg_loss']:7.4f} "
f"v_loss {summary['v_loss']:7.4f} "
f"ent_loss {summary['ent_loss']:7.4f} "
f"loss {summary['loss']:7.4f}\n"
f" Others: advantage {summary['advantage_mean']:7.4f} ± {summary['advantage_std']:7.4f} "
f"value {summary['value_mean']:7.4f} ± {summary['value_std']:7.4f}\n"
f" Action {self.log_action_id} frac: {summary[f'action_{self.log_action_id}']:7.2f}\n"
f" Action prob: {summary['action_prob']:7.2f}"
)
logging.info(log_str)
return summary