duo_ai.algorithms.ppo ===================== .. py:module:: duo_ai.algorithms.ppo Classes ------- .. autoapisummary:: duo_ai.algorithms.ppo.PPOAlgorithmConfig duo_ai.algorithms.ppo.PPOAlgorithm duo_ai.algorithms.ppo.PPOBatch duo_ai.algorithms.ppo.TrainBuffer duo_ai.algorithms.ppo.TensorDict duo_ai.algorithms.ppo.PPOTrainSummarizer Module Contents --------------- .. py:class:: PPOAlgorithmConfig Configuration for the PPOAlgorithm. :param name: Name of the algorithm. :type name: str, optional :param log_freq: Frequency (in iterations) to log training statistics. :type log_freq: int, optional :param save_freq: Frequency (in iterations) to save model checkpoints. :type save_freq: int, optional :param num_steps: Number of steps to run in each environment per iteration. :type num_steps: int, optional :param total_timesteps: Total number of environment steps to train for. :type total_timesteps: int, optional :param update_epochs: Number of epochs to update the policy per iteration. :type update_epochs: int, optional :param gamma: Discount factor for rewards. :type gamma: float, optional :param gae_lambda: Lambda for Generalized Advantage Estimation. :type gae_lambda: float, optional :param num_minibatches: Number of minibatches for each update epoch. :type num_minibatches: int, optional :param clip_coef: Clipping coefficient for PPO surrogate objective. :type clip_coef: float, optional :param norm_adv: Whether to normalize advantages. :type norm_adv: bool, optional :param clip_vloss: Whether to use clipped value loss. :type clip_vloss: bool, optional :param vf_coef: Coefficient for value function loss. :type vf_coef: float, optional :param ent_coef: Coefficient for entropy bonus. :type ent_coef: float, optional :param max_grad_norm: Maximum norm for gradient clipping. :type max_grad_norm: float, optional :param learning_rate: Learning rate for optimizer. :type learning_rate: float, optional :param critic_pretrain_steps: Number of steps to pretrain the critic before policy updates. :type critic_pretrain_steps: int, optional :param anneal_lr: Whether to linearly anneal the learning rate. :type anneal_lr: bool, optional :param log_action_id: Action ID to log statistics for (e.g., expert action). :type log_action_id: int, optional .. rubric:: Examples >>> cfg = PPOAlgorithmConfig(num_steps=128, total_timesteps=10000) .. py:attribute:: name :type: str :value: 'ppo' .. py:attribute:: log_freq :type: int :value: 10 .. py:attribute:: save_freq :type: int :value: 0 .. py:attribute:: num_steps :type: int :value: 256 .. py:attribute:: total_timesteps :type: int :value: 1500000 .. py:attribute:: update_epochs :type: int :value: 3 .. py:attribute:: gamma :type: float :value: 0.999 .. py:attribute:: gae_lambda :type: float :value: 0.95 .. py:attribute:: num_minibatches :type: int :value: 8 .. py:attribute:: clip_coef :type: float :value: 0.2 .. py:attribute:: norm_adv :type: bool :value: True .. py:attribute:: clip_vloss :type: bool :value: True .. py:attribute:: vf_coef :type: float :value: 0.5 .. py:attribute:: ent_coef :type: float :value: 0.01 .. py:attribute:: max_grad_norm :type: float :value: 0.5 .. py:attribute:: learning_rate :type: float :value: 0.0005 .. py:attribute:: critic_pretrain_steps :type: int :value: 0 .. py:attribute:: anneal_lr :type: bool :value: False .. py:attribute:: log_action_id :type: int :value: 1 .. py:class:: PPOAlgorithm(config: PPOAlgorithmConfig) Bases: :py:obj:`duo_ai.core.Algorithm` Proximal Policy Optimization (PPO) algorithm implementation. .. rubric:: Examples >>> algo = PPOAlgorithm(PPOAlgorithmConfig()) >>> algo.train(policy, env, validators) .. py:attribute:: config_cls .. py:attribute:: config .. py:method:: _initialize() -> None Initialize PPO training state, buffers, optimizer, and logging. :rtype: None .. py:method:: train(policy: duo.policies.PPOPolicy, env: gymnasium.Env, validators: Dict[str, duo.core.Evaluator]) -> None Train the PPO algorithm on the specified environment(s) using the provided policy. This method performs multiple training iterations, periodically evaluates the policy, logs statistics, and saves checkpoints for the best and last models. :param policy: The policy to be trained. :type policy: duo.policies.PPOPolicy :param env: The environment instance for training. :type env: gym.Env :param validators: Dictionary mapping split names to evaluator instances for evaluation. :type validators: dict of str to duo.core.Evaluator :rtype: None .. rubric:: Examples >>> algorithm.train(policy, env, validators) .. py:method:: _train_once() -> None Perform a single training iteration of PPO, including trajectory collection, advantage computation, and policy/value updates. :rtype: None .. py:method:: _update_learning_rate() -> None Update the learning rate for the optimizer, optionally annealing it over time. :rtype: None .. py:method:: _compute_advantages_and_returns() -> Tuple[torch.Tensor, torch.Tensor] Compute advantages and returns using Generalized Advantage Estimation (GAE). :returns: * **advantages** (*torch.Tensor*) -- Advantage estimates for each step. * **returns** (*torch.Tensor*) -- Computed returns for each step. .. rubric:: Examples >>> adv, ret = algo._compute_advantages_and_returns() .. py:method:: save_checkpoint(policy: duo.policies.PPOPolicy, name: str) -> None Save the current policy and optimizer state to a checkpoint file. :param policy: The policy to save. :type policy: duo.policies.PPOPolicy :param name: Name for the checkpoint file. :type name: str :rtype: None .. rubric:: Examples >>> algo.save_checkpoint(policy, "last") .. py:method:: load_checkpoint(policy: duo.policies.PPOPolicy, load_path: str) -> None Load policy and optimizer state from a checkpoint file. :param policy: The policy to load parameters into. :type policy: duo.policies.PPOPolicy :param load_path: Path to the checkpoint file. :type load_path: str :rtype: None .. rubric:: Examples >>> algo.load_checkpoint(policy, "checkpoint.ckpt") .. py:class:: PPOBatch Data structure for a batch of PPO training data. .. rubric:: Examples >>> batch = PPOBatch(obs, actions, log_probs, advantages, returns, values) .. py:attribute:: obs :type: TensorDict .. py:attribute:: actions :type: torch.Tensor .. py:attribute:: log_probs :type: torch.Tensor .. py:attribute:: advantages :type: torch.Tensor .. py:attribute:: returns :type: torch.Tensor .. py:attribute:: values :type: torch.Tensor .. py:class:: TrainBuffer(data: Dict[str, Any]) Buffer for storing trajectories and training data for PPO. .. rubric:: Examples >>> buffer = TrainBuffer.new(env, num_steps=128) .. py:attribute:: data .. py:method:: __getattr__(name: str) -> Any Retrieve a buffer attribute by key. :param name: Name of the buffer key to retrieve. :type name: str :returns: The buffer value for the given key. :rtype: Any :raises AttributeError: If the key is not found in the buffer. .. py:method:: new(env: gymnasium.Env, num_steps: int) -> TrainBuffer :classmethod: Create a new TrainBuffer with zero-initialized arrays for the given environment and number of steps. :param env: The environment instance. :type env: gym.Env :param num_steps: Number of steps to allocate in the buffer. :type num_steps: int :returns: A new buffer instance with allocated arrays. :rtype: TrainBuffer .. rubric:: Examples >>> buffer = TrainBuffer.new(env, 128) .. py:method:: add(step: int, new_data: Dict[str, Any]) -> None Add new data for a given step to the buffer. :param step: The step index to add data to. :type step: int :param new_data: Dictionary of new data to add for this step. :type new_data: dict :rtype: None .. py:method:: flatten() -> TrainBuffer Flatten the buffer for minibatch training. :returns: A new buffer with flattened arrays. :rtype: TrainBuffer .. rubric:: Examples >>> flat_buffer = buffer.flatten() .. py:method:: __setitem__(name: str, value: Any) -> None Set a buffer value by key. :param name: Name of the buffer key to set. :type name: str :param value: Value to assign to the buffer key. :type value: Any :rtype: None .. py:method:: generate_minibatches(num_epochs: int, minibatch_size: int) -> PPOBatch Yield minibatches for training. :param num_epochs: Number of epochs to iterate over the buffer. :type num_epochs: int :param minibatch_size: Size of each minibatch. :type minibatch_size: int :Yields: *PPOBatch* -- A minibatch of PPO training data. .. rubric:: Examples >>> for mb in buffer.generate_minibatches(3, 64): ... # train on mb .. py:class:: TensorDict(data: Union[Dict[str, torch.Tensor], torch.Tensor]) Utility class for handling dictionary-structured tensors, supporting batch operations. .. rubric:: Examples >>> td = TensorDict({'obs': torch.zeros(4, 3)}) .. py:attribute:: data .. py:method:: zeros(shape: Union[Dict[str, Tuple[int, Ellipsis]], Tuple[int, Ellipsis]]) -> TensorDict :classmethod: Create a TensorDict of zeros with the given shape. :param shape: Shape for each tensor or the single tensor. :type shape: dict or tuple :returns: A TensorDict of zeros. :rtype: TensorDict .. rubric:: Examples >>> td = TensorDict.zeros({'obs': (4, 3)}) .. py:method:: to(device: Union[torch.device, str]) -> TensorDict Move all tensors in the TensorDict to the specified device. :param device: The device to move tensors to. :type device: torch.device or str :returns: A new TensorDict with tensors on the specified device. :rtype: TensorDict .. rubric:: Examples >>> td = td.to("cuda") .. py:method:: __setitem__(indices: Any, other: TensorDict) -> None Set values in the TensorDict at the given indices. :param indices: Indices to set. :type indices: Any :param other: TensorDict containing values to set. :type other: TensorDict :rtype: None .. py:method:: __getitem__(indices: Any) -> TensorDict Retrieve values from the TensorDict at the given indices. :param indices: Indices to retrieve. :type indices: Any :returns: A new TensorDict with the selected values. :rtype: TensorDict .. rubric:: Examples >>> td_slice = td[0:2] .. py:method:: flatten(start_dim: int = 0, end_dim: int = -1) -> TensorDict Flatten tensors in the TensorDict along specified dimensions. :param start_dim: The first dimension to flatten. Default is 0. :type start_dim: int, optional :param end_dim: The last dimension to flatten. Default is -1. :type end_dim: int, optional :returns: A new TensorDict with flattened tensors. :rtype: TensorDict .. rubric:: Examples >>> td_flat = td.flatten(0, 1) .. py:method:: from_numpy(data: Union[Dict[str, numpy.ndarray], numpy.ndarray]) -> TensorDict :classmethod: Convert numpy arrays to a TensorDict. :param data: Dictionary of numpy arrays or a single numpy array. :type data: dict or np.ndarray :returns: A TensorDict with tensors converted from numpy arrays. :rtype: TensorDict .. rubric:: Examples >>> td = TensorDict.from_numpy({'obs': np.zeros((4, 3))}) .. py:class:: PPOTrainSummarizer(config: PPOAlgorithmConfig) Summarizer for PPO training statistics and logging. .. rubric:: Examples >>> summarizer = PPOTrainSummarizer(config) .. py:attribute:: log_action_id .. py:method:: clear() -> None Clear the summary statistics log. :rtype: None .. py:method:: initialize_iteration(env: gymnasium.Env) -> None Initialize logging for a new training iteration. :param env: The environment instance for the iteration. :type env: gym.Env :rtype: None .. py:method:: finalize_iteration() -> None Finalize and aggregate statistics for the iteration. :rtype: None .. py:method:: add_episode_step(action: torch.Tensor, log_prob: torch.Tensor, reward: Union[numpy.ndarray, torch.Tensor], done: Union[numpy.ndarray, torch.Tensor], info: List[dict]) -> None Log statistics for each episode step. :param action: Actions taken at this step. :type action: torch.Tensor :param log_prob: Log probabilities of the actions. :type log_prob: torch.Tensor :param reward: Rewards received at this step. :type reward: np.ndarray or torch.Tensor :param done: Done flags for each environment. :type done: np.ndarray or torch.Tensor :param info: Additional info for each environment. :type info: list of dict :rtype: None .. py:method:: add_training_iteration(value: torch.Tensor, advantage: torch.Tensor, pg_loss: torch.Tensor, v_loss: torch.Tensor, entropy_loss: torch.Tensor, loss: torch.Tensor) -> None Log statistics for each training minibatch. :param value: Value function predictions. :type value: torch.Tensor :param advantage: Advantage estimates. :type advantage: torch.Tensor :param pg_loss: Policy gradient loss. :type pg_loss: torch.Tensor :param v_loss: Value loss. :type v_loss: torch.Tensor :param entropy_loss: Entropy loss. :type entropy_loss: torch.Tensor :param loss: Total loss. :type loss: torch.Tensor :rtype: None .. py:method:: summarize() -> Dict[str, float] Compute summary statistics for the current log. :returns: Dictionary of summary statistics. :rtype: dict .. rubric:: Examples >>> stats = summarizer.summarize() .. py:method:: write(summary: Optional[Dict[str, float]] = None) -> Dict[str, float] Pretty-print and log the summary statistics. :param summary: Precomputed summary statistics. If None, will compute from log. :type summary: dict, optional :returns: The summary statistics that were logged. :rtype: dict .. rubric:: Examples >>> summarizer.write()