duo_ai.algorithms.ppo¶
Classes¶
Configuration for the PPOAlgorithm. |
|
Proximal Policy Optimization (PPO) algorithm implementation. |
|
Data structure for a batch of PPO training data. |
|
Buffer for storing trajectories and training data for PPO. |
|
Utility class for handling dictionary-structured tensors, supporting batch operations. |
|
Summarizer for PPO training statistics and logging. |
Module Contents¶
- class duo_ai.algorithms.ppo.PPOAlgorithmConfig[source]¶
Configuration for the PPOAlgorithm.
- Parameters:
name (str, optional) – Name of the algorithm.
log_freq (int, optional) – Frequency (in iterations) to log training statistics.
save_freq (int, optional) – Frequency (in iterations) to save model checkpoints.
num_steps (int, optional) – Number of steps to run in each environment per iteration.
total_timesteps (int, optional) – Total number of environment steps to train for.
update_epochs (int, optional) – Number of epochs to update the policy per iteration.
gamma (float, optional) – Discount factor for rewards.
gae_lambda (float, optional) – Lambda for Generalized Advantage Estimation.
num_minibatches (int, optional) – Number of minibatches for each update epoch.
clip_coef (float, optional) – Clipping coefficient for PPO surrogate objective.
norm_adv (bool, optional) – Whether to normalize advantages.
clip_vloss (bool, optional) – Whether to use clipped value loss.
vf_coef (float, optional) – Coefficient for value function loss.
ent_coef (float, optional) – Coefficient for entropy bonus.
max_grad_norm (float, optional) – Maximum norm for gradient clipping.
learning_rate (float, optional) – Learning rate for optimizer.
critic_pretrain_steps (int, optional) – Number of steps to pretrain the critic before policy updates.
anneal_lr (bool, optional) – Whether to linearly anneal the learning rate.
log_action_id (int, optional) – Action ID to log statistics for (e.g., expert action).
Examples
>>> cfg = PPOAlgorithmConfig(num_steps=128, total_timesteps=10000)
- name: str = 'ppo'¶
- log_freq: int = 10¶
- save_freq: int = 0¶
- num_steps: int = 256¶
- total_timesteps: int = 1500000¶
- update_epochs: int = 3¶
- gamma: float = 0.999¶
- gae_lambda: float = 0.95¶
- num_minibatches: int = 8¶
- clip_coef: float = 0.2¶
- norm_adv: bool = True¶
- clip_vloss: bool = True¶
- vf_coef: float = 0.5¶
- ent_coef: float = 0.01¶
- max_grad_norm: float = 0.5¶
- learning_rate: float = 0.0005¶
- critic_pretrain_steps: int = 0¶
- anneal_lr: bool = False¶
- log_action_id: int = 1¶
- class duo_ai.algorithms.ppo.PPOAlgorithm(config: PPOAlgorithmConfig)[source]¶
Bases:
duo_ai.core.AlgorithmProximal Policy Optimization (PPO) algorithm implementation.
Examples
>>> algo = PPOAlgorithm(PPOAlgorithmConfig()) >>> algo.train(policy, env, validators)
- config_cls¶
- config¶
- _initialize() None[source]¶
Initialize PPO training state, buffers, optimizer, and logging.
- Return type:
None
- train(policy: duo.policies.PPOPolicy, env: gymnasium.Env, validators: Dict[str, duo.core.Evaluator]) None[source]¶
Train the PPO algorithm on the specified environment(s) using the provided policy.
This method performs multiple training iterations, periodically evaluates the policy, logs statistics, and saves checkpoints for the best and last models.
- Parameters:
policy (duo.policies.PPOPolicy) – The policy to be trained.
env (gym.Env) – The environment instance for training.
validators (dict of str to duo.core.Evaluator) – Dictionary mapping split names to evaluator instances for evaluation.
- Return type:
None
Examples
>>> algorithm.train(policy, env, validators)
- _train_once() None[source]¶
Perform a single training iteration of PPO, including trajectory collection, advantage computation, and policy/value updates.
- Return type:
None
- _update_learning_rate() None[source]¶
Update the learning rate for the optimizer, optionally annealing it over time.
- Return type:
None
- _compute_advantages_and_returns() Tuple[torch.Tensor, torch.Tensor][source]¶
Compute advantages and returns using Generalized Advantage Estimation (GAE).
- Returns:
advantages (torch.Tensor) – Advantage estimates for each step.
returns (torch.Tensor) – Computed returns for each step.
Examples
>>> adv, ret = algo._compute_advantages_and_returns()
- save_checkpoint(policy: duo.policies.PPOPolicy, name: str) None[source]¶
Save the current policy and optimizer state to a checkpoint file.
- Parameters:
policy (duo.policies.PPOPolicy) – The policy to save.
name (str) – Name for the checkpoint file.
- Return type:
None
Examples
>>> algo.save_checkpoint(policy, "last")
- load_checkpoint(policy: duo.policies.PPOPolicy, load_path: str) None[source]¶
Load policy and optimizer state from a checkpoint file.
- Parameters:
policy (duo.policies.PPOPolicy) – The policy to load parameters into.
load_path (str) – Path to the checkpoint file.
- Return type:
None
Examples
>>> algo.load_checkpoint(policy, "checkpoint.ckpt")
- class duo_ai.algorithms.ppo.PPOBatch[source]¶
Data structure for a batch of PPO training data.
Examples
>>> batch = PPOBatch(obs, actions, log_probs, advantages, returns, values)
- obs: TensorDict¶
- actions: torch.Tensor¶
- log_probs: torch.Tensor¶
- advantages: torch.Tensor¶
- returns: torch.Tensor¶
- values: torch.Tensor¶
- class duo_ai.algorithms.ppo.TrainBuffer(data: Dict[str, Any])[source]¶
Buffer for storing trajectories and training data for PPO.
Examples
>>> buffer = TrainBuffer.new(env, num_steps=128)
- data¶
- __getattr__(name: str) Any[source]¶
Retrieve a buffer attribute by key.
- Parameters:
name (str) – Name of the buffer key to retrieve.
- Returns:
The buffer value for the given key.
- Return type:
Any
- Raises:
AttributeError – If the key is not found in the buffer.
- classmethod new(env: gymnasium.Env, num_steps: int) TrainBuffer[source]¶
Create a new TrainBuffer with zero-initialized arrays for the given environment and number of steps.
- Parameters:
env (gym.Env) – The environment instance.
num_steps (int) – Number of steps to allocate in the buffer.
- Returns:
A new buffer instance with allocated arrays.
- Return type:
Examples
>>> buffer = TrainBuffer.new(env, 128)
- add(step: int, new_data: Dict[str, Any]) None[source]¶
Add new data for a given step to the buffer.
- Parameters:
step (int) – The step index to add data to.
new_data (dict) – Dictionary of new data to add for this step.
- Return type:
None
- flatten() TrainBuffer[source]¶
Flatten the buffer for minibatch training.
- Returns:
A new buffer with flattened arrays.
- Return type:
Examples
>>> flat_buffer = buffer.flatten()
- __setitem__(name: str, value: Any) None[source]¶
Set a buffer value by key.
- Parameters:
name (str) – Name of the buffer key to set.
value (Any) – Value to assign to the buffer key.
- Return type:
None
- generate_minibatches(num_epochs: int, minibatch_size: int) PPOBatch[source]¶
Yield minibatches for training.
- Parameters:
num_epochs (int) – Number of epochs to iterate over the buffer.
minibatch_size (int) – Size of each minibatch.
- Yields:
PPOBatch – A minibatch of PPO training data.
Examples
>>> for mb in buffer.generate_minibatches(3, 64): ... # train on mb
- class duo_ai.algorithms.ppo.TensorDict(data: Dict[str, torch.Tensor] | torch.Tensor)[source]¶
Utility class for handling dictionary-structured tensors, supporting batch operations.
Examples
>>> td = TensorDict({'obs': torch.zeros(4, 3)})
- data¶
- classmethod zeros(shape: Dict[str, Tuple[int, Ellipsis]] | Tuple[int, Ellipsis]) TensorDict[source]¶
Create a TensorDict of zeros with the given shape.
- Parameters:
shape (dict or tuple) – Shape for each tensor or the single tensor.
- Returns:
A TensorDict of zeros.
- Return type:
Examples
>>> td = TensorDict.zeros({'obs': (4, 3)})
- to(device: torch.device | str) TensorDict[source]¶
Move all tensors in the TensorDict to the specified device.
- Parameters:
device (torch.device or str) – The device to move tensors to.
- Returns:
A new TensorDict with tensors on the specified device.
- Return type:
Examples
>>> td = td.to("cuda")
- __setitem__(indices: Any, other: TensorDict) None[source]¶
Set values in the TensorDict at the given indices.
- Parameters:
indices (Any) – Indices to set.
other (TensorDict) – TensorDict containing values to set.
- Return type:
None
- __getitem__(indices: Any) TensorDict[source]¶
Retrieve values from the TensorDict at the given indices.
- Parameters:
indices (Any) – Indices to retrieve.
- Returns:
A new TensorDict with the selected values.
- Return type:
Examples
>>> td_slice = td[0:2]
- flatten(start_dim: int = 0, end_dim: int = -1) TensorDict[source]¶
Flatten tensors in the TensorDict along specified dimensions.
- Parameters:
start_dim (int, optional) – The first dimension to flatten. Default is 0.
end_dim (int, optional) – The last dimension to flatten. Default is -1.
- Returns:
A new TensorDict with flattened tensors.
- Return type:
Examples
>>> td_flat = td.flatten(0, 1)
- classmethod from_numpy(data: Dict[str, numpy.ndarray] | numpy.ndarray) TensorDict[source]¶
Convert numpy arrays to a TensorDict.
- Parameters:
data (dict or np.ndarray) – Dictionary of numpy arrays or a single numpy array.
- Returns:
A TensorDict with tensors converted from numpy arrays.
- Return type:
Examples
>>> td = TensorDict.from_numpy({'obs': np.zeros((4, 3))})
- class duo_ai.algorithms.ppo.PPOTrainSummarizer(config: PPOAlgorithmConfig)[source]¶
Summarizer for PPO training statistics and logging.
Examples
>>> summarizer = PPOTrainSummarizer(config)
- log_action_id¶
- initialize_iteration(env: gymnasium.Env) None[source]¶
Initialize logging for a new training iteration.
- Parameters:
env (gym.Env) – The environment instance for the iteration.
- Return type:
None
- finalize_iteration() None[source]¶
Finalize and aggregate statistics for the iteration.
- Return type:
None
- add_episode_step(action: torch.Tensor, log_prob: torch.Tensor, reward: numpy.ndarray | torch.Tensor, done: numpy.ndarray | torch.Tensor, info: List[dict]) None[source]¶
Log statistics for each episode step.
- Parameters:
action (torch.Tensor) – Actions taken at this step.
log_prob (torch.Tensor) – Log probabilities of the actions.
reward (np.ndarray or torch.Tensor) – Rewards received at this step.
done (np.ndarray or torch.Tensor) – Done flags for each environment.
info (list of dict) – Additional info for each environment.
- Return type:
None
- add_training_iteration(value: torch.Tensor, advantage: torch.Tensor, pg_loss: torch.Tensor, v_loss: torch.Tensor, entropy_loss: torch.Tensor, loss: torch.Tensor) None[source]¶
Log statistics for each training minibatch.
- Parameters:
value (torch.Tensor) – Value function predictions.
advantage (torch.Tensor) – Advantage estimates.
pg_loss (torch.Tensor) – Policy gradient loss.
v_loss (torch.Tensor) – Value loss.
entropy_loss (torch.Tensor) – Entropy loss.
loss (torch.Tensor) – Total loss.
- Return type:
None
- summarize() Dict[str, float][source]¶
Compute summary statistics for the current log.
- Returns:
Dictionary of summary statistics.
- Return type:
dict
Examples
>>> stats = summarizer.summarize()
- write(summary: Dict[str, float] | None = None) Dict[str, float][source]¶
Pretty-print and log the summary statistics.
- Parameters:
summary (dict, optional) – Precomputed summary statistics. If None, will compute from log.
- Returns:
The summary statistics that were logged.
- Return type:
dict
Examples
>>> summarizer.write()