Source code for mlpy.agents.modules

from __future__ import division, print_function, absolute_import
# noinspection PyUnresolvedReferences
from six.moves import range

import math
from abc import abstractmethod

import numpy as np

from ..modules import Module
from ..modules.patterns import RegistryInterface
from ..tools.log import LoggingMgr
from ..auxiliary.misc import stdout_redirected, listify
from ..constants import eps
from ..mdp.stateaction import Experience, Action
from ..learners import LearnerFactory


[docs]class AgentModuleFactory(object): # noinspection PyUnusedLocal """The agent module factory. An instance of an agent module can be created by passing the agent module type. The module type is the name of the module. The set of agent modules can be extended by inheriting from :class:`IAgentModule`. However, for the agent module to be registered, the custom module must be imported by the time the agent module factory is called. Examples -------- >>> from mlpy.agents.modules import AgentModuleFactory >>> AgentModuleFactory.create('learningmodule', 'qlearner', max_steps=10) This creates a :class:`.LearningModule` instance performing q-learning with max_steps set to 10. >>> def get_reward(state, action): ... return 1.0 ... >>> AgentModuleFactory().create('learningmodule', 'qlearner', get_reward, ... max_steps=10) This creates a q-learning learning module instance passing a reward callback function and sets max_steps to 10. >>> from mlpy.mdp.discrete import DiscreteModel >>> from mlpy.planners.discrete import ValueIteration >>> >>> planner = ValueIteration(DiscreteModel(['out', 'in', 'kick'])) >>> >>> AgentModuleFactory().create('learningmodule', 'rldtlearner', None, planner, ... max_steps=10) This creates a learning module using the :class:`.RLDTLearner`. The parameters for the learner are appended to the end of the argument list. Notice that since positional arguments are used to pass the planner, the reward callback must be accounted for by setting it to `None`. Alternatively non-positional arguments can be used: >>> AgentModuleFactory().create('learningmodule', 'rldtlearner', planner=planner, ... max_steps=10) Notes ----- The agent module factory is being called by the :class:`.Agent` during initialization to create the agents controller. """ @staticmethod
[docs] def create(_type, *args, **kwargs): """Create an agent module of the given type. Parameters ---------- _type : str The agent module type. Valid agent module types: followpolicymodule The agent follows a given policy: a :class:`.FollowPolicyModule` is created. learningmodule The agent learns according to a specified learner: a :class:`.LearningModule` is created. usermodule The agent is user controlled via keyboard or PS2 controller: a :class:`.UserModule` is created. args : tuple, optional Positional arguments passed to the class of the given type for initialization. kwargs : dict, optional Non-positional arguments passed to the class of the given type for initialization. Returns ------- IAgentModule : Agent model instance of given type. """ # noinspection PyUnresolvedReferences return IAgentModule.registry[_type.lower()](*args, **kwargs)
[docs]class IAgentModule(Module): """Agent module base interface class. The agent (:class:`~mlpy.agents.Agent`) uses an agent module, which specifies how the agent is controlled. Valid agent module types are: followpolicymodule The agent follows a given policy (:class:`FollowPolicyModule`) learningmodule The agent learns according to a specified learner (:class:`LearningModule`). usermodule The agent is user controlled via keyboard or PS2 controller (:class:`UserModule`). Notes ----- Every class inheriting from IAgentModule must implement :meth:`get_next_action`. """ __metaclass__ = RegistryInterface def __init__(self): super(IAgentModule, self).__init__() self._logger = LoggingMgr().get_logger(self._mid) self._completed = False self._state = None
[docs] def reset(self, t, **kwargs): """Reset the agent module. Parameters ---------- t : float The current time (sec) kwargs : dict, optional Non-positional parameters, optional. """ super(IAgentModule, self).reset(t, **kwargs) self._state = None
[docs] def terminate(self, value): """Set the termination flag. Parameters ---------- value : bool The value of the termination flag. """ self._completed = value
[docs] def is_complete(self): """Check if the agent module has completed. Returns ------- bool : Whether the agent module has completed or not. """ return self._completed
# noinspection PyMethodMayBeStatic
[docs] def execute(self, state): """Execute the agent module. This method can optionally be overwritten. Parameters ---------- state : State The current state """ pass
@abstractmethod
[docs] def get_next_action(self): """Return the next action the agent will execute. Returns ------- Action : The next action Raises ------ NotImplementedError If the child class does not implement this function. Notes ----- This is an abstract method and *must* be implemented by its deriving class. """ raise NotImplementedError
[docs]class LearningModule(IAgentModule): # noinspection PyUnusedLocal """Learning agent module. The learning agent module allows the agent to learn from passed experiences. Parameters ---------- learner_type : str The learning type. Based on the type the appropriate learner module is created. Valid learning types are: qlearner The learner performs q-learning, a reinforcement learning variant (:class:`~mlpy.learners.online.rl.QLearner`). rldtlearner The learner performs reinforcement learning with decision trees (RLDT), a method introduced by Hester, Quinlan, and Stone which builds a generalized model for the transitions and rewards of the environment (:class:`~mlpy.learners.online.rl.RLDTLearner`). apprenticeshiplearner The learner performs apprenticeship learning via inverse reinforcement learning, a method introduced by Abbeel and Ng which strives to imitate the demonstrations given by an expert (:class:`~mlpy.learners.offline.irl.ApprenticeshipLearner`). incrapprenticeshiplearner The learner incrementally performs apprenticeship learning via inverse reinforcement learning. Inverse reinforcement learning assumes knowledge of the underlying model. However, this is not always feasible. The incremental apprenticeship learner updates its model after every iteration by executing the current policy (:class:`~mlpy.learners.offline.irl.IncrApprenticeshipLearner`). cb_get_reward : callable, optional A callback function to retrieve the reward based on the current state and action. Default is `None`. The function must be of the following format: >>> def callback(state, action): >>> pass learner_params : dict, optional Parameters passed to the learner for initialization. See the appropriate learner type for more information. Default is None. """ def __init__(self, learner_type, cb_get_reward=None, *args, **kwargs): super(LearningModule, self).__init__() self._learner = LearnerFactory.create(learner_type, *args, **kwargs) self._cb_get_reward = cb_get_reward if self._cb_get_reward is not None and not hasattr(self._cb_get_reward, '__call__'): raise ValueError("Expected a function or None, but got %r" % type(cb_get_reward)) self._state = None self._action = None
[docs] def reset(self, t, **kwargs): """Reset the module for the next iteration. Offline learning is performed at the end of the iteration. Parameters ---------- t : float The current time (sec). kwargs : dict, optional Non-positional parameters. """ super(LearningModule, self).reset(t, **kwargs) if self._learner.type == 'offline': self.terminate(self._learner.learn()) self._learner.reset(t, **kwargs)
[docs] def execute(self, state): """Execute the learner. Update models with the current experience (:class:`~mlpy.mdp.stateaction.Experience`). Additionally, online learning is performed at this point. Parameters ---------- state : State The current state. """ experience = Experience(self._state, self._action, state, self._cb_get_reward(self._state, self._action)) self._learner.execute(experience) if self._state is not None: if self._learner.type == 'online': if self._state is not None: self._learner.learn(experience) self._state = state
[docs] def get_next_action(self): """Return the next action. The next action the agent will execute is selected based on the current state and the policy that has been derived by the learner so far. Returns ------- Action : The next action """ self._action = Action.get_noop_action() if self._state is not None: self._action = self._learner.choose_action(self._state) return self._action
[docs]class FollowPolicyModule(IAgentModule): """The follow policy agent module. The follow policy agent module follows a given policy choosing the next action based on that policy. Parameters ---------- policies : array_like, shape (`n`, `nfeatures`, `ni`) A list of policies (i.e., action sequences), where `n` is the number of policies, `nfeatures` is the number of action features, and `ni` is the sequence length. niter : int, optional The number of times each policy is repeated. Default is 1. start : int, optional The first policy to execute. Default is 0. """ def __init__(self, policies, niter=None, start=None): super(FollowPolicyModule, self).__init__() self._policies = None """:type: ndarray""" self._npolicies = None self._policy_cntr = None self._policy_len = None self._iter = 0 self._current = start if start is not None else 0 self._niter = niter if niter is not None else 1 self.change_policies(policies) self._logger.info("Current policy id: {0}".format(self._current))
[docs] def reset(self, t, **kwargs): """Reset the module for the next iteration. Offline learning is performed at the end of the iteration. Parameters ---------- t : float The current time (sec). kwargs : dict, optional Non-positional parameters. """ super(FollowPolicyModule, self).reset(t, **kwargs) self._iter += 1 if self._iter >= self._niter: self._iter = 0 self._current += 1 if self._current >= self._npolicies: self.terminate(True) return self._policy_len = self._policies[self._current].shape[1] self._policy_cntr = -1 self._logger.info("Current policy id: {0}".format(self._current))
[docs] def change_policies(self, policies): """ Exchange the list of policies. Parameters ---------- policies : array_like, shape (`n`, `nfeatures`, `ni`) A list of policies (i.e., action sequences), where `n` is the number of policies, `nfeatures` is the number of action features, and `ni` is the sequence length. Raises ------ IndexError If the list is empty. """ self._npolicies = policies.shape[0] if self._npolicies < 1: raise IndexError("No policies available.") self._policies = policies self._policy_cntr = -1 self._policy_len = self._policies[self._current].shape[1]
[docs] def get_next_action(self): """Return the next action. The next action the agent will execute is selected based on the current state and the policy that has been derived by the learner so far. Returns ------- Action : The next action """ action = None self._policy_cntr += 1 if self._policy_cntr < self._policy_len: action = Action(self._policies[self._current][:, self._policy_cntr]) self._logger.debug("'%s'" % action) return action
[docs]class UserModule(IAgentModule): """The user agent module. With the user agent module the agent is controlled by the user via the keyboard or a PS2 controller. The mapping of keyboard/joystick keys to events is given through a configuration file. Parameters ---------- events_map : ConfigMgr The configuration mapping keyboard/joystick keys to events that are translated into actions. :Example: :: { "keyboard": { "down": { "pygame.K_ESCAPE": "QUIT", "pygame.K_SPACE": [-1.0], "pygame.K_LEFT" : [-0.004], "pygame.K_RIGHT": [0.004] } } } niter : int The number of episodes to capture.. Notes ----- This agent module is requires the `PyGame <http://www.pygame.org/>`_ library. """ def __init__(self, events_map, niter=None): super(UserModule, self).__init__() import pygame self.pygame = pygame self._niter = niter if niter is not None else 1 """:type: int""" self._iter = 0 self._effectors = ["LArm"] self._effector_iter = 0 self._events_map = events_map """:type: ConfigMgr""" try: self._discretize_joystick_axis = self._events_map.get("joystick.axis.discretize") except KeyError: self._discretize_joystick_axis = False self.pygame.init() self._js = None if self.pygame.joystick.get_count() > 0: self._js = self.pygame.joystick.Joystick(0) self._js.init() # if not self._js: self.pygame.display.set_mode((320, 240)) self.pygame.display.set_caption('Read Keyboard Input') self.pygame.mouse.set_visible(0)
[docs] def reset(self, t, **kwargs): """Reset the module for the next iteration. Offline learning is performed at the end of the iteration. Parameters ---------- t : float The current time (sec). kwargs : dict, optional Non-positional parameters. """ super(UserModule, self).reset(t, **kwargs) self._effector_iter = 0 self._iter += 1 if self._iter >= self._niter: self.terminate(True)
[docs] def exit(self): """ Exit the agent module. """ super(UserModule, self).exit() self.pygame.quit()
[docs] def get_next_action(self): """Return the next action. Return the next action the agent will execute depending on the key/button pressed. Returns ------- Action : The next action """ action = np.zeros(Action.nfeatures) for event in self.pygame.event.get(): if event.type == self.pygame.QUIT: return None if event.type == self.pygame.KEYDOWN: try: button_value = self._events_map.get("keyboard.down." + str(event.key)) if button_value: if button_value == "QUIT": self.pygame.event.post(self.pygame.event.Event(self.pygame.QUIT)) break else: action = np.asarray(button_value) except KeyError: pass if event.type == self.pygame.JOYBUTTONDOWN: try: button_value = self._events_map.get("joystick.button.down." + str(event.dict['button'])) if button_value: if button_value == "QUIT": self.pygame.event.post(self.pygame.event.Event(self.pygame.QUIT)) break else: if isinstance(button_value, dict): operation = button_value["operation"] if operation == "effectors": self._effector_iter += 1 if self._effector_iter >= len(button_value["effectors"]): self._effector_iter = 0 self._effectors = listify(button_value["effectors"][self._effector_iter]) self._logger.info(self._effectors) except KeyError: pass name = Action.get_name(action) if self._js: with stdout_redirected(): num_axis = self._js.get_numaxes() for i in range(num_axis): try: config = self._events_map.get("joystick.axis." + str(i)) if config: with stdout_redirected(): axis = self._js.get_axis(i) axis = axis if abs(axis) > eps else 0.0 if not axis == 0: descr = Action.description[name]["descr"] for e in self._effectors: if e in config["effectors"]: action[descr[e][config["label"]]] = axis * config["scale"] except KeyError: pass with stdout_redirected(): num_hats = self._js.get_numhats() for i in range(num_hats): try: with stdout_redirected(): hat = self._js.get_hat(i) for j, hat in enumerate(hat): config = None if not hat == 0: config = self._events_map.get("joystick.hat." + ("x", "y")[j]) if config: descr = Action.description[name]["descr"] for e in self._effectors: if e in config["effectors"]: action[descr[e][config["label"]]] = hat * config["scale"] except KeyError: pass if self._discretize_joystick_axis: action = self._discretize(action) action = Action(action, name) return action
# noinspection PyMethodMayBeStatic def _discretize(self, vec): new_vec = [0.0 if -0.5 <= vec[0] <= 0.5 else vec[0]/abs(vec[0]), 0.0 if -0.5 <= vec[1] <= 0.5 else vec[1]/abs(vec[1]), 0.0 if -0.5 <= vec[2] <= 0.5 else vec[2]/abs(vec[2])] uvec = [0.0, 0.0, 0.0] mag = math.sqrt(math.pow(new_vec[0], 2) + math.pow(new_vec[1], 2) + math.pow(new_vec[2], 2)) if not mag == 0.0: # noinspection PyTypeChecker uvec = np.true_divide(np.asarray(vec), mag) uvec = np.asarray(uvec) * 0.01 # noinspection PyUnresolvedReferences return uvec.tolist() + [0.0, 0.0, 0.0]