duo_ai.algorithms.ppo

Classes

PPOAlgorithmConfig

Configuration for the PPOAlgorithm.

PPOAlgorithm

Proximal Policy Optimization (PPO) algorithm implementation.

PPOBatch

Data structure for a batch of PPO training data.

TrainBuffer

Buffer for storing trajectories and training data for PPO.

TensorDict

Utility class for handling dictionary-structured tensors, supporting batch operations.

PPOTrainSummarizer

Summarizer for PPO training statistics and logging.

Module Contents

class duo_ai.algorithms.ppo.PPOAlgorithmConfig[source]

Configuration for the PPOAlgorithm.

Parameters:
  • name (str, optional) – Name of the algorithm.

  • log_freq (int, optional) – Frequency (in iterations) to log training statistics.

  • save_freq (int, optional) – Frequency (in iterations) to save model checkpoints.

  • num_steps (int, optional) – Number of steps to run in each environment per iteration.

  • total_timesteps (int, optional) – Total number of environment steps to train for.

  • update_epochs (int, optional) – Number of epochs to update the policy per iteration.

  • gamma (float, optional) – Discount factor for rewards.

  • gae_lambda (float, optional) – Lambda for Generalized Advantage Estimation.

  • num_minibatches (int, optional) – Number of minibatches for each update epoch.

  • clip_coef (float, optional) – Clipping coefficient for PPO surrogate objective.

  • norm_adv (bool, optional) – Whether to normalize advantages.

  • clip_vloss (bool, optional) – Whether to use clipped value loss.

  • vf_coef (float, optional) – Coefficient for value function loss.

  • ent_coef (float, optional) – Coefficient for entropy bonus.

  • max_grad_norm (float, optional) – Maximum norm for gradient clipping.

  • learning_rate (float, optional) – Learning rate for optimizer.

  • critic_pretrain_steps (int, optional) – Number of steps to pretrain the critic before policy updates.

  • anneal_lr (bool, optional) – Whether to linearly anneal the learning rate.

  • log_action_id (int, optional) – Action ID to log statistics for (e.g., expert action).

Examples

>>> cfg = PPOAlgorithmConfig(num_steps=128, total_timesteps=10000)
name: str = 'ppo'
log_freq: int = 10
save_freq: int = 0
num_steps: int = 256
total_timesteps: int = 1500000
update_epochs: int = 3
gamma: float = 0.999
gae_lambda: float = 0.95
num_minibatches: int = 8
clip_coef: float = 0.2
norm_adv: bool = True
clip_vloss: bool = True
vf_coef: float = 0.5
ent_coef: float = 0.01
max_grad_norm: float = 0.5
learning_rate: float = 0.0005
critic_pretrain_steps: int = 0
anneal_lr: bool = False
log_action_id: int = 1
class duo_ai.algorithms.ppo.PPOAlgorithm(config: PPOAlgorithmConfig)[source]

Bases: duo_ai.core.Algorithm

Proximal Policy Optimization (PPO) algorithm implementation.

Examples

>>> algo = PPOAlgorithm(PPOAlgorithmConfig())
>>> algo.train(policy, env, validators)
config_cls
config
_initialize() None[source]

Initialize PPO training state, buffers, optimizer, and logging.

Return type:

None

train(policy: duo.policies.PPOPolicy, env: gymnasium.Env, validators: Dict[str, duo.core.Evaluator]) None[source]

Train the PPO algorithm on the specified environment(s) using the provided policy.

This method performs multiple training iterations, periodically evaluates the policy, logs statistics, and saves checkpoints for the best and last models.

Parameters:
  • policy (duo.policies.PPOPolicy) – The policy to be trained.

  • env (gym.Env) – The environment instance for training.

  • validators (dict of str to duo.core.Evaluator) – Dictionary mapping split names to evaluator instances for evaluation.

Return type:

None

Examples

>>> algorithm.train(policy, env, validators)
_train_once() None[source]

Perform a single training iteration of PPO, including trajectory collection, advantage computation, and policy/value updates.

Return type:

None

_update_learning_rate() None[source]

Update the learning rate for the optimizer, optionally annealing it over time.

Return type:

None

_compute_advantages_and_returns() Tuple[torch.Tensor, torch.Tensor][source]

Compute advantages and returns using Generalized Advantage Estimation (GAE).

Returns:

  • advantages (torch.Tensor) – Advantage estimates for each step.

  • returns (torch.Tensor) – Computed returns for each step.

Examples

>>> adv, ret = algo._compute_advantages_and_returns()
save_checkpoint(policy: duo.policies.PPOPolicy, name: str) None[source]

Save the current policy and optimizer state to a checkpoint file.

Parameters:
  • policy (duo.policies.PPOPolicy) – The policy to save.

  • name (str) – Name for the checkpoint file.

Return type:

None

Examples

>>> algo.save_checkpoint(policy, "last")
load_checkpoint(policy: duo.policies.PPOPolicy, load_path: str) None[source]

Load policy and optimizer state from a checkpoint file.

Parameters:
  • policy (duo.policies.PPOPolicy) – The policy to load parameters into.

  • load_path (str) – Path to the checkpoint file.

Return type:

None

Examples

>>> algo.load_checkpoint(policy, "checkpoint.ckpt")
class duo_ai.algorithms.ppo.PPOBatch[source]

Data structure for a batch of PPO training data.

Examples

>>> batch = PPOBatch(obs, actions, log_probs, advantages, returns, values)
obs: TensorDict
actions: torch.Tensor
log_probs: torch.Tensor
advantages: torch.Tensor
returns: torch.Tensor
values: torch.Tensor
class duo_ai.algorithms.ppo.TrainBuffer(data: Dict[str, Any])[source]

Buffer for storing trajectories and training data for PPO.

Examples

>>> buffer = TrainBuffer.new(env, num_steps=128)
data
__getattr__(name: str) Any[source]

Retrieve a buffer attribute by key.

Parameters:

name (str) – Name of the buffer key to retrieve.

Returns:

The buffer value for the given key.

Return type:

Any

Raises:

AttributeError – If the key is not found in the buffer.

classmethod new(env: gymnasium.Env, num_steps: int) TrainBuffer[source]

Create a new TrainBuffer with zero-initialized arrays for the given environment and number of steps.

Parameters:
  • env (gym.Env) – The environment instance.

  • num_steps (int) – Number of steps to allocate in the buffer.

Returns:

A new buffer instance with allocated arrays.

Return type:

TrainBuffer

Examples

>>> buffer = TrainBuffer.new(env, 128)
add(step: int, new_data: Dict[str, Any]) None[source]

Add new data for a given step to the buffer.

Parameters:
  • step (int) – The step index to add data to.

  • new_data (dict) – Dictionary of new data to add for this step.

Return type:

None

flatten() TrainBuffer[source]

Flatten the buffer for minibatch training.

Returns:

A new buffer with flattened arrays.

Return type:

TrainBuffer

Examples

>>> flat_buffer = buffer.flatten()
__setitem__(name: str, value: Any) None[source]

Set a buffer value by key.

Parameters:
  • name (str) – Name of the buffer key to set.

  • value (Any) – Value to assign to the buffer key.

Return type:

None

generate_minibatches(num_epochs: int, minibatch_size: int) PPOBatch[source]

Yield minibatches for training.

Parameters:
  • num_epochs (int) – Number of epochs to iterate over the buffer.

  • minibatch_size (int) – Size of each minibatch.

Yields:

PPOBatch – A minibatch of PPO training data.

Examples

>>> for mb in buffer.generate_minibatches(3, 64):
...     # train on mb
class duo_ai.algorithms.ppo.TensorDict(data: Dict[str, torch.Tensor] | torch.Tensor)[source]

Utility class for handling dictionary-structured tensors, supporting batch operations.

Examples

>>> td = TensorDict({'obs': torch.zeros(4, 3)})
data
classmethod zeros(shape: Dict[str, Tuple[int, Ellipsis]] | Tuple[int, Ellipsis]) TensorDict[source]

Create a TensorDict of zeros with the given shape.

Parameters:

shape (dict or tuple) – Shape for each tensor or the single tensor.

Returns:

A TensorDict of zeros.

Return type:

TensorDict

Examples

>>> td = TensorDict.zeros({'obs': (4, 3)})
to(device: torch.device | str) TensorDict[source]

Move all tensors in the TensorDict to the specified device.

Parameters:

device (torch.device or str) – The device to move tensors to.

Returns:

A new TensorDict with tensors on the specified device.

Return type:

TensorDict

Examples

>>> td = td.to("cuda")
__setitem__(indices: Any, other: TensorDict) None[source]

Set values in the TensorDict at the given indices.

Parameters:
  • indices (Any) – Indices to set.

  • other (TensorDict) – TensorDict containing values to set.

Return type:

None

__getitem__(indices: Any) TensorDict[source]

Retrieve values from the TensorDict at the given indices.

Parameters:

indices (Any) – Indices to retrieve.

Returns:

A new TensorDict with the selected values.

Return type:

TensorDict

Examples

>>> td_slice = td[0:2]
flatten(start_dim: int = 0, end_dim: int = -1) TensorDict[source]

Flatten tensors in the TensorDict along specified dimensions.

Parameters:
  • start_dim (int, optional) – The first dimension to flatten. Default is 0.

  • end_dim (int, optional) – The last dimension to flatten. Default is -1.

Returns:

A new TensorDict with flattened tensors.

Return type:

TensorDict

Examples

>>> td_flat = td.flatten(0, 1)
classmethod from_numpy(data: Dict[str, numpy.ndarray] | numpy.ndarray) TensorDict[source]

Convert numpy arrays to a TensorDict.

Parameters:

data (dict or np.ndarray) – Dictionary of numpy arrays or a single numpy array.

Returns:

A TensorDict with tensors converted from numpy arrays.

Return type:

TensorDict

Examples

>>> td = TensorDict.from_numpy({'obs': np.zeros((4, 3))})
class duo_ai.algorithms.ppo.PPOTrainSummarizer(config: PPOAlgorithmConfig)[source]

Summarizer for PPO training statistics and logging.

Examples

>>> summarizer = PPOTrainSummarizer(config)
log_action_id
clear() None[source]

Clear the summary statistics log.

Return type:

None

initialize_iteration(env: gymnasium.Env) None[source]

Initialize logging for a new training iteration.

Parameters:

env (gym.Env) – The environment instance for the iteration.

Return type:

None

finalize_iteration() None[source]

Finalize and aggregate statistics for the iteration.

Return type:

None

add_episode_step(action: torch.Tensor, log_prob: torch.Tensor, reward: numpy.ndarray | torch.Tensor, done: numpy.ndarray | torch.Tensor, info: List[dict]) None[source]

Log statistics for each episode step.

Parameters:
  • action (torch.Tensor) – Actions taken at this step.

  • log_prob (torch.Tensor) – Log probabilities of the actions.

  • reward (np.ndarray or torch.Tensor) – Rewards received at this step.

  • done (np.ndarray or torch.Tensor) – Done flags for each environment.

  • info (list of dict) – Additional info for each environment.

Return type:

None

add_training_iteration(value: torch.Tensor, advantage: torch.Tensor, pg_loss: torch.Tensor, v_loss: torch.Tensor, entropy_loss: torch.Tensor, loss: torch.Tensor) None[source]

Log statistics for each training minibatch.

Parameters:
  • value (torch.Tensor) – Value function predictions.

  • advantage (torch.Tensor) – Advantage estimates.

  • pg_loss (torch.Tensor) – Policy gradient loss.

  • v_loss (torch.Tensor) – Value loss.

  • entropy_loss (torch.Tensor) – Entropy loss.

  • loss (torch.Tensor) – Total loss.

Return type:

None

summarize() Dict[str, float][source]

Compute summary statistics for the current log.

Returns:

Dictionary of summary statistics.

Return type:

dict

Examples

>>> stats = summarizer.summarize()
write(summary: Dict[str, float] | None = None) Dict[str, float][source]

Pretty-print and log the summary statistics.

Parameters:

summary (dict, optional) – Precomputed summary statistics. If None, will compute from log.

Returns:

The summary statistics that were logged.

Return type:

dict

Examples

>>> summarizer.write()