from __future__ import division, print_function, absolute_import
# noinspection PyUnresolvedReferences
from six.moves import range
import sys
from datetime import datetime
import numpy as np
from ...tools.log import LoggingMgr
from ...mdp.stateaction import State, RewardFunction
from . import IOfflineLearner
__all__ = ['ApprenticeshipLearner', 'IncrApprenticeshipLearner']
# noinspection PyAbstractClass
[docs]class ApprenticeshipLearner(IOfflineLearner):
"""The apprenticeship learner.
The apprenticeship learner is an inverse reinforcement learner, a method introduced
by Abbeel and Ng [1]_ which strives to imitate the demonstrations given by an expert.
Parameters
----------
obs : array_like, shape (`n`, `nfeatures`, `ni`)
List of trajectories provided by demonstrator, which the learner
is trying to emulate, where `n` is the number of sequences, `ni` is
the length of the i_th demonstration, and each demonstration has
`nfeatures` features.
planner : IPlanner
The planner to use to determine the best action.
method : {'projection', 'maxmargin'}, optional
The IRL method to employ. Default is `projection`.
max_iter : int, optional
The maximum number of iteration after which learning
will be terminated. It is assumed that a policy close enough to
the experts demonstrations was found. Default is `inf`.
thresh : float, optional
The learning is considered having converged to the
demonstrations once the threshold has been reach. Default is `eps`.
gamma : float, optional
The discount factor. Default is 0.9.
nsamples : int, optional
The number of samples taken during Monte Carlo sampling. Default is 100.
max_steps : int, optional
The maximum number of steps in an iteration (during MonteCarlo sampling).
Default is 100.
filename : str, optional
The name of the file to save the learner state to after each iteration.
If None is given, the learner state is not saved. Default is None.
Other Parameters
----------------
mix_policies : bool
Whether to create a new policy by mixing from policies seen so far or by
considering the best valued action. Default is False.
rescale : bool
If set to True, the feature expectations are rescaled to be between 0 and 1.
Default is False.
visualize : bool
Visualize each iteration of the IRL step if set to True. Default is False.
See Also
--------
:class:`IncrApprenticeshipLearner`
Notes
-----
Method **maxmargin** using a QP solver to solve the following equation:
.. math::
\\begin{aligned}
& \\underset{t, w}{\\text{maximize}} & & t \\\\
& \\text{subject to} & & w^T \\mu_E > w^T \\mu^{(j)} + t, j=0, \\ldots, i-1 \\\\
& & & ||w||_2 \\le 1.
\\end{aligned}
and mixing policies is realized by solving the quadratic problem:
.. math::
\\begin{aligned}
& \\text{minimize} & & ||\\mu_E - \\mu||_2 \\\\
& \\text{subject to} & & \\mu = \\sum_i (\\lambda_i \\mu^{(i)}) \\\\
& & & \\lambda_i \\ge 0 \\\\
& & & \\sum_i \\lambda_i = 1
\\end{aligned}
The QP solver used for the implementation is the IBM ILOG CPLEX Optimizer which
requires a separate license. If you are unable to obtain a license, the 'projection'
method can be used instead.
References
----------
.. [1] Abbeel, Pieter, and Andrew Y. Ng. "Apprenticeship learning via inverse
reinforcement learning." Proceedings of the twenty-first international
conference on Machine learning. ACM, 2004.
"""
def __init__(self, obs, planner, method=None, max_iter=None, thresh=None, gamma=None,
nsamples=None, max_steps=None, filename=None, **kwargs):
super(ApprenticeshipLearner, self).__init__(filename)
self._logger = LoggingMgr().get_logger(self._mid)
self._iter = 0
self._i = 0
self._t = 0.0
# noinspection PyTypeChecker
RewardFunction.cb_get = staticmethod(lambda r, s: np.dot(s, RewardFunction.reward))
self._planner = planner
if self._planner is None:
raise AttributeError("The apprenticeship learner requires a planner.")
self._method = method if method is not None else 'projection'
if self._method not in ['maxmargin', 'projection']:
raise ValueError("%s is not a valid IRL method" % self._method)
self._max_iter = max_iter if max_iter is not None else sys.maxint
self._thresh = thresh if thresh is not None else np.finfo(float).eps
self._gamma = gamma if gamma is not None else 0.9
self._nsamples = nsamples if nsamples is not None else 100
self._max_steps = max_steps if max_steps is not None else 100
self._mix_policies = kwargs["mix_policies"] if "mix_policies" in kwargs else False
self._rescale = kwargs["rescale"] if "rescale" in kwargs else False
self._visualize = kwargs["visualize"] if "visualize" in kwargs else False
assert (len(obs) > 0)
if self._logger.level <= LoggingMgr.LOG_DEBUG:
for i, o in enumerate(obs):
self._logger.debug("Demonstration #{0}:".format(i + 1))
for j, state in enumerate(o.T):
self._logger.debug(" {0}: {1}".format(j + 1, State(state)))
self._mu_E = self._estimate_expert_mu(obs)
""":type : ndarray[float]"""
nfeatures = obs[0].shape[0]
self._mu = np.empty((0, nfeatures), float)
""":type : ndarray[ndarray[float]]"""
self._weights = np.empty((0, nfeatures), float)
""":type : ndarray[ndarray[float]]"""
if self._method == 'projection':
self._mu_bar = np.array([])
""":type: ndarray[float]"""
if self._method == 'maxmargin' or self._mix_policies:
import cplex
from cplex.exceptions import CplexError
self.cplex = cplex
self.CplexError = CplexError
def __getstate__(self):
data = super(ApprenticeshipLearner, self).__getstate__()
data.update(self.__dict__.copy())
remove_list = ('_id', '_logger', '_i')
for key in remove_list:
if key in data:
del data[key]
data['_iter'] = self._i
return data
def __setstate__(self, d):
super(ApprenticeshipLearner, self).__setstate__(d)
for name, value in d.iteritems():
setattr(self, name, value)
# noinspection PyTypeChecker
RewardFunction.cb_get = staticmethod(lambda r, s: np.dot(s, RewardFunction.reward))
RewardFunction.reward = self._weights[self._iter - 1]
self._logger = LoggingMgr().get_logger(self._mid)
[docs] def learn(self):
"""Learn the optimal policy via apprenticeship learning.
The apprenticeship learning algorithm for finding a policy :math:`\\tilde{\\pi}`,
that induces feature expectations :math:`\\mu(\\tilde{\\pi})` close to :math:`\\mu_E`
is as follows:
1. Randomly pick some policy :math:`\\pi^{(0)}`, compute (or approximate via Monte Carlo)
:math:`\\mu^{(0)} = \\mu(\\pi^{(0)})`, and set :math:`i=1`.
2. Compute :math:`t^{(i)} = \\underset{w:||w||_2 \\le 1}{\\text{max}}\\underset{j \in {0 \ldots (i-1)}}{\\text{min}} w^T(\\mu_E = \\mu^{(j)})`,
and let :math:`w^{(i)}` be the value of :math:`w` that attains this maximum. This can be achieved
by either the **max-margin** method or by the **projection** method.
3. If :math:`t^{(i)} \\le \\epsilon`, then terminate.
4. Using the RL algorithm, compute the optimal policy :math:`\\pi^{(i)}` for the MDP using rewards
:math:`R = (w^{(i)})^T \\phi`.
5. Compute (or estimate) :math:`\\mu^{(i)} = \\mu(\\pi^{(i)})`.
6. Set :math:`i = i + 1`, and go back to step 2.
"""
for self._i in range(self._iter, self._max_iter):
self._logger.info(
"Starting iteration {0}/{1} (error was {2})".format(self._i + 1, self._max_iter, self._t))
if self._perform_irl(self._i):
break
# save to file
self.save(self._filename)
def _perform_irl(self, i):
"""Perform the inverse reinforcement learning algorithm.
Parameters
----------
i : int
The current iteration count.
Returns
-------
bool :
In the case of the algorithm having converged on the optimal policy,
True is returned otherwise False. The algorithm is considered to have
converged to the optimal policy if either the performance is within a
certain threshold or if the maximum number of iterations has been reached.
"""
# 2. Estimate mu
self._logger.info("Estimate mu...")
self._mu = np.vstack([self._mu, self._estimate_mu()])
# 3. Compute maximum weights
if self._method == "projection":
self._t, weights, self._mu_bar = self._compute_projection(self._mu[i], self._mu_bar)
else:
self._t, weights = self._compute_max_margin(self._i + 1, self._mu)
self._weights = np.vstack([self._weights, weights])
self._logger.debug("\nweights = \n{0}\nmu = \n{1}\nmu_E = \n{2}".format(weights, self._mu[i], self._mu_E))
self._logger.info("Delta error is {0}".format(abs(self._t - self._thresh)))
# 4. Check termination
if self._t <= self._thresh:
self._logger.info("Converged to optimal solution")
# save to file
self.save(self._filename)
return True
# 5. Compute optimal policy \pi^(i)
# update reward model
RewardFunction.reward = weights
# value iteration
self._planner.plan()
if self._visualize:
self._planner.visualize()
return False
def _estimate_expert_mu(self, obs):
"""Estimate the experts feature expectations.
Calculate the empirical estimate for the experts feature expectation mu
from the demonstration trajectories.
Parameters
----------
obs: array_like, shape (`n`, `nfeatures`, `ni`)
List of trajectories provided by demonstrator, which the learner
is trying to emulate, where `n` is the number of sequences, `ni` is
the length of the i_th demonstration, and each demonstration has `nfeatures` features.
Returns
-------
ndarray[float] :
The experts feature expectations
"""
n = obs.shape[0]
nfeatures = obs[0].shape[0]
mu = np.zeros(nfeatures)
for o in obs:
for t, sample in enumerate(o.T):
mu += self._gamma ** t * np.array(sample)
mu /= n
if self._rescale:
mu *= (1 - self._gamma)
return mu
def _estimate_mu(self):
"""Estimate the feature expectations for the current policy.
Perform Monte Carlo sampling to estimate the feature expectations, mu,
for the policy.
Returns
-------
ndarray[float] :
The feature expectations.
"""
s0 = datetime.now()
mu = np.zeros(State.nfeatures, float)
self._planner.create_policy(self._find_closest if self._mix_policies else None)
for i in range(self._nsamples):
self._logger.info("Sample #{0}...".format(i + 1))
# select an initial state according to the initial state distribution
state = self._planner.model.sample()
mu = np.add(mu, state.get())
for t in range(2, self._max_steps + 1):
# choose the next state according to the chosen policy
action = self._planner.get_next_action(state, use_policy=True)
next_state = self._planner.model.sample(state, action)
if next_state is None:
# a state is reached for which no empirical transition data exists, it is uncertain where
# to go from here, so break out of the loop and this sample will be discarded
break
self._logger.info("state={0}, act={1}, next= {2}".format(state, action, next_state))
# calculate mu here
fe = self._gamma ** (t - 1) * np.array(next_state.get())
mu = np.add(mu, fe)
state = next_state
mu /= self._nsamples
if self._rescale:
mu *= (1 - self._gamma)
s1 = datetime.now()
delta = s1 - s0
self._logger.info("Estimation of feature expectations in %d:%d\n", delta.seconds, delta.microseconds)
return mu
def _compute_projection(self, mu, mu_bar):
""" Inverse reinforcement learning step.
Computation of orthogonal projection of mu_E onto the line through mu_bar(i-2)
and mu(i-1).
Parameters
----------
mu : array_like, shape (`nfeatures`,)
Feature expectations found in the previous iteration, with `nfeatures`
being the number of features.
mu_bar : array_like, shape (`nfeatures`,)
Vector with `nfeatures` being the number of features.
Returns
-------
t : float
The margin.
w : ndarray[float]
The feature weights.
mu_bar : ndarray[float]
Vector of the current iteration.
"""
if mu_bar.size == 0:
mu_bar = np.copy(mu)
else:
diff = mu - mu_bar
mu_bar = mu_bar + (np.dot(diff, self._mu_E - mu_bar) / np.dot(diff, diff)) * diff
w = self._mu_E - mu_bar
t = np.linalg.norm(w)
return t, w, mu_bar
def _compute_max_margin(self, idx, mu):
""" Inverse reinforcement learning step.
Guesses the reward function being optimized by the expert; i.e. find the reward
on which the expert does better by a 'margin' of `t`, then any of the policies
found previously.
Parameters
----------
idx : int
The current iteration step
mu : array_like, shape (`n`, `nfeatures`)
The set of feature expectations, where `n` is the number of iterations and
`nfeatures` is the number of features.
Returns
-------
t : float
The margin.
w : ndarray[float]
The feature weights
Notes
-----
Using the QP solver (CPLEX) solve the following equation:
.. math::
\\begin{aligned}
& \\underset{t, w}{\\text{maximize}} & & t \\
& \\text{subject to} & & w^T * mu_E > w^T * mu^j + t, j=0, \\ldots, idx-1 \\
& & & ||w||_2 <= 1.
\\end{aligned}
"""
try:
n = mu[0, :].shape[0]
cpx = self.cplex.Cplex()
cpx.objective.set_sense(cpx.objective.sense.maximize)
obj = [1.0] + [0.0] * n
lb = [0.0] + [-self.cplex.infinity] * n
ub = [self.cplex.infinity] * (n + 1)
names = ["t"]
for i in range(n):
names.append("w{0}".format(i))
cpx.variables.add(obj=obj, lb=lb, ub=ub, names=names)
# add linear constraints:
# w^T * mu_E >= w^T * mu^j + t, j=0,...,idx-1
# => -t + w^T * (mu_E - mu^j) >= 0, j=0,...,idx-1
# populated by row
expr = []
for j in range(idx):
# noinspection PyTypeChecker
row = [names, [-1.0] + (self._mu_E - mu[j, :]).tolist()]
expr.append(row)
senses = "G" * idx
rhs = [0.0] * idx
cpx.linear_constraints.add(expr, senses, rhs)
# add quadratic constraints:
# w * w^T <= 1
q = self.cplex.SparseTriple(ind1=names, ind2=names, val=[0.0] + [1.0] * n)
cpx.quadratic_constraints.add(rhs=1.0, quad_expr=q, sense="L")
cpx.solve()
if not cpx.solution.get_status() == cpx.solution.status.optimal:
raise Exception("No optimal solution found")
t, w = cpx.solution.get_values(0), cpx.solution.get_values(1, n)
w = np.array(w)
return t, w
except self.CplexError as e:
self._logger.exception(e.message)
return None
def _find_closest(self):
"""Find the point closest to the experts feature expectation.
Find the point closest to the experts feature expectation in the convex closure
of the feature expectation by solving the quadratic problem:
.. math::
min ||self._mu_E - mu||_2
s.t. mu = sum(lambda_i * mu_i)
lambda >=0
sum(lambda_i) = 1
Returns
-------
lambda_ : array_like, shape (`nfeatures`, `n`)
Set of feature expectations found by the algorithm, with `nfeatures` being the number
of features and `n` being the number of iterations (until the margin `t` was epsilon close).
"""
try:
n = self._mu.shape[0]
cpx = self.cplex.Cplex()
cpx.objective.set_sense(cpx.objective.sense.minimize)
# Solve non-negative least square problem
# min ||A * x - y||_2
# s.t. x >= 0
# sum(x_i) = 1
#
# by solving the quadratic program problem
# min 1/2 x^T * Q * x + c^T * x
# s.t. x >= 0
# sum(x_i) = 1
# where Q = A^T * A, and c = -A^T * y
#
# Let A = mu, y = mu_E, and x = lambda
# set linear terms:
obj = -(np.dot(self._mu, self._mu_E))
obj = obj.tolist()
# set linear boundaries
# lambda >= 0 (true for all components of the vector lambda)
lb = [0.0] * n
ub = [self.cplex.infinity] * n
# set linear constraints:
# sum_i(lambda_i) = 1
# names = []
# for i in range(n):
# names.append("lambda{0}".format(i))
cols = []
for i in range(n):
cols.append([[0], [1]])
cpx.linear_constraints.add(rhs=[1.0], senses="E")
cpx.variables.add(obj=obj, lb=lb, ub=ub, columns=cols)
# add quadratic terms
qmat = []
q = 2 * np.dot(self._mu, self._mu.transpose())
for j in range(n):
# noinspection PyUnresolvedReferences
row = [range(n), q[j, :].tolist()]
qmat.append(row)
cpx.objective.set_quadratic(qmat)
cpx.solve()
if not cpx.solution.get_status() == cpx.solution.status.optimal:
raise Exception("No optimal solution found")
lambda_ = np.array(cpx.solution.get_values())
self._logger.info("Cplex solution value: {0}".format(cpx.solution.get_objective_value()))
self._logger.info("lambda={0}".format(lambda_))
return lambda_
except self.CplexError as e:
self._logger.exception(e.message)
return None
[docs]class IncrApprenticeshipLearner(ApprenticeshipLearner):
""" Incremental apprenticeship learner.
The model under which the apprenticeship is operating is updated incrementally
while learning a policy that emulates the expert's demonstrations.
Parameters
----------
obs : array_like, shape (`n`, `nfeatures`, `ni`)
List of trajectories provided by demonstrator, which the learner
is trying to emulate, where `n` is the number of sequences, `ni` is
the length of the i_th demonstration, and each demonstration has
`nfeatures` features.
planner : IPlanner
The planner to use to determine the best action.
method : {'projection', 'maxmargin'}, optional
The IRL method to employ. Default is `projection`.
max_iter : int, optional
The maximum number of iteration after which learning
will be terminated. It is assumed that a policy close enough to
the experts demonstrations was found. Default is `inf`.
thresh : float, optional
The learning is considered having converged to the
demonstrations once the threshold has been reach. Default is `eps`.
gamma : float, optional
The discount factor. Default is 0.9.
nsamples : int, optional
The number of samples taken during Monte Carlo sampling. Default is 100.
max_steps : int, optional
The maximum number of steps in an iteration (during MonteCarlo sampling).
Default is 100.
filename : str, optional
The name of the file to save the learner state to after each iteration.
If None is given, the learner state is not saved. Default is None.
Other Parameters
----------------
mix_policies : bool
Whether to create a new policy by mixing from policies seen so far or by
considering the best valued action. Default is False.
rescale : bool
If set to True, the feature expectations are rescaled to be between 0 and 1.
Default is False.
visualize : bool
Visualize each iteration of the IRL step if set to True. Default is False.
Notes
-----
Inverse reinforcement learning assumes knowledge of the underlying model. However,
this is not always feasible. The incremental apprenticeship learner updates its model
after every iteration by executing the current policy. Thus, it provides an extension to
the original apprenticeship learner.
See Also
--------
:class:`ApprenticeshipLearner`
"""
def __init__(self, obs, planner, method=None, max_iter=None, thresh=None, gamma=None,
nsamples=None, max_steps=None, filename=None, **kwargs):
super(IncrApprenticeshipLearner, self).__init__(obs, planner, method, max_iter, thresh, gamma, nsamples,
max_steps, filename, **kwargs)
self._step_iter = 0
def __getstate__(self):
return super(IncrApprenticeshipLearner, self).__getstate__()
def __setstate__(self, d):
super(IncrApprenticeshipLearner, self).__setstate__(d)
self._i = self._iter
self._step_iter = 0
[docs] def reset(self, t, **kwargs):
"""Reset the apprenticeship learner.
Parameters
----------
t : float
The current time (sec)
kwargs : dict, optional
Non-positional parameters, optional.
"""
super(IncrApprenticeshipLearner, self).reset(t, **kwargs)
self._step_iter = 0
[docs] def execute(self, experience):
"""Execute learning specific updates.
Learning specific updates are performed, e.g. model updates.
Parameters
----------
experience : Experience
The actor's current experience consisting of previous state, the action
performed in that state, the current state, and the reward awarded.
"""
self._planner.model.update(experience)
[docs] def learn(self):
""" Learn a policy from the experience.
Learn the optimal policy using an apprenticeship learning algorithm
incrementally.
Returns
-------
bool :
Whether the found policy is considered to have converged. The algorithm is
considered to have converged on the optimal policy if either the performance
is within a certain threshold or if the maximum number of iterations has been
reached.
"""
self._logger.info("Starting iteration {0}/{1} (error was {2})".format(self._i + 1, self._max_iter, self._t))
self._planner.deactivate_exploration()
converged = self._perform_irl(self._i)
self._i += 1
return converged or (self._i >= self._max_iter)
[docs] def choose_action(self, state):
"""Choose the next action
The next action is chosen according to the current policy and the
selected exploration strategy.
Parameters
----------
state : State
The current state.
Returns
-------
Action :
The chosen action.
"""
action = None
if self._step_iter < self._max_steps:
self._planner.activate_exploration()
action = self._planner.get_next_action(state)
self._step_iter += 1
return action