duo_ai.core

Submodules

Classes

Algorithm

Abstract base class for all algorithms in the Duo framework.

CoordEnv

Environment for coordinating between novice and expert policies.

Evaluator

Evaluator for running policy evaluation on environments and summarizing results.

Policy

Abstract base class for all policies in the Duo framework.

Package Contents

class duo_ai.core.Algorithm[source]

Bases: abc.ABC

Abstract base class for all algorithms in the Duo framework.

This class defines the interface that all algorithm implementations must follow.

Examples

>>> class MyAlgorithm(Algorithm):
...     def train(self, *args, **kwargs):
...         pass
abstract train(*args, **kwargs) None[source]

Train the model or algorithm using the provided arguments.

Parameters:
  • *args – Variable length argument list.

  • **kwargs – Arbitrary keyword arguments.

Return type:

None

Examples

>>> algo.train(data)
class duo_ai.core.CoordEnv(config: CoordinationConfig, base_env: gymnasium.Env, novice: duo.core.Policy, expert: duo.core.Policy, open_novice: bool = True, open_expert: bool = False)[source]

Bases: gymnasium.Env

Environment for coordinating between novice and expert policies.

This class wraps a base environment and enables switching between a novice and expert policy, applying costs for expert queries and agent switching.

Examples

>>> config = CoordinationConfig()
>>> base_env = gym.make(...)
>>> novice = ...
>>> expert = ...
>>> env = CoordEnv(config, base_env, novice, expert)
config_cls
NOVICE = 0
EXPERT = 1
config
base_env
novice
expert
open_novice = True
open_expert = False
action_space
observation_space
expert_query_cost_per_action = None
switch_agent_cost_per_action = None
property num_envs: int

Number of parallel environments.

Returns:

Number of parallel environments.

Return type:

int

Examples

>>> n = env.num_envs
set_costs(base_penalty: float) None[source]

Set the cost per action for expert queries and agent switching.

Parameters:

base_penalty (float) – The reward value per action.

Return type:

None

Examples

>>> env.set_costs(0.05)
reset() Dict[str, Any][source]

Reset the coordination environment to an initial state.

Returns:

The initial observation of the environment, including:
  • ”base_obs”: The initial observation from the base environment.

  • ”novice_hidden”: Numpy array of hidden features from the novice policy.

  • ”novice_logits”: Numpy array of output logits from the novice policy.

  • ”expert_hidden”: Numpy array of hidden features from the expert policy (if open_expert).

  • ”expert_logits”: Numpy array of output logits from the expert policy (if open_expert).

Return type:

dict

Examples

>>> obs = env.reset()
_reset_agents(done: numpy.ndarray) None[source]

Reset the internal state of the novice and expert agents.

Parameters:

done (numpy.ndarray) – Boolean array indicating which episodes in a batch require a reset.

Return type:

None

Examples

>>> env._reset_agents(np.array([True, False]))
step(action: numpy.ndarray) Tuple[Dict[str, Any], numpy.ndarray, numpy.ndarray, List[Dict[str, Any]]][source]

Advance the environment by one step using the provided action.

Parameters:

action (numpy.ndarray) – The action(s) to take in the environment. Should be a numpy array indicating which agent acts.

Returns:

  • obs (dict) –

    The next observation of the environment, including:
    • ”base_obs”: The observation from the base environment.

    • ”novice_hidden”: Numpy array of hidden features from the novice policy.

    • ”novice_logits”: Numpy array of output logits from the novice policy.

    • ”expert_hidden”: Numpy array of hidden features from the expert policy (if open_expert).

    • ”expert_logits”: Numpy array of output logits from the expert policy (if open_expert).

  • reward (numpy.ndarray) – The reward(s) obtained from the environment after taking the action.

  • done (numpy.ndarray) – Boolean flag(s) indicating whether the episode has ended for each environment.

  • info (list of dict) – Additional information from the environment for each agent or environment instance.

Raises:

Exception – Propagates any exceptions raised by the underlying environment’s step method.

Examples

>>> obs, reward, done, info = env.step(action)
_compute_base_action(action: numpy.ndarray) numpy.ndarray[source]

Compute the environment-specific action for each agent.

Parameters:

action (numpy.ndarray) – Array indicating which agent (novice or expert) acts for each environment.

Returns:

Array of actions to be passed to the base environment.

Return type:

numpy.ndarray

Examples

>>> base_action = env._compute_base_action(action)
_get_obs() Dict[str, Any][source]

Return the current observation for the coordination environment.

Returns:

A dictionary containing:
  • ”base_obs”: The current observation from the base environment.

  • ”novice_hidden”: Numpy array of hidden features from the novice policy (if open_novice).

  • ”novice_logits”: Numpy array of output logits from the novice policy (if open_novice).

  • ”expert_hidden”: Numpy array of hidden features from the expert policy (if open_expert).

  • ”expert_logits”: Numpy array of output logits from the expert policy (if open_expert).

Return type:

dict

Examples

>>> obs = env._get_obs()
_get_reward(base_reward: numpy.ndarray, action: numpy.ndarray, done: numpy.ndarray) numpy.ndarray[source]

Compute the reward for the current step, including costs for expert queries and agent switching.

Parameters:
  • base_reward (numpy.ndarray) – The base reward from the environment.

  • action (numpy.ndarray) – The action(s) taken (novice or expert).

  • done (numpy.ndarray) – Boolean flag(s) indicating whether the episode has ended for each environment.

Returns:

The computed reward(s) after applying costs.

Return type:

numpy.ndarray

Examples

>>> reward = env._get_reward(base_reward, action, done)
close() None[source]

Close the coordination environment and release any resources held.

Return type:

None

Examples

>>> env.close()
class duo_ai.core.Evaluator(config: EvaluatorConfig, env: gym.Env)[source]

Evaluator for running policy evaluation on environments and summarizing results.

Examples

>>> evaluator = Evaluator(EvaluatorConfig(), env)
>>> summary = evaluator.evaluate(policy)
config_cls
config
env
evaluate(policy: duo_ai.core.Policy, num_episodes: int | None = None) Dict[str, Any][source]

Evaluate a policy on the environment and summarize the results.

Parameters:
  • policy (duo.core.Policy) – The policy to evaluate. Must implement an act method and have a .model attribute.

  • num_episodes (int, optional) – Number of episodes to run. If None, uses value from config.

Returns:

A dictionary mapping split names to summary statistics for each evaluation.

Return type:

dict

Examples

>>> summary = evaluator.evaluate(policy, num_episodes=100)
>>> print(summary['reward_mean'])
_eval_one_iteration(policy: duo_ai.core.Policy, env: gym.Env) None[source]

Run a single evaluation iteration for the policy on the environment.

Parameters:
  • policy (duo.core.Policy) – The policy to evaluate.

  • env (gym.Env) – The environment instance to evaluate on.

Return type:

None

class duo_ai.core.Policy[source]

Bases: abc.ABC

Abstract base class for all policies in the Duo framework.

This class defines the interface that all policy implementations must follow.

Examples

>>> class MyPolicy(Policy):
...     def act(self, obs):
...         return ...
...     def reset(self, done):
...         pass
...     def set_params(self, params):
...         pass
...     def get_params(self):
...         return {}
...     def train(self):
...         pass
...     def eval(self):
...         pass
abstract act(obs: Any, *args: Any, **kwargs: Any) torch.Tensor[source]

Select an action based on the given observation.

Parameters:
  • obs (Any) – The current observation from the environment.

  • *args (Any) – Additional positional arguments.

  • **kwargs (Any) – Additional keyword arguments.

Returns:

The selected action. The format depends on the policy implementation.

Return type:

torch.Tensor

Examples

>>> action = policy.act(obs)
abstract reset(done: numpy.ndarray) None[source]

Reset the internal state of the policy.

This method should be overridden by subclasses to implement any necessary logic for resetting the policy’s state to its initial configuration, such as clearing hidden states or episode-specific variables.

Parameters:

done (numpy.ndarray) – Boolean array indicating which episodes in a batch require a reset.

Return type:

None

Examples

>>> policy.reset(done)
abstract set_params(params: Dict[str, Any]) None[source]

Set the parameters of the policy.

This method should be overridden by subclasses to update the policy’s parameters based on the provided dictionary, such as loading model weights or hyperparameters.

Parameters:

params (dict) – A dictionary containing the new parameters for the policy.

Return type:

None

Examples

>>> policy.set_params(params)
abstract get_params() Dict[str, Any][source]

Returns the current parameters of the policy.

This method should be overridden by subclasses to return the relevant parameters of the policy, such as model weights or hyperparameters.

Returns:

A dictionary containing the current parameters of the policy.

Return type:

dict

Examples

>>> params = policy.get_params()
abstract train() None[source]

Set the policy to training mode.

This method should be overridden by subclasses to implement any necessary logic for preparing the policy for training, such as setting dropout or batch normalization layers.

Return type:

None

Examples

>>> policy.train()
abstract eval() None[source]

Set the policy to evaluation mode.

This method should be overridden by subclasses to implement any necessary logic for preparing the policy for evaluation, such as disabling dropout or batch normalization layers.

Return type:

None

Examples

>>> policy.eval()