from __future__ import division, print_function, absolute_import

import os

from import LoggingMgr
from ...planners.explorers.discrete import DiscreteExplorer
from ...mdp.discrete import DiscreteModel
from . import IOnlineLearner

__all__ = ['RLLearner', 'QLearner', 'RLDTLearner']

# noinspection PyAbstractClass
[docs]class RLLearner(IOnlineLearner): """The reinforcement learning learner interface. Parameters ---------- max_steps : int, optional The maximum number of steps in an iteration. Default is 100. filename : str, optional The name of the file to save the learner state to after each iteration. If None is given, the learner state is not saved. Default is None. profile : bool, optional Turn on profiling at which point profiling data is collected and saved to a text file. Default is False. """ def __init__(self, max_steps=None, filename=None, profile=False): super(RLLearner, self).__init__(filename) self._logger = LoggingMgr().get_logger(self._mid) self._step_iter = 0 self._episode_cntr = 1 self._cum_reward = 0 self._num_wins = 0 self._max_steps = max_steps if max_steps is not None else 100 self._profile = profile def __getstate__(self): data = super(RLLearner, self).__getstate__() data.update(self.__dict__.copy()) remove_list = ('_id', '_logger') for key in remove_list: if key in data: del data[key] return data def __setstate__(self, d): super(RLLearner, self).__setstate__(d) for name, value in d.iteritems(): setattr(self, name, value) self._logger = LoggingMgr().get_logger(self._mid) self._logger.debug("Episode=%d", self._episode_cntr)
[docs] def reset(self, t, **kwargs): """Reset reinforcement learner. Parameters ---------- t : float The current time (sec) kwargs : dict, optional Non-positional parameters, optional. """ super(RLLearner, self).reset(t, **kwargs) self._step_iter = 0
[docs] def save(self, filename): """Save the learners state. If profiling is turned on, profile information is saved to a `txt` file with the same name. Parameters ---------- filename : str The filename to save the information to. """ super(RLLearner, self).save(filename) if self._profile: filename = os.path.splitext(self._filename)[0] with open(filename + ".txt", "a") as f: win_ratio = float(self._num_wins) / float(self._episode_cntr) f.write("%d, %d, %.2f, %.2f\n" % (self._episode_cntr, self._num_wins, self._cum_reward, win_ratio))
[docs] def learn(self, experience=None): """Learn a policy from the experience. Parameters ---------- experience : Experience The agent's experience consisting of the previous state, the action performed in that state, the current state and the reward awarded. """ if self._profile and experience.reward is not None: if experience.reward > 0.0: self._num_wins += 1 self._cum_reward += experience.reward self._logger.debug("cumReward: %.2f", self._cum_reward)
[docs]class QLearner(RLLearner): """Performs q-learning. Q-learning is a reinforcement learning variant. Parameters ---------- explorer : Explorer, optional The exploration strategy used. Default is no exploration. max_steps : int, optional The maximum number of steps in an iteration. Default is 100 alpha : float, optional The learning rate. Default is 0.5. gamma : float, optional The discounting factor. Default is 0.9. filename : str, optional The name of the file to save the learner state to after each iteration. If None is given, the learner state is not saved. Default is None. profile : bool, optional Turn on profiling at which point profiling data is collected and saved to a text file. Default is False. """ def __init__(self, explorer=None, max_steps=None, alpha=None, gamma=None, filename=None, profile=False): super(QLearner, self).__init__(max_steps, filename, profile) self._model = DiscreteModel() self._explorer = explorer if explorer is not None else DiscreteExplorer() """:type: Explorer""" self._alpha = alpha if alpha is not None else 0.5 self._gamma = gamma if gamma is not None else 0.9
[docs] def execute(self, experience): """Execute learning specific updates. Learning specific updates are performed, e.g. model updates. Parameters ---------- experience : Experience The actor's current experience consisting of previous state, the action performed in that state, the current state, and the reward awarded. """ self._model.update(experience)
[docs] def learn(self, experience=None): """ Learn a policy from the experience. By updating the Q table according to the experience a policy is learned. Parameters ---------- experience : Experience The actor's current experience consisting of previous state, the action performed in that state, the current state, and the reward awarded. """ super(QLearner, self).learn(experience) info = self._model.statespace[experience.state] info2 = self._model.statespace[experience.next_state] qvalue = info.q[experience.action] maxq = max([info2.q[a] for a in self._model.get_actions(experience.next_state)]) delta = experience.reward + self._gamma * maxq - qvalue info.q[experience.action] = qvalue + self._alpha * delta self._logger.debug("%s action=%s reward=%.2f %s d=%.2f", experience.state, experience.action, experience.reward, experience.next_state, delta) self._logger.debug("\tq_old=%.2f visits=%d", qvalue, info.models[experience.action].visits) self._logger.debug("\tq_new=%.2f", info.q[experience.action])
[docs] def choose_action(self, state): """Choose the next action The next action is chosen according to the current policy and the selected exploration strategy. Parameters ---------- state : State The current state. Returns ------- Action : The chosen action. """ self._model.add_state(state) action = None if self._step_iter < self._max_steps: actions = self._model.get_actions(state) info = self._model.statespace[state] action = self._explorer.choose_action(actions, [info.q[a] for a in actions]) self._logger.debug("state=%s act=%s value=%.2f", state, action, self._model.statespace[state].q[action]) return action
[docs]class RLDTLearner(RLLearner): """Performs reinforcement learning using decision trees. Reinforcement learning using decision trees (RL-DT) use decision trees to build the transition and reward models as described by Todd Hester and Peter Stone [1]_. Parameters ---------- planner : IPlanner The planner to use to determine the best action. max_steps : int, optional The maximum number of steps in an iteration. Default is 100. filename : str, optional The name of the file to save the learner state to after each iteration. If None is given, the learner state is not saved. Default is None. profile : bool, optional Turn on profiling at which point profiling data is collected and saved to a text file. Default is False. References ---------- .. [1] Hester, Todd, and Peter Stone. "Generalized model learning for reinforcement learning in factored domains." Proceedings of The 8th International Conference on Autonomous Agents and Multiagent Systems-Volume 2. International Foundation for Autonomous Agents and Multiagent Systems, 2009. """ def __init__(self, planner, max_steps=None, filename=None, profile=False): super(RLDTLearner, self).__init__(max_steps, filename, profile) self._do_plan = True self._planner = planner def __getstate__(self): data = super(RLDTLearner, self).__getstate__() data.update({'_planner': self._planner}) return data def __setstate__(self, d): super(RLDTLearner, self).__setstate__(d) for name, value in d.iteritems(): setattr(self, name, value) self._do_plan = False
[docs] def execute(self, experience): """Execute learning specific updates. Learning specific updates are performed, e.g. model updates. Parameters ---------- experience : Experience The actor's current experience consisting of previous state, the action performed in that state, the current state, and the reward awarded. """ self._do_plan = self._planner.model.update(experience)
[docs] def learn(self, experience=None): """Learn a policy from the experience. A policy is learned from the experience by building the MDP model. Parameters ---------- experience : Experience The actor's current experience consisting of previous state, the action performed in that state, the current state, and the reward awarded. """ super(RLDTLearner, self).learn(experience) if self._do_plan: self._planner.plan()
[docs] def choose_action(self, state): """Choose the next action The next action is chosen according to the current policy and the selected exploration strategy. Parameters ---------- state : State The current state. Returns ------- Action : The chosen action. """ action = None if self._step_iter < self._max_steps: action = self._planner.get_next_action(state) self._step_iter += 1 return action